[ 
https://issues.apache.org/jira/browse/KAFKA-13739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17507261#comment-17507261
 ] 

Matthias J. Sax commented on KAFKA-13739:
-----------------------------------------

Thanks for reporting this. Good catch. I think you analysis makes sense. Do you 
want to do a PR to fix it? Seems we have some testing gap for "no grace 
period"... :( 

> Sliding window without grace not working
> ----------------------------------------
>
>                 Key: KAFKA-13739
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13739
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 3.1.0
>            Reporter: bounkong khamphousone
>            Priority: Major
>
> Hi everyone! I would like to understand why KafkaStreams DSL offer the 
> ability to express a SlidingWindow with no grace period but seems that it 
> doesn't work. [confluent's 
> site|https://docs.confluent.io/platform/current/streams/developer-guide/dsl-api.html#sliding-time-windows]
>  state that grace period is required and with the deprecated method, it's 
> default to 24 hours.
> Doing a basic sliding window with a count, if I set grace period to 1 ms, 
> expected output is done. Based on the sliding window documentation, lower and 
> upper bounds are inclusive.
> If I set grace period to 0 ms, I can see that record is not skipped at 
> KStreamSlidingWindowAggregate(l.126) but when we try to create the window and 
> push the event in KStreamSlidingWindowAggregate#createWindows we call the 
> method updateWindowAndForward(l.417). This method (l.468) check that 
> {{{}windowEnd > closeTime{}}}.
> closeTime is defined as {{observedStreamTime - window.gracePeriodMs}} 
> (Sliding window configuration)
> windowEnd is defined as {{{}inputRecordTimestamp{}}}.
>  
> For a first event with a record timestamp, we can assume that 
> observedStreamTime is equal to inputRecordTimestamp.
>  
> Therefore, closeTime is {{inputRecordTimestamp - 0}} (gracePeriodMS) which 
> results to {{{}inputRecordTimestamp{}}}.
> If we go back to the check done in {{updateWindowAndForward}} method, then we 
> have inputRecordTimestamp > inputRecordTimestamp which is always false. The 
> record is then skipped for record's own window.
> Stating that lower and upper bounds are inclusive, I would have expected the 
> event to be pushed in the store and forwarded. Hence, the check would be 
> {{{}windowEnd >= closeTime{}}}.
>  
> Is it a bug or is it intended ?
> Thanks in advance for your explanations!
> Best regards!



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to