guozhangwang commented on a change in pull request #8900: URL: https://github.com/apache/kafka/pull/8900#discussion_r443238726
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -679,58 +675,75 @@ private void cleanupTask(final Task task) { void shutdown(final boolean clean) { final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null); - final Set<Task> tasksToClose = new HashSet<>(); + final Set<Task> tasksToCloseClean = new HashSet<>(); + final Set<Task> tasksToCloseDirty = new HashSet<>(); final Set<Task> tasksToCommit = new HashSet<>(); final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>(); - for (final Task task : tasks.values()) { - if (clean) { + if (clean) { + for (final Task task : tasks.values()) { try { task.suspend(); if (task.commitNeeded()) { - tasksToCommit.add(task); final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit(); + tasksToCommit.add(task); if (task.isActive()) { consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets); } } - tasksToClose.add(task); + tasksToCloseClean.add(task); } catch (final TaskMigratedException e) { // just ignore the exception as it doesn't matter during shutdown - closeTaskDirty(task); + tasksToCloseDirty.add(task); } catch (final RuntimeException e) { firstException.compareAndSet(null, e); - closeTaskDirty(task); + tasksToCloseDirty.add(task); } - } else { - closeTaskDirty(task); } - } - try { - if (clean) { - commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask); - for (final Task task : tasksToCommit) { - try { - task.postCommit(); - } catch (final RuntimeException e) { - log.error("Exception caught while post-committing task " + task.id(), e); - firstException.compareAndSet(null, e); - } - } + // If any active tasks have to be clsoed dirty and can't be committed, none of them can be + if (!filterActive(tasksToCloseDirty).isEmpty()) { + tasksToCloseClean.removeAll(filterActive(tasksToCommit)); + tasksToCommit.removeAll(filterActive(tasksToCommit)); + tasksToCloseDirty.addAll(activeTaskIterable()); + consumedOffsetsAndMetadataPerTask.clear(); } - } catch (final RuntimeException e) { - log.error("Exception caught while committing tasks during shutdown", e); - firstException.compareAndSet(null, e); - } - for (final Task task : tasksToClose) { try { - completeTaskCloseClean(task); + commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask); } catch (final RuntimeException e) { + log.error("Exception caught while committing tasks during shutdown", e); firstException.compareAndSet(null, e); - closeTaskDirty(task); + + // If the commit fails, everyone who participated in it must be closed dirty + tasksToCloseDirty.addAll(filterActive(tasksToCommit)); + tasksToCloseClean.removeAll(filterActive(tasksToCommit)); + tasksToCommit.clear(); + } + + for (final Task task : tasksToCommit) { Review comment: Regarding `If any active tasks have to be closed dirty and can't be committed, none of them can be`, is that only for eos-beta? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org