Did you try to test you code using `TopologyTestDriver`? Maybe this helps to figure out the root cause of the issue.
We have many unit/integration tests in place and many people use suppress() successfully in production. Hence, I am sure, it basically works -- of course, they might still be an unknown bug... One side question: the subject of this email thread says "2.4.0", but Kafka 2.4.0 is not release yet. Hence, I am wondering what version you are actually using? -Matthias On 9/25/19 4:53 PM, Thameem Ansari wrote: > Tried your suggestions and unable to get suppress emit anything. I can see > the SUPPRESS_STORES are created in Kafka nodes but nothing get outputted. > Looks like the grace period and window closing is not honored for some > reason. I can see lot of people having difficulty of getting suppress > working. > My window time is one minute and I tried with and without grace period. I can > see the event time is in the past as I am feeding the test data but even if I > post the data with progressive event times in an order nothing happens. > Any help is appreciated. > > Thanks. > >> On Sep 11, 2019, at 10:50 PM, Alessandro Tagliapietra >> <tagliapietra.alessan...@gmail.com> wrote: >> >> Have you tried deleting the suppress changelog topic to see if you get >> something after deleting it? >> >> By per topic and not per key I mean that if you send for example an event >> with timestamp equal to today's date with key 1 and that closes today's >> window and data in the past with key 2 won't go through >> >> On Wed, Sep 11, 2019, 8:45 PM Thameem Ansari <thame...@gmail.com> wrote: >> >>> I tried with different timestamps in the near past but nothing coming out. >>> I went thru the article from Confluent about using the suppress but I don’t >>> see many people are successful with that. >>> >>> What do you mean by “timestamp is per topic and not per key”. Can you >>> please elaborate? >>> >>> >>> >>> >>>> On Sep 11, 2019, at 10:13 PM, Alessandro Tagliapietra < >>> tagliapietra.alessan...@gmail.com> wrote: >>>> >>>> Did you ever push any data with a greater timestamp than the current one >>>> you're producing? >>>> One thing took me a while to find out is that the suppress timestamp is >>> per >>>> topic and not per key >>>> >>>> -- >>>> Alessandro Tagliapietra >>>> >>>> >>>> On Wed, Sep 11, 2019 at 8:06 PM Thameem Ansari <thame...@gmail.com> >>> wrote: >>>> >>>>> Yes I am able to see the output when I remove suppress. >>>>> >>>>> >>>>>> On Sep 11, 2019, at 9:58 PM, Matthias J. Sax <matth...@confluent.io> >>>>> wrote: >>>>>> >>>>>> Hard to say. Do you see output if you remove `suppress()` from your >>>>>> topology? >>>>>> >>>>>> -Matthias >>>>>> >>>>>> >>>>>> On 9/11/19 6:19 PM, Thameem Ansari wrote: >>>>>>> I am using a producer simulator to simulate the events in the past and >>>>> I can see my time advances and the topology is based on the event time. >>> But >>>>> even if I run the producer for few hours nothing get emitted. Is there >>>>> anyway to debug this issue? >>>>>>> >>>>>>> >>>>>>> >>>>>>>> On Sep 11, 2019, at 6:13 PM, Matthias J. Sax <matth...@confluent.io> >>>>> wrote: >>>>>>>> >>>>>>>> Note that `suppress()` is event time based, and does not emit any >>> data >>>>>>>> if event time does not advance. >>>>>>>> >>>>>>>> A common miss understanding is, that people stop to send data and >>>>> expect >>>>>>>> to see a result after some time, but that is not how it works. If you >>>>>>>> stop sending data, event time cannot advance and thus emit will never >>>>>>>> send anything downstream. >>>>>>>> >>>>>>>> Also see this blog post about `suppress`: >>>>>>>> >>>>> >>> https://www.confluent.io/blog/kafka-streams-take-on-watermarks-and-triggers >>>>>>>> >>>>>>>> >>>>>>>> -Matthias >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On 9/10/19 9:52 PM, Thameem Ansari wrote: >>>>>>>>> In my streaming topology, I am using the suppress dsl operator. As >>>>> per the documentation, it is supposed to output the final results after >>> the >>>>> window closes. But I noticed it's not emitting anything at all. Here is >>> the >>>>> pseudo code of my topology. >>>>>>>>> >>>>>>>>> .filter((key, value) -> ...) >>>>>>>>> .flatMap((key, value) -> { >>>>>>>>> ... >>>>>>>>> }) >>>>>>>>> .groupByKey(Grouped.with(Serdes.String(), ...)) >>>>>>>>> >>>>> >>> .windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(Duration.ofMinutes(1))) >>>>>>>>> .aggregate( >>>>>>>>> ... >>>>>>>>> >>>>> >>> ).suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())); >>>>> >>>>>>>>> Anything wrong here?? >>>>>>>>> >>>>>>>>> Thanks >>>>>>>>> Thameem >>>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>> >>> >
signature.asc
Description: OpenPGP digital signature