lct45 commented on a change in pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#discussion_r483129715



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -180,41 +217,225 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
             }
 
             //create right window for previous record
-            if (latestLeftTypeWindow != null) {
-                final long rightWinStart = latestLeftTypeWindow.end() + 1;
-                if (!windowStartTimes.contains(rightWinStart)) {
-                    final TimeWindow window = new TimeWindow(rightWinStart, 
rightWinStart + windows.timeDifferenceMs());
-                    final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), timestamp);
-                    putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+            if (previousRecordTimestamp != null) {
+                final long previousRightWinStart = previousRecordTimestamp + 1;
+                if (rightWindowNecessaryAndPossible(windowStartTimes, 
previousRightWinStart, timestamp)) {
+                    createPreviousRightWindow(previousRightWinStart, 
timestamp, key, value, closeTime);
                 }
             }
 
             //create left window for new record
             if (!leftWinAlreadyCreated) {
-                final ValueAndTimestamp<Agg> valueAndTime;
-                //there's a right window that the new record could create --> 
new record's left window is not empty
-                if (latestLeftTypeWindow != null) {
-                    valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
-                } else {
-                    valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
+                createCurrentRecordLeftWindow(previousRecordTimestamp, 
timestamp, leftWinAgg, key, value, closeTime);
+            }
+            // create right window for new record, if necessary
+            if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+            }
+        }
+
+        public void processReverse(final K key, final V value, final long 
timestamp, final long closeTime) {
+
+            final Set<Long> windowStartTimes = new HashSet<>();
+            // aggregate that will go in the current record’s left/right 
window (if needed)
+            ValueAndTimestamp<Agg> leftWinAgg = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+
+            //if current record's left/right windows already exist
+            boolean leftWinAlreadyCreated = false;
+            boolean rightWinAlreadyCreated = false;
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> 
iterator = windowStore.backwardFetch(
+                    key,
+                    key,
+                    Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it 
exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                //if we've already seen the window with the closest start time 
to the record
+                boolean foundRightWinAgg = false;
+
+                while (iterator.hasNext()) {
+                    final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next = 
iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+                    final long endTime = startTime + 
windows.timeDifferenceMs();
+                    final long windowMaxRecordTimestamp = 
next.value.timestamp();
+
+                    if (endTime > timestamp) {
+                        if (!foundRightWinAgg) {
+                            foundRightWinAgg = true;
+                            rightWinAgg = next.value;
+                        }
+                        putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+                    } else if (endTime == timestamp) {
+                        if (windowMaxRecordTimestamp < timestamp) {
+                            previousRecordTimestamp = windowMaxRecordTimestamp;
+                        }

Review comment:
       Do we need a boolean? Or could we just return? If there's a record at 
the current record's timestamp, all we need to do is update the windows it 
falls within, and as we go back in time the earliest window it'll fall within 
is it's left window, so if we find the left window _and_ the left window was 
created by a record at the same timestamp, we can just return after updating 
that window, right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to