Monday, November 18, 2024
Google search engine
HomeLanguagesJavaSpring Boot – Start/Stop a Kafka Listener Dynamically

Spring Boot – Start/Stop a Kafka Listener Dynamically

When your spring boot application starts, Kafka Listener’s default behavior is to begin listening for a certain topic. However, there are situations when we don’t want to start it right away once our application launches. This article will go through how to dynamically start or stop a Kafka listener.

Implementation

Make a class whose objects will be consumed by the Kafka listener.

File: Message.java

Java




import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
  
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Message {
    private String message;
}


Configure the consumer that will be used by Kafka Listener.

File: KakfaConsumerConfig.java

Java




import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
  
import java.util.HashMap;
import java.util.Map;
  
@Configuration
public class KafkaConsumerConfig {
  
    private String kafkaUrl = "localhost:9092";
  
    @Bean
    public ConsumerFactory<String, Message> messageConsumerFactory() {
        JsonDeserializer<Message> deserializer = new JsonDeserializer<>(Message.class, false);
        deserializer.setRemoveTypeHeaders(false);
        deserializer.addTrustedPackages("*");
        deserializer.setUseTypeMapperForKey(true);
  
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUrl);
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "group-1");
        return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), deserializer);
    }
  
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Message> messageListenerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Message> containerFactory = new ConcurrentKafkaListenerContainerFactory();
        containerFactory.setConsumerFactory(messageConsumerFactory());
        return containerFactory;
    }
}


Create a Kafka listener with the necessary parameters.

  • id = The container’s unique identifier for this listener. If none is specified, an auto-generated ID is used.
  • groupId = Override the group.id property for the consumer factory with this value for this listener only.
  • topics = The topics of this listener. The entries can be “topic names,” “property placeholder keys,” or “expressions.” The topic name must be resolved from an expression. This uses group management, and Kafka will assign partitions to group members.
  • containerFactory = The bean name of the KafkaListenerContainerFactory that will be used to create the message listener container that will serve this endpoint.
  • autoStartup = Set to true or false to override the container factory’s default setting. By default, the value is set to true, and because of this, it’ll start consuming messages as soon as our application starts.

File: KafkaMessageListener.java

Java




import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;
  
@Configuration
public class KafkaMessageListener {
  
    Logger logger = LoggerFactory.getLogger(KafkaMessageListener.class);
  
    @KafkaListener(id = "id-1", groupId = "group-1", topics = "Message-topic", containerFactory = "messageListenerFactory", autoStartup = "false")
    public void consumeMessage(Message message) {
        logger.info("Message received : -> {}", message);
    }
}


The KafkaListenerEndpointRegistry class can be used to get a Kafka Listener Container by listenerId. The Kafka Listener can now be started or stopped using this container.

File: KafkaListenerAutomation.java

Java




import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.stereotype.Component;
  
@Component
public class KafkaListenerAutomation {
  
    private final Logger logger = LoggerFactory.getLogger(KafkaListenerAutomation.class);
  
    @Autowired
    KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
  
    public boolean startListener(String listenerId) {
        MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(listenerId);
        assert listenerContainer != null : false;
        listenerContainer.start();
        logger.info("{} Kafka Listener Started", listenerId);
        return true;
    }
  
    public boolean stopListener(String listenerId) {
        MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(listenerId);
        assert listenerContainer != null : false;
        listenerContainer.stop();
        logger.info("{} Kafka Listener Stopped.", listenerId);
        return true;
    }
}


Using API endpoints, we can start or stop a specific Kafka listener by providing the listenerID.

File: StartOrStopListenerController.java

Java




import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
  
@RestController
public class StartOrStopListenerController {
  
    @Autowired
    KafkaListenerAutomation kafkaListenerAutomation;
  
    @GetMapping("/start")
    public void start(@RequestParam("id") String listenerId) {
        kafkaListenerAutomation.startListener(listenerId);
    }
  
    @GetMapping("/stop")
    public void stop(@RequestParam("id") String listenerId) {
        kafkaListenerAutomation.stopListener(listenerId);
    }
}


Output:

Kafka Listener started

Kafka Listener started

Kafka Listener received message

Kafka Listener received message

Kafka Listener stopped

Kafka Listener stopped

Conclusion

The app should ideally be started when the Kafka messages need to be processed and stopped as soon as that process is complete. It’s a good practice to constrain your Kafka listener to utilize it efficiently.

RELATED ARTICLES

Most Popular

Recent Comments