John Roesler created KAFKA-8582:
-----------------------------------

             Summary: Consider adding an ExpiredWindowRecordHandler to Suppress
                 Key: KAFKA-8582
                 URL: https://issues.apache.org/jira/browse/KAFKA-8582
             Project: Kafka
          Issue Type: Improvement
            Reporter: John Roesler


I got some feedback on Suppress:
{quote}Specifying how to handle events outside the grace period does seem like 
a business concern, and simply discarding them thus seems risky (for example 
imagine any situation where money is involved).

This sort of situation is addressed by the late-triggering approach associated 
with watermarks 
(https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102), given 
this I wondered if you were considering adding anything similar?{quote}

It seems like, if a record has arrived past the grace period for its window, 
then the state of the windowed aggregation would already have been lost, so if 
we were to compute an aggregation result, it would be incorrect. Plus, since 
the window is already expired, we can't store the new (incorrect, but more 
importantly expired) aggregation result either, so any subsequent super-late 
records would also face the same blank-slate. I think this would wind up 
looking like this: if you have three timely records for a window, and then 
three more that arrive after the grace period, and you were doing a count 
aggregation, you'd see the counts emitted for the window as [1, 2, 3, 1, 1, 1]. 
I guess we could add a flag to the post-expiration results to indicate that 
they're broken, but this seems like the wrong approach. The post-expiration 
aggregation _results_ are meaningless, but I could see wanting to send the 
past-expiration _input records_ to a dead-letter queue or something instead of 
dropping them.

Along this line of thinking, I wonder if we should add an optional 
past-expiration record handler interface to the suppression operator. Then, you 
could define your own logic, whether it's a dead-letter queue, sending it to 
some alerting pipeline, or even just crashing the application before it can do 
something wrong. This would be a similar pattern to how we allow custom logic 
to handle deserialization errors by supplying a 
org.apache.kafka.streams.errors.DeserializationExceptionHandler.




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to