Kafka Primer

This is a short article about how to setup a Kafka application up and running.

Kafka is a messaging service, very fast. It revolves around topics of messages. Messages have a key and a payload. Typically the server listens to port 9092 TCP

Maven

The following packages are needed. The client one covers producer and consumer, the streams covers streams. Look it up for the latest version.

<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>3.6.0</version>
</dependency>
</dependencies>

Producer

Producer puts messages in the topic. The producer can be accessed from multiple threads. Here is the code. The key can be changed, same the payload type, with, of course, the right serializer and deserializer.

package org.example;


import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Date;
import java.util.Properties;

public class ProdStarter {
    private static final Logger log = LoggerFactory.getLogger(ProdStarter.class);

    public static void main(String[] args) {
        try {
            new ProdStarter().start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void start() throws Exception {
        KafkaProducer<Long, String> producer = getStringStringKafkaProducer();

        for (int i = 0; i < 10; i++) {
            long currentTimestamp = new Date().getTime();


            ProducerRecord<Long, String> producerRecord =
                    new ProducerRecord<>("tester1", (long) i, "this is the amount for the value - " + i + " - here");

            // send data - asynchronous
            producer.send(producerRecord);
        }


        // flush data - synchronous
        producer.flush();

        // flush and close producer
        producer.close();
    }

    private static KafkaProducer<Long, String> getStringStringKafkaProducer() {
        String bootstrapServers = "127.0.0.1:9092";

        // setup the producer
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "groupidhere");

        // one instance of producer is thread safe and enough for all application
        // better performance than opening one for each thread or message
        KafkaProducer<Long, String> producer = new KafkaProducer<>(properties);
        return producer;
    }
}

Consumer

The example below of course is a proof of concept.

package org.example;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Properties;

public class ConsumerStarter {
    private static final Logger log = LoggerFactory.getLogger(ProdStarter.class);

    public static void main(String[] args) {
        try {
            new ConsumerStarter().start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void start() throws Exception {
        String bootstrapServers = "127.0.0.1:9092";

        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        // very important if for a new group id you want to start from very beginning, as
        // the default value is "latest" that means that even for a new group id config you still
        // get messages starting "now", not "earliest"
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "a");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

        // just list the topics to see what's there
        Map<String, List<PartitionInfo>> topics = consumer.listTopics();
        topics.forEach((key, partitions) -> {
            System.out.println("key: " + key);
            partitions.forEach((part) -> {
                String top = part.topic();
                System.out.println("topic: " + top);
            });
        });

        // start reading messages
        consumer.subscribe(List.of("tester1"));
        // poll for new data
        while(true){
            ConsumerRecords<String, String> records =
                    consumer.poll(Duration.ofMillis(100));

            for (ConsumerRecord<String, String> record : records){
                System.out.println("Key: " + record.key() + ", Value: " + record.value());
                System.out.println("Partition: " + record.partition() + ", Offset:" + record.offset());
            }
        }
    }
}

Streams

Streams are a kind of consumers of topic messages with a lot of functionality associated with them:

  • You can handle message structure and content including changing the key and the payload
  • Filtering
  • Grouping
  • At the end you can post to another topic the updated messages
  • Use temporary storage provided by Kafka for groups, counts and other operations

Here is a sample code of how to register a stream against a topic and then consume the contents

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package org.example;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.state.KeyValueStore;

import java.util.Arrays;
import java.util.List;
import java.util.Properties;

public class StreamerTest {
    public static void main(String[] args) {
        try {
            new StreamerTest().start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void start() throws Exception {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();
        KStream<Long, String> textLines = builder.stream("tester1");
        textLines
                .mapValues(value -> value + " suffix")
                .flatMapValues(value -> List.of(value, value + "(a)"))
                .filter((key, value) -> key % 2 == 0)
                .map((key, val) -> new KeyValue<>(key + 10, "prefix - " + val))
                .foreach((key, value) -> {
                    String message = String.format("only value: %d >>%s<<", key, value);

                    System.out.println(message);
                });

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }

}

Various Commands

Here are a number of commands. It is easy to figure out what they do, however you need those samples to follow the model.

kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
kafka-topics.sh --bootstrap-server localhost:9092 --create --topic tester1  --partitions 20 --replication-factor 3
kafka-consumer-groups.bat --list --bootstrap-server localhost:9092
kafka-topics.bat --bootstrap-server=localhost:9092 --list
kafka-consumer-groups.bat --bootstrap-server=localhost:9092 --list
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic tester1