Monday, November 18, 2024
Google search engine
HomeLanguagesJavaApache Kafka Streams – Simple Word Count Example

Apache Kafka Streams – Simple Word Count Example

Kafka Streams is used to create apps and microservices with input and output data stored in an Apache Kafka cluster. It combines the advantages of Kafka’s server-side cluster technology with the ease of creating and deploying regular Java and Scala apps on the client side.

Approach

In this article, we are going to use Kafka streams for counting words in a given sentence. We will use the maven project to manage the flow and Kafka topics defined in Kafka. Following that, a number of functions will be used, and processing will result in a word count. This outcome will once more be posted to a Kafka topic. For this, it is assumed that basic knowledge of Apache Kafka and Kafka Streams is there.

Concepts

KStream

The abstraction of a record stream known as a KStream is one in which every data record represents a self-contained item in the unbounded data set. Using the table analogy, data records in a record stream are always treated as “INSERTs” because no record replaces an existing row with the same key. Instead, think of it as adding more entries to an append-only ledger. Examples include a server log entry, a page view event, or a credit card transaction. For example, two data are being sent to the stream.

("Seeta", 1) --> ("Seeta", 3)

If the stream processing application added the values for each user, Seeta would have a result of 4. due to the fact that the second data record would not be seen as an update to the first.

KTable

Each data record in a KTable represents an update and is an abstraction of a changelog stream. More specifically, the value in a data record is understood as an “UPDATE” of the previous value for the same record key, if any (the update will be regarded as an INSERT if the corresponding key does not yet exist). A data record in a changelog stream is understood as an UPSERT, also known as an INSERT/UPDATE, because any existing row with the same key is rewritten, to use the table analogy. A record with a null value denotes a “DELETE” or tombstone for the record’s key. Additionally, null values are processed differently. For example, two data are being sent to the stream.

("Seeta", 1) --> ("Seeta", 3)

If the stream processing application added the values for each user, ‘Seeta’ would have a result of 3. as the second data record would be seen as an update to the first.

GlobalKTable

A GlobalKTable and a KTable are different from one another in terms of the data that is being read into each table from the underlying Kafka topic. Imagine, in a simplified manner, that your input topic has n partitions. You wish to read this subject into a table in your application. Additionally, for optimum parallelism, you should run your application across n application instances. For example,

  • Each application instance’s “local” KTable instance will only have data from one of the subject’s five partitions if the input topic is read into a KTable.
  • Each application instance’s local GlobalKTable instance will be filled with information from all of the subject’s partitions if the input topic is read into a GlobalKTable.

Implementation

One java file will be used to demonstrate the use case.

WordCountApp.java – in the main method the whole flow is written. 

Java




import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;
  
public class WordCountApp {
    public static void main(String[] args)
    {
        Properties config = new Properties();
        config.put(StreamsConfig.APPLICATION_ID_CONFIG,
                   "wordcount-application");
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
                   "localhost:9092");
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
                   "earliest");
        config.put(
            StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
            Serdes.String().getClass());
        config.put(
            StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
            Serdes.String().getClass());
  
        StreamsBuilder builder = new StreamsBuilder();
  
        KStream<String, String> textLines
            = builder.stream("word-count-input");
        KTable<String, Long> wordCounts
            = textLines
                  .mapValues(
                      textLine -> textLine.toLowerCase())
                  .flatMapValues(
                      textLine
                      -> Arrays.asList(
                          textLine.split("\\W+")))
                  .selectKey((key, word) -> word)
                  .groupByKey()
                  .count(Named.as("Counts"));
  
        KafkaStreams streams
            = new KafkaStreams(builder.build(), config);
        streams.start();
        Runtime.getRuntime().addShutdownHook(
            new Thread(streams::close));
  
        while (true) {
            System.out.println(streams.toString());
            try {
                Thread.sleep(5000);
            }
            catch (InterruptedException e) {
                break;
            }
        }
    }
}


First, we need to create two Kafka topics, one for taking the input and one for the output. The Kafka stream will connect these topics and run the logic written in the above java file.

Step 1: Create input and output topics on Kafka

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic word-count-input 

 

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic word-count-output 

 

Step 2: Launch the Kafka consumer

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \         
    --topic word-count-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

 

It will be in waiting state.

Step 3: Run the java application

Intellij console after running the java app

Intellij console after running the java app

Step 4: Launch the Kafka producer and give the input in a new tab as the Kafka consumer is running

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic word-count-input
>Geeks love geeks for geeks

 

In the Kafka consumer window, you will see word count is getting printed in the below format,

word count output

word count output

Visualize the Flow

For the above implementation, the input is “Geeks love geeks for geeks“.

mapValues: Takes one record and produces one record, while retaining the key of the original record.

 

flatMapValues: Takes one record and produces zero, one, or more records, while retaining the key of the original record.

 

selectKey: Assigns a new key – possibly of a new key type – to each record.

 

groupByKey: Groups the records by the existing key.

 

count: Counts the number of records by the grouped key. 

 

Conclusion

Currently, Kafka Streams is a very well-liked interface. It is advantageous for aspiring software developers and technophiles to delve deeply into this vast subject.

RELATED ARTICLES

Most Popular

Recent Comments