ableegoldman commented on a change in pull request #9835:
URL: https://github.com/apache/kafka/pull/9835#discussion_r553019634



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -377,8 +370,9 @@ private void handleCloseAndRecycle(final Set<Task> 
tasksToRecycle,
         for (final Task task : tasksToCloseClean) {
             try {
                 completeTaskCloseClean(task);
-                cleanUpTaskProducer(task, taskCloseExceptions);
-                tasks.remove(task.id());
+                if (task.isActive()) {

Review comment:
       IIUC the direction you want to head in (this is not the final PR, right? 
Lots of TODOs) is to improve type safety and ultimately have most of the 
methods in `Tasks` accept specific StreamTask/StandbyTask, vs just a generic 
"Task". Is that correct? In that case it feels correct to leave it here.
   But if the ultimate goal is to make the TaskManager _more_ task-type 
agnostic, then maybe it should be in Tasks. How would you define the division 
of labor between TaskManager and Tasks?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -155,13 +151,13 @@ void handleRebalanceComplete() {
         rebalanceInProgress = false;
     }
 
-    void handleCorruption(final Map<TaskId, Collection<TopicPartition>> 
tasksWithChangelogs) throws TaskMigratedException {

Review comment:
       Why remove this?

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
##########
@@ -336,6 +340,7 @@ public void 
shouldComputeOffsetSumForUnassignedTaskWeCanLock() throws Exception
 
         replay(stateDirectory);
         taskManager.handleRebalanceStart(singleton("topic"));
+        taskManager.addTask(new StateMachineTask(taskId00, 
Collections.emptySet(), true));

Review comment:
       Doesn't this invalidate the test though? We're specifically testing 
whether we can compute offset sums for tasks that we _don't_ own

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -390,71 +384,28 @@ private void handleCloseAndRecycle(final Set<Task> 
tasksToRecycle,
         }
 
         tasksToRecycle.removeAll(tasksToCloseDirty);
-        for (final Task task : tasksToRecycle) {
+        for (final Task oldTask : tasksToRecycle) {
             final Task newTask;
             try {
-                if (task.isActive()) {
-                    final Set<TopicPartition> partitions = 
standbyTasksToCreate.remove(task.id());
-                    newTask = 
standbyTaskCreator.createStandbyTaskFromActive((StreamTask) task, partitions);
-                    cleanUpTaskProducer(task, taskCloseExceptions);
+                if (oldTask.isActive()) {
+                    final Set<TopicPartition> partitions = 
standbyTasksToCreate.remove(oldTask.id());
+                    tasks.convertActiveToStandby((StreamTask) oldTask, 
partitions, taskCloseExceptions);

Review comment:
       Much better name :)

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
##########
@@ -1514,8 +1521,6 @@ public void 
shouldOnlyCommitRevokedStandbyTaskAndPropagatePrepareCommitException
 
     @Test
     public void 
shouldSuspendAllRevokedActiveTasksAndPropagateSuspendException() {
-        setUpTaskManager(StreamThread.ProcessingMode.EXACTLY_ONCE_ALPHA);

Review comment:
       cc @abbccdda 

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -804,45 +749,42 @@ private void closeTaskDirty(final Task task) {
         } catch (final RuntimeException swallow) {
             log.error("Error suspending dirty task {} ", task.id(), swallow);
         }
-        cleanupTask(task);
+        tasks.removeTaskBeforeClosing(task.id());
         task.closeDirty();
     }
 
     private void completeTaskCloseClean(final Task task) {
-        cleanupTask(task);
+        tasks.removeTaskBeforeClosing(task.id());
         task.closeClean();
     }
 
-    // Note: this MUST be called *before* actually closing the task
-    private void cleanupTask(final Task task) {
-        for (final TopicPartition inputPartition : task.inputPartitions()) {
-            partitionToTask.remove(inputPartition);
-        }
-    }
-
     void shutdown(final boolean clean) {
         final AtomicReference<RuntimeException> firstException = new 
AtomicReference<>(null);
 
         final Set<Task> tasksToCloseDirty = new HashSet<>();
+        // TODO: change type to `StreamTask`
+        final Set<Task> activeTasks = new 
TreeSet<>(Comparator.comparing(Task::id));

Review comment:
       Would it be cleaner to just close/remove the task producer inside 
`tryCloseCleanAllActiveTasks`? Could we just close/remove the task producers 
before closing clean or do we specifically need to close them first? I don't 
remember...




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to