Kafka Streams is used to create apps and microservices with input and output data stored in an Apache Kafka cluster. It combines the advantages of Kafka’s server-side cluster technology with the ease of creating and deploying regular Java and Scala apps on the client side.
Approach
In this article, we are going to use Kafka streams for counting words in a given sentence. We will use the maven project to manage the flow and Kafka topics defined in Kafka. Following that, a number of functions will be used, and processing will result in a word count. This outcome will once more be posted to a Kafka topic. For this, it is assumed that basic knowledge of Apache Kafka and Kafka Streams is there.
Concepts
KStream
The abstraction of a record stream known as a KStream is one in which every data record represents a self-contained item in the unbounded data set. Using the table analogy, data records in a record stream are always treated as “INSERTs” because no record replaces an existing row with the same key. Instead, think of it as adding more entries to an append-only ledger. Examples include a server log entry, a page view event, or a credit card transaction. For example, two data are being sent to the stream.
("Seeta", 1) --> ("Seeta", 3)
If the stream processing application added the values for each user, Seeta would have a result of 4. due to the fact that the second data record would not be seen as an update to the first.
KTable
Each data record in a KTable represents an update and is an abstraction of a changelog stream. More specifically, the value in a data record is understood as an “UPDATE” of the previous value for the same record key, if any (the update will be regarded as an INSERT if the corresponding key does not yet exist). A data record in a changelog stream is understood as an UPSERT, also known as an INSERT/UPDATE, because any existing row with the same key is rewritten, to use the table analogy. A record with a null value denotes a “DELETE” or tombstone for the record’s key. Additionally, null values are processed differently. For example, two data are being sent to the stream.
("Seeta", 1) --> ("Seeta", 3)
If the stream processing application added the values for each user, ‘Seeta’ would have a result of 3. as the second data record would be seen as an update to the first.
GlobalKTable
A GlobalKTable and a KTable are different from one another in terms of the data that is being read into each table from the underlying Kafka topic. Imagine, in a simplified manner, that your input topic has n partitions. You wish to read this subject into a table in your application. Additionally, for optimum parallelism, you should run your application across n application instances. For example,
- Each application instance’s “local” KTable instance will only have data from one of the subject’s five partitions if the input topic is read into a KTable.
- Each application instance’s local GlobalKTable instance will be filled with information from all of the subject’s partitions if the input topic is read into a GlobalKTable.
Implementation
One java file will be used to demonstrate the use case.
WordCountApp.java – in the main method the whole flow is written.
Java
import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.*; public class WordCountApp { public static void main(String[] args) { Properties config = new Properties(); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application" ); config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092" ); config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest" ); config.put( StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put( StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> textLines = builder.stream( "word-count-input" ); KTable<String, Long> wordCounts = textLines .mapValues( textLine -> textLine.toLowerCase()) .flatMapValues( textLine -> Arrays.asList( textLine.split( "\\W+" ))) .selectKey((key, word) -> word) .groupByKey() .count(Named.as( "Counts" )); KafkaStreams streams = new KafkaStreams(builder.build(), config); streams.start(); Runtime.getRuntime().addShutdownHook( new Thread(streams::close)); while ( true ) { System.out.println(streams.toString()); try { Thread.sleep( 5000 ); } catch (InterruptedException e) { break ; } } } } |
First, we need to create two Kafka topics, one for taking the input and one for the output. The Kafka stream will connect these topics and run the logic written in the above java file.
Step 1: Create input and output topics on Kafka
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic word-count-input
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic word-count-output
Step 2: Launch the Kafka consumer
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \ --topic word-count-output \ --from-beginning \ --formatter kafka.tools.DefaultMessageFormatter \ --property print.key=true \ --property print.value=true \ --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \ --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
It will be in waiting state.
Step 3: Run the java application
Step 4: Launch the Kafka producer and give the input in a new tab as the Kafka consumer is running
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic word-count-input >Geeks love geeks for geeks
In the Kafka consumer window, you will see word count is getting printed in the below format,
Visualize the Flow
For the above implementation, the input is “Geeks love geeks for geeks“.
mapValues: Takes one record and produces one record, while retaining the key of the original record.
flatMapValues: Takes one record and produces zero, one, or more records, while retaining the key of the original record.
selectKey: Assigns a new key – possibly of a new key type – to each record.
groupByKey: Groups the records by the existing key.
count: Counts the number of records by the grouped key.
Conclusion
Currently, Kafka Streams is a very well-liked interface. It is advantageous for aspiring software developers and technophiles to delve deeply into this vast subject.