mjsax commented on code in PR #20403:
URL: https://github.com/apache/kafka/pull/20403#discussion_r2299384167


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##########
@@ -1104,11 +1104,18 @@ private void initializeTopology() {
         // initialize the task by initializing all its processor nodes in the 
topology
         log.trace("Initializing processor nodes of the topology");
         for (final ProcessorNode<?, ?, ?, ?> node : topology.processors()) {
-            processorContext.setCurrentNode(node);
+            final ProcessorRecordContext recordContext = new 
ProcessorRecordContext(
+                    time.milliseconds(),

Review Comment:
   Not sure if this would be what we want? We would set a single TS, and if 
`init()` runs for a longer period of time, all records would get the same TS?
   
   We might need to change `ProcessorRecordContext` to have a "time function" 
instead of a hard-coded value? For a record at hand, the time function would be 
`() -> record.timestamp()` and return the same value each time, and for this 
dummy context, we would use `() -> time.milliseconds()`.
   
   But need to think about this more, and figure out what design we really want 
to get to, and what the splash radius of changing `ProcessorRecordContext` 
would be :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to