lct45 commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r481391993
########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ########## @@ -192,29 +191,113 @@ public void processInOrder(final K key, final V value, final long timestamp) { //create left window for new record if (!leftWinAlreadyCreated) { final ValueAndTimestamp<Agg> valueAndTime; - //there's a right window that the new record could create --> new record's left window is not empty - if (latestLeftTypeWindow != null) { + // if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty + if (previousRecord != null && leftWindowNotEmpty(previousRecord, timestamp)) { valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp); } else { valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); } final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp); putAndForward(window, valueAndTime, key, value, closeTime, timestamp); } - //create right window for new record if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { - final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs()); - final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp)); + createRightWindow(timestamp, rightWinAgg, key, value, closeTime); + } + } + + /** + * Created to handle records that have a timestamp > 0 but < timeDifference. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifference] + * window, and we will update their right windows as new records come in later + */ + private void processEarly(final K key, final V value, final long timestamp, final long closeTime) { + ValueAndTimestamp<Agg> rightWinAgg = null; + //window from [0,timeDifference] that holds all early records + KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow = null; + boolean rightWinAlreadyCreated = false; + final Set<Long> windowStartTimes = new HashSet<>(); + + Long previousRecordTimestamp = null; + + try ( + final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.fetch( + key, + key, + Math.max(0, timestamp - 2 * windows.timeDifferenceMs()), + // to catch the current record's right window, if it exists, without more calls to the store + timestamp + 1) + ) { + KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next; + while (iterator.hasNext()) { + next = iterator.next(); + windowStartTimes.add(next.key.window().start()); + final long startTime = next.key.window().start(); + + if (startTime == 0) { + combinedWindow = next; + if (next.value.timestamp() < timestamp) { + previousRecordTimestamp = next.value.timestamp(); + } + + } else if (startTime <= timestamp) { + rightWinAgg = next.value; + putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); + } else if (startTime == timestamp + 1) { + rightWinAlreadyCreated = true; + } + } + } + + // if there wasn't a right window agg found and we need a right window for our new record, + // the current aggregate in the combined window will go in the new record's right window + if (rightWinAgg == null && combinedWindow != null && combinedWindow.value.timestamp() > timestamp) { + rightWinAgg = combinedWindow.value; + } + + if (combinedWindow == null) { + final TimeWindow window = new TimeWindow(0, windows.timeDifferenceMs()); + final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); putAndForward(window, valueAndTime, key, value, closeTime, timestamp); + + } else { + //create the right window for the previous record if the previous record exists and the window hasn't already been created + if (previousRecordTimestamp != null && !windowStartTimes.contains(previousRecordTimestamp + 1)) { + final TimeWindow window = new TimeWindow(previousRecordTimestamp + 1, previousRecordTimestamp + 1 + windows.timeDifferenceMs()); + final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); + putAndForward(window, valueAndTime, key, value, closeTime, timestamp); + } + //update the combined window with the new aggregate + putAndForward(combinedWindow.key.window(), combinedWindow.value, key, value, closeTime, timestamp); + } + //create right window for new record if needed + if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { + createRightWindow(timestamp, rightWinAgg, key, value, closeTime); } } - private boolean rightWindowIsNotEmpty(final ValueAndTimestamp<Agg> rightWinAgg, final long timestamp) { - return rightWinAgg != null && rightWinAgg.timestamp() > timestamp; + private void createRightWindow(final long timestamp, + final ValueAndTimestamp<Agg> rightWinAgg, + final K key, + final V value, + final long closeTime) { + final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs()); + final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp)); + putAndForward(window, valueAndTime, key, value, closeTime, timestamp); + } + + private boolean leftWindowNotEmpty(final long previousTimestamp, final long currentTimestamp) { + return currentTimestamp - windows.timeDifferenceMs() <= previousTimestamp; } - private boolean isLeftWindow(final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> window) { - return window.key.window().end() == window.value.timestamp(); + // previous record's right window does not already exist and current record falls within previous record's right window + private boolean rightWindowNecessaryAndPossible(final Set<Long> windowStartTimes, + final long previousRightWindowStart, + final long currentRecordTimestamp) { + return !windowStartTimes.contains(previousRightWindowStart) && previousRightWindowStart + windows.timeDifferenceMs() >= currentRecordTimestamp; + } Review comment: Just ran it with updates and we _do_ need to check to make sure that the previous record is close enough for us to make a right window from the current record, so I've updated the `processEarly` `create right window for previous record` to call this method again ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org