In this article, we are going to make a program to produce some data using Kafka Producer which will consume by the Kafka Consumer and save into the elastic search DB, and later on, plot that JSON data into the Grafana dashboard. Start with configuring all the required software and tool.
Requirements
- Java 8
- Spring Boot 2.7.0
- Kafka 3.2.0
- Elastic search DB 7.17.3
Note: Why do we mention the version?
Version compatibility is of paramount importance in the case of Elastic and Spring Boot. If your elastic-search version doesn’t match with the spring boot version or vice versa, then you face problems in configuring both. Below is the list of version compatibility :
Spring Data Elasticsearch |
Elastic-Search |
Spring framework |
Spring Boot |
---|---|---|---|
4.4.X |
7.17.3 |
5.3.X |
2.7.X |
4.3.X |
7.15.2 |
5.3.X |
2.6.X |
4.2.X |
7.12.0 |
5.3.X |
2.5.X |
4.1.X |
7.9.3 |
5.3.2 |
2.4.X |
4.0.X |
7.6.2 |
5.2.12 |
2.3.X |
3.2.X |
6.8.12 |
5.2.12 |
2.2.X |
3.1.X |
6.2.2 |
5.1.19 |
2.1.X |
3.0.X |
5.5.0 |
5.0.13 |
2.0.X |
2.1.X |
2.4.0 |
4.3.25 |
1.5.X |
Download links:
- Elastic-Search – click here
- Kafka – click here
Download and extract from your system.
Create Kafka Producer Application
Create a spring boot project named Producer.
ProducerApplication.class
Java
package com.example.demo; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class ProducerApplication { public static void main(String[] args) { SpringApplication.run(ProducerApplication. class , args); } } |
KafkaProducerConfig.java
In this class, we have provided the configuration of Kafka.
Java
package com.example.demo.config; import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.support.serializer.JsonSerializer; import com.example.demo.model.User; @Configuration public class KafkaProducerConfig { @Bean public ProducerFactory<String, User> userProducerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092" ); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer. class .getName()); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer. class .getName()); return new DefaultKafkaProducerFactory<>(configProps); } @Bean public KafkaTemplate<String, User> userKafkaTemplate() { return new KafkaTemplate<>(userProducerFactory()); } } |
User.java
Model class where we store the user information.
Java
package com.example.demo.model; public class User { int id; String name; String pdate; public User() { super (); } public User( int id, String name, String pdate) { super (); this .id = id; this .name = name; this .pdate = pdate; } public int getId() { return id; } public void setId( int id) { this .id = id; } public String getName() { return name; } public void setName(String name) { this .name = name; } public String getPdate() { return pdate; } public void setPdate(String pdate) { this .pdate = pdate; } } |
KafkaService.java
Service class uses kafka template to send the data to the consumer.
Java
package com.example.demo.service; import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; import com.example.demo.model.User; @Service public class KafkaService { private final Logger LOG = LoggerFactory.getLogger(KafkaService. class ); @Autowired private KafkaTemplate<String, User> kafkaTemplate; String kafkaTopic = "gfg" ; public void send(User user) { LOG.info( "Sending User Json Serializer : {}" , user); kafkaTemplate.send(kafkaTopic, user); } public void sendList(List<User> userList) { LOG.info( "Sending UserList Json Serializer : {}" , userList); for (User user : userList) { kafkaTemplate.send(kafkaTopic, user); } } } |
ProducerController.java
Java
package com.example.demo.controller; import java.util.List; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RestController; import com.example.demo.model.User; import com.example.demo.service.KafkaService; @RestController public class ProducerController { @Autowired KafkaService kafkaProducer; @PostMapping ( "/producer" ) public String sendMessage( @RequestBody User user) { kafkaProducer.send(user); return "Message sent successfully to the Kafka topic shubham" ; } @PostMapping ( "/producerlist" ) public String sendMessage( @RequestBody List<User> user) { kafkaProducer.sendList(user); return "Message sent successfully to the Kafka topic shubham" ; } } |
pom.xml
XML
<? xml version = "1.0" encoding = "UTF-8" ?> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 maven-4.0.0.xsd"> < modelVersion >4.0.0</ modelVersion > < parent > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-starter-parent</ artifactId > < version >2.7.0</ version > < relativePath /> </ parent > < groupId >com.example</ groupId > < artifactId >Producer</ artifactId > < version >0.0.1-SNAPSHOT</ version > < name >Producer</ name > < description >Demo project for Spring Boot</ description > < properties > < java.version >1.8</ java.version > </ properties > < dependencies > < dependency > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-starter-web</ artifactId > </ dependency > < dependency > < groupId >org.springframework.kafka</ groupId > < artifactId >spring-kafka</ artifactId > </ dependency > < dependency > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-devtools</ artifactId > < scope >runtime</ scope > < optional >true</ optional > </ dependency > < dependency > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-starter-test</ artifactId > < scope >test</ scope > </ dependency > < dependency > < groupId >org.springframework.kafka</ groupId > < artifactId >spring-kafka-test</ artifactId > < scope >test</ scope > </ dependency > </ dependencies > < build > < plugins > < plugin > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-maven-plugin</ artifactId > </ plugin > </ plugins > </ build > </ project > |
application.properties
server.port=1234
Create Kafka Consumer and Configure the ElastisSearch Application
Create another spring boot application named ElasticConsumer.
ComsumerApplication.java
Java
package com.example.demo; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import com.example.demo.model.User; import com.example.demo.service.KafkaUserService; @SpringBootApplication @RestController public class ConsumerApplication { @Autowired KafkaUserService kafkaUserService; public static void main(String[] args) { SpringApplication.run(ConsumerApplication. class , args); } @KafkaListener (topics = "gfg" , groupId = "gfg-group" ) public void listen(User user) { System.out.println( "Received User information : " + user.toString()); try { kafkaUserService.saveUser(user); } catch (Exception e) { e.printStackTrace(); } } @GetMapping ( "/getElasticUserFromKafka" ) public Iterable<User> findAllUser() { return kafkaUserService.findAllUsers(); } } |
KafkaConsumerConfig.java
Java
package com.example.demo.config; import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.support.serializer.JsonDeserializer; import com.example.demo.model.User; @EnableKafka @Configuration public class kafkaConsumerConfig { @Bean public ConsumerFactory<String, User> consumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092" ); props.put(ConsumerConfig.GROUP_ID_CONFIG, "gfg-group" ); return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(User. class )); } @Bean public ConcurrentKafkaListenerContainerFactory<String, User> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, User> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } } |
User.java
@Document – specifies our index.
@Id – represents the field _id of our document and it is unique for each message.
@Field – represents a different type of field that might be in our data.
Java
package com.example.demo.model; import java.util.Date; import org.springframework.data.annotation.Id; import org.springframework.data.elasticsearch.annotations.Document; import org.springframework.data.elasticsearch.annotations.Field; import org.springframework.data.elasticsearch.annotations.FieldType; import com.google.gson.Gson; @Document (indexName = "kafkauser" ) public class User { @Id int id; @Field (type = FieldType.Text, name = "name" ) String name; @Field (type = FieldType.Date, name = "pdate" ) Date pdate; public User() { super (); } public User( int id, String name, Date pdate) { super (); this .id = id; this .name = name; this .pdate = pdate; } public Date getPdate() { return pdate; } public void setPdate(Date pdate) { this .pdate = pdate; } public int getId() { return id; } public void setId( int id) { this .id = id; } public String getName() { return name; } public void setName(String name) { this .name = name; } @Override public String toString() { return new Gson().toJson( this ); } } |
KafkaUserRepository.java
Java
package com.example.demo.repository; import org.springframework.data.elasticsearch.repository.ElasticsearchRepository; import org.springframework.stereotype.Repository; import com.example.demo.model.User; @Repository public interface KafkaUserRepository extends ElasticsearchRepository<User,String> { } |
KafkaUserService.java
Java
package com.example.demo.service; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import com.example.demo.model.User; import com.example.demo.repository.KafkaUserRepository; @Service public class KafkaUserService { @Autowired private KafkaUserRepository edao; public void saveUser(User user) { edao.save(user); } public Iterable<User> findAllUsers() { return edao.findAll(); } } |
How to Run?
Elastis-Search:
- Run elastissearch.bat using cmd -> E:\elasticsearch-7.17.3-windows-x86_64\elasticsearch-7.17.3\bin> elasticsearch.bat
- open browser and type – http://localhost:9200
The output look like this:
{
“name”: “DESKTOP-S6UTE8M”,
“cluster_name”: “elasticsearch”,
“cluster_uuid”: “VDlwyl2WQhCX7_lLwWm9Kg”,
“version”: {
“number”: “7.17.3”,
“build_flavor”: “default”,
“build_type”: “zip”,
“build_hash”: “5ad023604c8d7416c9eb6c0eadb62b14e766caff”,
“build_date”: “2022-04-19T08:11:19.070913226Z”,
“build_snapshot”: false,
“lucene_version”: “8.11.1”,
“minimum_wire_compatibility_version”: “6.8.0”,
“minimum_index_compatibility_version”: “6.0.0-beta1”
},
“tagline”: “You Know, for Search”
}Kafka:
In Kafka first we have to run zookeeper and after that kafkaserver. In windows we have to run .bat file and in the case of linux we have to run .sh file
- Open cmd
- Navigate under kafka folder
- E:\kafka_2.12-3.2.0\kafka_2.12-3.2.0> .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
- E:\kafka_2.12-3.2.0\kafka_2.12-3.2.0> .\bin\windows\kafka-server-start.bat .\config\server.properties
pom.xml
XML
<? xml version = "1.0" encoding = "UTF-8" ?> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 < modelVersion >4.0.0</ modelVersion > < parent > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-starter-parent</ artifactId > < version >2.7.0</ version > < relativePath /> </ parent > < groupId >com.example</ groupId > < artifactId >Consumer</ artifactId > < version >0.0.1-SNAPSHOT</ version > < name >Consumer</ name > < description >Demo project for Spring Boot</ description > < properties > < java.version >1.8</ java.version > </ properties > < dependencies > < dependency > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-starter-web</ artifactId > </ dependency > < dependency > < groupId >com.google.code.gson</ groupId > < artifactId >gson</ artifactId > </ dependency > < dependency > < groupId >org.springframework.kafka</ groupId > < artifactId >spring-kafka</ artifactId > </ dependency > < dependency > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-starter-data-elasticsearch</ artifactId > </ dependency > < dependency > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-devtools</ artifactId > < scope >runtime</ scope > < optional >true</ optional > </ dependency > < dependency > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-starter-test</ artifactId > < scope >test</ scope > </ dependency > < dependency > < groupId >org.springframework.kafka</ groupId > < artifactId >spring-kafka-test</ artifactId > < scope >test</ scope > </ dependency > </ dependencies > < build > < plugins > < plugin > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-maven-plugin</ artifactId > </ plugin > </ plugins > </ build > </ project > |
Run Producer and ElasticConsumer Spring Application
Send JSON data using postman. Here data is sent by the Producer app to ElasticConsumer. And in the ElasticConsumer console data would be printed and saved into ElasticSearchDB in the form of JSON.
Producer App APIs:
- Send single object -> http://localhost:1234/producer
- Send list of objects -> http://localhost:1234/producerlist
ElasticConsumer app APIs
- Fetch all records from elastic db -> localhost:8080/getElasticUserFromKafka
Grafana Dashboard
Grafana dashboard is running on http://localhots:3000. Watch the configuration video below.
Output
Output Video 1:
Output Video 2:
Some ElasticSearch APIs
- To show the records of index -> http://localhost:9200/<index_name>/_search
- To Delete index -> http://localhost:9200/<index_name>
- List all indices -> http://localhost:9200/_cat/indices
- Show schema of index -> http://localhost:9200/<index_name>