lct45 commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r478490860



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -232,40 +239,54 @@ private void processEarly(final K key, final V value, 
final long timestamp, fina
                     final long startTime = next.key.window().start();
                     final long endTime = startTime + 
windows.timeDifferenceMs();
 
-                    if (endTime == windows.timeDifferenceMs()) {
+                    if (startTime == 0) {
                         combinedWindow = next;
-                    } else if (endTime > timestamp && startTime <= timestamp) {
+                    } else if (endTime >= timestamp && startTime <= timestamp) 
{
                         rightWinAgg = next.value;
                         putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
-                    } else {
+                    } else if (startTime == timestamp + 1) {
                         rightWinAlreadyCreated = true;
                     }
                 }
             }
 
+            // if there wasn't a right window agg found and we need a right 
window for our new record,
+            // the current aggregate in the combined window iwll go in the new 
record's right window
+            if (rightWinAgg == null && combinedWindow != null) {
+                rightWinAgg = combinedWindow.value;
+            }
+
             if (combinedWindow == null) {
                 final TimeWindow window = new TimeWindow(0, 
windows.timeDifferenceMs());
                 final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), timestamp);
                 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
 
             } else {
-                //create the right window for the most recent max timestamp in 
the combined window
-                final long rightWinStart = combinedWindow.value.timestamp() + 
1;
-                if (!windowStartTimes.contains(rightWinStart) && 
combinedWindow.value.timestamp() < timestamp) {
-                    final TimeWindow window = new TimeWindow(rightWinStart, 
rightWinStart + windows.timeDifferenceMs());
+                //create the right window for the combined window's max record 
before the current record was added
+                final long maxRightWindowStart = 
combinedWindow.value.timestamp() + 1;
+                //only create the right window if new record falls within it 
and it does not already exist
+                if (!windowStartTimes.contains(maxRightWindowStart) && 
previousRightWindowPossible(maxRightWindowStart, timestamp)) {

Review comment:
       The comment here was unclear, that's my bad. It should read `only create 
the previous record's right window if the new record falls within it ...` . 
This part doesn't leverage `rightWinAgg`, since the agg in the previous 
record's right window will just be the current record's value.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to