This is a short article about how to setup a Kafka application up and running.
Kafka is a messaging service, very fast. It revolves around topics of messages. Messages have a key and a payload. Typically the server listens to port 9092 TCP
Maven
The following packages are needed. The client one covers producer and consumer, the streams covers streams. Look it up for the latest version.
<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.6.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>3.6.0</version> </dependency> </dependencies>
Producer
Producer puts messages in the topic. The producer can be accessed from multiple threads. Here is the code. The key can be changed, same the payload type, with, of course, the right serializer and deserializer.
package org.example; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Date; import java.util.Properties; public class ProdStarter { private static final Logger log = LoggerFactory.getLogger(ProdStarter.class); public static void main(String[] args) { try { new ProdStarter().start(); } catch (Exception e) { e.printStackTrace(); } } private void start() throws Exception { KafkaProducer<Long, String> producer = getStringStringKafkaProducer(); for (int i = 0; i < 10; i++) { long currentTimestamp = new Date().getTime(); ProducerRecord<Long, String> producerRecord = new ProducerRecord<>("tester1", (long) i, "this is the amount for the value - " + i + " - here"); // send data - asynchronous producer.send(producerRecord); } // flush data - synchronous producer.flush(); // flush and close producer producer.close(); } private static KafkaProducer<Long, String> getStringStringKafkaProducer() { String bootstrapServers = "127.0.0.1:9092"; // setup the producer Properties properties = new Properties(); properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName()); properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "groupidhere"); // one instance of producer is thread safe and enough for all application // better performance than opening one for each thread or message KafkaProducer<Long, String> producer = new KafkaProducer<>(properties); return producer; } }
Consumer
The example below of course is a proof of concept.
package org.example; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.serialization.StringDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.List; import java.util.Map; import java.util.Properties; public class ConsumerStarter { private static final Logger log = LoggerFactory.getLogger(ProdStarter.class); public static void main(String[] args) { try { new ConsumerStarter().start(); } catch (Exception e) { e.printStackTrace(); } } private void start() throws Exception { String bootstrapServers = "127.0.0.1:9092"; Properties properties = new Properties(); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // very important if for a new group id you want to start from very beginning, as // the default value is "latest" that means that even for a new group id config you still // get messages starting "now", not "earliest" properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "a"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); // just list the topics to see what's there Map<String, List<PartitionInfo>> topics = consumer.listTopics(); topics.forEach((key, partitions) -> { System.out.println("key: " + key); partitions.forEach((part) -> { String top = part.topic(); System.out.println("topic: " + top); }); }); // start reading messages consumer.subscribe(List.of("tester1")); // poll for new data while(true){ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records){ System.out.println("Key: " + record.key() + ", Value: " + record.value()); System.out.println("Partition: " + record.partition() + ", Offset:" + record.offset()); } } } }
Streams
Streams are a kind of consumers of topic messages with a lot of functionality associated with them:
- You can handle message structure and content including changing the key and the payload
- Filtering
- Grouping
- At the end you can post to another topic the updated messages
- Use temporary storage provided by Kafka for groups, counts and other operations
Here is a sample code of how to register a stream against a topic and then consume the contents
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 | package org.example; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.*; import org.apache.kafka.streams.state.KeyValueStore; import java.util.Arrays; import java.util.List; import java.util.Properties; public class StreamerTest { public static void main(String[] args) { try { new StreamerTest().start(); } catch (Exception e) { e.printStackTrace(); } } private void start() throws Exception { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); StreamsBuilder builder = new StreamsBuilder(); KStream<Long, String> textLines = builder.stream("tester1"); textLines .mapValues(value -> value + " suffix") .flatMapValues(value -> List.of(value, value + "(a)")) .filter((key, value) -> key % 2 == 0) .map((key, val) -> new KeyValue<>(key + 10, "prefix - " + val)) .foreach((key, value) -> { String message = String.format("only value: %d >>%s<<", key, value); System.out.println(message); }); KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start(); } } |
Various Commands
Here are a number of commands. It is easy to figure out what they do, however you need those samples to follow the model.
kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092 kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092 kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092 kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092 kafka-topics.sh --bootstrap-server localhost:9092 --create --topic tester1 --partitions 20 --replication-factor 3 kafka-consumer-groups.bat --list --bootstrap-server localhost:9092 kafka-topics.bat --bootstrap-server=localhost:9092 --list kafka-consumer-groups.bat --bootstrap-server=localhost:9092 --list kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic tester1