{"id":176,"date":"2024-12-10T12:30:31","date_gmt":"2024-12-10T17:30:31","guid":{"rendered":"https:\/\/coding101.xyz\/?p=176"},"modified":"2024-12-12T09:32:39","modified_gmt":"2024-12-12T14:32:39","slug":"kafka-stateless-streams","status":"publish","type":"post","link":"https:\/\/coding101.xyz\/?p=176","title":{"rendered":"Kafka Stateless Streams"},"content":{"rendered":"\n<p>This is a sample of an application with kafka streams stateless. <\/p>\n\n\n\n<!--more-->\n\n\n\n<ul class=\"wp-block-list\">\n<li>There are two starter queues populated in different threads<\/li>\n\n\n\n<li>A topology is created that shows how to do filters, branches, merges and ultimately forwards the messages to three other queues depending on some predicates. <\/li>\n<\/ul>\n\n\n\n<p>This is the code:<\/p>\n\n\n\n<!-- HTML generated using hilite.me --><div style=\"background: #f8f8f8; overflow:auto;width:auto;border:solid gray;border-width:.1em .1em .1em .8em;padding:.2em .6em;\"><pre style=\"margin: 0; line-height: 125%\"><span style=\"color: #204a87; font-weight: bold\">package<\/span> <span style=\"color: #000000\">org<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">example<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">stateless<\/span><span style=\"color: #ce5c00; font-weight: bold\">;<\/span>\n\n<span style=\"color: #204a87; font-weight: bold\">import<\/span> <span style=\"color: #000000\">org.apache.kafka.clients.consumer.ConsumerRecords<\/span><span style=\"color: #ce5c00; font-weight: bold\">;<\/span>\n<span style=\"color: #204a87; font-weight: bold\">import<\/span> <span style=\"color: #000000\">org.apache.kafka.clients.consumer.KafkaConsumer<\/span><span style=\"color: #ce5c00; font-weight: bold\">;<\/span>\n<span style=\"color: #204a87; font-weight: bold\">import<\/span> <span style=\"color: #000000\">org.apache.kafka.clients.producer.KafkaProducer<\/span><span style=\"color: #ce5c00; font-weight: bold\">;<\/span>\n<span style=\"color: #204a87; font-weight: bold\">import<\/span> <span style=\"color: #000000\">org.apache.kafka.clients.producer.Producer<\/span><span style=\"color: #ce5c00; font-weight: bold\">;<\/span>\n<span style=\"color: #204a87; font-weight: bold\">import<\/span> <span style=\"color: #000000\">org.apache.kafka.clients.producer.ProducerRecord<\/span><span style=\"color: #ce5c00; font-weight: bold\">;<\/span>\n<span style=\"color: #204a87; font-weight: bold\">import<\/span> <span style=\"color: #000000\">org.apache.kafka.common.serialization.Serdes<\/span><span style=\"color: #ce5c00; font-weight: bold\">;<\/span>\n<span style=\"color: #204a87; font-weight: bold\">import<\/span> <span style=\"color: #000000\">org.apache.kafka.streams.KafkaStreams<\/span><span style=\"color: #ce5c00; font-weight: bold\">;<\/span>\n<span style=\"color: #204a87; font-weight: bold\">import<\/span> <span style=\"color: #000000\">org.apache.kafka.streams.StreamsBuilder<\/span><span style=\"color: #ce5c00; font-weight: bold\">;<\/span>\n<span style=\"color: #204a87; font-weight: bold\">import<\/span> <span style=\"color: #000000\">org.apache.kafka.streams.StreamsConfig<\/span><span style=\"color: #ce5c00; font-weight: bold\">;<\/span>\n<span style=\"color: #204a87; font-weight: bold\">import<\/span> <span style=\"color: #000000\">org.apache.kafka.streams.Topology<\/span><span style=\"color: #ce5c00; font-weight: bold\">;<\/span>\n<span style=\"color: #204a87; font-weight: bold\">import<\/span> <span style=\"color: #000000\">org.apache.kafka.streams.kstream.KStream<\/span><span style=\"color: #ce5c00; font-weight: bold\">;<\/span>\n<span style=\"color: #204a87; font-weight: bold\">import<\/span> <span style=\"color: #000000\">org.slf4j.Logger<\/span><span style=\"color: #ce5c00; font-weight: bold\">;<\/span>\n<span style=\"color: #204a87; font-weight: bold\">import<\/span> <span style=\"color: #000000\">org.slf4j.LoggerFactory<\/span><span style=\"color: #ce5c00; font-weight: bold\">;<\/span>\n\n<span style=\"color: #204a87; font-weight: bold\">import<\/span> <span style=\"color: #000000\">java.time.Duration<\/span><span style=\"color: #ce5c00; font-weight: bold\">;<\/span>\n<span style=\"color: #204a87; font-weight: bold\">import<\/span> <span style=\"color: #000000\">java.util.Collections<\/span><span style=\"color: #ce5c00; font-weight: bold\">;<\/span>\n<span style=\"color: #204a87; font-weight: bold\">import<\/span> <span style=\"color: #000000\">java.util.Properties<\/span><span style=\"color: #ce5c00; font-weight: bold\">;<\/span>\n<span style=\"color: #204a87; font-weight: bold\">import<\/span> <span style=\"color: #000000\">java.util.Random<\/span><span style=\"color: #ce5c00; font-weight: bold\">;<\/span>\n<span style=\"color: #204a87; font-weight: bold\">import<\/span> <span style=\"color: #000000\">java.util.concurrent.ExecutorService<\/span><span style=\"color: #ce5c00; font-weight: bold\">;<\/span>\n<span style=\"color: #204a87; font-weight: bold\">import<\/span> <span style=\"color: #000000\">java.util.concurrent.Executors<\/span><span style=\"color: #ce5c00; font-weight: bold\">;<\/span>\n<span style=\"color: #204a87; font-weight: bold\">import<\/span> <span style=\"color: #000000\">java.util.concurrent.Future<\/span><span style=\"color: #ce5c00; font-weight: bold\">;<\/span>\n\n<span style=\"color: #204a87; font-weight: bold\">class<\/span> <span style=\"color: #000000\">Context<\/span> <span style=\"color: #ce5c00; font-weight: bold\">{<\/span>\n    <span style=\"color: #204a87; font-weight: bold\">private<\/span> <span style=\"color: #204a87; font-weight: bold\">static<\/span> <span style=\"color: #204a87; font-weight: bold\">final<\/span> <span style=\"color: #000000\">Logger<\/span> <span style=\"color: #000000\">logger<\/span> <span style=\"color: #ce5c00; font-weight: bold\">=<\/span> <span style=\"color: #000000\">LoggerFactory<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">getLogger<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #000000\">Context<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">class<\/span><span style=\"color: #ce5c00; font-weight: bold\">);<\/span>\n    <span style=\"color: #204a87; font-weight: bold\">private<\/span> <span style=\"color: #204a87; font-weight: bold\">boolean<\/span> <span style=\"color: #000000\">stopping<\/span><span style=\"color: #ce5c00; font-weight: bold\">;<\/span>\n\n    <span style=\"color: #204a87; font-weight: bold\">public<\/span> <span style=\"color: #000000\">Context<\/span><span style=\"color: #ce5c00; font-weight: bold\">()<\/span> <span style=\"color: #ce5c00; font-weight: bold\">{<\/span>\n        <span style=\"color: #204a87; font-weight: bold\">this<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">stopping<\/span> <span style=\"color: #ce5c00; font-weight: bold\">=<\/span> <span style=\"color: #204a87; font-weight: bold\">false<\/span><span style=\"color: #ce5c00; font-weight: bold\">;<\/span>\n    <span style=\"color: #ce5c00; font-weight: bold\">}<\/span>\n\n    <span style=\"color: #204a87; font-weight: bold\">public<\/span> <span style=\"color: #204a87; font-weight: bold\">boolean<\/span> <span style=\"color: #000000\">isStopping<\/span><span style=\"color: #ce5c00; font-weight: bold\">()<\/span> <span style=\"color: #ce5c00; font-weight: bold\">{<\/span>\n        <span style=\"color: #204a87; font-weight: bold\">return<\/span> <span style=\"color: #000000\">stopping<\/span><span style=\"color: #ce5c00; font-weight: bold\">;<\/span>\n    <span style=\"color: #ce5c00; font-weight: bold\">}<\/span>\n\n    <span style=\"color: #204a87; font-weight: bold\">public<\/span> <span style=\"color: #204a87; font-weight: bold\">void<\/span> <span style=\"color: #000000\">stop<\/span><span style=\"color: #ce5c00; font-weight: bold\">()<\/span> <span style=\"color: #ce5c00; font-weight: bold\">{<\/span>\n        <span style=\"color: #000000\">logger<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">info<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #4e9a06\">&quot;application stop triggered&quot;<\/span><span style=\"color: #ce5c00; font-weight: bold\">);<\/span>\n\n        <span style=\"color: #000000\">stopping<\/span> <span style=\"color: #ce5c00; font-weight: bold\">=<\/span> <span style=\"color: #204a87; font-weight: bold\">true<\/span><span style=\"color: #ce5c00; font-weight: bold\">;<\/span>\n    <span style=\"color: #ce5c00; font-weight: bold\">}<\/span>\n<span style=\"color: #ce5c00; font-weight: bold\">}<\/span>\n\n\n<span style=\"color: #204a87; font-weight: bold\">public<\/span> <span style=\"color: #204a87; font-weight: bold\">class<\/span> <span style=\"color: #000000\">Stateless<\/span> <span style=\"color: #ce5c00; font-weight: bold\">{<\/span>\n\n    <span style=\"color: #204a87; font-weight: bold\">private<\/span> <span style=\"color: #204a87; font-weight: bold\">static<\/span> <span style=\"color: #204a87; font-weight: bold\">final<\/span> <span style=\"color: #000000\">Logger<\/span> <span style=\"color: #000000\">logger<\/span> <span style=\"color: #ce5c00; font-weight: bold\">=<\/span> <span style=\"color: #000000\">LoggerFactory<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">getLogger<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #000000\">Stateless<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">class<\/span><span style=\"color: #ce5c00; font-weight: bold\">);<\/span>\n\n    <span style=\"color: #204a87; font-weight: bold\">public<\/span> <span style=\"color: #204a87; font-weight: bold\">static<\/span> <span style=\"color: #204a87; font-weight: bold\">void<\/span> <span style=\"color: #000000\">main<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #000000\">String<\/span><span style=\"color: #ce5c00; font-weight: bold\">[]<\/span> <span style=\"color: #000000\">args<\/span><span style=\"color: #ce5c00; font-weight: bold\">)<\/span> <span style=\"color: #ce5c00; font-weight: bold\">{<\/span>\n        <span style=\"color: #204a87; font-weight: bold\">try<\/span> <span style=\"color: #ce5c00; font-weight: bold\">{<\/span>\n            <span style=\"color: #204a87; font-weight: bold\">new<\/span> <span style=\"color: #000000\">Stateless<\/span><span style=\"color: #ce5c00; font-weight: bold\">().<\/span><span style=\"color: #c4a000\">start<\/span><span style=\"color: #ce5c00; font-weight: bold\">();<\/span>\n        <span style=\"color: #ce5c00; font-weight: bold\">}<\/span> <span style=\"color: #204a87; font-weight: bold\">catch<\/span> <span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #000000\">Exception<\/span> <span style=\"color: #000000\">e<\/span><span style=\"color: #ce5c00; font-weight: bold\">)<\/span> <span style=\"color: #ce5c00; font-weight: bold\">{<\/span>\n            <span style=\"color: #000000\">e<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">printStackTrace<\/span><span style=\"color: #ce5c00; font-weight: bold\">();<\/span>\n        <span style=\"color: #ce5c00; font-weight: bold\">}<\/span>\n    <span style=\"color: #ce5c00; font-weight: bold\">}<\/span>\n\n    <span style=\"color: #204a87; font-weight: bold\">private<\/span> <span style=\"color: #204a87; font-weight: bold\">void<\/span> <span style=\"color: #000000\">start<\/span><span style=\"color: #ce5c00; font-weight: bold\">()<\/span> <span style=\"color: #204a87; font-weight: bold\">throws<\/span> <span style=\"color: #000000\">Exception<\/span> <span style=\"color: #ce5c00; font-weight: bold\">{<\/span>\n        <span style=\"color: #000000\">Context<\/span> <span style=\"color: #000000\">context<\/span> <span style=\"color: #ce5c00; font-weight: bold\">=<\/span> <span style=\"color: #204a87; font-weight: bold\">new<\/span> <span style=\"color: #000000\">Context<\/span><span style=\"color: #ce5c00; font-weight: bold\">();<\/span>\n\n        <span style=\"color: #000000\">Runtime<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">getRuntime<\/span><span style=\"color: #ce5c00; font-weight: bold\">().<\/span><span style=\"color: #c4a000\">addShutdownHook<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #204a87; font-weight: bold\">new<\/span> <span style=\"color: #000000\">Thread<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #f57900\">context:<\/span><span style=\"color: #ce5c00; font-weight: bold\">:<\/span><span style=\"color: #000000\">stop<\/span><span style=\"color: #ce5c00; font-weight: bold\">));<\/span>\n\n        <span style=\"color: #000000\">ExecutorService<\/span> <span style=\"color: #000000\">threads<\/span> <span style=\"color: #ce5c00; font-weight: bold\">=<\/span>  <span style=\"color: #000000\">Executors<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">newFixedThreadPool<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #0000cf; font-weight: bold\">10<\/span><span style=\"color: #ce5c00; font-weight: bold\">);<\/span>\n        <span style=\"color: #000000\">Future<\/span><span style=\"color: #ce5c00; font-weight: bold\">&lt;?&gt;<\/span> <span style=\"color: #000000\">futureFirst<\/span> <span style=\"color: #ce5c00; font-weight: bold\">=<\/span> <span style=\"color: #000000\">threads<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">submit<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #000000\">getPushTopic<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #000000\">context<\/span><span style=\"color: #ce5c00; font-weight: bold\">,<\/span> <span style=\"color: #4e9a06\">&quot;first&quot;<\/span><span style=\"color: #ce5c00; font-weight: bold\">,<\/span> <span style=\"color: #0000cf; font-weight: bold\">1000<\/span><span style=\"color: #ce5c00; font-weight: bold\">));<\/span>\n        <span style=\"color: #000000\">Future<\/span><span style=\"color: #ce5c00; font-weight: bold\">&lt;?&gt;<\/span> <span style=\"color: #000000\">futureSecond<\/span> <span style=\"color: #ce5c00; font-weight: bold\">=<\/span> <span style=\"color: #000000\">threads<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">submit<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #000000\">getPushTopic<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #000000\">context<\/span><span style=\"color: #ce5c00; font-weight: bold\">,<\/span> <span style=\"color: #4e9a06\">&quot;second&quot;<\/span><span style=\"color: #ce5c00; font-weight: bold\">,<\/span> <span style=\"color: #0000cf; font-weight: bold\">1000<\/span><span style=\"color: #ce5c00; font-weight: bold\">));<\/span>\n\n        <span style=\"color: #000000\">Future<\/span><span style=\"color: #ce5c00; font-weight: bold\">&lt;?&gt;<\/span> <span style=\"color: #000000\">futureStreams<\/span> <span style=\"color: #ce5c00; font-weight: bold\">=<\/span> <span style=\"color: #000000\">threads<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">submit<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #000000\">registerStatelessStreams<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #000000\">context<\/span><span style=\"color: #ce5c00; font-weight: bold\">));<\/span>\n        <span style=\"color: #000000\">threads<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">submit<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #000000\">getConsumer<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #000000\">context<\/span><span style=\"color: #ce5c00; font-weight: bold\">,<\/span> <span style=\"color: #4e9a06\">&quot;zero&quot;<\/span><span style=\"color: #ce5c00; font-weight: bold\">));<\/span>\n        <span style=\"color: #000000\">threads<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">submit<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #000000\">getConsumer<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #000000\">context<\/span><span style=\"color: #ce5c00; font-weight: bold\">,<\/span> <span style=\"color: #4e9a06\">&quot;one&quot;<\/span><span style=\"color: #ce5c00; font-weight: bold\">));<\/span>\n        <span style=\"color: #000000\">threads<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">submit<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #000000\">getConsumer<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #000000\">context<\/span><span style=\"color: #ce5c00; font-weight: bold\">,<\/span> <span style=\"color: #4e9a06\">&quot;three&quot;<\/span><span style=\"color: #ce5c00; font-weight: bold\">));<\/span>\n\n        <span style=\"color: #000000\">futureFirst<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">get<\/span><span style=\"color: #ce5c00; font-weight: bold\">();<\/span>\n        <span style=\"color: #000000\">logger<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">info<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #4e9a06\">&quot;first stopped&quot;<\/span><span style=\"color: #ce5c00; font-weight: bold\">);<\/span>\n        <span style=\"color: #000000\">futureSecond<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">get<\/span><span style=\"color: #ce5c00; font-weight: bold\">();<\/span>\n        <span style=\"color: #000000\">logger<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">info<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #4e9a06\">&quot;second stopped&quot;<\/span><span style=\"color: #ce5c00; font-weight: bold\">);<\/span>\n        <span style=\"color: #000000\">futureStreams<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">get<\/span><span style=\"color: #ce5c00; font-weight: bold\">();<\/span>\n        <span style=\"color: #000000\">logger<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">info<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #4e9a06\">&quot;streamed stopped as well, exiting the application&quot;<\/span><span style=\"color: #ce5c00; font-weight: bold\">);<\/span>\n\n    <span style=\"color: #ce5c00; font-weight: bold\">}<\/span>\n\n    <span style=\"color: #204a87; font-weight: bold\">private<\/span> <span style=\"color: #000000\">Runnable<\/span> <span style=\"color: #000000\">getPushTopic<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #000000\">Context<\/span> <span style=\"color: #000000\">context<\/span><span style=\"color: #ce5c00; font-weight: bold\">,<\/span> <span style=\"color: #000000\">String<\/span> <span style=\"color: #000000\">topicName<\/span><span style=\"color: #ce5c00; font-weight: bold\">,<\/span> <span style=\"color: #204a87; font-weight: bold\">int<\/span> <span style=\"color: #000000\">interval<\/span><span style=\"color: #ce5c00; font-weight: bold\">)<\/span> <span style=\"color: #ce5c00; font-weight: bold\">{<\/span>\n\n        <span style=\"color: #204a87; font-weight: bold\">return<\/span> <span style=\"color: #ce5c00; font-weight: bold\">()<\/span> <span style=\"color: #ce5c00; font-weight: bold\">-&gt;<\/span> <span style=\"color: #ce5c00; font-weight: bold\">{<\/span>\n            <span style=\"color: #000000\">logger<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">info<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #4e9a06\">&quot;starting pushing topic: {}&quot;<\/span><span style=\"color: #ce5c00; font-weight: bold\">,<\/span> <span style=\"color: #000000\">topicName<\/span><span style=\"color: #ce5c00; font-weight: bold\">);<\/span>\n\n            <span style=\"color: #8f5902; font-style: italic\">\/\/ connect to kafka<\/span>\n            <span style=\"color: #000000\">Properties<\/span> <span style=\"color: #000000\">props<\/span> <span style=\"color: #ce5c00; font-weight: bold\">=<\/span> <span style=\"color: #204a87; font-weight: bold\">new<\/span> <span style=\"color: #000000\">Properties<\/span><span style=\"color: #ce5c00; font-weight: bold\">();<\/span>\n            <span style=\"color: #000000\">props<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">put<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #4e9a06\">&quot;bootstrap.servers&quot;<\/span><span style=\"color: #ce5c00; font-weight: bold\">,<\/span> <span style=\"color: #4e9a06\">&quot;localhost:9092&quot;<\/span><span style=\"color: #ce5c00; font-weight: bold\">);<\/span>\n            <span style=\"color: #000000\">props<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">put<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #4e9a06\">&quot;key.serializer&quot;<\/span><span style=\"color: #ce5c00; font-weight: bold\">,<\/span> <span style=\"color: #4e9a06\">&quot;org.apache.kafka.common.serialization.StringSerializer&quot;<\/span><span style=\"color: #ce5c00; font-weight: bold\">);<\/span>\n            <span style=\"color: #000000\">props<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">put<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #4e9a06\">&quot;value.serializer&quot;<\/span><span style=\"color: #ce5c00; font-weight: bold\">,<\/span> <span style=\"color: #4e9a06\">&quot;org.apache.kafka.common.serialization.StringSerializer&quot;<\/span><span style=\"color: #ce5c00; font-weight: bold\">);<\/span>\n            <span style=\"color: #000000\">Producer<\/span><span style=\"color: #ce5c00; font-weight: bold\">&lt;<\/span><span style=\"color: #000000\">String<\/span><span style=\"color: #ce5c00; font-weight: bold\">,<\/span> <span style=\"color: #000000\">String<\/span><span style=\"color: #ce5c00; font-weight: bold\">&gt;<\/span> <span style=\"color: #000000\">producer<\/span> <span style=\"color: #ce5c00; font-weight: bold\">=<\/span> <span style=\"color: #204a87; font-weight: bold\">new<\/span> <span style=\"color: #000000\">KafkaProducer<\/span><span style=\"color: #ce5c00; font-weight: bold\">&lt;&gt;(<\/span><span style=\"color: #000000\">props<\/span><span style=\"color: #ce5c00; font-weight: bold\">);<\/span>\n            <span style=\"color: #000000\">Random<\/span> <span style=\"color: #000000\">random<\/span> <span style=\"color: #ce5c00; font-weight: bold\">=<\/span> <span style=\"color: #204a87; font-weight: bold\">new<\/span> <span style=\"color: #000000\">Random<\/span><span style=\"color: #ce5c00; font-weight: bold\">();<\/span>\n\n            <span style=\"color: #8f5902; font-style: italic\">\/\/ every interval<\/span>\n            <span style=\"color: #204a87; font-weight: bold\">while<\/span> <span style=\"color: #ce5c00; font-weight: bold\">(!<\/span> <span style=\"color: #000000\">context<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">isStopping<\/span><span style=\"color: #ce5c00; font-weight: bold\">())<\/span> <span style=\"color: #ce5c00; font-weight: bold\">{<\/span>\n                <span style=\"color: #000000\">String<\/span> <span style=\"color: #000000\">key<\/span> <span style=\"color: #ce5c00; font-weight: bold\">=<\/span> <span style=\"color: #4e9a06\">&quot;&quot;<\/span> <span style=\"color: #ce5c00; font-weight: bold\">+<\/span> <span style=\"color: #000000\">random<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">nextInt<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #0000cf; font-weight: bold\">10<\/span><span style=\"color: #ce5c00; font-weight: bold\">);<\/span>\n                <span style=\"color: #000000\">String<\/span> <span style=\"color: #000000\">value<\/span> <span style=\"color: #ce5c00; font-weight: bold\">=<\/span> <span style=\"color: #4e9a06\">&quot;queue: &quot;<\/span> <span style=\"color: #ce5c00; font-weight: bold\">+<\/span> <span style=\"color: #000000\">topicName<\/span> <span style=\"color: #ce5c00; font-weight: bold\">+<\/span> <span style=\"color: #4e9a06\">&quot;, with the key: &quot;<\/span> <span style=\"color: #ce5c00; font-weight: bold\">+<\/span> <span style=\"color: #000000\">key<\/span><span style=\"color: #ce5c00; font-weight: bold\">;<\/span>\n\n                <span style=\"color: #000000\">ProducerRecord<\/span><span style=\"color: #ce5c00; font-weight: bold\">&lt;<\/span><span style=\"color: #000000\">String<\/span><span style=\"color: #ce5c00; font-weight: bold\">,<\/span> <span style=\"color: #000000\">String<\/span><span style=\"color: #ce5c00; font-weight: bold\">&gt;<\/span> <span style=\"color: #000000\">record<\/span> <span style=\"color: #ce5c00; font-weight: bold\">=<\/span> <span style=\"color: #204a87; font-weight: bold\">new<\/span> <span style=\"color: #000000\">ProducerRecord<\/span><span style=\"color: #ce5c00; font-weight: bold\">&lt;&gt;(<\/span><span style=\"color: #000000\">topicName<\/span><span style=\"color: #ce5c00; font-weight: bold\">,<\/span> <span style=\"color: #000000\">key<\/span><span style=\"color: #ce5c00; font-weight: bold\">,<\/span> <span style=\"color: #000000\">value<\/span><span style=\"color: #ce5c00; font-weight: bold\">);<\/span>\n                <span style=\"color: #000000\">producer<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">send<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #000000\">record<\/span><span style=\"color: #ce5c00; font-weight: bold\">);<\/span>\n\n                <span style=\"color: #000000\">logger<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">info<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #4e9a06\">&quot;ok, sent message: {}&quot;<\/span><span style=\"color: #ce5c00; font-weight: bold\">,<\/span> <span style=\"color: #000000\">value<\/span><span style=\"color: #ce5c00; font-weight: bold\">);<\/span>\n\n                <span style=\"color: #204a87; font-weight: bold\">try<\/span> <span style=\"color: #ce5c00; font-weight: bold\">{<\/span>\n                    <span style=\"color: #000000\">Thread<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">sleep<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #000000\">interval<\/span><span style=\"color: #ce5c00; font-weight: bold\">);<\/span>\n                <span style=\"color: #ce5c00; font-weight: bold\">}<\/span> <span style=\"color: #204a87; font-weight: bold\">catch<\/span> <span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #000000\">InterruptedException<\/span> <span style=\"color: #000000\">e<\/span><span style=\"color: #ce5c00; font-weight: bold\">)<\/span> <span style=\"color: #ce5c00; font-weight: bold\">{<\/span>\n                    <span style=\"color: #204a87; font-weight: bold\">throw<\/span> <span style=\"color: #204a87; font-weight: bold\">new<\/span> <span style=\"color: #000000\">RuntimeException<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #000000\">e<\/span><span style=\"color: #ce5c00; font-weight: bold\">);<\/span>\n                <span style=\"color: #ce5c00; font-weight: bold\">}<\/span>\n            <span style=\"color: #ce5c00; font-weight: bold\">}<\/span>\n\n            <span style=\"color: #000000\">producer<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">close<\/span><span style=\"color: #ce5c00; font-weight: bold\">();<\/span>\n            <span style=\"color: #000000\">logger<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">info<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #4e9a06\">&quot;all right, producer for topic: &quot;<\/span> <span style=\"color: #ce5c00; font-weight: bold\">+<\/span> <span style=\"color: #000000\">topicName<\/span> <span style=\"color: #ce5c00; font-weight: bold\">+<\/span> <span style=\"color: #4e9a06\">&quot; is being closed now, exiting&quot;<\/span><span style=\"color: #ce5c00; font-weight: bold\">);<\/span>\n        <span style=\"color: #ce5c00; font-weight: bold\">};<\/span>\n    <span style=\"color: #ce5c00; font-weight: bold\">}<\/span>\n\n    <span style=\"color: #204a87; font-weight: bold\">private<\/span> <span style=\"color: #000000\">Runnable<\/span> <span style=\"color: #000000\">registerStatelessStreams<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #000000\">Context<\/span> <span style=\"color: #000000\">context<\/span><span style=\"color: #ce5c00; font-weight: bold\">)<\/span> <span style=\"color: #ce5c00; font-weight: bold\">{<\/span>\n        <span style=\"color: #204a87; font-weight: bold\">return<\/span> <span style=\"color: #ce5c00; font-weight: bold\">()<\/span> <span style=\"color: #ce5c00; font-weight: bold\">-&gt;<\/span> <span style=\"color: #ce5c00; font-weight: bold\">{<\/span>\n            <span style=\"color: #000000\">logger<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">info<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #4e9a06\">&quot;starting processing streams&quot;<\/span><span style=\"color: #ce5c00; font-weight: bold\">);<\/span>\n            <span style=\"color: #000000\">Topology<\/span> <span style=\"color: #000000\">topology<\/span> <span style=\"color: #ce5c00; font-weight: bold\">=<\/span> <span style=\"color: #000000\">getTopology<\/span><span style=\"color: #ce5c00; font-weight: bold\">();<\/span>\n\n            <span style=\"color: #8f5902; font-style: italic\">\/\/ put together the parameters to start the streams, then start the process<\/span>\n            <span style=\"color: #000000\">Properties<\/span> <span style=\"color: #000000\">config<\/span> <span style=\"color: #ce5c00; font-weight: bold\">=<\/span> <span style=\"color: #204a87; font-weight: bold\">new<\/span> <span style=\"color: #000000\">Properties<\/span><span style=\"color: #ce5c00; font-weight: bold\">();<\/span>\n            <span style=\"color: #000000\">config<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">put<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #4e9a06\">&quot;bootstrap.servers&quot;<\/span><span style=\"color: #ce5c00; font-weight: bold\">,<\/span> <span style=\"color: #4e9a06\">&quot;localhost:9092&quot;<\/span><span style=\"color: #ce5c00; font-weight: bold\">);<\/span>\n            <span style=\"color: #000000\">config<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">put<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #000000\">StreamsConfig<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">DEFAULT_KEY_SERDE_CLASS_CONFIG<\/span><span style=\"color: #ce5c00; font-weight: bold\">,<\/span> <span style=\"color: #000000\">Serdes<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">String<\/span><span style=\"color: #ce5c00; font-weight: bold\">().<\/span><span style=\"color: #c4a000\">getClass<\/span><span style=\"color: #ce5c00; font-weight: bold\">().<\/span><span style=\"color: #c4a000\">getName<\/span><span style=\"color: #ce5c00; font-weight: bold\">());<\/span>\n            <span style=\"color: #000000\">config<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">put<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #000000\">StreamsConfig<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">DEFAULT_VALUE_SERDE_CLASS_CONFIG<\/span><span style=\"color: #ce5c00; font-weight: bold\">,<\/span> <span style=\"color: #000000\">Serdes<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">String<\/span><span style=\"color: #ce5c00; font-weight: bold\">().<\/span><span style=\"color: #c4a000\">getClass<\/span><span style=\"color: #ce5c00; font-weight: bold\">().<\/span><span style=\"color: #c4a000\">getName<\/span><span style=\"color: #ce5c00; font-weight: bold\">());<\/span>\n            <span style=\"color: #000000\">config<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">put<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #4e9a06\">&quot;application.id&quot;<\/span><span style=\"color: #ce5c00; font-weight: bold\">,<\/span> <span style=\"color: #4e9a06\">&quot;id11&quot;<\/span><span style=\"color: #ce5c00; font-weight: bold\">);<\/span>\n\n            <span style=\"color: #000000\">KafkaStreams<\/span> <span style=\"color: #000000\">streams<\/span> <span style=\"color: #ce5c00; font-weight: bold\">=<\/span> <span style=\"color: #204a87; font-weight: bold\">new<\/span> <span style=\"color: #000000\">KafkaStreams<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #000000\">topology<\/span><span style=\"color: #ce5c00; font-weight: bold\">,<\/span> <span style=\"color: #000000\">config<\/span><span style=\"color: #ce5c00; font-weight: bold\">);<\/span>\n\n            <span style=\"color: #000000\">Runtime<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">getRuntime<\/span><span style=\"color: #ce5c00; font-weight: bold\">().<\/span><span style=\"color: #c4a000\">addShutdownHook<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #204a87; font-weight: bold\">new<\/span> <span style=\"color: #000000\">Thread<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #f57900\">streams:<\/span><span style=\"color: #ce5c00; font-weight: bold\">:<\/span><span style=\"color: #000000\">close<\/span><span style=\"color: #ce5c00; font-weight: bold\">));<\/span>\n            <span style=\"color: #000000\">streams<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">start<\/span><span style=\"color: #ce5c00; font-weight: bold\">();<\/span>\n        <span style=\"color: #ce5c00; font-weight: bold\">};<\/span>\n    <span style=\"color: #ce5c00; font-weight: bold\">}<\/span>\n\n    <span style=\"color: #204a87; font-weight: bold\">private<\/span> <span style=\"color: #000000\">Topology<\/span> <span style=\"color: #000000\">getTopology<\/span><span style=\"color: #ce5c00; font-weight: bold\">()<\/span> <span style=\"color: #ce5c00; font-weight: bold\">{<\/span>\n        <span style=\"color: #000000\">StreamsBuilder<\/span> <span style=\"color: #000000\">builder<\/span> <span style=\"color: #ce5c00; font-weight: bold\">=<\/span> <span style=\"color: #204a87; font-weight: bold\">new<\/span> <span style=\"color: #000000\">StreamsBuilder<\/span><span style=\"color: #ce5c00; font-weight: bold\">();<\/span>\n\n        <span style=\"color: #8f5902; font-style: italic\">\/\/ original streams<\/span>\n        <span style=\"color: #000000\">KStream<\/span><span style=\"color: #ce5c00; font-weight: bold\">&lt;<\/span><span style=\"color: #000000\">String<\/span><span style=\"color: #ce5c00; font-weight: bold\">,<\/span> <span style=\"color: #000000\">String<\/span><span style=\"color: #ce5c00; font-weight: bold\">&gt;<\/span> <span style=\"color: #000000\">firstStream<\/span> <span style=\"color: #ce5c00; font-weight: bold\">=<\/span> <span style=\"color: #000000\">builder<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">stream<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #4e9a06\">&quot;first&quot;<\/span><span style=\"color: #ce5c00; font-weight: bold\">);<\/span>\n        <span style=\"color: #000000\">KStream<\/span><span style=\"color: #ce5c00; font-weight: bold\">&lt;<\/span><span style=\"color: #000000\">String<\/span><span style=\"color: #ce5c00; font-weight: bold\">,<\/span> <span style=\"color: #000000\">String<\/span><span style=\"color: #ce5c00; font-weight: bold\">&gt;<\/span> <span style=\"color: #000000\">secondStream<\/span> <span style=\"color: #ce5c00; font-weight: bold\">=<\/span> <span style=\"color: #000000\">builder<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">stream<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #4e9a06\">&quot;second&quot;<\/span><span style=\"color: #ce5c00; font-weight: bold\">);<\/span>\n\n        <span style=\"color: #8f5902; font-style: italic\">\/\/ filter the key &quot;3&quot; from the second stream<\/span>\n        <span style=\"color: #000000\">KStream<\/span><span style=\"color: #ce5c00; font-weight: bold\">&lt;<\/span><span style=\"color: #000000\">String<\/span><span style=\"color: #ce5c00; font-weight: bold\">,<\/span> <span style=\"color: #000000\">String<\/span><span style=\"color: #ce5c00; font-weight: bold\">&gt;<\/span> <span style=\"color: #000000\">secondFiltered<\/span> <span style=\"color: #ce5c00; font-weight: bold\">=<\/span> <span style=\"color: #000000\">secondStream<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">filter<\/span><span style=\"color: #ce5c00; font-weight: bold\">((<\/span><span style=\"color: #000000\">key<\/span><span style=\"color: #ce5c00; font-weight: bold\">,<\/span> <span style=\"color: #000000\">value<\/span><span style=\"color: #ce5c00; font-weight: bold\">)<\/span> <span style=\"color: #ce5c00; font-weight: bold\">-&gt;<\/span> <span style=\"color: #ce5c00; font-weight: bold\">{<\/span>\n\n            <span style=\"color: #204a87; font-weight: bold\">boolean<\/span> <span style=\"color: #000000\">res<\/span> <span style=\"color: #ce5c00; font-weight: bold\">=<\/span> <span style=\"color: #ce5c00; font-weight: bold\">!<\/span><span style=\"color: #000000\">key<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">equals<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #4e9a06\">&quot;3&quot;<\/span><span style=\"color: #ce5c00; font-weight: bold\">);<\/span>\n\n            <span style=\"color: #204a87; font-weight: bold\">if<\/span> <span style=\"color: #ce5c00; font-weight: bold\">(!<\/span><span style=\"color: #000000\">res<\/span><span style=\"color: #ce5c00; font-weight: bold\">)<\/span> <span style=\"color: #ce5c00; font-weight: bold\">{<\/span>\n                <span style=\"color: #000000\">logger<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">info<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #4e9a06\">&quot;filtering out message with key value of 3&quot;<\/span><span style=\"color: #ce5c00; font-weight: bold\">);<\/span>\n            <span style=\"color: #ce5c00; font-weight: bold\">}<\/span>\n\n            <span style=\"color: #204a87; font-weight: bold\">return<\/span> <span style=\"color: #000000\">res<\/span><span style=\"color: #ce5c00; font-weight: bold\">;<\/span>\n        <span style=\"color: #ce5c00; font-weight: bold\">});<\/span>\n\n        <span style=\"color: #8f5902; font-style: italic\">\/\/ join the firstStream and the secondFiltered<\/span>\n        <span style=\"color: #000000\">KStream<\/span><span style=\"color: #ce5c00; font-weight: bold\">&lt;<\/span><span style=\"color: #000000\">String<\/span><span style=\"color: #ce5c00; font-weight: bold\">,<\/span> <span style=\"color: #000000\">String<\/span><span style=\"color: #ce5c00; font-weight: bold\">&gt;<\/span> <span style=\"color: #000000\">mergedStreams<\/span> <span style=\"color: #ce5c00; font-weight: bold\">=<\/span> <span style=\"color: #000000\">firstStream<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">merge<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #000000\">secondFiltered<\/span><span style=\"color: #ce5c00; font-weight: bold\">);<\/span>\n\n        <span style=\"color: #8f5902; font-style: italic\">\/\/ all right, from merged streams, three branches, one for key &quot;0&quot;, the other one for key &quot;1&quot; and finally the rest<\/span>\n        <span style=\"color: #8f5902; font-style: italic\">\/\/ will go to the other branch.<\/span>\n        <span style=\"color: #8f5902; font-style: italic\">\/\/ first branch goes to &quot;zero&quot;, the second branch goes to the topic &quot;one&quot; and finally the other messages<\/span>\n        <span style=\"color: #8f5902; font-style: italic\">\/\/ will go to the branch which is called three<\/span>\n        <span style=\"color: #000000\">KStream<\/span><span style=\"color: #ce5c00; font-weight: bold\">&lt;<\/span><span style=\"color: #000000\">String<\/span><span style=\"color: #ce5c00; font-weight: bold\">,<\/span> <span style=\"color: #000000\">String<\/span><span style=\"color: #ce5c00; font-weight: bold\">&gt;[]<\/span> <span style=\"color: #000000\">branches<\/span> <span style=\"color: #ce5c00; font-weight: bold\">=<\/span> <span style=\"color: #000000\">mergedStreams<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">branch<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span>\n                <span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #000000\">k<\/span><span style=\"color: #ce5c00; font-weight: bold\">,<\/span> <span style=\"color: #000000\">val<\/span><span style=\"color: #ce5c00; font-weight: bold\">)<\/span> <span style=\"color: #ce5c00; font-weight: bold\">-&gt;<\/span> <span style=\"color: #4e9a06\">&quot;0&quot;<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">equals<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #000000\">k<\/span><span style=\"color: #ce5c00; font-weight: bold\">),<\/span>\n                <span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #000000\">k<\/span><span style=\"color: #ce5c00; font-weight: bold\">,<\/span> <span style=\"color: #000000\">val<\/span><span style=\"color: #ce5c00; font-weight: bold\">)<\/span> <span style=\"color: #ce5c00; font-weight: bold\">-&gt;<\/span> <span style=\"color: #4e9a06\">&quot;1&quot;<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">equals<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #000000\">k<\/span><span style=\"color: #ce5c00; font-weight: bold\">),<\/span>\n                <span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #000000\">k<\/span><span style=\"color: #ce5c00; font-weight: bold\">,<\/span> <span style=\"color: #000000\">v<\/span><span style=\"color: #ce5c00; font-weight: bold\">)<\/span> <span style=\"color: #ce5c00; font-weight: bold\">-&gt;<\/span> <span style=\"color: #204a87; font-weight: bold\">true<\/span><span style=\"color: #ce5c00; font-weight: bold\">);<\/span>\n\n        <span style=\"color: #000000\">branches<\/span><span style=\"color: #ce5c00; font-weight: bold\">[<\/span><span style=\"color: #0000cf; font-weight: bold\">0<\/span><span style=\"color: #ce5c00; font-weight: bold\">].<\/span><span style=\"color: #c4a000\">to<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #4e9a06\">&quot;zero&quot;<\/span><span style=\"color: #ce5c00; font-weight: bold\">);<\/span>\n        <span style=\"color: #000000\">branches<\/span><span style=\"color: #ce5c00; font-weight: bold\">[<\/span><span style=\"color: #0000cf; font-weight: bold\">1<\/span><span style=\"color: #ce5c00; font-weight: bold\">].<\/span><span style=\"color: #c4a000\">to<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #4e9a06\">&quot;one&quot;<\/span><span style=\"color: #ce5c00; font-weight: bold\">);<\/span>\n        <span style=\"color: #000000\">branches<\/span><span style=\"color: #ce5c00; font-weight: bold\">[<\/span><span style=\"color: #0000cf; font-weight: bold\">2<\/span><span style=\"color: #ce5c00; font-weight: bold\">].<\/span><span style=\"color: #c4a000\">to<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #4e9a06\">&quot;three&quot;<\/span><span style=\"color: #ce5c00; font-weight: bold\">);<\/span>\n\n        <span style=\"color: #204a87; font-weight: bold\">return<\/span> <span style=\"color: #000000\">builder<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">build<\/span><span style=\"color: #ce5c00; font-weight: bold\">();<\/span>\n    <span style=\"color: #ce5c00; font-weight: bold\">}<\/span>\n\n    <span style=\"color: #204a87; font-weight: bold\">private<\/span> <span style=\"color: #000000\">Runnable<\/span> <span style=\"color: #000000\">getConsumer<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #000000\">Context<\/span> <span style=\"color: #000000\">context<\/span><span style=\"color: #ce5c00; font-weight: bold\">,<\/span> <span style=\"color: #000000\">String<\/span> <span style=\"color: #000000\">topicName<\/span><span style=\"color: #ce5c00; font-weight: bold\">)<\/span> <span style=\"color: #ce5c00; font-weight: bold\">{<\/span>\n        <span style=\"color: #204a87; font-weight: bold\">return<\/span> <span style=\"color: #ce5c00; font-weight: bold\">()<\/span> <span style=\"color: #ce5c00; font-weight: bold\">-&gt;<\/span> <span style=\"color: #ce5c00; font-weight: bold\">{<\/span>\n            <span style=\"color: #8f5902; font-style: italic\">\/\/ connect to the queue<\/span>\n            <span style=\"color: #000000\">Properties<\/span> <span style=\"color: #000000\">props<\/span> <span style=\"color: #ce5c00; font-weight: bold\">=<\/span> <span style=\"color: #204a87; font-weight: bold\">new<\/span> <span style=\"color: #000000\">Properties<\/span><span style=\"color: #ce5c00; font-weight: bold\">();<\/span>\n            <span style=\"color: #000000\">props<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">put<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #4e9a06\">&quot;bootstrap.servers&quot;<\/span><span style=\"color: #ce5c00; font-weight: bold\">,<\/span> <span style=\"color: #4e9a06\">&quot;localhost:9092&quot;<\/span><span style=\"color: #ce5c00; font-weight: bold\">);<\/span> <span style=\"color: #8f5902; font-style: italic\">\/\/ Replace with your Kafka broker<\/span>\n            <span style=\"color: #000000\">props<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">put<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #4e9a06\">&quot;group.id&quot;<\/span><span style=\"color: #ce5c00; font-weight: bold\">,<\/span> <span style=\"color: #4e9a06\">&quot;current-consumer-group&quot;<\/span><span style=\"color: #ce5c00; font-weight: bold\">);<\/span>  <span style=\"color: #8f5902; font-style: italic\">\/\/ Consumer group ID<\/span>\n            <span style=\"color: #000000\">props<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">put<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #4e9a06\">&quot;key.deserializer&quot;<\/span><span style=\"color: #ce5c00; font-weight: bold\">,<\/span> <span style=\"color: #4e9a06\">&quot;org.apache.kafka.common.serialization.StringDeserializer&quot;<\/span><span style=\"color: #ce5c00; font-weight: bold\">);<\/span>\n            <span style=\"color: #000000\">props<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">put<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #4e9a06\">&quot;value.deserializer&quot;<\/span><span style=\"color: #ce5c00; font-weight: bold\">,<\/span> <span style=\"color: #4e9a06\">&quot;org.apache.kafka.common.serialization.StringDeserializer&quot;<\/span><span style=\"color: #ce5c00; font-weight: bold\">);<\/span>\n            <span style=\"color: #000000\">props<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">put<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #4e9a06\">&quot;enable.auto.commit&quot;<\/span><span style=\"color: #ce5c00; font-weight: bold\">,<\/span> <span style=\"color: #4e9a06\">&quot;true&quot;<\/span><span style=\"color: #ce5c00; font-weight: bold\">);<\/span>         <span style=\"color: #8f5902; font-style: italic\">\/\/ Automatically commit offsets<\/span>\n            <span style=\"color: #000000\">props<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">put<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #4e9a06\">&quot;auto.commit.interval.ms&quot;<\/span><span style=\"color: #ce5c00; font-weight: bold\">,<\/span> <span style=\"color: #4e9a06\">&quot;1000&quot;<\/span><span style=\"color: #ce5c00; font-weight: bold\">);<\/span>    <span style=\"color: #8f5902; font-style: italic\">\/\/ Commit interval<\/span>\n            <span style=\"color: #000000\">props<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">put<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #4e9a06\">&quot;auto.offset.reset&quot;<\/span><span style=\"color: #ce5c00; font-weight: bold\">,<\/span> <span style=\"color: #4e9a06\">&quot;earliest&quot;<\/span><span style=\"color: #ce5c00; font-weight: bold\">);<\/span>      <span style=\"color: #8f5902; font-style: italic\">\/\/ Start from the beginning if no offset is available<\/span>\n\n            <span style=\"color: #000000\">KafkaConsumer<\/span><span style=\"color: #ce5c00; font-weight: bold\">&lt;<\/span><span style=\"color: #000000\">String<\/span><span style=\"color: #ce5c00; font-weight: bold\">,<\/span> <span style=\"color: #000000\">String<\/span><span style=\"color: #ce5c00; font-weight: bold\">&gt;<\/span> <span style=\"color: #000000\">consumer<\/span> <span style=\"color: #ce5c00; font-weight: bold\">=<\/span> <span style=\"color: #204a87; font-weight: bold\">new<\/span> <span style=\"color: #000000\">KafkaConsumer<\/span><span style=\"color: #ce5c00; font-weight: bold\">&lt;&gt;(<\/span><span style=\"color: #000000\">props<\/span><span style=\"color: #ce5c00; font-weight: bold\">);<\/span>\n            <span style=\"color: #000000\">consumer<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">subscribe<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #000000\">Collections<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">singletonList<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #000000\">topicName<\/span><span style=\"color: #ce5c00; font-weight: bold\">));<\/span>\n\n            <span style=\"color: #8f5902; font-style: italic\">\/\/ wait for message and write them on the console<\/span>\n            <span style=\"color: #8f5902; font-style: italic\">\/\/ in case of interruption, then go ahead and exit<\/span>\n            <span style=\"color: #204a87; font-weight: bold\">while<\/span> <span style=\"color: #ce5c00; font-weight: bold\">(!<\/span> <span style=\"color: #000000\">context<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">isStopping<\/span><span style=\"color: #ce5c00; font-weight: bold\">())<\/span> <span style=\"color: #ce5c00; font-weight: bold\">{<\/span>\n                <span style=\"color: #000000\">ConsumerRecords<\/span><span style=\"color: #ce5c00; font-weight: bold\">&lt;<\/span><span style=\"color: #000000\">String<\/span><span style=\"color: #ce5c00; font-weight: bold\">,<\/span> <span style=\"color: #000000\">String<\/span><span style=\"color: #ce5c00; font-weight: bold\">&gt;<\/span> <span style=\"color: #000000\">records<\/span> <span style=\"color: #ce5c00; font-weight: bold\">=<\/span> <span style=\"color: #000000\">consumer<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">poll<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #000000\">Duration<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">ofMillis<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #0000cf; font-weight: bold\">1000<\/span><span style=\"color: #ce5c00; font-weight: bold\">));<\/span>\n                <span style=\"color: #000000\">records<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">forEach<\/span><span style=\"color: #ce5c00; font-weight: bold\">((<\/span><span style=\"color: #000000\">record<\/span><span style=\"color: #ce5c00; font-weight: bold\">)<\/span> <span style=\"color: #ce5c00; font-weight: bold\">-&gt;<\/span> <span style=\"color: #ce5c00; font-weight: bold\">{<\/span>\n                    <span style=\"color: #000000\">String<\/span> <span style=\"color: #000000\">value<\/span> <span style=\"color: #ce5c00; font-weight: bold\">=<\/span> <span style=\"color: #000000\">record<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">value<\/span><span style=\"color: #ce5c00; font-weight: bold\">();<\/span>\n                    <span style=\"color: #000000\">System<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">out<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">println<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #4e9a06\">&quot;consuming from topic: &quot;<\/span> <span style=\"color: #ce5c00; font-weight: bold\">+<\/span> <span style=\"color: #000000\">topicName<\/span> <span style=\"color: #ce5c00; font-weight: bold\">+<\/span> <span style=\"color: #4e9a06\">&quot;, value: &quot;<\/span> <span style=\"color: #ce5c00; font-weight: bold\">+<\/span> <span style=\"color: #000000\">value<\/span><span style=\"color: #ce5c00; font-weight: bold\">);<\/span>\n                <span style=\"color: #ce5c00; font-weight: bold\">});<\/span>\n\n            <span style=\"color: #ce5c00; font-weight: bold\">}<\/span>\n\n            <span style=\"color: #000000\">consumer<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">close<\/span><span style=\"color: #ce5c00; font-weight: bold\">();<\/span>\n        <span style=\"color: #ce5c00; font-weight: bold\">};<\/span>\n    <span style=\"color: #ce5c00; font-weight: bold\">}<\/span>\n<span style=\"color: #ce5c00; font-weight: bold\">}<\/span>\n<\/pre><\/div>\n\n\n\n<p>The pom.xml<\/p>\n\n\n\n<p>Here is the pom.xml, just in case you need all the dependencies. In this particular situation there are very few dependencies, only the kafka-streams which will pull all the other core Kafka libraries<\/p>\n\n\n\n<!-- HTML generated using hilite.me --><div style=\"background: #f8f8f8; overflow:auto;width:auto;border:solid gray;border-width:.1em .1em .1em .8em;padding:.2em .6em;\"><pre style=\"margin: 0; line-height: 125%\"><span style=\"color: #8f5902; font-style: italic\">&lt;?xml version=&quot;1.0&quot; encoding=&quot;UTF-8&quot;?&gt;<\/span>\n<span style=\"color: #204a87; font-weight: bold\">&lt;project<\/span> <span style=\"color: #c4a000\">xmlns=<\/span><span style=\"color: #4e9a06\">&quot;http:\/\/maven.apache.org\/POM\/4.0.0&quot;<\/span>\n         <span style=\"color: #c4a000\">xmlns:xsi=<\/span><span style=\"color: #4e9a06\">&quot;http:\/\/www.w3.org\/2001\/XMLSchema-instance&quot;<\/span>\n         <span style=\"color: #c4a000\">xsi:schemaLocation=<\/span><span style=\"color: #4e9a06\">&quot;http:\/\/maven.apache.org\/POM\/4.0.0 http:\/\/maven.apache.org\/xsd\/maven-4.0.0.xsd&quot;<\/span><span style=\"color: #204a87; font-weight: bold\">&gt;<\/span>\n    <span style=\"color: #204a87; font-weight: bold\">&lt;modelVersion&gt;<\/span>4.0.0<span style=\"color: #204a87; font-weight: bold\">&lt;\/modelVersion&gt;<\/span>\n\n    <span style=\"color: #204a87; font-weight: bold\">&lt;groupId&gt;<\/span>org.example<span style=\"color: #204a87; font-weight: bold\">&lt;\/groupId&gt;<\/span>\n    <span style=\"color: #204a87; font-weight: bold\">&lt;artifactId&gt;<\/span>kafka<span style=\"color: #204a87; font-weight: bold\">&lt;\/artifactId&gt;<\/span>\n    <span style=\"color: #204a87; font-weight: bold\">&lt;version&gt;<\/span>1.0-SNAPSHOT<span style=\"color: #204a87; font-weight: bold\">&lt;\/version&gt;<\/span>\n\n    <span style=\"color: #204a87; font-weight: bold\">&lt;dependencies&gt;<\/span>\n        <span style=\"color: #204a87; font-weight: bold\">&lt;dependency&gt;<\/span>\n            <span style=\"color: #204a87; font-weight: bold\">&lt;groupId&gt;<\/span>org.apache.kafka<span style=\"color: #204a87; font-weight: bold\">&lt;\/groupId&gt;<\/span>\n            <span style=\"color: #204a87; font-weight: bold\">&lt;artifactId&gt;<\/span>kafka-streams<span style=\"color: #204a87; font-weight: bold\">&lt;\/artifactId&gt;<\/span>\n            <span style=\"color: #204a87; font-weight: bold\">&lt;version&gt;<\/span>2.7.0<span style=\"color: #204a87; font-weight: bold\">&lt;\/version&gt;<\/span>\n        <span style=\"color: #204a87; font-weight: bold\">&lt;\/dependency&gt;<\/span>\n        <span style=\"color: #204a87; font-weight: bold\">&lt;dependency&gt;<\/span>\n            <span style=\"color: #204a87; font-weight: bold\">&lt;groupId&gt;<\/span>org.slf4j<span style=\"color: #204a87; font-weight: bold\">&lt;\/groupId&gt;<\/span>\n            <span style=\"color: #204a87; font-weight: bold\">&lt;artifactId&gt;<\/span>slf4j-api<span style=\"color: #204a87; font-weight: bold\">&lt;\/artifactId&gt;<\/span>\n            <span style=\"color: #204a87; font-weight: bold\">&lt;version&gt;<\/span>2.0.9<span style=\"color: #204a87; font-weight: bold\">&lt;\/version&gt;<\/span>\n        <span style=\"color: #204a87; font-weight: bold\">&lt;\/dependency&gt;<\/span>\n        <span style=\"color: #204a87; font-weight: bold\">&lt;dependency&gt;<\/span>\n            <span style=\"color: #204a87; font-weight: bold\">&lt;groupId&gt;<\/span>org.slf4j<span style=\"color: #204a87; font-weight: bold\">&lt;\/groupId&gt;<\/span>\n            <span style=\"color: #204a87; font-weight: bold\">&lt;artifactId&gt;<\/span>slf4j-simple<span style=\"color: #204a87; font-weight: bold\">&lt;\/artifactId&gt;<\/span>\n            <span style=\"color: #204a87; font-weight: bold\">&lt;version&gt;<\/span>2.0.9<span style=\"color: #204a87; font-weight: bold\">&lt;\/version&gt;<\/span>\n        <span style=\"color: #204a87; font-weight: bold\">&lt;\/dependency&gt;<\/span>\n    <span style=\"color: #204a87; font-weight: bold\">&lt;\/dependencies&gt;<\/span>\n\n    <span style=\"color: #204a87; font-weight: bold\">&lt;properties&gt;<\/span>\n        <span style=\"color: #204a87; font-weight: bold\">&lt;maven.compiler.source&gt;<\/span>23<span style=\"color: #204a87; font-weight: bold\">&lt;\/maven.compiler.source&gt;<\/span>\n        <span style=\"color: #204a87; font-weight: bold\">&lt;maven.compiler.target&gt;<\/span>23<span style=\"color: #204a87; font-weight: bold\">&lt;\/maven.compiler.target&gt;<\/span>\n        <span style=\"color: #204a87; font-weight: bold\">&lt;project.build.sourceEncoding&gt;<\/span>UTF-8<span style=\"color: #204a87; font-weight: bold\">&lt;\/project.build.sourceEncoding&gt;<\/span>\n\n    <span style=\"color: #204a87; font-weight: bold\">&lt;\/properties&gt;<\/span>\n\n<span style=\"color: #204a87; font-weight: bold\">&lt;\/project&gt;<\/span>\n<\/pre><\/div>\n","protected":false},"excerpt":{"rendered":"<p>This is a sample of an application with kafka streams stateless.<\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"closed","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[1],"tags":[],"class_list":["post-176","post","type-post","status-publish","format-standard","hentry","category-uncategorized"],"_links":{"self":[{"href":"https:\/\/coding101.xyz\/index.php?rest_route=\/wp\/v2\/posts\/176","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/coding101.xyz\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/coding101.xyz\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/coding101.xyz\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/coding101.xyz\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=176"}],"version-history":[{"count":3,"href":"https:\/\/coding101.xyz\/index.php?rest_route=\/wp\/v2\/posts\/176\/revisions"}],"predecessor-version":[{"id":179,"href":"https:\/\/coding101.xyz\/index.php?rest_route=\/wp\/v2\/posts\/176\/revisions\/179"}],"wp:attachment":[{"href":"https:\/\/coding101.xyz\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=176"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/coding101.xyz\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=176"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/coding101.xyz\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=176"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}