ableegoldman commented on a change in pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#discussion_r482435367



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -180,41 +217,225 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
             }
 
             //create right window for previous record
-            if (latestLeftTypeWindow != null) {
-                final long rightWinStart = latestLeftTypeWindow.end() + 1;
-                if (!windowStartTimes.contains(rightWinStart)) {
-                    final TimeWindow window = new TimeWindow(rightWinStart, 
rightWinStart + windows.timeDifferenceMs());
-                    final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), timestamp);
-                    putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+            if (previousRecordTimestamp != null) {
+                final long previousRightWinStart = previousRecordTimestamp + 1;
+                if (rightWindowNecessaryAndPossible(windowStartTimes, 
previousRightWinStart, timestamp)) {
+                    createPreviousRightWindow(previousRightWinStart, 
timestamp, key, value, closeTime);
                 }
             }
 
             //create left window for new record
             if (!leftWinAlreadyCreated) {
-                final ValueAndTimestamp<Agg> valueAndTime;
-                //there's a right window that the new record could create --> 
new record's left window is not empty
-                if (latestLeftTypeWindow != null) {
-                    valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
-                } else {
-                    valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
+                createCurrentRecordLeftWindow(previousRecordTimestamp, 
timestamp, leftWinAgg, key, value, closeTime);
+            }
+            // create right window for new record, if necessary
+            if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+            }
+        }
+
+        public void processReverse(final K key, final V value, final long 
timestamp, final long closeTime) {
+
+            final Set<Long> windowStartTimes = new HashSet<>();
+            // aggregate that will go in the current record’s left/right 
window (if needed)
+            ValueAndTimestamp<Agg> leftWinAgg = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+
+            //if current record's left/right windows already exist
+            boolean leftWinAlreadyCreated = false;
+            boolean rightWinAlreadyCreated = false;
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> 
iterator = windowStore.backwardFetch(
+                    key,
+                    key,
+                    Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it 
exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                //if we've already seen the window with the closest start time 
to the record
+                boolean foundRightWinAgg = false;
+
+                while (iterator.hasNext()) {
+                    final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next = 
iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+                    final long endTime = startTime + 
windows.timeDifferenceMs();
+                    final long windowMaxRecordTimestamp = 
next.value.timestamp();
+
+                    if (endTime > timestamp) {
+                        if (!foundRightWinAgg) {

Review comment:
       Instead of the extra `foundRightWinAgg`  boolean, can we just check if 
`rightWinAgg` is still equal to `null`?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -180,41 +217,225 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
             }
 
             //create right window for previous record
-            if (latestLeftTypeWindow != null) {
-                final long rightWinStart = latestLeftTypeWindow.end() + 1;
-                if (!windowStartTimes.contains(rightWinStart)) {
-                    final TimeWindow window = new TimeWindow(rightWinStart, 
rightWinStart + windows.timeDifferenceMs());
-                    final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), timestamp);
-                    putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+            if (previousRecordTimestamp != null) {
+                final long previousRightWinStart = previousRecordTimestamp + 1;
+                if (rightWindowNecessaryAndPossible(windowStartTimes, 
previousRightWinStart, timestamp)) {
+                    createPreviousRightWindow(previousRightWinStart, 
timestamp, key, value, closeTime);
                 }
             }
 
             //create left window for new record
             if (!leftWinAlreadyCreated) {
-                final ValueAndTimestamp<Agg> valueAndTime;
-                //there's a right window that the new record could create --> 
new record's left window is not empty
-                if (latestLeftTypeWindow != null) {
-                    valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
-                } else {
-                    valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
+                createCurrentRecordLeftWindow(previousRecordTimestamp, 
timestamp, leftWinAgg, key, value, closeTime);
+            }
+            // create right window for new record, if necessary
+            if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+            }
+        }
+
+        public void processReverse(final K key, final V value, final long 
timestamp, final long closeTime) {
+
+            final Set<Long> windowStartTimes = new HashSet<>();
+            // aggregate that will go in the current record’s left/right 
window (if needed)
+            ValueAndTimestamp<Agg> leftWinAgg = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+
+            //if current record's left/right windows already exist
+            boolean leftWinAlreadyCreated = false;
+            boolean rightWinAlreadyCreated = false;
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> 
iterator = windowStore.backwardFetch(
+                    key,
+                    key,
+                    Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it 
exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                //if we've already seen the window with the closest start time 
to the record
+                boolean foundRightWinAgg = false;
+
+                while (iterator.hasNext()) {
+                    final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next = 
iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+                    final long endTime = startTime + 
windows.timeDifferenceMs();
+                    final long windowMaxRecordTimestamp = 
next.value.timestamp();
+
+                    if (endTime > timestamp) {
+                        if (!foundRightWinAgg) {
+                            foundRightWinAgg = true;
+                            rightWinAgg = next.value;
+                        }
+                        putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+                    } else if (endTime == timestamp) {
+                        if (windowMaxRecordTimestamp < timestamp) {
+                            previousRecordTimestamp = windowMaxRecordTimestamp;
+                        }
+                        putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+                        leftWinAlreadyCreated = true;
+                    } else if (endTime < timestamp) {
+                        leftWinAgg = next.value;
+                        previousRecordTimestamp = windowMaxRecordTimestamp;
+                        break;
+                    } else {
+                        //determine if current record's right window exists, 
will only be true at most once, on the first pass
+                        rightWinAlreadyCreated = true;

Review comment:
       Instead of asserting that this will be true at most once in the comment, 
we should do so in the code by checking `else if startTime == timestamp + 1` 
instead of just falling back to `else`. Tbh we should probably do the same for 
the `processInOrder` case and not make any assumptions (you can add an `else` 
case that throws `IllegalStateException` then, since every possible case should 
be covered by one of the above conditions)

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -180,41 +217,225 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
             }
 
             //create right window for previous record
-            if (latestLeftTypeWindow != null) {
-                final long rightWinStart = latestLeftTypeWindow.end() + 1;
-                if (!windowStartTimes.contains(rightWinStart)) {
-                    final TimeWindow window = new TimeWindow(rightWinStart, 
rightWinStart + windows.timeDifferenceMs());
-                    final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), timestamp);
-                    putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+            if (previousRecordTimestamp != null) {
+                final long previousRightWinStart = previousRecordTimestamp + 1;
+                if (rightWindowNecessaryAndPossible(windowStartTimes, 
previousRightWinStart, timestamp)) {
+                    createPreviousRightWindow(previousRightWinStart, 
timestamp, key, value, closeTime);
                 }
             }
 
             //create left window for new record
             if (!leftWinAlreadyCreated) {
-                final ValueAndTimestamp<Agg> valueAndTime;
-                //there's a right window that the new record could create --> 
new record's left window is not empty
-                if (latestLeftTypeWindow != null) {
-                    valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
-                } else {
-                    valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
+                createCurrentRecordLeftWindow(previousRecordTimestamp, 
timestamp, leftWinAgg, key, value, closeTime);
+            }
+            // create right window for new record, if necessary
+            if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+            }
+        }
+
+        public void processReverse(final K key, final V value, final long 
timestamp, final long closeTime) {
+
+            final Set<Long> windowStartTimes = new HashSet<>();
+            // aggregate that will go in the current record’s left/right 
window (if needed)
+            ValueAndTimestamp<Agg> leftWinAgg = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+
+            //if current record's left/right windows already exist
+            boolean leftWinAlreadyCreated = false;
+            boolean rightWinAlreadyCreated = false;
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> 
iterator = windowStore.backwardFetch(
+                    key,
+                    key,
+                    Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it 
exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                //if we've already seen the window with the closest start time 
to the record
+                boolean foundRightWinAgg = false;
+
+                while (iterator.hasNext()) {
+                    final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next = 
iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+                    final long endTime = startTime + 
windows.timeDifferenceMs();
+                    final long windowMaxRecordTimestamp = 
next.value.timestamp();
+
+                    if (endTime > timestamp) {
+                        if (!foundRightWinAgg) {
+                            foundRightWinAgg = true;
+                            rightWinAgg = next.value;
+                        }
+                        putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+                    } else if (endTime == timestamp) {
+                        if (windowMaxRecordTimestamp < timestamp) {
+                            previousRecordTimestamp = windowMaxRecordTimestamp;
+                        }
+                        putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+                        leftWinAlreadyCreated = true;
+                    } else if (endTime < timestamp) {
+                        leftWinAgg = next.value;
+                        previousRecordTimestamp = windowMaxRecordTimestamp;
+                        break;
+                    } else {
+                        //determine if current record's right window exists, 
will only be true at most once, on the first pass
+                        rightWinAlreadyCreated = true;
+                    }
+                }
+            }
+
+            //create right window for previous record

Review comment:
       Is everything after this point the same for both `processInOrder` and 
`processReverse`? The only difference between the two is in the iterator loop, 
right? If so, we should try to reduce duplicate code and only invoke a 
difference `in-order` vs `reverse` method for the loop 

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -608,10 +615,13 @@ public void testAggregateRandomInput() {
 
     private void verifyRandomTestResults(final Map<Long, 
ValueAndTimestamp<String>> actual) {

Review comment:
       I think it would be valuable to have all the tests run with both the 
forward and reverse iterators. You can actually parametrize the test class 
itself so that it runs multiple times with different input: the syntax is kind 
of hard to explain (and understand) but you can look at 
EosBetaUpgradeIntegrationTest as an example. It's parametrized by a 
`injectFailure` boolean -- you can do the same thing with a `forwardIteration` 
boolean.
   Then you could force it to run in the forward direction by providing a 
custom `WindowBytesStoreSupplier` that supplies a custom `WindowStore` 
implementation where the appropriate `fetch` method throws 
UnsupportedOperationException. You should be able to just extend one of the 
existing built-in stores (eg RocksDBWindowStore or InMemoryWindowStore) that 
just overrides `fetch`. Let me know if you have any questions about how all 
this would work

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -180,41 +217,225 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
             }
 
             //create right window for previous record
-            if (latestLeftTypeWindow != null) {
-                final long rightWinStart = latestLeftTypeWindow.end() + 1;
-                if (!windowStartTimes.contains(rightWinStart)) {
-                    final TimeWindow window = new TimeWindow(rightWinStart, 
rightWinStart + windows.timeDifferenceMs());
-                    final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), timestamp);
-                    putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+            if (previousRecordTimestamp != null) {
+                final long previousRightWinStart = previousRecordTimestamp + 1;
+                if (rightWindowNecessaryAndPossible(windowStartTimes, 
previousRightWinStart, timestamp)) {
+                    createPreviousRightWindow(previousRightWinStart, 
timestamp, key, value, closeTime);
                 }
             }
 
             //create left window for new record
             if (!leftWinAlreadyCreated) {
-                final ValueAndTimestamp<Agg> valueAndTime;
-                //there's a right window that the new record could create --> 
new record's left window is not empty
-                if (latestLeftTypeWindow != null) {
-                    valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
-                } else {
-                    valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
+                createCurrentRecordLeftWindow(previousRecordTimestamp, 
timestamp, leftWinAgg, key, value, closeTime);
+            }
+            // create right window for new record, if necessary
+            if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+            }
+        }
+
+        public void processReverse(final K key, final V value, final long 
timestamp, final long closeTime) {
+
+            final Set<Long> windowStartTimes = new HashSet<>();
+            // aggregate that will go in the current record’s left/right 
window (if needed)
+            ValueAndTimestamp<Agg> leftWinAgg = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+
+            //if current record's left/right windows already exist
+            boolean leftWinAlreadyCreated = false;
+            boolean rightWinAlreadyCreated = false;
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> 
iterator = windowStore.backwardFetch(
+                    key,
+                    key,
+                    Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it 
exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                //if we've already seen the window with the closest start time 
to the record
+                boolean foundRightWinAgg = false;
+
+                while (iterator.hasNext()) {
+                    final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next = 
iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+                    final long endTime = startTime + 
windows.timeDifferenceMs();
+                    final long windowMaxRecordTimestamp = 
next.value.timestamp();
+
+                    if (endTime > timestamp) {
+                        if (!foundRightWinAgg) {
+                            foundRightWinAgg = true;
+                            rightWinAgg = next.value;
+                        }
+                        putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+                    } else if (endTime == timestamp) {
+                        if (windowMaxRecordTimestamp < timestamp) {
+                            previousRecordTimestamp = windowMaxRecordTimestamp;
+                        }
+                        putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+                        leftWinAlreadyCreated = true;
+                    } else if (endTime < timestamp) {
+                        leftWinAgg = next.value;
+                        previousRecordTimestamp = windowMaxRecordTimestamp;
+                        break;
+                    } else {
+                        //determine if current record's right window exists, 
will only be true at most once, on the first pass
+                        rightWinAlreadyCreated = true;
+                    }
+                }
+            }
+
+            //create right window for previous record
+            if (previousRecordTimestamp != null) {
+                final long previousRightWinStart = previousRecordTimestamp + 1;
+                if (rightWindowNecessaryAndPossible(windowStartTimes, 
previousRightWinStart, timestamp)) {
+                    createPreviousRightWindow(previousRightWinStart, 
timestamp, key, value, closeTime);
+                }
+            }
+
+            //create the left window of the current record if it's not created
+            if (!leftWinAlreadyCreated) {
+                createCurrentRecordLeftWindow(previousRecordTimestamp, 
timestamp, leftWinAgg, key, value, closeTime);
+            }
+            //create the right window for the current record, if need be
+            if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+            }
+        }
+
+        /**
+         * Created to handle records that have a timestamp > 0 but < 
timeDifference. These records would create
+         * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifference]
+         * window, and we will update their right windows as new records come 
in later
+         */
+        private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+            //window from [0,timeDifference] that holds all early records
+            KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow = 
null;
+            boolean rightWinAlreadyCreated = false;
+            final Set<Long> windowStartTimes = new HashSet<>();
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> 
iterator = windowStore.fetch(
+                    key,
+                    key,
+                    Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it 
exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next;
+                while (iterator.hasNext()) {
+                    next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+                    final long windowMaxRecordTimestamp = 
next.value.timestamp();
+
+                    if (startTime == 0) {
+                        combinedWindow = next;
+                        if (windowMaxRecordTimestamp < timestamp) {
+                            // If maxRecordTimestamp > timestamp, the current 
record is out-of-order, meaning that the
+                            // previous record's right window would have been 
created already by other records. This
+                            // will always be true for early records, as they 
all fall within [0, timeDifferenceMs].
+                            previousRecordTimestamp = windowMaxRecordTimestamp;
+                        }
+
+                    } else if (startTime <= timestamp) {
+                        rightWinAgg = next.value;
+                        putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+                    } else if (startTime == timestamp + 1) {
+                        rightWinAlreadyCreated = true;
+                    }
                 }
-                final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
-                putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
             }
-            //create right window for new record
+
+            // if there wasn't a right window agg found and we need a right 
window for our new record,
+            // the current aggregate in the combined window will go in the new 
record's right window
+            if (rightWinAgg == null && combinedWindow != null && 
combinedWindow.value.timestamp() > timestamp) {
+                rightWinAgg = combinedWindow.value;
+            }
+
+            //create right window for new record if needed
             if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-                final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-                final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+            }
+
+            //create the right window for the previous record if the previous 
record exists and the window hasn't already been created
+            if (previousRecordTimestamp != null && 
!windowStartTimes.contains(previousRecordTimestamp + 1)) {
+                createPreviousRightWindow(previousRecordTimestamp + 1, 
timestamp, key, value, closeTime);
+            }
+
+            if (combinedWindow == null) {
+                final TimeWindow window = new TimeWindow(0, 
windows.timeDifferenceMs());
+                final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), timestamp);
                 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+
+            } else {
+                //update the combined window with the new aggregate
+                putAndForward(combinedWindow.key.window(), 
combinedWindow.value, key, value, closeTime, timestamp);
             }
+
         }
 
-        private boolean rightWindowIsNotEmpty(final ValueAndTimestamp<Agg> 
rightWinAgg, final long timestamp) {
-            return rightWinAgg != null && rightWinAgg.timestamp() > timestamp;
+        private void createCurrentRecordRightWindow(final long timestamp,
+                                                    final 
ValueAndTimestamp<Agg> rightWinAgg,
+                                                    final K key) {
+            final TimeWindow window = new TimeWindow(timestamp + 1, timestamp 
+ 1 + windows.timeDifferenceMs());
+            final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(rightWinAgg.value(), Math.max(rightWinAgg.timestamp(), 
timestamp));
+            windowStore.put(
+                key,
+                valueAndTime,
+                window.start());
+            tupleForwarder.maybeForward(
+                new Windowed<>(key, window),
+                rightWinAgg.value(),
+                null,
+                rightWinAgg.timestamp());        }
+
+        private void createPreviousRightWindow(final long windowStart,
+                                               final long 
currentRecordTimestamp,
+                                               final K key,
+                                               final V value,
+                                               final long closeTime) {
+            final TimeWindow window = new TimeWindow(windowStart, windowStart 
+ windows.timeDifferenceMs());
+            final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), currentRecordTimestamp);
+            putAndForward(window, valueAndTime, key, value, closeTime, 
currentRecordTimestamp);
+        }
+
+        private void createCurrentRecordLeftWindow(final Long 
previousRecordTimestamp,
+                                               final long timestamp,
+                                               final ValueAndTimestamp<Agg> 
leftWinAgg,
+                                               final K key,
+                                               final V value,
+                                               final long closeTime) {
+            final ValueAndTimestamp<Agg> valueAndTime;
+            // if there's a right window that the new record could create && 
previous record falls within left window -> new record's left window is not 
empty
+            if (previousRecordTimestamp != null && 
leftWindowNotEmpty(previousRecordTimestamp, timestamp)) {
+                valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
+            } else {
+                valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
+            }
+            final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
+            putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
         }
 
