This is a sample of an application with kafka streams stateless.
- There are two starter queues populated in different threads
- A topology is created that shows how to do filters, branches, merges and ultimately forwards the messages to three other queues depending on some predicates.
This is the code:
package org.example.stateless; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.Collections; import java.util.Properties; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; class Context { private static final Logger logger = LoggerFactory.getLogger(Context.class); private boolean stopping; public Context() { this.stopping = false; } public boolean isStopping() { return stopping; } public void stop() { logger.info("application stop triggered"); stopping = true; } } public class Stateless { private static final Logger logger = LoggerFactory.getLogger(Stateless.class); public static void main(String[] args) { try { new Stateless().start(); } catch (Exception e) { e.printStackTrace(); } } private void start() throws Exception { Context context = new Context(); Runtime.getRuntime().addShutdownHook(new Thread(context::stop)); ExecutorService threads = Executors.newFixedThreadPool(10); Future<?> futureFirst = threads.submit(getPushTopic(context, "first", 1000)); Future<?> futureSecond = threads.submit(getPushTopic(context, "second", 1000)); Future<?> futureStreams = threads.submit(registerStatelessStreams(context)); threads.submit(getConsumer(context, "zero")); threads.submit(getConsumer(context, "one")); threads.submit(getConsumer(context, "three")); futureFirst.get(); logger.info("first stopped"); futureSecond.get(); logger.info("second stopped"); futureStreams.get(); logger.info("streamed stopped as well, exiting the application"); } private Runnable getPushTopic(Context context, String topicName, int interval) { return () -> { logger.info("starting pushing topic: {}", topicName); // connect to kafka Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); Random random = new Random(); // every interval while (! context.isStopping()) { String key = "" + random.nextInt(10); String value = "queue: " + topicName + ", with the key: " + key; ProducerRecord<String, String> record = new ProducerRecord<>(topicName, key, value); producer.send(record); logger.info("ok, sent message: {}", value); try { Thread.sleep(interval); } catch (InterruptedException e) { throw new RuntimeException(e); } } producer.close(); logger.info("all right, producer for topic: " + topicName + " is being closed now, exiting"); }; } private Runnable registerStatelessStreams(Context context) { return () -> { logger.info("starting processing streams"); Topology topology = getTopology(); // put together the parameters to start the streams, then start the process Properties config = new Properties(); config.put("bootstrap.servers", "localhost:9092"); config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); config.put("application.id", "id11"); KafkaStreams streams = new KafkaStreams(topology, config); Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); streams.start(); }; } private Topology getTopology() { StreamsBuilder builder = new StreamsBuilder(); // original streams KStream<String, String> firstStream = builder.stream("first"); KStream<String, String> secondStream = builder.stream("second"); // filter the key "3" from the second stream KStream<String, String> secondFiltered = secondStream.filter((key, value) -> { boolean res = !key.equals("3"); if (!res) { logger.info("filtering out message with key value of 3"); } return res; }); // join the firstStream and the secondFiltered KStream<String, String> mergedStreams = firstStream.merge(secondFiltered); // all right, from merged streams, three branches, one for key "0", the other one for key "1" and finally the rest // will go to the other branch. // first branch goes to "zero", the second branch goes to the topic "one" and finally the other messages // will go to the branch which is called three KStream<String, String>[] branches = mergedStreams.branch( (k, val) -> "0".equals(k), (k, val) -> "1".equals(k), (k, v) -> true); branches[0].to("zero"); branches[1].to("one"); branches[2].to("three"); return builder.build(); } private Runnable getConsumer(Context context, String topicName) { return () -> { // connect to the queue Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); // Replace with your Kafka broker props.put("group.id", "current-consumer-group"); // Consumer group ID props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("enable.auto.commit", "true"); // Automatically commit offsets props.put("auto.commit.interval.ms", "1000"); // Commit interval props.put("auto.offset.reset", "earliest"); // Start from the beginning if no offset is available KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList(topicName)); // wait for message and write them on the console // in case of interruption, then go ahead and exit while (! context.isStopping()) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); records.forEach((record) -> { String value = record.value(); System.out.println("consuming from topic: " + topicName + ", value: " + value); }); } consumer.close(); }; } }
The pom.xml
Here is the pom.xml, just in case you need all the dependencies. In this particular situation there are very few dependencies, only the kafka-streams which will pull all the other core Kafka libraries
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>kafka</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>2.7.0</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>2.0.9</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>2.0.9</version> </dependency> </dependencies> <properties> <maven.compiler.source>23</maven.compiler.source> <maven.compiler.target>23</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> </project>