guozhangwang commented on a change in pull request #8248:
URL: https://github.com/apache/kafka/pull/8248#discussion_r431995079



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##########
@@ -42,48 +41,36 @@
 import static 
org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.getReadWriteStore;
 
 public class ProcessorContextImpl extends AbstractProcessorContext implements 
RecordCollector.Supplier {
-    // The below are both null for standby tasks
-    private final StreamTask streamTask;
-    private final RecordCollector collector;
+    private StreamTask streamTask;     // always null for standby tasks
 
     private final ToInternal toInternal = new ToInternal();
     private final static To SEND_TO_ALL = To.all();
 
     final Map<String, String> storeToChangelogTopic = new HashMap<>();
 
-    ProcessorContextImpl(final TaskId id,
-                         final StreamTask streamTask,
-                         final StreamsConfig config,
-                         final RecordCollector collector,
-                         final ProcessorStateManager stateMgr,
-                         final StreamsMetricsImpl metrics,
-                         final ThreadCache cache) {
+    public ProcessorContextImpl(final TaskId id,
+                                final StreamsConfig config,
+                                final ProcessorStateManager stateMgr,
+                                final StreamsMetricsImpl metrics,
+                                final ThreadCache cache) {
         super(id, config, metrics, stateMgr, cache);
-        this.streamTask = streamTask;
-        this.collector = collector;
+    }
 
-        if (streamTask == null && taskType() == TaskType.ACTIVE) {
-            throw new IllegalStateException("Tried to create context for 
active task but the streamtask was null");
-        }
+    @Override
+    public void transitionTaskType(final TaskType newType,
+                                   final ThreadCache cache) {
+        this.cache = cache;
+        streamTask = null;
     }
 
-    ProcessorContextImpl(final TaskId id,
-                         final StreamsConfig config,
-                         final ProcessorStateManager stateMgr,
-                         final StreamsMetricsImpl metrics) {
-        this(
-            id,
-            null,
-            config,
-            null,
-            stateMgr,
-            metrics,
-            new ThreadCache(
-                new LogContext(String.format("stream-thread [%s] ", 
Thread.currentThread().getName())),
-                0,
-                metrics
-            )
-        );
+    @Override
+    public void registerNewTask(final Task task) {

Review comment:
       Ah yes, we still need the reference of a task in the future.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to