Sunday, November 17, 2024
Google search engine
HomeLanguagesJavaSpring Boot – Consume JSON Object From Kafka Topics

Spring Boot – Consume JSON Object From Kafka Topics

Apache Kafka is a publish-subscribe messaging system. A messaging system lets someone is sending messages between processes, applications, and servers. Broadly Speaking, Apache Kafka is software where topics (A topic might be a category) can be defined and further processed. Applications may connect to this system and transfer a message onto the topic. A message can include any kind of information, from any event on your Personal blog or can be a very simple text message that would trigger any other event. Read more about Kafka here. In this article, Spring Boot Kafka Consumer Example we have discussed how we can consume messages from Kafka topics with Spring Boot. But in a complex program, we need to consume JSON objects from Kafka topics. 

Prerequisite: Make sure you have installed Apache Kafka in your local machine. Refer to this article How to Install and Run Apache Kafka on Windows?

Implementation:

Step 1: Go to this link https://start.spring.io/ and create a Spring Boot project. Add the “Spring for Apache Kafka” dependency to your Spring Boot project. 

Step 2: Create a simple POJO class named Book inside the Model package. Below is the code for the Book.java file.

Java




// Java Program to Illustrate Book Class
  
package com.amiya.kafka.apachekafkaconsumer.Model;
  
// Class
public class Book {
  
    // Class data members
    private String bookName;
    private String isbn;
  
    // Constructor 1
    public Book() {}
  
    // Constructor 2
    public Book(String bookName, String isbn)
    {
        // This keyword refers to
        // current instance itself
        this.bookName = bookName;
        this.isbn = isbn;
    }
  
    // Setter
    public String getBookName() { return bookName; }
  
    // Setter
    public void setBookName(String bookName)
    {
        this.bookName = bookName;
    }
  
    // Setter
    public String getIsbn() { return isbn; }
  
    // Setter
    public void setIsbn(String isbn) { this.isbn = isbn; }
}


Step 3: Create a Configuration file named KafkaConfig. Below is the code for the KafkaConfig.java file. Comments are added inside the code to understand the code in more detail.

Example

Java




// Java Program to Illustrate Configuration Class
  
package com.amiya.kafka.apachekafkaconsumer.config;
  
// Importing required classes
import com.amiya.kafka.apachekafkaconsumer.Model.Book;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
  
// Annotation
@EnableKafka
@Configuration
  
// Class
public class KafkaConfig {
  
    @Bean
    public ConsumerFactory<String, Book> consumerFactory()
    {
  
        // Creating a map of string-object type
        Map<String, Object> config = new HashMap<>();
  
        // Adding the Configuration
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                   "127.0.0.1:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG,
                   "group_id");
        config.put(
            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class);
        config.put(
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            JsonDeserializer.class);
  
        // Returning message in JSON format
        return new DefaultKafkaConsumerFactory<>(
            config, new StringDeserializer(),
            new JsonDeserializer<>(Book.class));
    }
  
    // Creating a Listener
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String,
                                                   Book>
    bookListener()
    {
        ConcurrentKafkaListenerContainerFactory<
            String, Book> factory
            = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
                                                       
        return factory;
    }
}


Step 4: Create a Consumer file named KafkaConsumer.

File: KafkaConsumer.java 

Java




// Java Program to Illustrate kafka Consumer Class
  
package com.amiya.kafka.apachekafkaconsumer.consumer;
  
// Importing required classes
import com.amiya.kafka.apachekafkaconsumer.Model.Book;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
  
// Annotation
@Component
  
// Class
public class KafkaConsumer {
  
    @KafkaListener(topics = "NewTopic",
                   groupId = "group_id",
                   containerFactory = "bookListener")
  
    // Method
    public void
    consume(Book book)
    {
        // Print statement
        System.out.println("message = " + book);
    }
}


Step 5: Now we have to do the following things in order to consume messages from Kafka topics with Spring Boot

  1. Run the Apache Zookeeper server
  2. Run the Apache Kafka  server
  3. Send the JSON object from Kafka Topics

Run your Apache Zookeeper server by using this command:

C:\kafka>.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties

Similarly, run your Apache Kafka server by using this command

C:\kafka>.\bin\windows\kafka-server-start.bat .\config\server.properties

Run the following command to send the JSON object from Kafka Topics

C:\kafka>.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic NewTopic

Step 6: Now run your spring boot application. Make sure you have changed the port number in the application.properties file.

server.port=8081

Let’s run the Spring boot application inside the ApacheKafkaConsumerApplication file.

Output:

In the output, one can see when you are sending the JSON object from Kafka Topics it is displayed on the console in real-time. 

Output

RELATED ARTICLES

Most Popular

Recent Comments