[ https://issues.apache.org/jira/browse/KAFKA-19536?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
congye updated KAFKA-19536: --------------------------- Description: Actual: in KStreamKStreamJoin.java, there's code snippet: {code:java} // This condition below allows us to process the out-of-order records without the need // to hold it in the temporary outer store if (outerJoinStore.isEmpty() || timeTo < sharedTimeTracker.streamTime) { context().forward(record.withValue(joiner.apply(record.key(), record.value(), null))); } {code} So for out-of-order records, when the window closed, it may forward the leftJoin result without waiting for extra gracePeriod. Expect: change the condition to: timeTo {color:#de350b}+ joinGraceMs{color} < sharedTimeTracker.streamTime {code:java} if (outerJoinStore.isEmpty() || timeTo + joinGraceMs < sharedTimeTracker.streamTime) { context().forward(record.withValue(joiner.apply(record.key(), record.value(), null))); } {code} was: Actual: in KStreamKStreamJoin.java, there's code snippet: {code:java} // This condition below allows us to process the out-of-order records without the need // to hold it in the temporary outer store if (outerJoinStore.isEmpty() || timeTo < sharedTimeTracker.streamTime) { context().forward(record.withValue(joiner.apply(record.key(), record.value(), null))); } {code} So for out-of-order records, when the window closed, it may forward the leftJoin result without waiting for extra gracePeriod. Expect: {code:java} if (outerJoinStore.isEmpty() || timeTo + joinGraceMs < sharedTimeTracker.streamTime) { context().forward(record.withValue(joiner.apply(record.key(), record.value(), null))); } {code} > KStream-KStream leftJoin didn't consider gracePeriod for out-of-order records > ----------------------------------------------------------------------------- > > Key: KAFKA-19536 > URL: https://issues.apache.org/jira/browse/KAFKA-19536 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 3.9.2, 4.0.0 > Reporter: congye > Priority: Major > > Actual: in KStreamKStreamJoin.java, there's code snippet: > {code:java} > // This condition below allows us to process the out-of-order records without > the need > // to hold it in the temporary outer store > if (outerJoinStore.isEmpty() || timeTo < sharedTimeTracker.streamTime) { > context().forward(record.withValue(joiner.apply(record.key(), > record.value(), null))); > } {code} > So for out-of-order records, when the window closed, it may forward the > leftJoin result without waiting for extra gracePeriod. > > Expect: change the condition to: timeTo {color:#de350b}+ joinGraceMs{color} < > sharedTimeTracker.streamTime > {code:java} > if (outerJoinStore.isEmpty() || timeTo + joinGraceMs < > sharedTimeTracker.streamTime) { > context().forward(record.withValue(joiner.apply(record.key(), record.value(), > null))); > } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)