{"id":180,"date":"2024-12-12T09:35:58","date_gmt":"2024-12-12T14:35:58","guid":{"rendered":"https:\/\/coding101.xyz\/?p=180"},"modified":"2024-12-12T09:37:22","modified_gmt":"2024-12-12T14:37:22","slug":"kafka-windowed-streams","status":"publish","type":"post","link":"https:\/\/coding101.xyz\/?p=180","title":{"rendered":"Kafka Windowed Streams"},"content":{"rendered":"\n<p>This is how windowing is done on a Kafka stream &#8211; basically calculates maximum of some value in the last five minutes and generates a record with it. <\/p>\n\n\n\n<!--more-->\n\n\n\n<!-- HTML generated using hilite.me --><div style=\"background: #f8f8f8; overflow:auto;width:auto;border:solid gray;border-width:.1em .1em .1em .8em;padding:.2em .6em;\"><pre style=\"margin: 0; line-height: 125%\"><span style=\"color: #204a87; font-weight: bold\">import<\/span> <span style=\"color: #000000\">org.apache.kafka.common.serialization.Serdes<\/span><span style=\"color: #ce5c00; font-weight: bold\">;<\/span>\n<span style=\"color: #204a87; font-weight: bold\">import<\/span> <span style=\"color: #000000\">org.apache.kafka.streams.KafkaStreams<\/span><span style=\"color: #ce5c00; font-weight: bold\">;<\/span>\n<span style=\"color: #204a87; font-weight: bold\">import<\/span> <span style=\"color: #000000\">org.apache.kafka.streams.StreamsBuilder<\/span><span style=\"color: #ce5c00; font-weight: bold\">;<\/span>\n<span style=\"color: #204a87; font-weight: bold\">import<\/span> <span style=\"color: #000000\">org.apache.kafka.streams.StreamsConfig<\/span><span style=\"color: #ce5c00; font-weight: bold\">;<\/span>\n<span style=\"color: #204a87; font-weight: bold\">import<\/span> <span style=\"color: #000000\">org.apache.kafka.streams.kstream.*<\/span><span style=\"color: #ce5c00; font-weight: bold\">;<\/span>\n\n<span style=\"color: #204a87; font-weight: bold\">import<\/span> <span style=\"color: #000000\">java.time.Duration<\/span><span style=\"color: #ce5c00; font-weight: bold\">;<\/span>\n<span style=\"color: #204a87; font-weight: bold\">import<\/span> <span style=\"color: #000000\">java.util.Properties<\/span><span style=\"color: #ce5c00; font-weight: bold\">;<\/span>\n\n<span style=\"color: #204a87; font-weight: bold\">public<\/span> <span style=\"color: #204a87; font-weight: bold\">class<\/span> <span style=\"color: #000000\">MaxValueLast5Minutes<\/span> <span style=\"color: #ce5c00; font-weight: bold\">{<\/span>\n    <span style=\"color: #204a87; font-weight: bold\">public<\/span> <span style=\"color: #204a87; font-weight: bold\">static<\/span> <span style=\"color: #204a87; font-weight: bold\">void<\/span> <span style=\"color: #000000\">main<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #000000\">String<\/span><span style=\"color: #ce5c00; font-weight: bold\">[]<\/span> <span style=\"color: #000000\">args<\/span><span style=\"color: #ce5c00; font-weight: bold\">)<\/span> <span style=\"color: #ce5c00; font-weight: bold\">{<\/span>\n        <span style=\"color: #000000\">Properties<\/span> <span style=\"color: #000000\">props<\/span> <span style=\"color: #ce5c00; font-weight: bold\">=<\/span> <span style=\"color: #204a87; font-weight: bold\">new<\/span> <span style=\"color: #000000\">Properties<\/span><span style=\"color: #ce5c00; font-weight: bold\">();<\/span>\n        <span style=\"color: #000000\">props<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">put<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #000000\">StreamsConfig<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">APPLICATION_ID_CONFIG<\/span><span style=\"color: #ce5c00; font-weight: bold\">,<\/span> <span style=\"color: #4e9a06\">&quot;max-value-5min-app&quot;<\/span><span style=\"color: #ce5c00; font-weight: bold\">);<\/span>\n        <span style=\"color: #000000\">props<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">put<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #000000\">StreamsConfig<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">BOOTSTRAP_SERVERS_CONFIG<\/span><span style=\"color: #ce5c00; font-weight: bold\">,<\/span> <span style=\"color: #4e9a06\">&quot;localhost:9092&quot;<\/span><span style=\"color: #ce5c00; font-weight: bold\">);<\/span>\n        <span style=\"color: #000000\">props<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">put<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #000000\">StreamsConfig<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">DEFAULT_KEY_SERDE_CLASS_CONFIG<\/span><span style=\"color: #ce5c00; font-weight: bold\">,<\/span> <span style=\"color: #000000\">Serdes<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">String<\/span><span style=\"color: #ce5c00; font-weight: bold\">().<\/span><span style=\"color: #c4a000\">getClass<\/span><span style=\"color: #ce5c00; font-weight: bold\">());<\/span>\n        <span style=\"color: #000000\">props<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">put<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #000000\">StreamsConfig<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">DEFAULT_VALUE_SERDE_CLASS_CONFIG<\/span><span style=\"color: #ce5c00; font-weight: bold\">,<\/span> <span style=\"color: #000000\">Serdes<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">Double<\/span><span style=\"color: #ce5c00; font-weight: bold\">().<\/span><span style=\"color: #c4a000\">getClass<\/span><span style=\"color: #ce5c00; font-weight: bold\">());<\/span>\n\n        <span style=\"color: #000000\">StreamsBuilder<\/span> <span style=\"color: #000000\">builder<\/span> <span style=\"color: #ce5c00; font-weight: bold\">=<\/span> <span style=\"color: #204a87; font-weight: bold\">new<\/span> <span style=\"color: #000000\">StreamsBuilder<\/span><span style=\"color: #ce5c00; font-weight: bold\">();<\/span>\n\n        <span style=\"color: #8f5902; font-style: italic\">\/\/ Source topic<\/span>\n        <span style=\"color: #000000\">KStream<\/span><span style=\"color: #ce5c00; font-weight: bold\">&lt;<\/span><span style=\"color: #000000\">String<\/span><span style=\"color: #ce5c00; font-weight: bold\">,<\/span> <span style=\"color: #000000\">Double<\/span><span style=\"color: #ce5c00; font-weight: bold\">&gt;<\/span> <span style=\"color: #000000\">sourceStream<\/span> <span style=\"color: #ce5c00; font-weight: bold\">=<\/span> <span style=\"color: #000000\">builder<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">stream<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #4e9a06\">&quot;input-topic&quot;<\/span><span style=\"color: #ce5c00; font-weight: bold\">);<\/span>\n\n        <span style=\"color: #8f5902; font-style: italic\">\/\/ Compute max value for last 5 minutes with a 1-minute hopping window<\/span>\n        <span style=\"color: #000000\">KTable<\/span><span style=\"color: #ce5c00; font-weight: bold\">&lt;<\/span><span style=\"color: #000000\">Windowed<\/span><span style=\"color: #ce5c00; font-weight: bold\">&lt;<\/span><span style=\"color: #000000\">String<\/span><span style=\"color: #ce5c00; font-weight: bold\">&gt;,<\/span> <span style=\"color: #000000\">Double<\/span><span style=\"color: #ce5c00; font-weight: bold\">&gt;<\/span> <span style=\"color: #000000\">maxValues<\/span> <span style=\"color: #ce5c00; font-weight: bold\">=<\/span> <span style=\"color: #000000\">sourceStream<\/span>\n            <span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">groupByKey<\/span><span style=\"color: #ce5c00; font-weight: bold\">()<\/span>\n            <span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">windowedBy<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #000000\">TimeWindows<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">ofSizeWithNoGrace<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #000000\">Duration<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">ofMinutes<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #0000cf; font-weight: bold\">5<\/span><span style=\"color: #ce5c00; font-weight: bold\">))<\/span>\n                                   <span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">advanceBy<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #000000\">Duration<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">ofMinutes<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #0000cf; font-weight: bold\">1<\/span><span style=\"color: #ce5c00; font-weight: bold\">)))<\/span>\n            <span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">reduce<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #f57900\">Double:<\/span><span style=\"color: #ce5c00; font-weight: bold\">:<\/span><span style=\"color: #000000\">max<\/span><span style=\"color: #ce5c00; font-weight: bold\">);<\/span>\n\n        <span style=\"color: #8f5902; font-style: italic\">\/\/ Convert the results to a stream and output<\/span>\n        <span style=\"color: #000000\">maxValues<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">toStream<\/span><span style=\"color: #ce5c00; font-weight: bold\">()<\/span>\n                 <span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">map<\/span><span style=\"color: #ce5c00; font-weight: bold\">((<\/span><span style=\"color: #000000\">windowedKey<\/span><span style=\"color: #ce5c00; font-weight: bold\">,<\/span> <span style=\"color: #000000\">value<\/span><span style=\"color: #ce5c00; font-weight: bold\">)<\/span> <span style=\"color: #ce5c00; font-weight: bold\">-&gt;<\/span> <span style=\"color: #000000\">KeyValue<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">pair<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #000000\">windowedKey<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">key<\/span><span style=\"color: #ce5c00; font-weight: bold\">(),<\/span> <span style=\"color: #000000\">value<\/span><span style=\"color: #ce5c00; font-weight: bold\">))<\/span>\n                 <span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">to<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #4e9a06\">&quot;output-topic&quot;<\/span><span style=\"color: #ce5c00; font-weight: bold\">,<\/span> <span style=\"color: #000000\">Produced<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">with<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #000000\">Serdes<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">String<\/span><span style=\"color: #ce5c00; font-weight: bold\">(),<\/span> <span style=\"color: #000000\">Serdes<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">Double<\/span><span style=\"color: #ce5c00; font-weight: bold\">()));<\/span>\n\n        <span style=\"color: #000000\">KafkaStreams<\/span> <span style=\"color: #000000\">streams<\/span> <span style=\"color: #ce5c00; font-weight: bold\">=<\/span> <span style=\"color: #204a87; font-weight: bold\">new<\/span> <span style=\"color: #000000\">KafkaStreams<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #000000\">builder<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">build<\/span><span style=\"color: #ce5c00; font-weight: bold\">(),<\/span> <span style=\"color: #000000\">props<\/span><span style=\"color: #ce5c00; font-weight: bold\">);<\/span>\n        <span style=\"color: #000000\">streams<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">start<\/span><span style=\"color: #ce5c00; font-weight: bold\">();<\/span>\n\n        <span style=\"color: #000000\">Runtime<\/span><span style=\"color: #ce5c00; font-weight: bold\">.<\/span><span style=\"color: #c4a000\">getRuntime<\/span><span style=\"color: #ce5c00; font-weight: bold\">().<\/span><span style=\"color: #c4a000\">addShutdownHook<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #204a87; font-weight: bold\">new<\/span> <span style=\"color: #000000\">Thread<\/span><span style=\"color: #ce5c00; font-weight: bold\">(<\/span><span style=\"color: #f57900\">streams:<\/span><span style=\"color: #ce5c00; font-weight: bold\">:<\/span><span style=\"color: #000000\">close<\/span><span style=\"color: #ce5c00; font-weight: bold\">));<\/span>\n    <span style=\"color: #ce5c00; font-weight: bold\">}<\/span>\n<span style=\"color: #ce5c00; font-weight: bold\">}<\/span>\n<\/pre><\/div>\n\n","protected":false},"excerpt":{"rendered":"<p>This is how windowing is done on a Kafka stream &#8211; basically calculates maximum of some value in the last five minutes and generates a record with it.<\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"closed","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[1],"tags":[],"class_list":["post-180","post","type-post","status-publish","format-standard","hentry","category-uncategorized"],"_links":{"self":[{"href":"https:\/\/coding101.xyz\/index.php?rest_route=\/wp\/v2\/posts\/180","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/coding101.xyz\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/coding101.xyz\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/coding101.xyz\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/coding101.xyz\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=180"}],"version-history":[{"count":3,"href":"https:\/\/coding101.xyz\/index.php?rest_route=\/wp\/v2\/posts\/180\/revisions"}],"predecessor-version":[{"id":184,"href":"https:\/\/coding101.xyz\/index.php?rest_route=\/wp\/v2\/posts\/180\/revisions\/184"}],"wp:attachment":[{"href":"https:\/\/coding101.xyz\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=180"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/coding101.xyz\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=180"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/coding101.xyz\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=180"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}