Saturday, December 28, 2024
Google search engine
HomeLanguagesJavaSpring Boot Kafka Consumer Example

Spring Boot Kafka Consumer Example

Spring Boot is one of the most popular and most used frameworks of Java Programming Language. It is a microservice-based framework and to make a production-ready application using Spring Boot takes very less time. Spring Boot makes it easy to create stand-alone, production-grade Spring-based Applications that you can “just run“. So some of the main features of Spring boot are listed below.

  • Create stand-alone Spring applications
  • Embed Tomcat, Jetty, or Undertow directly.
  • Provide ‘starter’ dependencies to simplify the build configuration.
  • Configure Spring and 3rd party libraries Automatically whenever possible.
  • Provide production-ready features like health checks, metrics, and externalized configuration.
  • Almost no code generation and no requirement for XML configuration.

Apache Kafka is a publish-subscribe messaging system. A messaging system lets you send messages between processes, applications, and servers. Broadly Speaking, Apache Kafka is software where topics (A topic might be a category) can be defined and further processed. Applications may connect to this system and transfer a message onto the topic. A message can include any kind of information, from any event on your Personal blog or can be a very simple text message that would trigger any other event. Here we will be discussing how we can consume messages from Kafka topics and display them in our console with Spring Boot where Kafka is a pre-requisite

Example:

Prerequisite: Make sure you have installed Apache Kafka in your local machine for which one should know How to Install and Run Apache Kafka on Windows?

Step 1: Go to this link and create a Spring Boot project. Add the “Spring for Apache Kafka” dependency to your Spring Boot project. 

Step 2: Create a Configuration file named KafkaConfig. Below is the code for the KafkaConfig.java file.

Java




// Java Program to Illustrate Kafka Configuration
  
package com.amiya.kafka.apachekafkaconsumer.config;
  
// Importing required classes
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
  
// Annotations
@EnableKafka
@Configuration
  
// Class
public class KafkaConfig {
  
    @Bean
    public ConsumerFactory<String, String> consumerFactory()
    {
  
        // Creating a Map of string-object pairs
        Map<String, Object> config = new HashMap<>();
  
        // Adding the Configuration
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                   "127.0.0.1:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG,
                   "group_id");
        config.put(
            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class);
        config.put(
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class);
  
        return new DefaultKafkaConsumerFactory<>(config);
    }
  
    // Creating a Listener
    public ConcurrentKafkaListenerContainerFactory
    concurrentKafkaListenerContainerFactory()
    {
        ConcurrentKafkaListenerContainerFactory<
            String, String> factory
            = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}


Step 3: Create a Consumer file named KafkaConsumer

Java




// Java Program to Illustrate Kafka Consumer
  
package com.amiya.kafka.apachekafkaconsumer.consumer;
  
// Importing required classes
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
  
@Component
  
// Class
public class KafkaConsumer {
  
    @KafkaListener(topics = "NewTopic",
                   groupId = "group_id")
  
    // Method
    public void
    consume(String message)
    {
        // Print statement
        System.out.println("message = " + message);
    }
}


Step 4: Now we have to do the following things in order to consume messages from Kafka topics with Spring Boot

  • Run the Apache Zookeeper server
  • Run the Apache Kafka  server
  • Send the messages from Kafka Topics

Run your Apache Zookeeper server by using this command

C:\kafka>.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties

Similarly, run your Apache Kafka server by using this command

C:\kafka>.\bin\windows\kafka-server-start.bat .\config\server.properties

Run the following command to send the messages from Kafka Topics

C:\kafka>.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic NewTopic

Step 5: Now run your spring boot application. Make sure you have changed the port number in the application.properties file

server.port=8081

Let’s run the Spring boot application inside the ApacheKafkaConsumerApplication file

Output: In the output, you can see when you are sending the message from Kafka Topics it is displayed on the console in real-time. 

Output

RELATED ARTICLES

Most Popular

Recent Comments