Friday, September 19, 2025
HomeLanguagesJavaApache Kafka – Real World Project with Twitter using Java

Apache Kafka – Real World Project with Twitter using Java

Apache Kafka is a publish-subscribe messaging system. A messaging system let you send messages between processes, applications, and servers. Apache Kafka is software where topics (A topic might be a category) can be defined and further processed. To know more about this, please refer to the article – What is Apache Kafka and How Does it Work? 

In this article, we will demonstrate an Apache Kafka real-world project with Twitter using Java. We will develop a Kafka producer application in Java that will fetch data from the Twitter API data source and publish it to a Kafka Topic for a consumer application to subscribe and consume messages.

Step-by-Step Implementation

Step 1: Create a New Apache Kafka Project in IntelliJ

To create a new Apache Kafka Project in IntelliJ using Java and Maven please refer to How to Create an Apache Kafka Project in IntelliJ using Java and Maven.

Step 2: Install and Run Apache Kafka

To Install and Run Apache Kafka in your local system please refer to How to Install and Run Apache Kafka.

Step 3: Create a Twitter Developer Account & Generate Keys and Token

Go to this link and apply for a Twitter Developer Account & generate Consumer Keys and Authentication Tokens. Please refer to the below image.

 

Step 4: Add Dependencies to the pom.xml file

Add the following dependencies to your pom.xml file

<dependencies>

        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.8.0</version>
        </dependency>

        <dependency>
            <groupId>com.twitter</groupId>
            <artifactId>hbc-core</artifactId> <!-- or hbc-twitter4j -->
            <version>2.2.0</version> <!-- or whatever the latest version is -->
        </dependency>

</dependencies>

Below is the complete code for the pom.xml file

XML




<?xml version="1.0" encoding="UTF-8"?>
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.kafkademo</groupId>
        <artifactId>kafka-demo</artifactId>
        <version>1.0-SNAPSHOT</version>
    </parent>
  
    <artifactId>kafka-twitter</artifactId>
  
    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
  
    <dependencies>
  
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.8.0</version>
        </dependency>
  
        <dependency>
            <groupId>com.twitter</groupId>
            <artifactId>hbc-core</artifactId> <!-- or hbc-twitter4j -->
            <version>2.2.0</version> <!-- or whatever the latest version is -->
        </dependency>
  
    </dependencies>
  
</project>


Step 5: Writing a Twitter Client

Declare all the generated Keys and Tokens. Refer to the Step 3.

private String consumerKey = "put your consumerKey here";
private String consumerSecret = "put your consumerSecret here";
private String token = "put your token here";
private String secret = "put your secret here";

Declare the terms for which you want to fetch data from the Twitter API.

List<String> topics = Lists.newArrayList("neveropen", "java", "kafka");

Set up your blocking queues.

BlockingQueue<String> msgQueue = new LinkedBlockingQueue<>(1000);

Below is the function for creating a Twitter client.

public Client createTwitterClient(BlockingQueue<String> msgQueue) {
        // Declare the host you want to connect to,
        // the endpoint, and authentication
        Hosts hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);
        StatusesFilterEndpoint hosebirdEndpoint = new StatusesFilterEndpoint();

        hosebirdEndpoint.trackTerms(topics);

        // These secrets should be read from a config file
        Authentication hosebirdAuth = new OAuth1(consumerKey, consumerSecret, token, secret);

        ClientBuilder builder = new ClientBuilder()
                .name("Hosebird-Client-01")
                .hosts(hosebirdHosts)
                .authentication(hosebirdAuth)
                .endpoint(hosebirdEndpoint)
                .processor(new StringDelimitedProcessor(msgQueue));

        Client hosebirdClient = builder.build();

        return hosebirdClient;
}

Step 6: Attempts to establish a connection

Below is the code for an attempt to establish a connection

twitterClient.connect();

Step 7: Writing the Kafka Producer

Create Producer Properties.

Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

Create the Producer.

KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

Below is the function for creating the Kafka Producer.

public KafkaProducer<String, String> createKafkaProducer() {

        String bootstrapServer = "127.0.0.1:9092";

        // Create Producer Properties
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // Create the Producer
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        // Return the Producer
        return producer;
}

Step 8: Loop to send tweets to Kafka on a different thread, or multiple different threads

Below is the code for sending tweets to Kafka on a different thread, or multiple different threads.

