lct45 commented on a change in pull request #9239: URL: https://github.com/apache/kafka/pull/9239#discussion_r483129715
########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ########## @@ -180,41 +217,225 @@ public void processInOrder(final K key, final V value, final long timestamp) { } //create right window for previous record - if (latestLeftTypeWindow != null) { - final long rightWinStart = latestLeftTypeWindow.end() + 1; - if (!windowStartTimes.contains(rightWinStart)) { - final TimeWindow window = new TimeWindow(rightWinStart, rightWinStart + windows.timeDifferenceMs()); - final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); - putAndForward(window, valueAndTime, key, value, closeTime, timestamp); + if (previousRecordTimestamp != null) { + final long previousRightWinStart = previousRecordTimestamp + 1; + if (rightWindowNecessaryAndPossible(windowStartTimes, previousRightWinStart, timestamp)) { + createPreviousRightWindow(previousRightWinStart, timestamp, key, value, closeTime); } } //create left window for new record if (!leftWinAlreadyCreated) { - final ValueAndTimestamp<Agg> valueAndTime; - //there's a right window that the new record could create --> new record's left window is not empty - if (latestLeftTypeWindow != null) { - valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp); - } else { - valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); + createCurrentRecordLeftWindow(previousRecordTimestamp, timestamp, leftWinAgg, key, value, closeTime); + } + // create right window for new record, if necessary + if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { + createCurrentRecordRightWindow(timestamp, rightWinAgg, key); + } + } + + public void processReverse(final K key, final V value, final long timestamp, final long closeTime) { + + final Set<Long> windowStartTimes = new HashSet<>(); + // aggregate that will go in the current record’s left/right window (if needed) + ValueAndTimestamp<Agg> leftWinAgg = null; + ValueAndTimestamp<Agg> rightWinAgg = null; + + //if current record's left/right windows already exist + boolean leftWinAlreadyCreated = false; + boolean rightWinAlreadyCreated = false; + + Long previousRecordTimestamp = null; + + try ( + final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.backwardFetch( + key, + key, + Math.max(0, timestamp - 2 * windows.timeDifferenceMs()), + // to catch the current record's right window, if it exists, without more calls to the store + timestamp + 1) + ) { + //if we've already seen the window with the closest start time to the record + boolean foundRightWinAgg = false; + + while (iterator.hasNext()) { + final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next = iterator.next(); + windowStartTimes.add(next.key.window().start()); + final long startTime = next.key.window().start(); + final long endTime = startTime + windows.timeDifferenceMs(); + final long windowMaxRecordTimestamp = next.value.timestamp(); + + if (endTime > timestamp) { + if (!foundRightWinAgg) { + foundRightWinAgg = true; + rightWinAgg = next.value; + } + putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); + } else if (endTime == timestamp) { + if (windowMaxRecordTimestamp < timestamp) { + previousRecordTimestamp = windowMaxRecordTimestamp; + } Review comment: Do we need a boolean? Or could we just return? If there's a record at the current record's timestamp, all we need to do is update the windows it falls within, and as we go back in time the earliest window it'll fall within is it's left window, so if we find the left window _and_ the left window was created by a record at the same timestamp, we can just return after updating that window, right? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org