mjsax commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r443760619



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -679,58 +675,75 @@ private void cleanupTask(final Task task) {
     void shutdown(final boolean clean) {
         final AtomicReference<RuntimeException> firstException = new 
AtomicReference<>(null);
 
-        final Set<Task> tasksToClose = new HashSet<>();
+        final Set<Task> tasksToCloseClean = new HashSet<>();
+        final Set<Task> tasksToCloseDirty = new HashSet<>();
         final Set<Task> tasksToCommit = new HashSet<>();
         final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> 
consumedOffsetsAndMetadataPerTask = new HashMap<>();
 
-        for (final Task task : tasks.values()) {
-            if (clean) {
+        if (clean) {
+            for (final Task task : tasks.values()) {
                 try {
                     task.suspend();
                     if (task.commitNeeded()) {
-                        tasksToCommit.add(task);
                         final Map<TopicPartition, OffsetAndMetadata> 
committableOffsets = task.prepareCommit();
+                        tasksToCommit.add(task);
                         if (task.isActive()) {
                             consumedOffsetsAndMetadataPerTask.put(task.id(), 
committableOffsets);
                         }
                     }
-                    tasksToClose.add(task);
+                    tasksToCloseClean.add(task);
                 } catch (final TaskMigratedException e) {
                     // just ignore the exception as it doesn't matter during 
shutdown
-                    closeTaskDirty(task);
+                    tasksToCloseDirty.add(task);
                 } catch (final RuntimeException e) {
                     firstException.compareAndSet(null, e);
-                    closeTaskDirty(task);
+                    tasksToCloseDirty.add(task);
                 }
-            } else {
-                closeTaskDirty(task);
             }
-        }
 
-        try {
-            if (clean) {
-                commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
-                for (final Task task : tasksToCommit) {
-                    try {
-                        task.postCommit();
-                    } catch (final RuntimeException e) {
-                        log.error("Exception caught while post-committing task 
" + task.id(), e);
-                        firstException.compareAndSet(null, e);
-                    }
-                }
+            // If any active tasks have to be clsoed dirty and can't be 
committed, none of them can be
+            if (!filterActive(tasksToCloseDirty).isEmpty()) {
+                tasksToCloseClean.removeAll(filterActive(tasksToCommit));
+                tasksToCommit.removeAll(filterActive(tasksToCommit));
+                tasksToCloseDirty.addAll(activeTaskIterable());
+                consumedOffsetsAndMetadataPerTask.clear();
             }
-        } catch (final RuntimeException e) {
-            log.error("Exception caught while committing tasks during 
shutdown", e);
-            firstException.compareAndSet(null, e);
-        }
 
-        for (final Task task : tasksToClose) {
             try {
-                completeTaskCloseClean(task);
+                commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
             } catch (final RuntimeException e) {
+                log.error("Exception caught while committing tasks during 
shutdown", e);
                 firstException.compareAndSet(null, e);
-                closeTaskDirty(task);
+
+                // If the commit fails, everyone who participated in it must 
be closed dirty
+                tasksToCloseDirty.addAll(filterActive(tasksToCommit));
+                tasksToCloseClean.removeAll(filterActive(tasksToCommit));
+                tasksToCommit.clear();
+            }
+
+            for (final Task task : tasksToCommit) {

Review comment:
       I guess, technically it's only required for eos-beta, but we to avoid 
too many different cases, we might just want to do the same thing for all cases?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to