eduwercamacaro commented on code in PR #20403:
URL: https://github.com/apache/kafka/pull/20403#discussion_r2403334030


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##########
@@ -1104,11 +1104,18 @@ private void initializeTopology() {
         // initialize the task by initializing all its processor nodes in the 
topology
         log.trace("Initializing processor nodes of the topology");
         for (final ProcessorNode<?, ?, ?, ?> node : topology.processors()) {
-            processorContext.setCurrentNode(node);
+            final ProcessorRecordContext recordContext = new 
ProcessorRecordContext(
+                    time.milliseconds(),

Review Comment:
   I added a test in `ProcessorTopologyTest.java`. I believe that's where it 
should be. What do you think?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InitProcessorRecordContext.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.utils.Time;
+
+import java.util.Objects;
+
+public class InitProcessorRecordContext extends ProcessorRecordContext {
+
+    private final Time time;
+
+    public InitProcessorRecordContext(final Time time) {
+        super(time.milliseconds(), -1, -1, null, new RecordHeaders());
+        this.time = time;
+    }
+
+    @Override
+    public long timestamp() {
+        return time.milliseconds();
+    }
+
+    @Override
+    public boolean equals(final Object o) {

Review Comment:
   The thing is that there are some complaints from the spotbugs gradle task. 
It fails when it detects that the `equals` and `hashcode` methods are missing. 



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##########
@@ -1104,11 +1104,18 @@ private void initializeTopology() {
         // initialize the task by initializing all its processor nodes in the 
topology
         log.trace("Initializing processor nodes of the topology");
         for (final ProcessorNode<?, ?, ?, ?> node : topology.processors()) {
-            processorContext.setCurrentNode(node);
+            final ProcessorRecordContext recordContext = new 
ProcessorRecordContext(
+                    time.milliseconds(),

Review Comment:
   Also, I couldn't create a test case with the `TopologyTestDriver` that 
replicated the context problem, but then I realized that the TopologyTestDriver 
class was setting a context during setup, preventing the test to fail. I 
removed that line because TopologyDriver shouldn't. Check out my proposed 
changes on that class.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to