Hi, Matthias

I am doing a 5-minute tumbling window analysis over a 57-minute data
flow. And I want only one final result per window. So I need suppress.
The 57-minute period can be divided into about 12 windows. The results
of the first 11 windows can be delivered downstream. But the final
result for the last 2-minute window can never be delivered downstream
since there is no a new record to advance the window.

Is there any workaround to deliver the result for the last window in
my situation?

-- Jingguo

Matthias J. Sax <matth...@confluent.io> 于2019年1月2日周三 下午10:27写道:
>
> > After some time, the window closes.
>
> This is not correct. Windows are based on event-time, and because no new
> input record is processed, the window is not closed. That is the reason
> why you don't get any output. Only a new input record can advance
> "stream time" and close the window.
>
> In practice, when data flows continuously, this should not be a issue
> though.
>
>
> -Matthias
>
> On 12/31/18 8:22 AM, jingguo yao wrote:
> > Sorry for my typo in the mail. "Whey" should be "Why" in "Whey does
> > the window final result is not emitted after the window has elapsed?"
> >
> > I have browsed the Kafka source code and found the cause of the
> > mentioned behaviour.
> >
> > org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor
> > has the following code:
> >
> > @Override
> > public void process(final K key, final Change<V> value) {
> >   buffer(key, value);
> >   enforceConstraints();
> > }
> >
> > enforceConstraints method invocation emits window results under some
> > conditions in the above code.
> >
> > After process method processes the first record, the window begins.
> > After some time, the window closes. But before process is invoked
> > again (triggered by receiving another record), there is no chance to
> > emit the window result.
> >
> > Are there some configuration options to emit the window result without
> > waiting for another record to arrive?
> >
> > And I using Kafka 2.1.0 contained in Confluent Open Source Edition
> > 5.1.0.
> >
> > jingguo yao <yaojing...@gmail.com> 于2018年12月30日周日 下午10:53写道:
> >>
> >> I followed [1] to code a simple example to try suppress operator.
> >>
> >> Here is the simple code:
> >>
> >> final Serde<String> stringSerde = Serdes.String();
> >> final StreamsBuilder builder = new StreamsBuilder();
> >> builder.stream("TextLinesTopic", Consumed.with(Serdes.String(),
> >> Serdes.String()))
> >>   .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
> >>   .groupBy((key, word) -> word,
> >> Grouped.keySerde(stringSerde).withValueSerde(stringSerde))
> >>   
> >> .windowedBy(TimeWindows.of(Duration.ofSeconds(3)).grace(Duration.ofMillis(0)))
> >>   .count(Materialized.with(Serdes.String(), Serdes.Long()))
> >>   
> >> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> >>   .toStream()
> >>   .foreach(
> >>       (key, value) -> {
> >>         System.out.printf("key: %s, value: %d\n", key, value);
> >>       });
> >>
> >> I set commit.interval.ms to 1 and cache.max.bytes.buffering to 0. If I
> >> send one text line "hello", nothing will be printed even I wait for
> >> more than 3 seconds (the window size). Since the time longer than the
> >> window size has elapsed, I think that key and value should be printed.
> >>
> >> But if I send another text line "hello", key and value will be
> >> printed.
> >>
> >> Can anyone explain this behavior? I have browsed the Kafka
> >> documentation. But I can't find an explanation.
> >>
> >> [1] 
> >> http://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#window-final-results
> >>
> >>
> >> --
> >> Jingguo
> >
> >
> >
>


-- 
Jingguo

Reply via email to