private static void sendTweetsToKafka(BlockingQueue<String> msgQueue, Client twitterClient,
                                          KafkaProducer<String, String> producer) {
        while (!twitterClient.isDone()) {
            String msg = null;
            try {
                msg = msgQueue.poll(5, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                e.printStackTrace();
                twitterClient.stop();
            }
            if (msg != null) {
                producer.send(new ProducerRecord<>("twitter_tweets", null, msg), (recordMetadata, e) -> {
                    if (e != null) {
                        System.out.println(e.getMessage());
                    }
                });
            }
      }
}

Below is the complete code for Apache Kafka Real-World Project with Twitter using Java.

Java




package twitter;
  
import com.google.common.collect.Lists;
import com.twitter.hbc.ClientBuilder;
import com.twitter.hbc.core.Client;
import com.twitter.hbc.core.Constants;
import com.twitter.hbc.core.Hosts;
import com.twitter.hbc.core.HttpHosts;
import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
import com.twitter.hbc.core.processor.StringDelimitedProcessor;
import com.twitter.hbc.httpclient.auth.Authentication;
import com.twitter.hbc.httpclient.auth.OAuth1;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
  
import java.util.List;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
  
public class TwitterProducer {
  
    private String consumerKey = "put your consumerKey here";
    private String consumerSecret = "put your consumerSecret here";
    private String token = "put your token here";
    private String secret = "put your secret here";
    List<String> topics = Lists.newArrayList("neveropen", "java", "kafka");
  
    public static void main(String[] args) {
        new TwitterProducer().run();
    }
  
    public TwitterProducer() {
    }
  
    public void run() {
        // Set up your blocking queues
        BlockingQueue<String> msgQueue = new LinkedBlockingQueue<>(1000);
  
        // Create Twitter Client
        Client twitterClient = createTwitterClient(msgQueue);
  
        // Attempts to establish a connection
        twitterClient.connect();
  
        // Create Kafka Producer
        KafkaProducer<String, String> producer = createKafkaProducer();
  
        // Loop to send tweets to kafka
        // on a different thread, or
        // multiple different threads
        sendTweetsToKafka(msgQueue, twitterClient, producer);
    }
  
    private static void sendTweetsToKafka(BlockingQueue<String> msgQueue, Client twitterClient,
                                          KafkaProducer<String, String> producer) {
        while (!twitterClient.isDone()) {
            String msg = null;
            try {
                msg = msgQueue.poll(5, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                e.printStackTrace();
                twitterClient.stop();
            }
            if (msg != null) {
                producer.send(new ProducerRecord<>("twitter_tweets", null, msg), (recordMetadata, e) -> {
                    if (e != null) {
                        System.out.println(e.getMessage());
                    }
                });
            }
        }
    }
  
    public Client createTwitterClient(BlockingQueue<String> msgQueue) {
        // Declare the host you want to connect to,
        // the endpoint, and authentication
        Hosts hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);
        StatusesFilterEndpoint hosebirdEndpoint = new StatusesFilterEndpoint();
  
        hosebirdEndpoint.trackTerms(topics);
  
        // These secrets should be read from a config file
        Authentication hosebirdAuth = new OAuth1(consumerKey, consumerSecret, token, secret);
  
        ClientBuilder builder = new ClientBuilder()
                .name("Hosebird-Client-01")
                .hosts(hosebirdHosts)
                .authentication(hosebirdAuth)
                .endpoint(hosebirdEndpoint)
                .processor(new StringDelimitedProcessor(msgQueue));
  
        Client hosebirdClient = builder.build();
  
        return hosebirdClient;
    }
  
    public KafkaProducer<String, String> createKafkaProducer() {
  
        String bootstrapServer = "127.0.0.1:9092";
  
        // Create Producer Properties
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  
        // Create the Producer
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
  
        // Return the Producer
        return producer;
    }
  
}


Output:

 

RELATED ARTICLES

Most Popular

Dominic
32299 POSTS0 COMMENTS
Milvus
84 POSTS0 COMMENTS
Nango Kala
6664 POSTS0 COMMENTS
Nicole Veronica
11837 POSTS0 COMMENTS
Nokonwaba Nkukhwana
11895 POSTS0 COMMENTS
Shaida Kate Naidoo
6779 POSTS0 COMMENTS
Ted Musemwa
7054 POSTS0 COMMENTS
Thapelo Manthata
6738 POSTS0 COMMENTS
Umr Jansen
6744 POSTS0 COMMENTS