guozhangwang commented on a change in pull request #8900: URL: https://github.com/apache/kafka/pull/8900#discussion_r443237917
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -679,58 +675,75 @@ private void cleanupTask(final Task task) { void shutdown(final boolean clean) { final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null); - final Set<Task> tasksToClose = new HashSet<>(); + final Set<Task> tasksToCloseClean = new HashSet<>(); + final Set<Task> tasksToCloseDirty = new HashSet<>(); final Set<Task> tasksToCommit = new HashSet<>(); final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>(); - for (final Task task : tasks.values()) { - if (clean) { + if (clean) { + for (final Task task : tasks.values()) { try { task.suspend(); if (task.commitNeeded()) { - tasksToCommit.add(task); final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit(); + tasksToCommit.add(task); if (task.isActive()) { consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets); } } - tasksToClose.add(task); + tasksToCloseClean.add(task); } catch (final TaskMigratedException e) { // just ignore the exception as it doesn't matter during shutdown - closeTaskDirty(task); + tasksToCloseDirty.add(task); } catch (final RuntimeException e) { firstException.compareAndSet(null, e); - closeTaskDirty(task); + tasksToCloseDirty.add(task); } - } else { - closeTaskDirty(task); } - } - try { - if (clean) { - commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask); - for (final Task task : tasksToCommit) { - try { - task.postCommit(); - } catch (final RuntimeException e) { - log.error("Exception caught while post-committing task " + task.id(), e); - firstException.compareAndSet(null, e); - } - } + // If any active tasks have to be clsoed dirty and can't be committed, none of them can be + if (!filterActive(tasksToCloseDirty).isEmpty()) { + tasksToCloseClean.removeAll(filterActive(tasksToCommit)); + tasksToCommit.removeAll(filterActive(tasksToCommit)); + tasksToCloseDirty.addAll(activeTaskIterable()); Review comment: You mean `they can be closed clean` right? In that case I'd agree :) Realized that we are still closing standby as clean if one of the active task is causing all other active tasks to be closed dirty. The looping over all tasks above is a bit confusing to me. Maybe a subjective, nit suggestion here is to first loop over active, and then loop over standby and in the second loop we do not need the one-spoils-all logic anymore. Although it is a bit duplicated code it would make logic cleaner. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org