guozhangwang commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r442997760



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
##########
@@ -267,7 +283,17 @@ public void close() {
 
     private void checkForException() {
         if (sendException != null) {
-            throw sendException;
+            if (sendException.getCause() instanceof KafkaException
+                && sendException.getCause().getMessage().equals("Failing batch 
since transaction was aborted")) {

Review comment:
       I think we should not add this handling for now based on the conclusion 
that after one task caused `abortTxn` is called, no other tasks should ever 
call recordCollector#flush/send/close anymore right?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -522,7 +522,7 @@ private void close(final boolean clean) {
         if (clean && commitNeeded) {
             log.debug("Tried to close clean but there was pending uncommitted 
data, this means we failed to"
                           + " commit and should close as dirty instead");
-            throw new StreamsException("Tried to close dirty task as clean");

Review comment:
       This comment is worthy to be a comment on the code itself :)

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -69,7 +68,6 @@
     private final ChangelogReader changelogReader;
     private final UUID processId;
     private final String logPrefix;
-    private final StreamsMetricsImpl streamsMetrics;

Review comment:
       Hmm, it is used in `streamsMetrics.removeAllTaskLevelSensors(threadId, 
task.id().toString());` in `cleanupTask`?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -679,58 +675,75 @@ private void cleanupTask(final Task task) {
     void shutdown(final boolean clean) {
         final AtomicReference<RuntimeException> firstException = new 
AtomicReference<>(null);
 
-        final Set<Task> tasksToClose = new HashSet<>();
+        final Set<Task> tasksToCloseClean = new HashSet<>();
+        final Set<Task> tasksToCloseDirty = new HashSet<>();
         final Set<Task> tasksToCommit = new HashSet<>();
         final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> 
consumedOffsetsAndMetadataPerTask = new HashMap<>();
 
-        for (final Task task : tasks.values()) {
-            if (clean) {
+        if (clean) {
+            for (final Task task : tasks.values()) {
                 try {
                     task.suspend();
                     if (task.commitNeeded()) {
-                        tasksToCommit.add(task);
                         final Map<TopicPartition, OffsetAndMetadata> 
committableOffsets = task.prepareCommit();
+                        tasksToCommit.add(task);
                         if (task.isActive()) {
                             consumedOffsetsAndMetadataPerTask.put(task.id(), 
committableOffsets);
                         }
                     }
-                    tasksToClose.add(task);
+                    tasksToCloseClean.add(task);
                 } catch (final TaskMigratedException e) {
                     // just ignore the exception as it doesn't matter during 
shutdown
-                    closeTaskDirty(task);
+                    tasksToCloseDirty.add(task);
                 } catch (final RuntimeException e) {
                     firstException.compareAndSet(null, e);
-                    closeTaskDirty(task);
+                    tasksToCloseDirty.add(task);
                 }
-            } else {
-                closeTaskDirty(task);
             }
-        }
 
-        try {
-            if (clean) {
-                commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
-                for (final Task task : tasksToCommit) {
-                    try {
-                        task.postCommit();
-                    } catch (final RuntimeException e) {
-                        log.error("Exception caught while post-committing task 
" + task.id(), e);
-                        firstException.compareAndSet(null, e);
-                    }
-                }
+            // If any active tasks have to be clsoed dirty and can't be 
committed, none of them can be
+            if (!filterActive(tasksToCloseDirty).isEmpty()) {
+                tasksToCloseClean.removeAll(filterActive(tasksToCommit));
+                tasksToCommit.removeAll(filterActive(tasksToCommit));
+                tasksToCloseDirty.addAll(activeTaskIterable());

Review comment:
       Why only add active tasks here, not standby tasks?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to