ableegoldman commented on a change in pull request #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r737016054



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -280,6 +328,13 @@ public void handleAssignment(final Map<TaskId, 
Set<TopicPartition>> activeTasks,
                      "\tExisting standby tasks: {}",
                  activeTasks.keySet(), standbyTasks.keySet(), activeTaskIds(), 
standbyTaskIds());
 
+        final AtomicReference<RuntimeException> activeTasksCommitException = 
new AtomicReference<>(null);
+        commitActiveTasks(activeTasks, activeTasksCommitException);

Review comment:
       I recommend moving this to after the `// first rectify all existing 
tasks` and `// close and recycle those tasks` sections (and the exception 
handling section that comes after that), to make sure we first clean up/clear 
out any tasks that are going to be closed anyways. Then you should be able to 
simplify `commitActiveTasks`, for example you only need to loop through the 
`activeTasks` argument since this now covers all still-assigned active tasks. 
   
   Actually, maybe it would make sense to avoid doing all this in a standalone 
method, and instead try to do some of this bookkeeping of which/whether any 
tasks need to be committed while we iterate through all the `tasks` in the `// 
first rectify all existing tasks` section. Does that make sense?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to