lct45 commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r478490860
########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ########## @@ -232,40 +239,54 @@ private void processEarly(final K key, final V value, final long timestamp, fina final long startTime = next.key.window().start(); final long endTime = startTime + windows.timeDifferenceMs(); - if (endTime == windows.timeDifferenceMs()) { + if (startTime == 0) { combinedWindow = next; - } else if (endTime > timestamp && startTime <= timestamp) { + } else if (endTime >= timestamp && startTime <= timestamp) { rightWinAgg = next.value; putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); - } else { + } else if (startTime == timestamp + 1) { rightWinAlreadyCreated = true; } } } + // if there wasn't a right window agg found and we need a right window for our new record, + // the current aggregate in the combined window iwll go in the new record's right window + if (rightWinAgg == null && combinedWindow != null) { + rightWinAgg = combinedWindow.value; + } + if (combinedWindow == null) { final TimeWindow window = new TimeWindow(0, windows.timeDifferenceMs()); final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); putAndForward(window, valueAndTime, key, value, closeTime, timestamp); } else { - //create the right window for the most recent max timestamp in the combined window - final long rightWinStart = combinedWindow.value.timestamp() + 1; - if (!windowStartTimes.contains(rightWinStart) && combinedWindow.value.timestamp() < timestamp) { - final TimeWindow window = new TimeWindow(rightWinStart, rightWinStart + windows.timeDifferenceMs()); + //create the right window for the combined window's max record before the current record was added + final long maxRightWindowStart = combinedWindow.value.timestamp() + 1; + //only create the right window if new record falls within it and it does not already exist + if (!windowStartTimes.contains(maxRightWindowStart) && previousRightWindowPossible(maxRightWindowStart, timestamp)) { Review comment: The comment here was unclear, that's my bad. It should read `only create the previous record's right window if the new record falls within it ...` . This part doesn't leverage `rightWinAgg`, since the agg in the previous record's right window will just be the current record's value. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org