I followed [1] to code a simple example to try suppress operator. Here is the simple code:
final Serde<String> stringSerde = Serdes.String(); final StreamsBuilder builder = new StreamsBuilder(); builder.stream("TextLinesTopic", Consumed.with(Serdes.String(), Serdes.String())) .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))) .groupBy((key, word) -> word, Grouped.keySerde(stringSerde).withValueSerde(stringSerde)) .windowedBy(TimeWindows.of(Duration.ofSeconds(3)).grace(Duration.ofMillis(0))) .count(Materialized.with(Serdes.String(), Serdes.Long())) .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) .toStream() .foreach( (key, value) -> { System.out.printf("key: %s, value: %d\n", key, value); }); I set commit.interval.ms to 1 and cache.max.bytes.buffering to 0. If I send one text line "hello", nothing will be printed even I wait for more than 3 seconds (the window size). Since the time longer than the window size has elapsed, I think that key and value should be printed. But if I send another text line "hello", key and value will be printed. Can anyone explain this behavior? I have browsed the Kafka documentation. But I can't find an explanation. [1] http://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#window-final-results -- Jingguo