mjsax commented on a change in pull request #9835:
URL: https://github.com/apache/kafka/pull/9835#discussion_r553671696



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -377,8 +370,9 @@ private void handleCloseAndRecycle(final Set<Task> 
tasksToRecycle,
         for (final Task task : tasksToCloseClean) {
             try {
                 completeTaskCloseClean(task);
-                cleanUpTaskProducer(task, taskCloseExceptions);
-                tasks.remove(task.id());
+                if (task.isActive()) {

Review comment:
       Your understanding is correct. TaskManager should know if it's dealing 
with an active or standby -- in fact, I want to remove some method like 
`addRecords()` from `Task` interface and move it to `StreamTask`, too.
   
   And yes, this is just a first PR to keep the scope small.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
##########
@@ -336,6 +340,7 @@ public void 
shouldComputeOffsetSumForUnassignedTaskWeCanLock() throws Exception
 
         replay(stateDirectory);
         taskManager.handleRebalanceStart(singleton("topic"));
+        taskManager.addTask(new StateMachineTask(taskId00, 
Collections.emptySet(), true));

Review comment:
       Oh. Good point. I guess I did not pay enough attention to what this test 
actually verifies... _urgs_

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -804,45 +749,42 @@ private void closeTaskDirty(final Task task) {
         } catch (final RuntimeException swallow) {
             log.error("Error suspending dirty task {} ", task.id(), swallow);
         }
-        cleanupTask(task);
+        tasks.removeTaskBeforeClosing(task.id());
         task.closeDirty();
     }
 
     private void completeTaskCloseClean(final Task task) {
-        cleanupTask(task);
+        tasks.removeTaskBeforeClosing(task.id());
         task.closeClean();
     }
 
-    // Note: this MUST be called *before* actually closing the task
-    private void cleanupTask(final Task task) {
-        for (final TopicPartition inputPartition : task.inputPartitions()) {
-            partitionToTask.remove(inputPartition);
-        }
-    }
-
     void shutdown(final boolean clean) {
         final AtomicReference<RuntimeException> firstException = new 
AtomicReference<>(null);
 
         final Set<Task> tasksToCloseDirty = new HashSet<>();
+        // TODO: change type to `StreamTask`
+        final Set<Task> activeTasks = new 
TreeSet<>(Comparator.comparing(Task::id));

Review comment:
       Me neither. :) I actually wanted to look into this in more detail in a 
follow up PR and just added the deep-copy to get this PR into a "correct" state 
to make the tests pass. If you insist, I can try to update this PR for a proper 
cleanup, but I am not sure what the scope will be...

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -155,13 +151,13 @@ void handleRebalanceComplete() {
         rebalanceInProgress = false;
     }
 
-    void handleCorruption(final Map<TaskId, Collection<TopicPartition>> 
tasksWithChangelogs) throws TaskMigratedException {

Review comment:
       My understanding is, that we don't want to declare unchecked exception, 
but only checked exceptions (for which is mandatory). (Not my personal style, 
but most people seem to prefer it this way.)
   
   If we think it's important, we should rather add a JavaDocs comment.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to