Saturday, October 18, 2025
HomeLanguagesJavaSpring Boot | How to consume JSON messages using Apache Kafka

Spring Boot | How to consume JSON messages using Apache Kafka

Apache Kafka is a stream processing system which lets you send messages between processes, applications, and servers. In this article, we will see how to publish JSON messages on the console of a Spring boot application using Apache Kafka.

In order to learn how to create a Spring boot project, refer to this article.

Working Steps: 
 

  1. Go to Spring initializer and create a starter project with following dependency: 
    • Spring for Apache Kafka
  2. Open the project in an IDE and sync the dependencies. In this article, we would be creating a student model where we would be posting the student details. Therefore, create a model class Student. Add data members and create constructor and override the toString method to see the messages in JSON format. The following is the implementation of the student class:
     

Student Model




// Java program to implement a
// student class
 
// Creating a student class
public class Student {
 
    // Data members of the class
    int id;
    String firstName;
    String lastName;
 
    // Constructor of the student
    // class
    public Student()
    {
    }
 
    // Parameterized constructor of
    // the student class
    public Student(int id, String firstName,
                   String lastName)
    {
        this.id = id;
        this.firstName = firstName;
        this.lastName = lastName;
    }
 
    @Override
    public String toString()
    {
        return "Student{"
            + "id = " + id
            + ", firstName = '" + firstName + "'"
            + ", lastName = '" + lastName + "'"
            + "}";
    }
}


  1.  
  2. Create a new class Config and add annotations @Configuration and @EnableKafka. Now create beans ConsumerFactory and ConcurrentKafkaListenerContainerFactory with Student class object.
     

Config clas




@EnableKafka
@Configuration
public class Config {
 
    // Function to establish a connection
    // between Spring application
    // and Kafka server
    @Bean
    public ConsumerFactory<String, Student>
    studentConsumer()
    {
 
        // HashMap to store the configurations
        Map<String, Object> map
            = new HashMap<>();
 
        // put the host IP in the map
        map.put(ConsumerConfig
                    .BOOTSTRAP_SERVERS_CONFIG,
                "127.0.0.1:9092");
 
        // put the group ID of consumer in the map
        map.put(ConsumerConfig
                    .GROUP_ID_CONFIG,
                "id");
        map.put(ConsumerConfig
                    .KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        map.put(ConsumerConfig
                    .VALUE_DESERIALIZER_CLASS_CONFIG,
                JsonDeserializer.class);
 
        // return message in JSON formate
        return new DefaultKafkaConsumerFactory<>(
            map, new StringDeserializer(),
            new JsonDeserializer<>(Student.class));
    }
 
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String,
                                                   Student>
    studentListner()
    {
        ConcurrentKafkaListenerContainerFactory<String,
                                                Student>
            factory
            = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(studentConsumer());
        return factory;
    }
}


  1.  
  2. Create a class KafkaService with @Service annotation. This class will contain the listener method to publish the message on the console.
     

KafkaService Class




@Service
public class KafkaService {
 
    // Annotation required to listen
    // the message from Kafka server
    @KafkaListener(topics = "JsonTopic",
                   groupId = "id", containerFactory
                                   = "studentListner")
    public void
    publish(Student student)
    {
        System.out.println("New Entry: "
                           + student);
    }
}


  1.  
  2. Start zookeeper and Kafka server. Now we need to create a new topic with the name JsonTopic. To do so, open a new command prompt window and change directory to the Kafka directory.
  3. Now create a new topic using the command given below:
     

bin/Kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic topic_name // for mac and linux
.\bin\windows\Kafka-topics.bat –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic topic_name // for windows 
 

  1.  
  2. Now to run Kafka producer console, use the command below:
     

bin/Kafka-console-producer.sh –broker-list localhost:9092 –topic Kafka_Example // for mac and linux
.\bin\windows\Kafka-console-producer.bat –broker-list localhost:9092 –topic Kafka_Example // for windows 
 

  1.  
  2. Run the application and and type message on Kafka producer and press enter.
Dominic
Dominichttp://wardslaus.com
infosec,malicious & dos attacks generator, boot rom exploit philanthropist , wild hacker , game developer,
RELATED ARTICLES

Most Popular

Dominic
32361 POSTS0 COMMENTS
Milvus
88 POSTS0 COMMENTS
Nango Kala
6728 POSTS0 COMMENTS
Nicole Veronica
11892 POSTS0 COMMENTS
Nokonwaba Nkukhwana
11954 POSTS0 COMMENTS
Shaida Kate Naidoo
6852 POSTS0 COMMENTS
Ted Musemwa
7113 POSTS0 COMMENTS
Thapelo Manthata
6805 POSTS0 COMMENTS
Umr Jansen
6801 POSTS0 COMMENTS