guozhangwang commented on code in PR #12135:
URL: https://github.com/apache/kafka/pull/12135#discussion_r872646217
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java:
##########
@@ -248,80 +182,43 @@ public void process(final Record<KIn, VIn> record) {
}
}
- tryEmitFinalResult(record, windowCloseTime);
+ maybeMeasureEmitFinalLatency(record, windowCloseTime);
}
- private void tryEmitFinalResult(final Record<KIn, VIn> record, final
long windowCloseTime) {
- if (emitStrategy.type() != StrategyType.ON_WINDOW_CLOSE) {
- return;
- }
-
- final long now = internalProcessorContext.currentSystemTimeMs();
- // Throttle emit frequency as an optimization, the tradeoff is
that we need to remember the
- // window close time when we emitted last time so that we can
restart from there in the next emit
- if (now < timeTracker.nextTimeToEmit) {
- return;
- }
-
- // Schedule next emit time based on now to avoid the case that if
system time jumps a lot,
- // this can be triggered every time
- timeTracker.nextTimeToEmit = now;
- timeTracker.advanceNextTimeToEmit();
-
- // Window close time has not progressed, there will be no windows
to close hence no records to emit
- if (lastEmitWindowCloseTime != ConsumerRecord.NO_TIMESTAMP &&
lastEmitWindowCloseTime >= windowCloseTime) {
+ @Override
+ protected void maybeForwardFinalResult(final Record<KIn, VIn> record,
final long windowCloseTime) {
+ if (!shouldEmitFinal(windowCloseTime)) {
return;
}
final long emitRangeUpperBoundInclusive = windowCloseTime -
windows.size();
- // No window has ever closed and hence no need to emit any records
if (emitRangeUpperBoundInclusive < 0) {
+ // If emitRangeUpperBoundInclusive is 0, it means first window
closes since windowEndTime
+ // is exclusive
return;
}
-
- // Set emitRangeLowerBoundInclusive to -1L if
lastEmitWindowCloseTime was not set so that
- // we would fetch from 0L for the first time; otherwise set it to
lastEmitWindowCloseTime - windows.size().
- //
- // Note if we get here, it means emitRangeUpperBoundInclusive > 0,
which means windowCloseTime > windows.size(),
- // Because we always set lastEmitWindowCloseTime to
windowCloseTime before, it means
- // lastEmitWindowCloseTime - windows.size() should always > 0
- // As a result, emitRangeLowerBoundInclusive is always >= 0
+ // Because we only get here when emitRangeUpperBoundInclusive > 0
which means closeTime > windows.size()
+ // Since we set lastEmitCloseTime to closeTime before storing to
processor metadata
+ // lastEmitCloseTime - windows.size() is always > 0
+ // Set emitRangeLowerBoundInclusive to -1L if not set so that when
we fetchAll, we fetch from 0L
final long emitRangeLowerBoundInclusive = lastEmitWindowCloseTime
== ConsumerRecord.NO_TIMESTAMP ?
-1L : lastEmitWindowCloseTime - windows.size();
Review Comment:
I've made some refactoring, LMK what do you think.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]