eduwercamacaro commented on code in PR #20403:
URL: https://github.com/apache/kafka/pull/20403#discussion_r2376251694
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##########
@@ -1104,11 +1104,12 @@ private void initializeTopology() {
// initialize the task by initializing all its processor nodes in the
topology
log.trace("Initializing processor nodes of the topology");
for (final ProcessorNode<?, ?, ?, ?> node : topology.processors()) {
- processorContext.setCurrentNode(node);
+ final InitProcessorRecordContext initContext = new
InitProcessorRecordContext(time);
+ updateProcessorContext(node, time.milliseconds(), initContext);
Review Comment:
IMO, using a static ts for each record is not only good enough but also a
good correlation between processing time and the changelog timestamps. Also,
`ProcessorRecordContext` lifecycle might be significantly shorter than an
`InitProcessorRecordContext` as the init method can run for a longer period of
time.
I don’t see any overhead when using a moving ts on the
`InitProcessorRecordContext` because it runs just a few times in runtime. So, I
would suggest to use the moving ts in this context.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]