Kafka Windowed Streams

This is how windowing is done on a Kafka stream – basically calculates maximum of some value in the last five minutes and generates a record with it.

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;

import java.time.Duration;
import java.util.Properties;

public class MaxValueLast5Minutes {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "max-value-5min-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Double().getClass());

        StreamsBuilder builder = new StreamsBuilder();

        // Source topic
        KStream<String, Double> sourceStream = builder.stream("input-topic");

        // Compute max value for last 5 minutes with a 1-minute hopping window
        KTable<Windowed<String>, Double> maxValues = sourceStream
            .groupByKey()
            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5))
                                   .advanceBy(Duration.ofMinutes(1)))
            .reduce(Double::max);

        // Convert the results to a stream and output
        maxValues.toStream()
                 .map((windowedKey, value) -> KeyValue.pair(windowedKey.key(), value))
                 .to("output-topic", Produced.with(Serdes.String(), Serdes.Double()));

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}