ableegoldman commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r439702536



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -215,91 +215,54 @@ public void handleAssignment(final Map<TaskId, 
Set<TopicPartition>> activeTasks,
                      "\tExisting standby tasks: {}",
                  activeTasks.keySet(), standbyTasks.keySet(), activeTaskIds(), 
standbyTaskIds());
 
-        final Map<TaskId, Set<TopicPartition>> activeTasksToCreate = new 
HashMap<>(activeTasks);
-        final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate = new 
HashMap<>(standbyTasks);
-        final Set<Task> tasksToRecycle = new HashSet<>();
-
         builder.addSubscribedTopicsFromAssignment(
             
activeTasks.values().stream().flatMap(Collection::stream).collect(Collectors.toList()),
             logPrefix
         );
 
-        // first rectify all existing tasks
         final LinkedHashMap<TaskId, RuntimeException> taskCloseExceptions = 
new LinkedHashMap<>();
 
-        final Set<Task> tasksToClose = new HashSet<>();
-        final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> 
consumedOffsetsAndMetadataPerTask = new HashMap<>();
-        final Set<Task> additionalTasksForCommitting = new HashSet<>();
+        final Map<TaskId, Set<TopicPartition>> activeTasksToCreate = new 
HashMap<>(activeTasks);
+        final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate = new 
HashMap<>(standbyTasks);
+        final LinkedList<Task> tasksToClose = new LinkedList<>();
+        final Set<Task> tasksToRecycle = new HashSet<>();
         final Set<Task> dirtyTasks = new HashSet<>();
 
+        // first rectify all existing tasks
         for (final Task task : tasks.values()) {
             if (activeTasks.containsKey(task.id()) && task.isActive()) {
                 updateInputPartitionsAndResume(task, 
activeTasks.get(task.id()));
-                if (task.commitNeeded()) {
-                    additionalTasksForCommitting.add(task);
-                }
                 activeTasksToCreate.remove(task.id());
             } else if (standbyTasks.containsKey(task.id()) && 
!task.isActive()) {
                 updateInputPartitionsAndResume(task, 
standbyTasks.get(task.id()));
                 standbyTasksToCreate.remove(task.id());
-                // check for tasks that were owned previously but have changed 
active/standby status
             } else if (activeTasks.containsKey(task.id()) || 
standbyTasks.containsKey(task.id())) {
+                // check for tasks that were owned previously but have changed 
active/standby status
                 tasksToRecycle.add(task);
             } else {
-                try {
-                    task.suspend();
-                    final Map<TopicPartition, OffsetAndMetadata> 
committableOffsets = task.prepareCommit();
-
-                    tasksToClose.add(task);
-                    if (!committableOffsets.isEmpty()) {
-                        consumedOffsetsAndMetadataPerTask.put(task.id(), 
committableOffsets);
-                    }
-                } catch (final RuntimeException e) {
-                    final String uncleanMessage = String.format(
-                        "Failed to close task %s cleanly. Attempting to close 
remaining tasks before re-throwing:",
-                        task.id());
-                    log.error(uncleanMessage, e);
-                    taskCloseExceptions.put(task.id(), e);
-                    // We've already recorded the exception (which is the 
point of clean).
-                    // Now, we should go ahead and complete the close because 
a half-closed task is no good to anyone.
-                    dirtyTasks.add(task);
-                }
+                tasksToClose.add(task);
             }
         }
 
-        if (!consumedOffsetsAndMetadataPerTask.isEmpty()) {
+        for (final Task task : tasksToClose) {
             try {
-                for (final Task task : additionalTasksForCommitting) {
-                    final Map<TopicPartition, OffsetAndMetadata> 
committableOffsets = task.prepareCommit();
-                    if (!committableOffsets.isEmpty()) {
-                        consumedOffsetsAndMetadataPerTask.put(task.id(), 
committableOffsets);
+                task.suspend(); // Should be a no-op for active tasks, unless 
we hit an exception during handleRevocation

Review comment:
       Instead of checkpointing, we can check `if clean && commitNeeded & 
checkpoint != null` and then throw an exception on closeClean (which would 
result in calling `closeDirty`)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to