Did you try to test you code using `TopologyTestDriver`? Maybe this
helps to figure out the root cause of the issue.

We have many unit/integration tests in place and many people use
suppress() successfully in production. Hence, I am sure, it basically
works -- of course, they might still be an unknown bug...

One side question: the subject of this email thread says "2.4.0", but
Kafka 2.4.0 is not release yet. Hence, I am wondering what version you
are actually using?



-Matthias

On 9/25/19 4:53 PM, Thameem Ansari wrote:
> Tried your suggestions and unable to get suppress emit anything. I can see 
> the SUPPRESS_STORES are created in Kafka nodes but nothing get outputted. 
> Looks like the grace period and window closing is not honored for some 
> reason. I can see lot of people having difficulty of getting suppress 
> working. 
> My window time is one minute and I tried with and without grace period. I can 
> see the event time is in the past as I am feeding the test data but even if I 
> post the data with progressive event times in an order nothing happens. 
> Any help is appreciated. 
> 
> Thanks. 
> 
>> On Sep 11, 2019, at 10:50 PM, Alessandro Tagliapietra 
>> <tagliapietra.alessan...@gmail.com> wrote:
>>
>> Have you tried deleting the suppress changelog topic to see if you get
>> something after deleting it?
>>
>> By per topic and not per key I mean that if you send for example an event
>> with timestamp equal to today's date with key 1 and that closes today's
>> window and data in the past with key 2 won't go through
>>
>> On Wed, Sep 11, 2019, 8:45 PM Thameem Ansari <thame...@gmail.com> wrote:
>>
>>> I tried with different timestamps in the near past but nothing coming out.
>>> I went thru the article from Confluent about using the suppress but I don’t
>>> see many people are successful with that.
>>>
>>> What do you mean by “timestamp is per topic and not per key”. Can you
>>> please elaborate?
>>>
>>>
>>>
>>>
>>>> On Sep 11, 2019, at 10:13 PM, Alessandro Tagliapietra <
>>> tagliapietra.alessan...@gmail.com> wrote:
>>>>
>>>> Did you ever push any data with a greater timestamp than the current one
>>>> you're producing?
>>>> One thing took me a while to find out is that the suppress timestamp is
>>> per
>>>> topic and not per key
>>>>
>>>> --
>>>> Alessandro Tagliapietra
>>>>
>>>>
>>>> On Wed, Sep 11, 2019 at 8:06 PM Thameem Ansari <thame...@gmail.com>
>>> wrote:
>>>>
>>>>> Yes I am able to see the output when I remove suppress.
>>>>>
>>>>>
>>>>>> On Sep 11, 2019, at 9:58 PM, Matthias J. Sax <matth...@confluent.io>
>>>>> wrote:
>>>>>>
>>>>>> Hard to say. Do you see output if you remove `suppress()` from your
>>>>>> topology?
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>>
>>>>>> On 9/11/19 6:19 PM, Thameem Ansari wrote:
>>>>>>> I am using a producer simulator to simulate the events in the past and
>>>>> I can see my time advances and the topology is based on the event time.
>>> But
>>>>> even if I run the producer for few hours nothing get emitted. Is there
>>>>> anyway to debug this issue?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>> On Sep 11, 2019, at 6:13 PM, Matthias J. Sax <matth...@confluent.io>
>>>>> wrote:
>>>>>>>>
>>>>>>>> Note that `suppress()` is event time based, and does not emit any
>>> data
>>>>>>>> if event time does not advance.
>>>>>>>>
>>>>>>>> A common miss understanding is, that people stop to send data and
>>>>> expect
>>>>>>>> to see a result after some time, but that is not how it works. If you
>>>>>>>> stop sending data, event time cannot advance and thus emit will never
>>>>>>>> send anything downstream.
>>>>>>>>
>>>>>>>> Also see this blog post about `suppress`:
>>>>>>>>
>>>>>
>>> https://www.confluent.io/blog/kafka-streams-take-on-watermarks-and-triggers
>>>>>>>>
>>>>>>>>
>>>>>>>> -Matthias
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On 9/10/19 9:52 PM, Thameem Ansari wrote:
>>>>>>>>> In my streaming topology, I am using the suppress dsl operator. As
>>>>> per the documentation, it is supposed to output the final results after
>>> the
>>>>> window closes. But I noticed it's not emitting anything at all. Here is
>>> the
>>>>> pseudo code of my topology.
>>>>>>>>>
>>>>>>>>> .filter((key, value) -> ...)
>>>>>>>>> .flatMap((key, value) -> {
>>>>>>>>> ...
>>>>>>>>> })
>>>>>>>>> .groupByKey(Grouped.with(Serdes.String(), ...))
>>>>>>>>>
>>>>>
>>> .windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(Duration.ofMinutes(1)))
>>>>>>>>> .aggregate(
>>>>>>>>>   ...
>>>>>>>>>
>>>>>
>>> ).suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()));
>>>>>
>>>>>>>>> Anything wrong here??
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>> Thameem
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>
>>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to