lct45 commented on a change in pull request #9239: URL: https://github.com/apache/kafka/pull/9239#discussion_r483147852
########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ########## @@ -180,41 +217,225 @@ public void processInOrder(final K key, final V value, final long timestamp) { } //create right window for previous record - if (latestLeftTypeWindow != null) { - final long rightWinStart = latestLeftTypeWindow.end() + 1; - if (!windowStartTimes.contains(rightWinStart)) { - final TimeWindow window = new TimeWindow(rightWinStart, rightWinStart + windows.timeDifferenceMs()); - final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); - putAndForward(window, valueAndTime, key, value, closeTime, timestamp); + if (previousRecordTimestamp != null) { + final long previousRightWinStart = previousRecordTimestamp + 1; + if (rightWindowNecessaryAndPossible(windowStartTimes, previousRightWinStart, timestamp)) { + createPreviousRightWindow(previousRightWinStart, timestamp, key, value, closeTime); } } //create left window for new record if (!leftWinAlreadyCreated) { - final ValueAndTimestamp<Agg> valueAndTime; - //there's a right window that the new record could create --> new record's left window is not empty - if (latestLeftTypeWindow != null) { - valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp); - } else { - valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); + createCurrentRecordLeftWindow(previousRecordTimestamp, timestamp, leftWinAgg, key, value, closeTime); + } + // create right window for new record, if necessary + if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { + createCurrentRecordRightWindow(timestamp, rightWinAgg, key); + } + } + + public void processReverse(final K key, final V value, final long timestamp, final long closeTime) { + + final Set<Long> windowStartTimes = new HashSet<>(); + // aggregate that will go in the current record’s left/right window (if needed) + ValueAndTimestamp<Agg> leftWinAgg = null; + ValueAndTimestamp<Agg> rightWinAgg = null; + + //if current record's left/right windows already exist + boolean leftWinAlreadyCreated = false; + boolean rightWinAlreadyCreated = false; + + Long previousRecordTimestamp = null; + + try ( + final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.backwardFetch( + key, + key, + Math.max(0, timestamp - 2 * windows.timeDifferenceMs()), + // to catch the current record's right window, if it exists, without more calls to the store + timestamp + 1) + ) { + //if we've already seen the window with the closest start time to the record + boolean foundRightWinAgg = false; + + while (iterator.hasNext()) { + final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next = iterator.next(); + windowStartTimes.add(next.key.window().start()); + final long startTime = next.key.window().start(); + final long endTime = startTime + windows.timeDifferenceMs(); + final long windowMaxRecordTimestamp = next.value.timestamp(); + + if (endTime > timestamp) { + if (!foundRightWinAgg) { + foundRightWinAgg = true; + rightWinAgg = next.value; + } + putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); + } else if (endTime == timestamp) { + if (windowMaxRecordTimestamp < timestamp) { + previousRecordTimestamp = windowMaxRecordTimestamp; + } + putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); + leftWinAlreadyCreated = true; + } else if (endTime < timestamp) { + leftWinAgg = next.value; + previousRecordTimestamp = windowMaxRecordTimestamp; + break; + } else { + //determine if current record's right window exists, will only be true at most once, on the first pass + rightWinAlreadyCreated = true; + } + } + } + + //create right window for previous record Review comment: Okay I tried to reduce duplicate code, it led to some methods with long variable lists because of not being able to pass by reference. Maybe I'm missing some way to make it cleaner so let me know if it can be improved ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org