-        private boolean isLeftWindow(final KeyValue<Windowed<K>, 
ValueAndTimestamp<Agg>> window) {
-            return window.key.window().end() == window.value.timestamp();
+        private boolean leftWindowNotEmpty(final long previousTimestamp, final 
long currentTimestamp) {
+            return currentTimestamp - windows.timeDifferenceMs() <= 
previousTimestamp;
+        }
+
+        // previous record's right window does not already exist and current 
record falls within previous record's right window
+        private boolean rightWindowNecessaryAndPossible(final Set<Long> 
windowStartTimes,

Review comment:
       Sorry but this method name continues to throw me off...the comment does 
a good job of reminding what the check actually does/means, but ideally the 
method name alone would do a reasonable job of that. What about 
`previousRecordRightWindowMustBeCreated` or 
`previousRecordRightWindowDoesNotExistAndIsNotEmpty` ? I know those are both 
super long, especially the 2nd option, but I personally think the 2nd option 
does the best job of providing the link between what the check actually does, 
and why we do it. It's better to be clear than concise (when you can't be both)

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -180,41 +217,225 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
             }
 
             //create right window for previous record
-            if (latestLeftTypeWindow != null) {
-                final long rightWinStart = latestLeftTypeWindow.end() + 1;
-                if (!windowStartTimes.contains(rightWinStart)) {
-                    final TimeWindow window = new TimeWindow(rightWinStart, 
rightWinStart + windows.timeDifferenceMs());
-                    final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), timestamp);
-                    putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+            if (previousRecordTimestamp != null) {
+                final long previousRightWinStart = previousRecordTimestamp + 1;
+                if (rightWindowNecessaryAndPossible(windowStartTimes, 
previousRightWinStart, timestamp)) {
+                    createPreviousRightWindow(previousRightWinStart, 
timestamp, key, value, closeTime);
                 }
             }
 
             //create left window for new record
             if (!leftWinAlreadyCreated) {
-                final ValueAndTimestamp<Agg> valueAndTime;
-                //there's a right window that the new record could create --> 
new record's left window is not empty
-                if (latestLeftTypeWindow != null) {
-                    valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
-                } else {
-                    valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
+                createCurrentRecordLeftWindow(previousRecordTimestamp, 
timestamp, leftWinAgg, key, value, closeTime);
+            }
+            // create right window for new record, if necessary
+            if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+            }
+        }
+
+        public void processReverse(final K key, final V value, final long 
timestamp, final long closeTime) {
+
+            final Set<Long> windowStartTimes = new HashSet<>();
+            // aggregate that will go in the current record’s left/right 
window (if needed)
+            ValueAndTimestamp<Agg> leftWinAgg = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+
+            //if current record's left/right windows already exist
+            boolean leftWinAlreadyCreated = false;
+            boolean rightWinAlreadyCreated = false;
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> 
iterator = windowStore.backwardFetch(
+                    key,
+                    key,
+                    Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it 
exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                //if we've already seen the window with the closest start time 
to the record
+                boolean foundRightWinAgg = false;
+
+                while (iterator.hasNext()) {
+                    final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next = 
iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+                    final long endTime = startTime + 
windows.timeDifferenceMs();
+                    final long windowMaxRecordTimestamp = 
next.value.timestamp();
+
+                    if (endTime > timestamp) {
+                        if (!foundRightWinAgg) {
+                            foundRightWinAgg = true;
+                            rightWinAgg = next.value;
+                        }
+                        putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+                    } else if (endTime == timestamp) {
+                        if (windowMaxRecordTimestamp < timestamp) {
+                            previousRecordTimestamp = windowMaxRecordTimestamp;
+                        }
+                        putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+                        leftWinAlreadyCreated = true;
+                    } else if (endTime < timestamp) {
+                        leftWinAgg = next.value;
+                        previousRecordTimestamp = windowMaxRecordTimestamp;
+                        break;
+                    } else {
+                        //determine if current record's right window exists, 
will only be true at most once, on the first pass
+                        rightWinAlreadyCreated = true;

Review comment:
       I think it's also easiest to follow if we keep the conditions in the 
order that we will actually see them. So this case would be the first one 
(everything else is in order I think)

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -180,41 +217,225 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
             }
 
             //create right window for previous record
-            if (latestLeftTypeWindow != null) {
-                final long rightWinStart = latestLeftTypeWindow.end() + 1;
-                if (!windowStartTimes.contains(rightWinStart)) {
-                    final TimeWindow window = new TimeWindow(rightWinStart, 
rightWinStart + windows.timeDifferenceMs());
-                    final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), timestamp);
-                    putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+            if (previousRecordTimestamp != null) {
+                final long previousRightWinStart = previousRecordTimestamp + 1;
+                if (rightWindowNecessaryAndPossible(windowStartTimes, 
previousRightWinStart, timestamp)) {
+                    createPreviousRightWindow(previousRightWinStart, 
timestamp, key, value, closeTime);
                 }
             }
 
             //create left window for new record
             if (!leftWinAlreadyCreated) {
-                final ValueAndTimestamp<Agg> valueAndTime;
-                //there's a right window that the new record could create --> 
new record's left window is not empty
-                if (latestLeftTypeWindow != null) {
-                    valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
-                } else {
-                    valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
+                createCurrentRecordLeftWindow(previousRecordTimestamp, 
timestamp, leftWinAgg, key, value, closeTime);
+            }
+            // create right window for new record, if necessary
+            if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+            }
+        }
+
+        public void processReverse(final K key, final V value, final long 
timestamp, final long closeTime) {
+
+            final Set<Long> windowStartTimes = new HashSet<>();
+            // aggregate that will go in the current record’s left/right 
window (if needed)
+            ValueAndTimestamp<Agg> leftWinAgg = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+
+            //if current record's left/right windows already exist
+            boolean leftWinAlreadyCreated = false;
+            boolean rightWinAlreadyCreated = false;
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> 
iterator = windowStore.backwardFetch(
+                    key,
+                    key,
+                    Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it 
exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                //if we've already seen the window with the closest start time 
to the record
+                boolean foundRightWinAgg = false;
+
+                while (iterator.hasNext()) {
+                    final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next = 
iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+                    final long endTime = startTime + 
windows.timeDifferenceMs();
+                    final long windowMaxRecordTimestamp = 
next.value.timestamp();
+
+                    if (endTime > timestamp) {
+                        if (!foundRightWinAgg) {
+                            foundRightWinAgg = true;
+                            rightWinAgg = next.value;
+                        }
+                        putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+                    } else if (endTime == timestamp) {
+                        if (windowMaxRecordTimestamp < timestamp) {
+                            previousRecordTimestamp = windowMaxRecordTimestamp;
+                        }

Review comment:
       Since this is the current record's left window, either this condition or 
true or we already had a record with the same timestamp as the current record. 
Just throwing out a suggestion, maybe we could keep a boolean that tracks 
whether we already have a record at the current timestamp and if so we can 
actaully skip everything after the loop

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -180,41 +217,225 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
             }
 
             //create right window for previous record
-            if (latestLeftTypeWindow != null) {
-                final long rightWinStart = latestLeftTypeWindow.end() + 1;
-                if (!windowStartTimes.contains(rightWinStart)) {
-                    final TimeWindow window = new TimeWindow(rightWinStart, 
rightWinStart + windows.timeDifferenceMs());
-                    final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), timestamp);
-                    putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+            if (previousRecordTimestamp != null) {
+                final long previousRightWinStart = previousRecordTimestamp + 1;
+                if (rightWindowNecessaryAndPossible(windowStartTimes, 
previousRightWinStart, timestamp)) {
+                    createPreviousRightWindow(previousRightWinStart, 
timestamp, key, value, closeTime);
                 }
             }
 
             //create left window for new record
             if (!leftWinAlreadyCreated) {
-                final ValueAndTimestamp<Agg> valueAndTime;
-                //there's a right window that the new record could create --> 
new record's left window is not empty
-                if (latestLeftTypeWindow != null) {
-                    valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
-                } else {
-                    valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
+                createCurrentRecordLeftWindow(previousRecordTimestamp, 
timestamp, leftWinAgg, key, value, closeTime);
+            }
+            // create right window for new record, if necessary
+            if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+            }
+        }
+
+        public void processReverse(final K key, final V value, final long 
timestamp, final long closeTime) {
+
+            final Set<Long> windowStartTimes = new HashSet<>();
+            // aggregate that will go in the current record’s left/right 
window (if needed)
+            ValueAndTimestamp<Agg> leftWinAgg = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+
+            //if current record's left/right windows already exist
+            boolean leftWinAlreadyCreated = false;
+            boolean rightWinAlreadyCreated = false;
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> 
iterator = windowStore.backwardFetch(
+                    key,
+                    key,
+                    Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it 
exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                //if we've already seen the window with the closest start time 
to the record
+                boolean foundRightWinAgg = false;
+
+                while (iterator.hasNext()) {
+                    final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next = 
iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+                    final long endTime = startTime + 
windows.timeDifferenceMs();
+                    final long windowMaxRecordTimestamp = 
next.value.timestamp();
+
+                    if (endTime > timestamp) {
+                        if (!foundRightWinAgg) {
+                            foundRightWinAgg = true;
+                            rightWinAgg = next.value;
+                        }
+                        putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+                    } else if (endTime == timestamp) {
+                        if (windowMaxRecordTimestamp < timestamp) {
+                            previousRecordTimestamp = windowMaxRecordTimestamp;
+                        }
+                        putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+                        leftWinAlreadyCreated = true;
+                    } else if (endTime < timestamp) {
+                        leftWinAgg = next.value;
+                        previousRecordTimestamp = windowMaxRecordTimestamp;
+                        break;
+                    } else {
+                        //determine if current record's right window exists, 
will only be true at most once, on the first pass
+                        rightWinAlreadyCreated = true;
+                    }
+                }
+            }
+
+            //create right window for previous record
+            if (previousRecordTimestamp != null) {
+                final long previousRightWinStart = previousRecordTimestamp + 1;
+                if (rightWindowNecessaryAndPossible(windowStartTimes, 
previousRightWinStart, timestamp)) {
+                    createPreviousRightWindow(previousRightWinStart, 
timestamp, key, value, closeTime);
+                }
+            }
+
+            //create the left window of the current record if it's not created
+            if (!leftWinAlreadyCreated) {
+                createCurrentRecordLeftWindow(previousRecordTimestamp, 
timestamp, leftWinAgg, key, value, closeTime);
+            }
+            //create the right window for the current record, if need be
+            if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+            }
+        }
+
+        /**
+         * Created to handle records that have a timestamp > 0 but < 
timeDifference. These records would create
+         * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifference]
+         * window, and we will update their right windows as new records come 
in later
+         */
+        private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+            //window from [0,timeDifference] that holds all early records
+            KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow = 
null;
+            boolean rightWinAlreadyCreated = false;
+            final Set<Long> windowStartTimes = new HashSet<>();
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> 
iterator = windowStore.fetch(
+                    key,
+                    key,
+                    Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it 
exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next;
+                while (iterator.hasNext()) {
+                    next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+                    final long windowMaxRecordTimestamp = 
next.value.timestamp();
+
+                    if (startTime == 0) {
+                        combinedWindow = next;
+                        if (windowMaxRecordTimestamp < timestamp) {
+                            // If maxRecordTimestamp > timestamp, the current 
record is out-of-order, meaning that the
+                            // previous record's right window would have been 
created already by other records. This
+                            // will always be true for early records, as they 
all fall within [0, timeDifferenceMs].
+                            previousRecordTimestamp = windowMaxRecordTimestamp;
+                        }
+
+                    } else if (startTime <= timestamp) {
+                        rightWinAgg = next.value;
+                        putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+                    } else if (startTime == timestamp + 1) {
+                        rightWinAlreadyCreated = true;
+                    }
                 }
-                final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
-                putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
             }
-            //create right window for new record
+
+            // if there wasn't a right window agg found and we need a right 
window for our new record,
+            // the current aggregate in the combined window will go in the new 
record's right window
+            if (rightWinAgg == null && combinedWindow != null && 
combinedWindow.value.timestamp() > timestamp) {
+                rightWinAgg = combinedWindow.value;
+            }
+
+            //create right window for new record if needed
             if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-                final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-                final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+            }
+
+            //create the right window for the previous record if the previous 
record exists and the window hasn't already been created
+            if (previousRecordTimestamp != null && 
!windowStartTimes.contains(previousRecordTimestamp + 1)) {
+                createPreviousRightWindow(previousRecordTimestamp + 1, 
timestamp, key, value, closeTime);
+            }
+
+            if (combinedWindow == null) {
+                final TimeWindow window = new TimeWindow(0, 
windows.timeDifferenceMs());
+                final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), timestamp);
                 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+
+            } else {
+                //update the combined window with the new aggregate
+                putAndForward(combinedWindow.key.window(), 
combinedWindow.value, key, value, closeTime, timestamp);
             }
