mjsax commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r483736252



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -161,29 +180,31 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
                     windowStartTimes.add(next.key.window().start());
                     final long startTime = next.key.window().start();
                     final long endTime = startTime + 
windows.timeDifferenceMs();
+                    final long windowMaxRecordTimestamp = 
next.value.timestamp();
 
                     if (endTime < timestamp) {
                         leftWinAgg = next.value;
-                        if (isLeftWindow(next)) {
-                            latestLeftTypeWindow = next.key.window();
-                        }
+                        previousRecordTimestamp = windowMaxRecordTimestamp;
                     } else if (endTime == timestamp) {
                         leftWinAlreadyCreated = true;
+                        if (windowMaxRecordTimestamp < timestamp) {
+                            previousRecordTimestamp = windowMaxRecordTimestamp;
+                        }
                         putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
                     } else if (endTime > timestamp && startTime <= timestamp) {
                         rightWinAgg = next.value;
                         putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
-                    } else {
+                    } else if (startTime == timestamp + 1) {

Review comment:
       The suggest is not bad, but required to add the `else-throw` to make 
sense. Otherwise, an programming error could slip undetected.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
             //create left window for new record
             if (!leftWinAlreadyCreated) {
                 final ValueAndTimestamp<Agg> valueAndTime;
-                //there's a right window that the new record could create --> 
new record's left window is not empty
-                if (latestLeftTypeWindow != null) {
+                // if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
+                if (previousRecordTimestamp != null && 
leftWindowNotEmpty(previousRecordTimestamp, timestamp)) {
                     valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
                 } else {
                     valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
                 }
                 final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
                 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
             }
-            //create right window for new record
             if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-                final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-                final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+            }
+        }
+
+        /**
+         * Created to handle records where 0 < timestamp < timeDifferenceMs. 
These records would create
+         * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifferenceMs]

Review comment:
       nit: `Instead, they will fall within the [0, timeDifferenceMs]` -> 
`Instead, we will put them into the [0, timeDifferenceMs] window as a 
"workaround",`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to