mjsax commented on code in PR #20403: URL: https://github.com/apache/kafka/pull/20403#discussion_r2299384167
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java: ########## @@ -1104,11 +1104,18 @@ private void initializeTopology() { // initialize the task by initializing all its processor nodes in the topology log.trace("Initializing processor nodes of the topology"); for (final ProcessorNode<?, ?, ?, ?> node : topology.processors()) { - processorContext.setCurrentNode(node); + final ProcessorRecordContext recordContext = new ProcessorRecordContext( + time.milliseconds(), Review Comment: Not sure if this would be what we want? We would set a single TS, and if `init()` runs for a longer period of time, all records would get the same TS? We might need to change `ProcessorRecordContext` to have a "time function" instead of a hard-coded value? For a record at hand, the time function would be `() -> record.timestamp()` and return the same value each time, and for this dummy context, we would use `() -> time.milliseconds()`. But need to think about this more, and figure out what design we really want to get to, and what the splash radius of changing `ProcessorRecordContext` would be :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org