+
         }
 
-        private boolean rightWindowIsNotEmpty(final ValueAndTimestamp<Agg> 
rightWinAgg, final long timestamp) {
-            return rightWinAgg != null && rightWinAgg.timestamp() > timestamp;
+        private void createCurrentRecordRightWindow(final long timestamp,
+                                                    final 
ValueAndTimestamp<Agg> rightWinAgg,
+                                                    final K key) {
+            final TimeWindow window = new TimeWindow(timestamp + 1, timestamp 
+ 1 + windows.timeDifferenceMs());
+            final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(rightWinAgg.value(), Math.max(rightWinAgg.timestamp(), 
timestamp));
+            windowStore.put(
+                key,
+                valueAndTime,
+                window.start());
+            tupleForwarder.maybeForward(
+                new Windowed<>(key, window),
+                rightWinAgg.value(),
+                null,
+                rightWinAgg.timestamp());        }
+
+        private void createPreviousRightWindow(final long windowStart,

Review comment:
       nit: name it `createPreviousRecordRightWindow` for consistency/clarity

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -118,24 +120,56 @@ public void process(final K key, final V value) {
             }
 
             final long timestamp = context().timestamp();
-            //don't process records that don't fall within a full sliding 
window
-            if (timestamp < windows.timeDifferenceMs()) {
+            observedStreamTime = Math.max(observedStreamTime, timestamp);
+            final long closeTime = observedStreamTime - 
windows.gracePeriodMs();
+
+            if (timestamp + 1 + windows.timeDifferenceMs() <= closeTime) {
                 log.warn(
-                    "Skipping record due to early arrival. value=[{}] 
topic=[{}] partition=[{}] offset=[{}]",
-                    value, context().topic(), context().partition(), 
context().offset()
+                    "Skipping record for expired window. " +
+                        "key=[{}] " +
+                        "topic=[{}] " +
+                        "partition=[{}] " +
+                        "offset=[{}] " +
+                        "timestamp=[{}] " +
+                        "window=[{},{}] " +
+                        "expiration=[{}] " +
+                        "streamTime=[{}]",
+                    key,
+                    context().topic(),
+                    context().partition(),
+                    context().offset(),
+                    context().timestamp(),
+                    timestamp - windows.timeDifferenceMs(), timestamp,
+                    closeTime,
+                    observedStreamTime
                 );
-                droppedRecordsSensor.record();
+                lateRecordDropSensor.record();
                 return;
             }
-            processInOrder(key, value, timestamp);
-        }
 
-        public void processInOrder(final K key, final V value, final long 
timestamp) {
+            if (timestamp < windows.timeDifferenceMs()) {
+                processEarly(key, value, timestamp, closeTime);
+                return;
+            }
 
-            observedStreamTime = Math.max(observedStreamTime, timestamp);
-            final long closeTime = observedStreamTime - 
windows.gracePeriodMs();
+            if (reverseIteratorPossible == null) {
+                try {
+                    windowStore.backwardFetch(key, 0L, 0L);
+                    reverseIteratorPossible = true;
+                } catch (final UnsupportedOperationException e)  {
+                    reverseIteratorPossible = false;

Review comment:
       We should log a debug message indicating which we decide to use

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -180,41 +217,225 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
             }
 
             //create right window for previous record
-            if (latestLeftTypeWindow != null) {
-                final long rightWinStart = latestLeftTypeWindow.end() + 1;
-                if (!windowStartTimes.contains(rightWinStart)) {
-                    final TimeWindow window = new TimeWindow(rightWinStart, 
rightWinStart + windows.timeDifferenceMs());
-                    final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), timestamp);
-                    putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+            if (previousRecordTimestamp != null) {
+                final long previousRightWinStart = previousRecordTimestamp + 1;
+                if (rightWindowNecessaryAndPossible(windowStartTimes, 
previousRightWinStart, timestamp)) {
+                    createPreviousRightWindow(previousRightWinStart, 
timestamp, key, value, closeTime);
                 }
             }
 
             //create left window for new record
             if (!leftWinAlreadyCreated) {
-                final ValueAndTimestamp<Agg> valueAndTime;
-                //there's a right window that the new record could create --> 
new record's left window is not empty
-                if (latestLeftTypeWindow != null) {
-                    valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
-                } else {
-                    valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
+                createCurrentRecordLeftWindow(previousRecordTimestamp, 
timestamp, leftWinAgg, key, value, closeTime);
+            }
+            // create right window for new record, if necessary
+            if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+            }
+        }
+
+        public void processReverse(final K key, final V value, final long 
timestamp, final long closeTime) {
+
+            final Set<Long> windowStartTimes = new HashSet<>();
+            // aggregate that will go in the current record’s left/right 
window (if needed)
+            ValueAndTimestamp<Agg> leftWinAgg = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+
+            //if current record's left/right windows already exist
+            boolean leftWinAlreadyCreated = false;
+            boolean rightWinAlreadyCreated = false;
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> 
iterator = windowStore.backwardFetch(
+                    key,
+                    key,
+                    Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it 
exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                //if we've already seen the window with the closest start time 
to the record
+                boolean foundRightWinAgg = false;
+
+                while (iterator.hasNext()) {
+                    final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next = 
iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+                    final long endTime = startTime + 
windows.timeDifferenceMs();
+                    final long windowMaxRecordTimestamp = 
next.value.timestamp();
+
+                    if (endTime > timestamp) {
+                        if (!foundRightWinAgg) {
+                            foundRightWinAgg = true;
+                            rightWinAgg = next.value;
+                        }
+                        putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+                    } else if (endTime == timestamp) {
+                        if (windowMaxRecordTimestamp < timestamp) {
+                            previousRecordTimestamp = windowMaxRecordTimestamp;
+                        }
+                        putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+                        leftWinAlreadyCreated = true;
+                    } else if (endTime < timestamp) {
+                        leftWinAgg = next.value;
+                        previousRecordTimestamp = windowMaxRecordTimestamp;
+                        break;
+                    } else {
+                        //determine if current record's right window exists, 
will only be true at most once, on the first pass
+                        rightWinAlreadyCreated = true;
+                    }
+                }
+            }
+
+            //create right window for previous record
+            if (previousRecordTimestamp != null) {
+                final long previousRightWinStart = previousRecordTimestamp + 1;
+                if (rightWindowNecessaryAndPossible(windowStartTimes, 
previousRightWinStart, timestamp)) {
+                    createPreviousRightWindow(previousRightWinStart, 
timestamp, key, value, closeTime);
+                }
+            }
+
+            //create the left window of the current record if it's not created
+            if (!leftWinAlreadyCreated) {
+                createCurrentRecordLeftWindow(previousRecordTimestamp, 
timestamp, leftWinAgg, key, value, closeTime);
+            }
+            //create the right window for the current record, if need be
+            if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+            }
+        }
+
+        /**
+         * Created to handle records that have a timestamp > 0 but < 
timeDifference. These records would create
+         * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifference]
+         * window, and we will update their right windows as new records come 
in later
+         */
+        private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+            //window from [0,timeDifference] that holds all early records
+            KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow = 
null;
+            boolean rightWinAlreadyCreated = false;
+            final Set<Long> windowStartTimes = new HashSet<>();
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> 
iterator = windowStore.fetch(
+                    key,
+                    key,
+                    Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it 
exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next;
+                while (iterator.hasNext()) {
+                    next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+                    final long windowMaxRecordTimestamp = 
next.value.timestamp();
+
+                    if (startTime == 0) {
+                        combinedWindow = next;
+                        if (windowMaxRecordTimestamp < timestamp) {
+                            // If maxRecordTimestamp > timestamp, the current 
record is out-of-order, meaning that the
+                            // previous record's right window would have been 
created already by other records. This
+                            // will always be true for early records, as they 
all fall within [0, timeDifferenceMs].
+                            previousRecordTimestamp = windowMaxRecordTimestamp;
+                        }
+
+                    } else if (startTime <= timestamp) {
+                        rightWinAgg = next.value;
+                        putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+                    } else if (startTime == timestamp + 1) {
+                        rightWinAlreadyCreated = true;
+                    }
                 }
-                final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
-                putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
             }
-            //create right window for new record
+
+            // if there wasn't a right window agg found and we need a right 
window for our new record,
+            // the current aggregate in the combined window will go in the new 
record's right window
+            if (rightWinAgg == null && combinedWindow != null && 
combinedWindow.value.timestamp() > timestamp) {
+                rightWinAgg = combinedWindow.value;
+            }
+
+            //create right window for new record if needed
             if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-                final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-                final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+            }
+
+            //create the right window for the previous record if the previous 
record exists and the window hasn't already been created
+            if (previousRecordTimestamp != null && 
!windowStartTimes.contains(previousRecordTimestamp + 1)) {
+                createPreviousRightWindow(previousRecordTimestamp + 1, 
timestamp, key, value, closeTime);
+            }
+
+            if (combinedWindow == null) {
+                final TimeWindow window = new TimeWindow(0, 
windows.timeDifferenceMs());
+                final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), timestamp);
                 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+
