ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r478766196
########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ########## @@ -232,40 +239,54 @@ private void processEarly(final K key, final V value, final long timestamp, fina final long startTime = next.key.window().start(); final long endTime = startTime + windows.timeDifferenceMs(); - if (endTime == windows.timeDifferenceMs()) { + if (startTime == 0) { combinedWindow = next; - } else if (endTime > timestamp && startTime <= timestamp) { + } else if (endTime >= timestamp && startTime <= timestamp) { rightWinAgg = next.value; putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); - } else { + } else if (startTime == timestamp + 1) { rightWinAlreadyCreated = true; } } } + // if there wasn't a right window agg found and we need a right window for our new record, + // the current aggregate in the combined window iwll go in the new record's right window Review comment: Couldn't there still be a record to the left? Like we could have a record at 5 and at 50 and nothing else, then all we would have so far is the combined window and one at [40, 50], but `rightWindowAgg` would be null. So that's why we need to check that the `combinedWindow.maxTimestamp > timestamp` (and if not then we should leave `rightWinAgg` as null). Looks like this is what you're doing, so not suggesting any changes, just trying to make sure I have this right. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org