Apache Kafka is a stream processing system which lets you send messages between processes, applications, and servers. In this article, we will see how to publish JSON messages on the console of a Spring boot application using Apache Kafka.
In order to learn how to create a Spring boot project, refer to this article.
Working Steps:
- Go to Spring initializer and create a starter project with following dependency:
- Spring for Apache Kafka
- Open the project in an IDE and sync the dependencies. In this article, we would be creating a student model where we would be posting the student details. Therefore, create a model class Student. Add data members and create constructor and override the toString method to see the messages in JSON format. The following is the implementation of the student class:
Student Model
// Java program to implement a // student class // Creating a student class public class Student { // Data members of the class int id; String firstName; String lastName; // Constructor of the student // class public Student() { } // Parameterized constructor of // the student class public Student( int id, String firstName, String lastName) { this .id = id; this .firstName = firstName; this .lastName = lastName; } @Override public String toString() { return "Student{" + "id = " + id + ", firstName = '" + firstName + "'" + ", lastName = '" + lastName + "'" + "}" ; } } |
- Create a new class Config and add annotations @Configuration and @EnableKafka. Now create beans ConsumerFactory and ConcurrentKafkaListenerContainerFactory with Student class object.
Config clas
@EnableKafka @Configuration public class Config { // Function to establish a connection // between Spring application // and Kafka server @Bean public ConsumerFactory<String, Student> studentConsumer() { // HashMap to store the configurations Map<String, Object> map = new HashMap<>(); // put the host IP in the map map.put(ConsumerConfig .BOOTSTRAP_SERVERS_CONFIG, "" ); // put the group ID of consumer in the map map.put(ConsumerConfig .GROUP_ID_CONFIG, "id" ); map.put(ConsumerConfig .KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer. class ); map.put(ConsumerConfig .VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer. class ); // return message in JSON formate return new DefaultKafkaConsumerFactory<>( map, new StringDeserializer(), new JsonDeserializer<>(Student. class )); } @Bean public ConcurrentKafkaListenerContainerFactory<String, Student> studentListner() { ConcurrentKafkaListenerContainerFactory<String, Student> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(studentConsumer()); return factory; } } |
- Create a class KafkaService with @Service annotation. This class will contain the listener method to publish the message on the console.
KafkaService Class
@Service public class KafkaService { // Annotation required to listen // the message from Kafka server @KafkaListener (topics = "JsonTopic" , groupId = "id" , containerFactory = "studentListner" ) public void publish(Student student) { System.out.println( "New Entry: " + student); } } |
- Start zookeeper and Kafka server. Now we need to create a new topic with the name JsonTopic. To do so, open a new command prompt window and change directory to the Kafka directory.
- Now create a new topic using the command given below:
bin/Kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic topic_name // for mac and linux
.\bin\windows\Kafka-topics.bat –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic topic_name // for windows
- Now to run Kafka producer console, use the command below:
bin/Kafka-console-producer.sh –broker-list localhost:9092 –topic Kafka_Example // for mac and linux
.\bin\windows\Kafka-console-producer.bat –broker-list localhost:9092 –topic Kafka_Example // for windows
- Run the application and and type message on Kafka producer and press enter.