guozhangwang commented on a change in pull request #8248:
URL: https://github.com/apache/kafka/pull/8248#discussion_r429694775



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##########
@@ -109,7 +94,7 @@ public void logChange(final String storeName,
                           final long timestamp) {
         throwUnsupportedOperationExceptionIfStandby("logChange");
         // Sending null headers to changelog topics (KIP-244)
-        collector.send(
+        streamTask.recordCollector().send(

Review comment:
       maybe we can refactor the interfaces to `transitToActive` and 
`transitToStandby` such that:
   
   * transitToActive: takes the streamTask (in the future I'd suggest we just 
pass in the record-collector after removing schedule and refactored 
request-commit) and the cache as parameters, check the current state manager's 
taskType is active.
   * transitToStandby: takes no parameter, reset streamTask and cache, check 
the current state manager's taskType is active.
   

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
##########
@@ -189,38 +190,52 @@ private void prepareClose(final boolean clean) {
     @Override
     public void closeClean(final Map<TopicPartition, Long> checkpoint) {
         Objects.requireNonNull(checkpoint);
-        close(true);
+        close(true, false);
 
         log.info("Closed clean");
     }
 
     @Override
     public void closeDirty() {
-        close(false);
+        close(false, false);
 
         log.info("Closed dirty");
     }
 
-    private void close(final boolean clean) {
+    @Override
+    public void closeAndRecycleState() {
+        prepareClose(true);
+        close(true, true);

Review comment:
       nit: I'd suggest we have a separate `recycle()` function rather than 
piggy-backing on `close` with additional flag, since the `clean` is always 
true, i.e. we would always just 1) write checkpoints, 2) call 
stateMgr.recycle(), 3) then transit to CLOSED and if it is not in CREATED / 
RUNNING then do nothing. Though it has some duplicated lines but it is a bit 
straight-forward.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -482,7 +488,8 @@ public void closeDirty() {
      * </pre>
      */
     private void close(final boolean clean,
-                       final Map<TopicPartition, Long> checkpoint) {
+                       final Map<TopicPartition, Long> checkpoint,

Review comment:
       Ditto here, I'd suggest add `cycle` as a separate function.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
##########
@@ -46,7 +46,6 @@
     private final SessionKeySchema keySchema;
     private final SegmentedCacheFunction cacheFunction;
     private String cacheName;
-    private ThreadCache cache;

Review comment:
       nit: I feel it is not necessary to remove the extra reference on the 
cache and then call `context.cache()` every time we need it, but I think this 
is really a very very nit point so your call.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##########
@@ -42,48 +41,36 @@
 import static 
org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.getReadWriteStore;
 
 public class ProcessorContextImpl extends AbstractProcessorContext implements 
RecordCollector.Supplier {
-    // The below are both null for standby tasks
-    private final StreamTask streamTask;
-    private final RecordCollector collector;
+    private StreamTask streamTask;     // always null for standby tasks
 
     private final ToInternal toInternal = new ToInternal();
     private final static To SEND_TO_ALL = To.all();
 
     final Map<String, String> storeToChangelogTopic = new HashMap<>();
 
-    ProcessorContextImpl(final TaskId id,
-                         final StreamTask streamTask,
-                         final StreamsConfig config,
-                         final RecordCollector collector,
-                         final ProcessorStateManager stateMgr,
-                         final StreamsMetricsImpl metrics,
-                         final ThreadCache cache) {
+    public ProcessorContextImpl(final TaskId id,
+                                final StreamsConfig config,
+                                final ProcessorStateManager stateMgr,
+                                final StreamsMetricsImpl metrics,
+                                final ThreadCache cache) {
         super(id, config, metrics, stateMgr, cache);
-        this.streamTask = streamTask;
-        this.collector = collector;
+    }
 
-        if (streamTask == null && taskType() == TaskType.ACTIVE) {
-            throw new IllegalStateException("Tried to create context for 
active task but the streamtask was null");
-        }
+    @Override
+    public void transitionTaskType(final TaskType newType,
+                                   final ThreadCache cache) {
+        this.cache = cache;

Review comment:
       newType is not needed since it is overridden via state manager?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##########
@@ -42,48 +41,36 @@
 import static 
org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.getReadWriteStore;
 
 public class ProcessorContextImpl extends AbstractProcessorContext implements 
RecordCollector.Supplier {
-    // The below are both null for standby tasks
-    private final StreamTask streamTask;
-    private final RecordCollector collector;
+    private StreamTask streamTask;     // always null for standby tasks
 
     private final ToInternal toInternal = new ToInternal();
     private final static To SEND_TO_ALL = To.all();
 
     final Map<String, String> storeToChangelogTopic = new HashMap<>();
 
-    ProcessorContextImpl(final TaskId id,
-                         final StreamTask streamTask,
-                         final StreamsConfig config,
-                         final RecordCollector collector,
-                         final ProcessorStateManager stateMgr,
-                         final StreamsMetricsImpl metrics,
-                         final ThreadCache cache) {
+    public ProcessorContextImpl(final TaskId id,
+                                final StreamsConfig config,
+                                final ProcessorStateManager stateMgr,
+                                final StreamsMetricsImpl metrics,
+                                final ThreadCache cache) {
         super(id, config, metrics, stateMgr, cache);
-        this.streamTask = streamTask;
-        this.collector = collector;
+    }
 
-        if (streamTask == null && taskType() == TaskType.ACTIVE) {
-            throw new IllegalStateException("Tried to create context for 
active task but the streamtask was null");
-        }
+    @Override
+    public void transitionTaskType(final TaskType newType,
+                                   final ThreadCache cache) {
+        this.cache = cache;

Review comment:
       Thinking about this a bit more, seems we do not need `transition` but 
just a `registerCache`? Could this be consolidated with the function below?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##########
@@ -42,48 +41,36 @@
 import static 
org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.getReadWriteStore;
 
 public class ProcessorContextImpl extends AbstractProcessorContext implements 
RecordCollector.Supplier {
-    // The below are both null for standby tasks
-    private final StreamTask streamTask;
-    private final RecordCollector collector;
+    private StreamTask streamTask;     // always null for standby tasks
 
     private final ToInternal toInternal = new ToInternal();
     private final static To SEND_TO_ALL = To.all();
 
     final Map<String, String> storeToChangelogTopic = new HashMap<>();
 
-    ProcessorContextImpl(final TaskId id,
-                         final StreamTask streamTask,
-                         final StreamsConfig config,
-                         final RecordCollector collector,
-                         final ProcessorStateManager stateMgr,
-                         final StreamsMetricsImpl metrics,
-                         final ThreadCache cache) {
+    public ProcessorContextImpl(final TaskId id,
+                                final StreamsConfig config,
+                                final ProcessorStateManager stateMgr,
+                                final StreamsMetricsImpl metrics,
+                                final ThreadCache cache) {
         super(id, config, metrics, stateMgr, cache);
-        this.streamTask = streamTask;
-        this.collector = collector;
+    }
 
-        if (streamTask == null && taskType() == TaskType.ACTIVE) {
-            throw new IllegalStateException("Tried to create context for 
active task but the streamtask was null");
-        }
+    @Override
+    public void transitionTaskType(final TaskType newType,
+                                   final ThreadCache cache) {
+        this.cache = cache;
+        streamTask = null;
     }
 
-    ProcessorContextImpl(final TaskId id,
-                         final StreamsConfig config,
-                         final ProcessorStateManager stateMgr,
-                         final StreamsMetricsImpl metrics) {
-        this(
-            id,
-            null,
-            config,
-            null,
-            stateMgr,
-            metrics,
-            new ThreadCache(
-                new LogContext(String.format("stream-thread [%s] ", 
Thread.currentThread().getName())),
-                0,
-                metrics
-            )
-        );
+    @Override
+    public void registerNewTask(final Task task) {

Review comment:
       Not a comment for this PR: since in 2.6+ we always try to commit all 
tasks when user request commits via context, we do not need to maintain the 
flag per-task, but per-thread. And then when we removed the deprecated schedule 
function we can remove the `stream-task` reference inside.
   
   Can we add a TODO marker to do that in the future?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##########
@@ -425,12 +449,14 @@ public void flush() {
 
     /**
      * {@link StateStore#close() Close} all stores (even in case of failure).
-     * Log all exception and re-throw the first exception that did occur at 
the end.
+     * Log all exceptions and re-throw the first exception that occurred at 
the end.
      *
      * @throws ProcessorStateException if any error happens when closing the 
state stores
      */
     @Override
     public void close() throws ProcessorStateException {
+        unregisterAllStoresWithChangelogReader();
+

Review comment:
       nit: I think now it's better to move the debug line 464 before 
`unregisterAllStoresWithChangelogReader`.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
##########
@@ -112,6 +103,7 @@ static void closeStateManager(final Logger log,
         final AtomicReference<ProcessorStateException> firstException = new 
AtomicReference<>(null);
         try {
             if (stateDirectory.lock(id)) {
+

Review comment:
       Are these new lines intentional?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -272,6 +274,36 @@ public void handleAssignment(final Map<TaskId, 
Set<TopicPartition>> activeTasks,
             }
         }
 
+        if (taskCloseExceptions.isEmpty()) {
+            Task oldTask = null;
+            final Iterator<Task> transitioningTasksIter = 
tasksToRecycle.iterator();
+            try {
+                while (transitioningTasksIter.hasNext()) {
+                    oldTask = transitioningTasksIter.next();
+                    final Task newTask;
+                    if (oldTask.isActive()) {
+                        final Set<TopicPartition> partitions = 
standbyTasksToCreate.remove(oldTask.id());
+                        newTask = 
standbyTaskCreator.createStandbyTaskFromActive((StreamTask) oldTask, 
partitions);
+                    } else {
+                        final Set<TopicPartition> partitions = 
activeTasksToCreate.remove(oldTask.id());
+                        newTask = 
activeTaskCreator.createActiveTaskFromStandby((StandbyTask) oldTask, 
partitions, mainConsumer);
+                    }
+                    tasks.remove(oldTask.id());
+                    addNewTask(newTask);
+                    transitioningTasksIter.remove();
+                }
+            } catch (final RuntimeException e) {
+                final String uncleanMessage = String.format("Failed to recycle 
task %s cleanly. Attempting to close remaining tasks before re-throwing:", 
oldTask.id());
+                log.error(uncleanMessage, e);
+                taskCloseExceptions.put(oldTask.id(), e);
+
+                dirtyTasks.addAll(tasksToRecycle); // contains the tasks we 
have not yet tried to transition
+                dirtyTasks.addAll(tasks.values());         // contains the new 
tasks we just created

Review comment:
       This will add all current tasks, is that right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to