I followed [1] to code a simple example to try suppress operator.

Here is the simple code:

final Serde<String> stringSerde = Serdes.String();
final StreamsBuilder builder = new StreamsBuilder();
builder.stream("TextLinesTopic", Consumed.with(Serdes.String(),
  .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
  .groupBy((key, word) -> word,
  .count(Materialized.with(Serdes.String(), Serdes.Long()))
      (key, value) -> {
        System.out.printf("key: %s, value: %d\n", key, value);

I set commit.interval.ms to 1 and cache.max.bytes.buffering to 0. If I
send one text line "hello", nothing will be printed even I wait for
more than 3 seconds (the window size). Since the time longer than the
window size has elapsed, I think that key and value should be printed.

But if I send another text line "hello", key and value will be

Can anyone explain this behavior? I have browsed the Kafka
documentation. But I can't find an explanation.



Reply via email to