Hi, Matthias I am doing a 5-minute tumbling window analysis over a 57-minute data flow. And I want only one final result per window. So I need suppress. The 57-minute period can be divided into about 12 windows. The results of the first 11 windows can be delivered downstream. But the final result for the last 2-minute window can never be delivered downstream since there is no a new record to advance the window.
Is there any workaround to deliver the result for the last window in my situation? -- Jingguo Matthias J. Sax <matth...@confluent.io> 于2019年1月2日周三 下午10:27写道: > > > After some time, the window closes. > > This is not correct. Windows are based on event-time, and because no new > input record is processed, the window is not closed. That is the reason > why you don't get any output. Only a new input record can advance > "stream time" and close the window. > > In practice, when data flows continuously, this should not be a issue > though. > > > -Matthias > > On 12/31/18 8:22 AM, jingguo yao wrote: > > Sorry for my typo in the mail. "Whey" should be "Why" in "Whey does > > the window final result is not emitted after the window has elapsed?" > > > > I have browsed the Kafka source code and found the cause of the > > mentioned behaviour. > > > > org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor > > has the following code: > > > > @Override > > public void process(final K key, final Change<V> value) { > > buffer(key, value); > > enforceConstraints(); > > } > > > > enforceConstraints method invocation emits window results under some > > conditions in the above code. > > > > After process method processes the first record, the window begins. > > After some time, the window closes. But before process is invoked > > again (triggered by receiving another record), there is no chance to > > emit the window result. > > > > Are there some configuration options to emit the window result without > > waiting for another record to arrive? > > > > And I using Kafka 2.1.0 contained in Confluent Open Source Edition > > 5.1.0. > > > > jingguo yao <yaojing...@gmail.com> 于2018年12月30日周日 下午10:53写道: > >> > >> I followed [1] to code a simple example to try suppress operator. > >> > >> Here is the simple code: > >> > >> final Serde<String> stringSerde = Serdes.String(); > >> final StreamsBuilder builder = new StreamsBuilder(); > >> builder.stream("TextLinesTopic", Consumed.with(Serdes.String(), > >> Serdes.String())) > >> .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))) > >> .groupBy((key, word) -> word, > >> Grouped.keySerde(stringSerde).withValueSerde(stringSerde)) > >> > >> .windowedBy(TimeWindows.of(Duration.ofSeconds(3)).grace(Duration.ofMillis(0))) > >> .count(Materialized.with(Serdes.String(), Serdes.Long())) > >> > >> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) > >> .toStream() > >> .foreach( > >> (key, value) -> { > >> System.out.printf("key: %s, value: %d\n", key, value); > >> }); > >> > >> I set commit.interval.ms to 1 and cache.max.bytes.buffering to 0. If I > >> send one text line "hello", nothing will be printed even I wait for > >> more than 3 seconds (the window size). Since the time longer than the > >> window size has elapsed, I think that key and value should be printed. > >> > >> But if I send another text line "hello", key and value will be > >> printed. > >> > >> Can anyone explain this behavior? I have browsed the Kafka > >> documentation. But I can't find an explanation. > >> > >> [1] > >> http://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#window-final-results > >> > >> > >> -- > >> Jingguo > > > > > > > -- Jingguo