ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r478766196



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -232,40 +239,54 @@ private void processEarly(final K key, final V value, 
final long timestamp, fina
                     final long startTime = next.key.window().start();
                     final long endTime = startTime + 
windows.timeDifferenceMs();
 
-                    if (endTime == windows.timeDifferenceMs()) {
+                    if (startTime == 0) {
                         combinedWindow = next;
-                    } else if (endTime > timestamp && startTime <= timestamp) {
+                    } else if (endTime >= timestamp && startTime <= timestamp) 
{
                         rightWinAgg = next.value;
                         putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
-                    } else {
+                    } else if (startTime == timestamp + 1) {
                         rightWinAlreadyCreated = true;
                     }
                 }
             }
 
+            // if there wasn't a right window agg found and we need a right 
window for our new record,
+            // the current aggregate in the combined window iwll go in the new 
record's right window

Review comment:
       Couldn't there still be a record to the left? Like we could have a 
record at 5 and at 50 and nothing else, then all we would have so far is the 
combined window and one at [40, 50],  but `rightWindowAgg` would be null. So 
that's why we need to check that the `combinedWindow.maxTimestamp > timestamp` 
(and if not then we should leave `rightWinAgg` as null). Looks like this is 
what you're doing, so not suggesting any changes, just trying to make sure I 
have this right.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to