+            } else {
+                //update the combined window with the new aggregate
+                putAndForward(combinedWindow.key.window(), 
combinedWindow.value, key, value, closeTime, timestamp);
             }
+
         }
 
-        private boolean rightWindowIsNotEmpty(final ValueAndTimestamp<Agg> 
rightWinAgg, final long timestamp) {
-            return rightWinAgg != null && rightWinAgg.timestamp() > timestamp;
+        private void createCurrentRecordRightWindow(final long timestamp,
+                                                    final 
ValueAndTimestamp<Agg> rightWinAgg,
+                                                    final K key) {
+            final TimeWindow window = new TimeWindow(timestamp + 1, timestamp 
+ 1 + windows.timeDifferenceMs());
+            final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(rightWinAgg.value(), Math.max(rightWinAgg.timestamp(), 
timestamp));
+            windowStore.put(
+                key,
+                valueAndTime,
+                window.start());
+            tupleForwarder.maybeForward(
+                new Windowed<>(key, window),
+                rightWinAgg.value(),
+                null,
+                rightWinAgg.timestamp());        }
+
+        private void createPreviousRightWindow(final long windowStart,
+                                               final long 
currentRecordTimestamp,
+                                               final K key,
+                                               final V value,
+                                               final long closeTime) {
+            final TimeWindow window = new TimeWindow(windowStart, windowStart 
+ windows.timeDifferenceMs());
+            final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), currentRecordTimestamp);
+            putAndForward(window, valueAndTime, key, value, closeTime, 
currentRecordTimestamp);
+        }
+
+        private void createCurrentRecordLeftWindow(final Long 
previousRecordTimestamp,
+                                               final long timestamp,
+                                               final ValueAndTimestamp<Agg> 
leftWinAgg,
+                                               final K key,
+                                               final V value,
+                                               final long closeTime) {
+            final ValueAndTimestamp<Agg> valueAndTime;
+            // if there's a right window that the new record could create && 
previous record falls within left window -> new record's left window is not 
empty
+            if (previousRecordTimestamp != null && 
leftWindowNotEmpty(previousRecordTimestamp, timestamp)) {

Review comment:
       We might as well move the `previousRecordTimestamp` null check into 
`leftWindowNotEmpty`. Also you can probably remove the comment then since it's 
saying basically the same thing as `if leftWindowNotEmpty`

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -180,41 +217,225 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
             }
 
             //create right window for previous record
-            if (latestLeftTypeWindow != null) {
-                final long rightWinStart = latestLeftTypeWindow.end() + 1;
-                if (!windowStartTimes.contains(rightWinStart)) {
-                    final TimeWindow window = new TimeWindow(rightWinStart, 
rightWinStart + windows.timeDifferenceMs());
-                    final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), timestamp);
-                    putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+            if (previousRecordTimestamp != null) {
+                final long previousRightWinStart = previousRecordTimestamp + 1;
+                if (rightWindowNecessaryAndPossible(windowStartTimes, 
previousRightWinStart, timestamp)) {
+                    createPreviousRightWindow(previousRightWinStart, 
timestamp, key, value, closeTime);
                 }
             }
 
             //create left window for new record
             if (!leftWinAlreadyCreated) {
-                final ValueAndTimestamp<Agg> valueAndTime;
-                //there's a right window that the new record could create --> 
new record's left window is not empty
-                if (latestLeftTypeWindow != null) {
-                    valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
-                } else {
-                    valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
+                createCurrentRecordLeftWindow(previousRecordTimestamp, 
timestamp, leftWinAgg, key, value, closeTime);
+            }
+            // create right window for new record, if necessary
+            if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+            }
+        }
+
+        public void processReverse(final K key, final V value, final long 
timestamp, final long closeTime) {
+
+            final Set<Long> windowStartTimes = new HashSet<>();
+            // aggregate that will go in the current record’s left/right 
window (if needed)
+            ValueAndTimestamp<Agg> leftWinAgg = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+
+            //if current record's left/right windows already exist
+            boolean leftWinAlreadyCreated = false;
+            boolean rightWinAlreadyCreated = false;
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> 
iterator = windowStore.backwardFetch(
+                    key,
+                    key,
+                    Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it 
exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                //if we've already seen the window with the closest start time 
to the record
+                boolean foundRightWinAgg = false;
+
+                while (iterator.hasNext()) {
+                    final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next = 
iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+                    final long endTime = startTime + 
windows.timeDifferenceMs();
+                    final long windowMaxRecordTimestamp = 
next.value.timestamp();
+
+                    if (endTime > timestamp) {
+                        if (!foundRightWinAgg) {
+                            foundRightWinAgg = true;
+                            rightWinAgg = next.value;
+                        }
+                        putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+                    } else if (endTime == timestamp) {
+                        if (windowMaxRecordTimestamp < timestamp) {
+                            previousRecordTimestamp = windowMaxRecordTimestamp;
+                        }
+                        putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+                        leftWinAlreadyCreated = true;
+                    } else if (endTime < timestamp) {
+                        leftWinAgg = next.value;
+                        previousRecordTimestamp = windowMaxRecordTimestamp;
+                        break;
+                    } else {
+                        //determine if current record's right window exists, 
will only be true at most once, on the first pass
+                        rightWinAlreadyCreated = true;
+                    }
+                }
+            }
+
+            //create right window for previous record
+            if (previousRecordTimestamp != null) {
+                final long previousRightWinStart = previousRecordTimestamp + 1;
+                if (rightWindowNecessaryAndPossible(windowStartTimes, 
previousRightWinStart, timestamp)) {
+                    createPreviousRightWindow(previousRightWinStart, 
timestamp, key, value, closeTime);
+                }
+            }
+
+            //create the left window of the current record if it's not created
+            if (!leftWinAlreadyCreated) {
+                createCurrentRecordLeftWindow(previousRecordTimestamp, 
timestamp, leftWinAgg, key, value, closeTime);
+            }
+            //create the right window for the current record, if need be
+            if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+            }
+        }
+
+        /**
+         * Created to handle records that have a timestamp > 0 but < 
timeDifference. These records would create
+         * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifference]
+         * window, and we will update their right windows as new records come 
in later
+         */
+        private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+            //window from [0,timeDifference] that holds all early records
+            KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow = 
null;
+            boolean rightWinAlreadyCreated = false;
+            final Set<Long> windowStartTimes = new HashSet<>();
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> 
iterator = windowStore.fetch(
+                    key,
+                    key,
+                    Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it 
exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next;
+                while (iterator.hasNext()) {
+                    next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+                    final long windowMaxRecordTimestamp = 
next.value.timestamp();
+
+                    if (startTime == 0) {
+                        combinedWindow = next;
+                        if (windowMaxRecordTimestamp < timestamp) {
+                            // If maxRecordTimestamp > timestamp, the current 
record is out-of-order, meaning that the
+                            // previous record's right window would have been 
created already by other records. This
+                            // will always be true for early records, as they 
all fall within [0, timeDifferenceMs].
+                            previousRecordTimestamp = windowMaxRecordTimestamp;
+                        }
+
+                    } else if (startTime <= timestamp) {
+                        rightWinAgg = next.value;
+                        putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+                    } else if (startTime == timestamp + 1) {
+                        rightWinAlreadyCreated = true;
+                    }
                 }
-                final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
-                putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
             }
