This is how windowing is done on a Kafka stream – basically calculates maximum of some value in the last five minutes and generates a record with it.
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.*; import java.time.Duration; import java.util.Properties; public class MaxValueLast5Minutes { public static void main(String[] args) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "max-value-5min-app"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Double().getClass()); StreamsBuilder builder = new StreamsBuilder(); // Source topic KStream<String, Double> sourceStream = builder.stream("input-topic"); // Compute max value for last 5 minutes with a 1-minute hopping window KTable<Windowed<String>, Double> maxValues = sourceStream .groupByKey() .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)) .advanceBy(Duration.ofMinutes(1))) .reduce(Double::max); // Convert the results to a stream and output maxValues.toStream() .map((windowedKey, value) -> KeyValue.pair(windowedKey.key(), value)) .to("output-topic", Produced.with(Serdes.String(), Serdes.Double())); KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start(); Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); } }