guozhangwang commented on code in PR #12135:
URL: https://github.com/apache/kafka/pull/12135#discussion_r872646217


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java:
##########
@@ -248,80 +182,43 @@ public void process(final Record<KIn, VIn> record) {
                 }
             }
 
-            tryEmitFinalResult(record, windowCloseTime);
+            maybeMeasureEmitFinalLatency(record, windowCloseTime);
         }
 
-        private void tryEmitFinalResult(final Record<KIn, VIn> record, final 
long windowCloseTime) {
-            if (emitStrategy.type() != StrategyType.ON_WINDOW_CLOSE) {
-                return;
-            }
-
-            final long now = internalProcessorContext.currentSystemTimeMs();
-            // Throttle emit frequency as an optimization, the tradeoff is 
that we need to remember the
-            // window close time when we emitted last time so that we can 
restart from there in the next emit
-            if (now < timeTracker.nextTimeToEmit) {
-                return;
-            }
-
-            // Schedule next emit time based on now to avoid the case that if 
system time jumps a lot,
-            // this can be triggered every time
-            timeTracker.nextTimeToEmit = now;
-            timeTracker.advanceNextTimeToEmit();
-
-            // Window close time has not progressed, there will be no windows 
to close hence no records to emit
-            if (lastEmitWindowCloseTime != ConsumerRecord.NO_TIMESTAMP && 
lastEmitWindowCloseTime >= windowCloseTime) {
+        @Override
+        protected void maybeForwardFinalResult(final Record<KIn, VIn> record, 
final long windowCloseTime) {
+            if (!shouldEmitFinal(windowCloseTime)) {
                 return;
             }
 
             final long emitRangeUpperBoundInclusive = windowCloseTime - 
windows.size();
-            // No window has ever closed and hence no need to emit any records
             if (emitRangeUpperBoundInclusive < 0) {
+                // If emitRangeUpperBoundInclusive is 0, it means first window 
closes since windowEndTime
+                // is exclusive
                 return;
             }
 
-
-            // Set emitRangeLowerBoundInclusive to -1L if 
lastEmitWindowCloseTime was not set so that
-            // we would fetch from 0L for the first time; otherwise set it to 
lastEmitWindowCloseTime - windows.size().
-            //
-            // Note if we get here, it means emitRangeUpperBoundInclusive > 0, 
which means windowCloseTime > windows.size(),
-            // Because we always set lastEmitWindowCloseTime to 
windowCloseTime before, it means
-            // lastEmitWindowCloseTime - windows.size() should always > 0
-            // As a result, emitRangeLowerBoundInclusive is always >= 0
+            // Because we only get here when emitRangeUpperBoundInclusive > 0 
which means closeTime > windows.size()
+            // Since we set lastEmitCloseTime to closeTime before storing to 
processor metadata
+            // lastEmitCloseTime - windows.size() is always > 0
+            // Set emitRangeLowerBoundInclusive to -1L if not set so that when 
we fetchAll, we fetch from 0L
             final long emitRangeLowerBoundInclusive = lastEmitWindowCloseTime 
== ConsumerRecord.NO_TIMESTAMP ?
                 -1L : lastEmitWindowCloseTime - windows.size();

Review Comment:
   I've made some refactoring, LMK what do you think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to