lct45 commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r482078343



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -254,35 +257,43 @@ private void processEarly(final K key, final V value, 
final long timestamp, fina
                 rightWinAgg = combinedWindow.value;
             }
 
+            //create the right window for the previous record if the previous 
record exists and the window hasn't already been created
+            if (previousRecordTimestamp != null && 
!windowStartTimes.contains(previousRecordTimestamp + 1)) {
+                final TimeWindow window = new 
TimeWindow(previousRecordTimestamp + 1, previousRecordTimestamp + 1 + 
windows.timeDifferenceMs());
+                final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), timestamp);
+                putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+            }
+
             if (combinedWindow == null) {
                 final TimeWindow window = new TimeWindow(0, 
windows.timeDifferenceMs());
                 final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), timestamp);
                 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
 
             } else {
-                //create the right window for the previous record if the 
previous record exists and the window hasn't already been created
-                if (previousRecordTimestamp != null && 
!windowStartTimes.contains(previousRecordTimestamp + 1)) {
-                    final TimeWindow window = new 
TimeWindow(previousRecordTimestamp + 1, previousRecordTimestamp + 1 + 
windows.timeDifferenceMs());
-                    final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), timestamp);
-                    putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
-                }
                 //update the combined window with the new aggregate
                 putAndForward(combinedWindow.key.window(), 
combinedWindow.value, key, value, closeTime, timestamp);
             }
             //create right window for new record if needed
             if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-                createRightWindow(timestamp, rightWinAgg, key, value, 
closeTime);
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
             }
         }
 
-        private void createRightWindow(final long timestamp,
-                                       final ValueAndTimestamp<Agg> 
rightWinAgg,
-                                       final K key,
-                                       final V value,
-                                       final long closeTime) {
+        private void createCurrentRecordRightWindow(final long timestamp,
+                                                    final 
ValueAndTimestamp<Agg> rightWinAgg,
+                                                    final K key) {
             final TimeWindow window = new TimeWindow(timestamp + 1, timestamp 
+ 1 + windows.timeDifferenceMs());
-            final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
-            putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+            final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), rightWinAgg.timestamp());

Review comment:
       Yeah good point, we check for null before calling it




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to