When your spring boot application starts, Kafka Listener’s default behavior is to begin listening for a certain topic. However, there are situations when we don’t want to start it right away once our application launches. This article will go through how to dynamically start or stop a Kafka listener.
Implementation
Make a class whose objects will be consumed by the Kafka listener.
File: Message.java
Java
import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @Data @AllArgsConstructor @NoArgsConstructor public class Message { private String message; } |
Configure the consumer that will be used by Kafka Listener.
File: KakfaConsumerConfig.java
Java
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.support.serializer.JsonDeserializer; import java.util.HashMap; import java.util.Map; @Configuration public class KafkaConsumerConfig { private String kafkaUrl = "localhost:9092" ; @Bean public ConsumerFactory<String, Message> messageConsumerFactory() { JsonDeserializer<Message> deserializer = new JsonDeserializer<>(Message. class , false ); deserializer.setRemoveTypeHeaders( false ); deserializer.addTrustedPackages( "*" ); deserializer.setUseTypeMapperForKey( true ); Map<String, Object> config = new HashMap<>(); config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUrl); config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer. class ); config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer. class ); config.put(ConsumerConfig.GROUP_ID_CONFIG, "group-1" ); return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), deserializer); } @Bean public ConcurrentKafkaListenerContainerFactory<String, Message> messageListenerFactory() { ConcurrentKafkaListenerContainerFactory<String, Message> containerFactory = new ConcurrentKafkaListenerContainerFactory(); containerFactory.setConsumerFactory(messageConsumerFactory()); return containerFactory; } } |
Create a Kafka listener with the necessary parameters.
- id = The container’s unique identifier for this listener. If none is specified, an auto-generated ID is used.
- groupId = Override the group.id property for the consumer factory with this value for this listener only.
- topics = The topics of this listener. The entries can be “topic names,” “property placeholder keys,” or “expressions.” The topic name must be resolved from an expression. This uses group management, and Kafka will assign partitions to group members.
- containerFactory = The bean name of the KafkaListenerContainerFactory that will be used to create the message listener container that will serve this endpoint.
- autoStartup = Set to true or false to override the container factory’s default setting. By default, the value is set to true, and because of this, it’ll start consuming messages as soon as our application starts.
File: KafkaMessageListener.java
Java
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.KafkaListener; @Configuration public class KafkaMessageListener { Logger logger = LoggerFactory.getLogger(KafkaMessageListener. class ); @KafkaListener (id = "id-1" , groupId = "group-1" , topics = "Message-topic" , containerFactory = "messageListenerFactory" , autoStartup = "false" ) public void consumeMessage(Message message) { logger.info( "Message received : -> {}" , message); } } |
The KafkaListenerEndpointRegistry class can be used to get a Kafka Listener Container by listenerId. The Kafka Listener can now be started or stopped using this container.
File: KafkaListenerAutomation.java
Java
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.config.KafkaListenerEndpointRegistry; import org.springframework.kafka.listener.MessageListenerContainer; import org.springframework.stereotype.Component; @Component public class KafkaListenerAutomation { private final Logger logger = LoggerFactory.getLogger(KafkaListenerAutomation. class ); @Autowired KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; public boolean startListener(String listenerId) { MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(listenerId); assert listenerContainer != null : false ; listenerContainer.start(); logger.info( "{} Kafka Listener Started" , listenerId); return true ; } public boolean stopListener(String listenerId) { MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(listenerId); assert listenerContainer != null : false ; listenerContainer.stop(); logger.info( "{} Kafka Listener Stopped." , listenerId); return true ; } } |
Using API endpoints, we can start or stop a specific Kafka listener by providing the listenerID.
File: StartOrStopListenerController.java
Java
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @RestController public class StartOrStopListenerController { @Autowired KafkaListenerAutomation kafkaListenerAutomation; @GetMapping ( "/start" ) public void start( @RequestParam ( "id" ) String listenerId) { kafkaListenerAutomation.startListener(listenerId); } @GetMapping ( "/stop" ) public void stop( @RequestParam ( "id" ) String listenerId) { kafkaListenerAutomation.stopListener(listenerId); } } |
Output:
Conclusion
The app should ideally be started when the Kafka messages need to be processed and stopped as soon as that process is complete. It’s a good practice to constrain your Kafka listener to utilize it efficiently.