ableegoldman commented on a change in pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#discussion_r483310806



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -180,41 +217,225 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
             }
 
             //create right window for previous record
-            if (latestLeftTypeWindow != null) {
-                final long rightWinStart = latestLeftTypeWindow.end() + 1;
-                if (!windowStartTimes.contains(rightWinStart)) {
-                    final TimeWindow window = new TimeWindow(rightWinStart, 
rightWinStart + windows.timeDifferenceMs());
-                    final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), timestamp);
-                    putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+            if (previousRecordTimestamp != null) {
+                final long previousRightWinStart = previousRecordTimestamp + 1;
+                if (rightWindowNecessaryAndPossible(windowStartTimes, 
previousRightWinStart, timestamp)) {
+                    createPreviousRightWindow(previousRightWinStart, 
timestamp, key, value, closeTime);
                 }
             }
 
             //create left window for new record
             if (!leftWinAlreadyCreated) {
-                final ValueAndTimestamp<Agg> valueAndTime;
-                //there's a right window that the new record could create --> 
new record's left window is not empty
-                if (latestLeftTypeWindow != null) {
-                    valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
-                } else {
-                    valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
+                createCurrentRecordLeftWindow(previousRecordTimestamp, 
timestamp, leftWinAgg, key, value, closeTime);
+            }
+            // create right window for new record, if necessary
+            if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+            }
+        }
+
+        public void processReverse(final K key, final V value, final long 
timestamp, final long closeTime) {
+
+            final Set<Long> windowStartTimes = new HashSet<>();
+            // aggregate that will go in the current record’s left/right 
window (if needed)
+            ValueAndTimestamp<Agg> leftWinAgg = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+
+            //if current record's left/right windows already exist
+            boolean leftWinAlreadyCreated = false;
+            boolean rightWinAlreadyCreated = false;
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> 
iterator = windowStore.backwardFetch(
+                    key,
+                    key,
+                    Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it 
exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                //if we've already seen the window with the closest start time 
to the record
+                boolean foundRightWinAgg = false;
+
+                while (iterator.hasNext()) {
+                    final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next = 
iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+                    final long endTime = startTime + 
windows.timeDifferenceMs();
+                    final long windowMaxRecordTimestamp = 
next.value.timestamp();
+
+                    if (endTime > timestamp) {
+                        if (!foundRightWinAgg) {
+                            foundRightWinAgg = true;
+                            rightWinAgg = next.value;
+                        }
+                        putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+                    } else if (endTime == timestamp) {
+                        if (windowMaxRecordTimestamp < timestamp) {
+                            previousRecordTimestamp = windowMaxRecordTimestamp;
+                        }

Review comment:
       Hm, yeah, that makes sense to me. Nice! I guess if we wanted to do 
something similar for the forward and early case, we would have to store a 
boolean. Not sure if it's worth it or not, your call




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to