mjsax commented on code in PR #20403:
URL: https://github.com/apache/kafka/pull/20403#discussion_r2373724790


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InitProcessorRecordContext.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.utils.Time;
+
+import java.util.Objects;
+
+public class InitProcessorRecordContext extends ProcessorRecordContext {
+
+    private final Time time;
+
+    public InitProcessorRecordContext(final Time time) {
+        super(time.milliseconds(), -1, -1, null, new RecordHeaders());
+        this.time = time;
+    }
+
+    @Override
+    public long timestamp() {
+        return time.milliseconds();
+    }
+
+    @Override
+    public boolean equals(final Object o) {

Review Comment:
   Seem this overwrite or `equals` is not correct? Cf 
`ProccessorRecordContext.equals()`.
   
   But given that timestamp is not "fixed" any longer, I am wondering if 
overwriting equals() does make sense to begin with?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InitProcessorRecordContext.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.utils.Time;
+
+import java.util.Objects;
+
+public class InitProcessorRecordContext extends ProcessorRecordContext {
+
+    private final Time time;
+
+    public InitProcessorRecordContext(final Time time) {
+        super(time.milliseconds(), -1, -1, null, new RecordHeaders());

Review Comment:
   ```suggestion
           super(-1L, -1, -1, null, new RecordHeaders());
   ```
   
   Or maybe even better, add `private final static long NO_TIMESTAMP = -1L;` 
and pass `NO_TIMESTAMP` here, or use `ConsumerRecord.NO_TIMESTAMP` directly ?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##########
@@ -1104,11 +1104,12 @@ private void initializeTopology() {
         // initialize the task by initializing all its processor nodes in the 
topology
         log.trace("Initializing processor nodes of the topology");
         for (final ProcessorNode<?, ?, ?, ?> node : topology.processors()) {
-            processorContext.setCurrentNode(node);
+            final InitProcessorRecordContext initContext = new 
InitProcessorRecordContext(time);
+            updateProcessorContext(node, time.milliseconds(), initContext);

Review Comment:
   Looking into `updateProcessorContext()` it does set "system time" to a fixed 
value. We did discuss this originally, if it make sense to have a static ts, or 
a moving one, and though we want a moving one.
   
   But if system time is not moving either, I am wondering if we really need 
it, or should fall back to just getting system time once, and set on the 
context. 🤔 
   
   Or would we also need to update `ProcessorRecordContext` to return a 
"dynamic" system time? We did opt to return a static one, as it is not for free 
to call `System.currentTimeMillis()` in general, so we want to cache the result 
and reuse to avoid unnecessary overhead. If this was "good enough" so far, it 
might also be "good enough" during initialization, too?
   
   Thoughts? (Sorry for the back and forth...)



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##########
@@ -1104,11 +1104,12 @@ private void initializeTopology() {
         // initialize the task by initializing all its processor nodes in the 
topology
         log.trace("Initializing processor nodes of the topology");
         for (final ProcessorNode<?, ?, ?, ?> node : topology.processors()) {
-            processorContext.setCurrentNode(node);
+            final InitProcessorRecordContext initContext = new 
InitProcessorRecordContext(time);
+            updateProcessorContext(node, time.milliseconds(), initContext);
             try {
                 node.init(processorContext, processingExceptionHandler);
             } finally {
-                processorContext.setCurrentNode(null);
+                updateProcessorContext(null, RecordQueue.UNKNOWN, null);

Review Comment:
   Might be better to use `ConsumerRecord.NO_TIMESTAMP` directly instead of 
`RecordQueue.UNKNOWN` ?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InitProcessorRecordContext.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.utils.Time;
+
+import java.util.Objects;
+
+public class InitProcessorRecordContext extends ProcessorRecordContext {
+
+    private final Time time;
+
+    public InitProcessorRecordContext(final Time time) {
+        super(time.milliseconds(), -1, -1, null, new RecordHeaders());
+        this.time = time;
+    }
+
+    @Override
+    public long timestamp() {
+        return time.milliseconds();
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) {
+            return true;
+        }
+        return o != null && getClass() == o.getClass();
+    }
+
+    @Override
+    @Deprecated
+    public int hashCode() {

Review Comment:
   I don't think we should overwrite `hashCode()` at all, as the it's invalid 
to use (cf `ProcessorRecordContext.hashCode()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to