Did you ever push any data with a greater timestamp than the current one you're producing? One thing took me a while to find out is that the suppress timestamp is per topic and not per key
-- Alessandro Tagliapietra On Wed, Sep 11, 2019 at 8:06 PM Thameem Ansari <thame...@gmail.com> wrote: > Yes I am able to see the output when I remove suppress. > > > > On Sep 11, 2019, at 9:58 PM, Matthias J. Sax <matth...@confluent.io> > wrote: > > > > Hard to say. Do you see output if you remove `suppress()` from your > > topology? > > > > -Matthias > > > > > > On 9/11/19 6:19 PM, Thameem Ansari wrote: > >> I am using a producer simulator to simulate the events in the past and > I can see my time advances and the topology is based on the event time. But > even if I run the producer for few hours nothing get emitted. Is there > anyway to debug this issue? > >> > >> > >> > >>> On Sep 11, 2019, at 6:13 PM, Matthias J. Sax <matth...@confluent.io> > wrote: > >>> > >>> Note that `suppress()` is event time based, and does not emit any data > >>> if event time does not advance. > >>> > >>> A common miss understanding is, that people stop to send data and > expect > >>> to see a result after some time, but that is not how it works. If you > >>> stop sending data, event time cannot advance and thus emit will never > >>> send anything downstream. > >>> > >>> Also see this blog post about `suppress`: > >>> > https://www.confluent.io/blog/kafka-streams-take-on-watermarks-and-triggers > >>> > >>> > >>> -Matthias > >>> > >>> > >>> > >>> On 9/10/19 9:52 PM, Thameem Ansari wrote: > >>>> In my streaming topology, I am using the suppress dsl operator. As > per the documentation, it is supposed to output the final results after the > window closes. But I noticed it's not emitting anything at all. Here is the > pseudo code of my topology. > >>>> > >>>> .filter((key, value) -> ...) > >>>> .flatMap((key, value) -> { > >>>> ... > >>>> }) > >>>> .groupByKey(Grouped.with(Serdes.String(), ...)) > >>>> > .windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(Duration.ofMinutes(1))) > >>>> .aggregate( > >>>> ... > >>>> > ).suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())); > > >>>> Anything wrong here?? > >>>> > >>>> Thanks > >>>> Thameem > >>>> > >>> > >> > >> > > > >