eduwercamacaro commented on code in PR #20403:
URL: https://github.com/apache/kafka/pull/20403#discussion_r2330605579


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##########
@@ -1104,11 +1104,18 @@ private void initializeTopology() {
         // initialize the task by initializing all its processor nodes in the 
topology
         log.trace("Initializing processor nodes of the topology");
         for (final ProcessorNode<?, ?, ?, ?> node : topology.processors()) {
-            processorContext.setCurrentNode(node);
+            final ProcessorRecordContext recordContext = new 
ProcessorRecordContext(
+                    time.milliseconds(),

Review Comment:
   You are right; if the init method executes for a long time, all records will 
have the same timestamp. That’s an excellent point. 
   
   I would say that using the same timestamp for every record stored during the 
init method execution is not very accurate.
   I like your idea of providing a function that returns the current timestamp, 
but I believe we should add a specialized class (let’s say 
`InitProcessorRecordContext`) that extends from ProcessorRecordContext, so the 
normal record processing won’t have any impact because we will use the 
InitProcessorRecordContext just for the init method. 



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##########
@@ -1104,11 +1104,18 @@ private void initializeTopology() {
         // initialize the task by initializing all its processor nodes in the 
topology
         log.trace("Initializing processor nodes of the topology");
         for (final ProcessorNode<?, ?, ?, ?> node : topology.processors()) {
-            processorContext.setCurrentNode(node);
+            final ProcessorRecordContext recordContext = new 
ProcessorRecordContext(
+                    time.milliseconds(),

Review Comment:
   I'm thinking in something like this:
   ```java
   public class InitProcessorRecordContext extends ProcessorRecordContext {
   ....
       public InitProcessorRecordContext(Time time) {
           super(time.milliseconds(), -1, -1, null, new RecordHeaders());
           this.time = time;
       }
   
       @Override
       public long timestamp() {
           return time.milliseconds();
       }
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to