Did you ever push any data with a greater timestamp than the current one
you're producing?
One thing took me a while to find out is that the suppress timestamp is per
topic and not per key

--
Alessandro Tagliapietra


On Wed, Sep 11, 2019 at 8:06 PM Thameem Ansari <thame...@gmail.com> wrote:

> Yes I am able to see the output when I remove suppress.
>
>
> > On Sep 11, 2019, at 9:58 PM, Matthias J. Sax <matth...@confluent.io>
> wrote:
> >
> > Hard to say. Do you see output if you remove `suppress()` from your
> > topology?
> >
> > -Matthias
> >
> >
> > On 9/11/19 6:19 PM, Thameem Ansari wrote:
> >> I am using a producer simulator to simulate the events in the past and
> I can see my time advances and the topology is based on the event time. But
> even if I run the producer for few hours nothing get emitted. Is there
> anyway to debug this issue?
> >>
> >>
> >>
> >>> On Sep 11, 2019, at 6:13 PM, Matthias J. Sax <matth...@confluent.io>
> wrote:
> >>>
> >>> Note that `suppress()` is event time based, and does not emit any data
> >>> if event time does not advance.
> >>>
> >>> A common miss understanding is, that people stop to send data and
> expect
> >>> to see a result after some time, but that is not how it works. If you
> >>> stop sending data, event time cannot advance and thus emit will never
> >>> send anything downstream.
> >>>
> >>> Also see this blog post about `suppress`:
> >>>
> https://www.confluent.io/blog/kafka-streams-take-on-watermarks-and-triggers
> >>>
> >>>
> >>> -Matthias
> >>>
> >>>
> >>>
> >>> On 9/10/19 9:52 PM, Thameem Ansari wrote:
> >>>> In my streaming topology, I am using the suppress dsl operator. As
> per the documentation, it is supposed to output the final results after the
> window closes. But I noticed it's not emitting anything at all. Here is the
> pseudo code of my topology.
> >>>>
> >>>> .filter((key, value) -> ...)
> >>>> .flatMap((key, value) -> {
> >>>>   ...
> >>>> })
> >>>> .groupByKey(Grouped.with(Serdes.String(), ...))
> >>>>
> .windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(Duration.ofMinutes(1)))
> >>>> .aggregate(
> >>>>     ...
> >>>>
> ).suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()));
>
> >>>> Anything wrong here??
> >>>>
> >>>> Thanks
> >>>> Thameem
> >>>>
> >>>
> >>
> >>
> >
>
>

Reply via email to