lct45 commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r482078343
########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ########## @@ -254,35 +257,43 @@ private void processEarly(final K key, final V value, final long timestamp, fina rightWinAgg = combinedWindow.value; } + //create the right window for the previous record if the previous record exists and the window hasn't already been created + if (previousRecordTimestamp != null && !windowStartTimes.contains(previousRecordTimestamp + 1)) { + final TimeWindow window = new TimeWindow(previousRecordTimestamp + 1, previousRecordTimestamp + 1 + windows.timeDifferenceMs()); + final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); + putAndForward(window, valueAndTime, key, value, closeTime, timestamp); + } + if (combinedWindow == null) { final TimeWindow window = new TimeWindow(0, windows.timeDifferenceMs()); final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); putAndForward(window, valueAndTime, key, value, closeTime, timestamp); } else { - //create the right window for the previous record if the previous record exists and the window hasn't already been created - if (previousRecordTimestamp != null && !windowStartTimes.contains(previousRecordTimestamp + 1)) { - final TimeWindow window = new TimeWindow(previousRecordTimestamp + 1, previousRecordTimestamp + 1 + windows.timeDifferenceMs()); - final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); - putAndForward(window, valueAndTime, key, value, closeTime, timestamp); - } //update the combined window with the new aggregate putAndForward(combinedWindow.key.window(), combinedWindow.value, key, value, closeTime, timestamp); } //create right window for new record if needed if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { - createRightWindow(timestamp, rightWinAgg, key, value, closeTime); + createCurrentRecordRightWindow(timestamp, rightWinAgg, key); } } - private void createRightWindow(final long timestamp, - final ValueAndTimestamp<Agg> rightWinAgg, - final K key, - final V value, - final long closeTime) { + private void createCurrentRecordRightWindow(final long timestamp, + final ValueAndTimestamp<Agg> rightWinAgg, + final K key) { final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs()); - final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp)); - putAndForward(window, valueAndTime, key, value, closeTime, timestamp); + final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), rightWinAgg.timestamp()); Review comment: Yeah good point, we check for null before calling it ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org