-            //create right window for new record
+
+            // if there wasn't a right window agg found and we need a right 
window for our new record,
+            // the current aggregate in the combined window will go in the new 
record's right window
+            if (rightWinAgg == null && combinedWindow != null && 
combinedWindow.value.timestamp() > timestamp) {
+                rightWinAgg = combinedWindow.value;
+            }
+
+            //create right window for new record if needed
             if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-                final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-                final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+            }
+
+            //create the right window for the previous record if the previous 
record exists and the window hasn't already been created
+            if (previousRecordTimestamp != null && 
!windowStartTimes.contains(previousRecordTimestamp + 1)) {
+                createPreviousRightWindow(previousRecordTimestamp + 1, 
timestamp, key, value, closeTime);
+            }
+
+            if (combinedWindow == null) {
+                final TimeWindow window = new TimeWindow(0, 
windows.timeDifferenceMs());
+                final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), timestamp);
                 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+
+            } else {
+                //update the combined window with the new aggregate
+                putAndForward(combinedWindow.key.window(), 
combinedWindow.value, key, value, closeTime, timestamp);
             }
+
         }
 
-        private boolean rightWindowIsNotEmpty(final ValueAndTimestamp<Agg> 
rightWinAgg, final long timestamp) {
-            return rightWinAgg != null && rightWinAgg.timestamp() > timestamp;
+        private void createCurrentRecordRightWindow(final long timestamp,
+                                                    final 
ValueAndTimestamp<Agg> rightWinAgg,
+                                                    final K key) {
+            final TimeWindow window = new TimeWindow(timestamp + 1, timestamp 
+ 1 + windows.timeDifferenceMs());
+            final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(rightWinAgg.value(), Math.max(rightWinAgg.timestamp(), 
timestamp));

Review comment:
       `Math.max(rightWinAgg.timestamp(), timestamp)` doesn't make sense for 
this case, since we're not actually putting the current record in the window 
(although technically it will still choose the correct timestamp, for that 
reason).
   But I think this might just be out of date and need to be rebased after the 
changes in the early records PR?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -118,24 +120,56 @@ public void process(final K key, final V value) {
             }
 
             final long timestamp = context().timestamp();
-            //don't process records that don't fall within a full sliding 
window
-            if (timestamp < windows.timeDifferenceMs()) {
+            observedStreamTime = Math.max(observedStreamTime, timestamp);
+            final long closeTime = observedStreamTime - 
windows.gracePeriodMs();
+
+            if (timestamp + 1 + windows.timeDifferenceMs() <= closeTime) {
                 log.warn(
-                    "Skipping record due to early arrival. value=[{}] 
topic=[{}] partition=[{}] offset=[{}]",
-                    value, context().topic(), context().partition(), 
context().offset()
+                    "Skipping record for expired window. " +
+                        "key=[{}] " +
+                        "topic=[{}] " +
+                        "partition=[{}] " +
+                        "offset=[{}] " +
+                        "timestamp=[{}] " +
+                        "window=[{},{}] " +
+                        "expiration=[{}] " +
+                        "streamTime=[{}]",
+                    key,
+                    context().topic(),
+                    context().partition(),
+                    context().offset(),
+                    context().timestamp(),
+                    timestamp - windows.timeDifferenceMs(), timestamp,
+                    closeTime,
+                    observedStreamTime
                 );
-                droppedRecordsSensor.record();
+                lateRecordDropSensor.record();
                 return;
             }
-            processInOrder(key, value, timestamp);
-        }
 
-        public void processInOrder(final K key, final V value, final long 
timestamp) {
+            if (timestamp < windows.timeDifferenceMs()) {
+                processEarly(key, value, timestamp, closeTime);
+                return;
+            }
 
-            observedStreamTime = Math.max(observedStreamTime, timestamp);
-            final long closeTime = observedStreamTime - 
windows.gracePeriodMs();
+            if (reverseIteratorPossible == null) {
+                try {
+                    windowStore.backwardFetch(key, 0L, 0L);
+                    reverseIteratorPossible = true;
+                } catch (final UnsupportedOperationException e)  {
+                    reverseIteratorPossible = false;

Review comment:
       Also we can then use the log message to verify that the correct 
`process` method gets chosen




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to