eduwercamacaro commented on code in PR #20403: URL: https://github.com/apache/kafka/pull/20403#discussion_r2330605579
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java: ########## @@ -1104,11 +1104,18 @@ private void initializeTopology() { // initialize the task by initializing all its processor nodes in the topology log.trace("Initializing processor nodes of the topology"); for (final ProcessorNode<?, ?, ?, ?> node : topology.processors()) { - processorContext.setCurrentNode(node); + final ProcessorRecordContext recordContext = new ProcessorRecordContext( + time.milliseconds(), Review Comment: You are right; if the init method executes for a long time, all records will have the same timestamp. That’s an excellent point. I would say that using the same timestamp for every record stored during the init method execution is not very accurate. I like your idea of providing a function that returns the current timestamp, but I believe we should add a specialized class (let’s say `InitProcessorRecordContext`) that extends from ProcessorRecordContext, so the normal record processing won’t have any impact because we will use the InitProcessorRecordContext just for the init method. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java: ########## @@ -1104,11 +1104,18 @@ private void initializeTopology() { // initialize the task by initializing all its processor nodes in the topology log.trace("Initializing processor nodes of the topology"); for (final ProcessorNode<?, ?, ?, ?> node : topology.processors()) { - processorContext.setCurrentNode(node); + final ProcessorRecordContext recordContext = new ProcessorRecordContext( + time.milliseconds(), Review Comment: I'm thinking in something like this: ```java public class InitProcessorRecordContext extends ProcessorRecordContext { .... public InitProcessorRecordContext(Time time) { super(time.milliseconds(), -1, -1, null, new RecordHeaders()); this.time = time; } @Override public long timestamp() { return time.milliseconds(); } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org