I followed [1] to code a simple example to try suppress operator.

Here is the simple code:

final Serde<String> stringSerde = Serdes.String();
final StreamsBuilder builder = new StreamsBuilder();
builder.stream("TextLinesTopic", Consumed.with(Serdes.String(),
Serdes.String()))
  .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
  .groupBy((key, word) -> word,
Grouped.keySerde(stringSerde).withValueSerde(stringSerde))
  .windowedBy(TimeWindows.of(Duration.ofSeconds(3)).grace(Duration.ofMillis(0)))
  .count(Materialized.with(Serdes.String(), Serdes.Long()))
  .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
  .toStream()
  .foreach(
      (key, value) -> {
        System.out.printf("key: %s, value: %d\n", key, value);
      });

I set commit.interval.ms to 1 and cache.max.bytes.buffering to 0. If I
send one text line "hello", nothing will be printed even I wait for
more than 3 seconds (the window size). Since the time longer than the
window size has elapsed, I think that key and value should be printed.

But if I send another text line "hello", key and value will be
printed.

Can anyone explain this behavior? I have browsed the Kafka
documentation. But I can't find an explanation.

[1] 
http://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#window-final-results


-- 
Jingguo

Reply via email to