[
https://issues.apache.org/jira/browse/KAFKA-13739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17507261#comment-17507261
]
Matthias J. Sax commented on KAFKA-13739:
-----------------------------------------
Thanks for reporting this. Good catch. I think you analysis makes sense. Do you
want to do a PR to fix it? Seems we have some testing gap for "no grace
period"... :(
> Sliding window without grace not working
> ----------------------------------------
>
> Key: KAFKA-13739
> URL: https://issues.apache.org/jira/browse/KAFKA-13739
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 3.1.0
> Reporter: bounkong khamphousone
> Priority: Major
>
> Hi everyone! I would like to understand why KafkaStreams DSL offer the
> ability to express a SlidingWindow with no grace period but seems that it
> doesn't work. [confluent's
> site|https://docs.confluent.io/platform/current/streams/developer-guide/dsl-api.html#sliding-time-windows]
> state that grace period is required and with the deprecated method, it's
> default to 24 hours.
> Doing a basic sliding window with a count, if I set grace period to 1 ms,
> expected output is done. Based on the sliding window documentation, lower and
> upper bounds are inclusive.
> If I set grace period to 0 ms, I can see that record is not skipped at
> KStreamSlidingWindowAggregate(l.126) but when we try to create the window and
> push the event in KStreamSlidingWindowAggregate#createWindows we call the
> method updateWindowAndForward(l.417). This method (l.468) check that
> {{{}windowEnd > closeTime{}}}.
> closeTime is defined as {{observedStreamTime - window.gracePeriodMs}}
> (Sliding window configuration)
> windowEnd is defined as {{{}inputRecordTimestamp{}}}.
>
> For a first event with a record timestamp, we can assume that
> observedStreamTime is equal to inputRecordTimestamp.
>
> Therefore, closeTime is {{inputRecordTimestamp - 0}} (gracePeriodMS) which
> results to {{{}inputRecordTimestamp{}}}.
> If we go back to the check done in {{updateWindowAndForward}} method, then we
> have inputRecordTimestamp > inputRecordTimestamp which is always false. The
> record is then skipped for record's own window.
> Stating that lower and upper bounds are inclusive, I would have expected the
> event to be pushed in the store and forwarded. Hence, the check would be
> {{{}windowEnd >= closeTime{}}}.
>
> Is it a bug or is it intended ?
> Thanks in advance for your explanations!
> Best regards!
--
This message was sent by Atlassian Jira
(v8.20.1#820001)