ableegoldman commented on a change in pull request #9239: URL: https://github.com/apache/kafka/pull/9239#discussion_r483310806
########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ########## @@ -180,41 +217,225 @@ public void processInOrder(final K key, final V value, final long timestamp) { } //create right window for previous record - if (latestLeftTypeWindow != null) { - final long rightWinStart = latestLeftTypeWindow.end() + 1; - if (!windowStartTimes.contains(rightWinStart)) { - final TimeWindow window = new TimeWindow(rightWinStart, rightWinStart + windows.timeDifferenceMs()); - final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); - putAndForward(window, valueAndTime, key, value, closeTime, timestamp); + if (previousRecordTimestamp != null) { + final long previousRightWinStart = previousRecordTimestamp + 1; + if (rightWindowNecessaryAndPossible(windowStartTimes, previousRightWinStart, timestamp)) { + createPreviousRightWindow(previousRightWinStart, timestamp, key, value, closeTime); } } //create left window for new record if (!leftWinAlreadyCreated) { - final ValueAndTimestamp<Agg> valueAndTime; - //there's a right window that the new record could create --> new record's left window is not empty - if (latestLeftTypeWindow != null) { - valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp); - } else { - valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); + createCurrentRecordLeftWindow(previousRecordTimestamp, timestamp, leftWinAgg, key, value, closeTime); + } + // create right window for new record, if necessary + if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { + createCurrentRecordRightWindow(timestamp, rightWinAgg, key); + } + } + + public void processReverse(final K key, final V value, final long timestamp, final long closeTime) { + + final Set<Long> windowStartTimes = new HashSet<>(); + // aggregate that will go in the current record’s left/right window (if needed) + ValueAndTimestamp<Agg> leftWinAgg = null; + ValueAndTimestamp<Agg> rightWinAgg = null; + + //if current record's left/right windows already exist + boolean leftWinAlreadyCreated = false; + boolean rightWinAlreadyCreated = false; + + Long previousRecordTimestamp = null; + + try ( + final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.backwardFetch( + key, + key, + Math.max(0, timestamp - 2 * windows.timeDifferenceMs()), + // to catch the current record's right window, if it exists, without more calls to the store + timestamp + 1) + ) { + //if we've already seen the window with the closest start time to the record + boolean foundRightWinAgg = false; + + while (iterator.hasNext()) { + final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next = iterator.next(); + windowStartTimes.add(next.key.window().start()); + final long startTime = next.key.window().start(); + final long endTime = startTime + windows.timeDifferenceMs(); + final long windowMaxRecordTimestamp = next.value.timestamp(); + + if (endTime > timestamp) { + if (!foundRightWinAgg) { + foundRightWinAgg = true; + rightWinAgg = next.value; + } + putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); + } else if (endTime == timestamp) { + if (windowMaxRecordTimestamp < timestamp) { + previousRecordTimestamp = windowMaxRecordTimestamp; + } Review comment: Hm, yeah, that makes sense to me. Nice! I guess if we wanted to do something similar for the forward and early case, we would have to store a boolean. Not sure if it's worth it or not, your call ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org