frankvicky commented on code in PR #22282:
URL: https://github.com/apache/kafka/pull/22282#discussion_r3247219830


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1057,81 +1057,91 @@ void handleRevocation(final Collection<TopicPartition> 
revokedPartitions) {
                          "have been cleaned up by the handleAssignment 
callback.", remainingRevokedPartitions);
         }
 
-        if (revokedTasksNeedCommit) {
-            prepareCommitAndAddOffsetsToMap(revokedActiveTasks, 
consumedOffsetsPerTask);
-            // if we need to commit any revoking task then we just commit all 
of those needed committing together
-            prepareCommitAndAddOffsetsToMap(commitNeededActiveTasks, 
consumedOffsetsPerTask);
-        }
-
-        // even if commit failed, we should still continue and complete 
suspending those tasks, so we would capture
-        // any exception and rethrow it at the end. some exceptions may be 
handled immediately and then swallowed,
-        // as such we just need to skip those dirty tasks in the checkpoint
+        // even if prepare, commit, or postCommit failed, we must still 
suspend revoked tasks and unlock,
+        // so we use try-finally to guarantee that. Exceptions are captured 
and rethrown at the end.
         final Set<Task> dirtyTasks = new 
TreeSet<>(Comparator.comparing(Task::id));
+        boolean prepareCommitSucceeded = false;
         try {
             if (revokedTasksNeedCommit) {
-                // in handleRevocation we must call 
commitOffsetsOrTransaction() directly rather than
-                // commitAndFillInConsumedOffsetsAndMetadataPerTaskMap() to 
make sure we don't skip the
-                // offset commit because we are in a rebalance
-                
taskExecutor.commitOffsetsOrTransaction(consumedOffsetsPerTask);
-            }
-        } catch (final TaskCorruptedException e) {
-            log.warn("Some tasks were corrupted when trying to commit offsets, 
these will be cleaned and revived: {}",
-                     e.corruptedTasks());
-
-            // If we hit a TaskCorruptedException it must be EOS, just handle 
the cleanup for those corrupted tasks right here
-            dirtyTasks.addAll(tasks.initializedTasks(e.corruptedTasks()));
-            closeDirtyAndRevive(dirtyTasks, true);
-        } catch (final TimeoutException e) {
-            log.warn("Timed out while trying to commit all tasks during 
revocation, these will be cleaned and revived");
-
-            // If we hit a TimeoutException it must be ALOS, just close dirty 
and revive without wiping the state
-            dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
-            closeDirtyAndRevive(dirtyTasks, false);
-        } catch (final RuntimeException e) {
-            log.error("Exception caught while committing those revoked tasks " 
+ revokedActiveTasks, e);
-            firstException.compareAndSet(null, e);
-            dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
-        }
-
-        // we enforce checkpointing upon suspending a task: if it is resumed 
later we just proceed normally, if it is
-        // going to be closed we would checkpoint by then
-        for (final Task task : revokedActiveTasks) {
-            if (!dirtyTasks.contains(task)) {
                 try {
-                    task.postCommit(true);
+                    prepareCommitAndAddOffsetsToMap(revokedActiveTasks, 
consumedOffsetsPerTask);
+                    // if we need to commit any revoking task then we just 
commit all of those needed committing together
+                    prepareCommitAndAddOffsetsToMap(commitNeededActiveTasks, 
consumedOffsetsPerTask);
+                    prepareCommitSucceeded = true;
                 } catch (final RuntimeException e) {
-                    log.error("Exception caught while post-committing task " + 
task.id(), e);
-                    maybeSetFirstException(false, maybeWrapTaskException(e, 
task.id()), firstException);
+                    log.error("Exception caught while preparing to commit 
revoked tasks " + revokedActiveTasks, e);

Review Comment:
   Please replace with parametrised message.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1057,81 +1057,91 @@ void handleRevocation(final Collection<TopicPartition> 
revokedPartitions) {
                          "have been cleaned up by the handleAssignment 
callback.", remainingRevokedPartitions);
         }
 
-        if (revokedTasksNeedCommit) {
-            prepareCommitAndAddOffsetsToMap(revokedActiveTasks, 
consumedOffsetsPerTask);
-            // if we need to commit any revoking task then we just commit all 
of those needed committing together
-            prepareCommitAndAddOffsetsToMap(commitNeededActiveTasks, 
consumedOffsetsPerTask);
-        }
-
-        // even if commit failed, we should still continue and complete 
suspending those tasks, so we would capture
-        // any exception and rethrow it at the end. some exceptions may be 
handled immediately and then swallowed,
-        // as such we just need to skip those dirty tasks in the checkpoint
+        // even if prepare, commit, or postCommit failed, we must still 
suspend revoked tasks and unlock,
+        // so we use try-finally to guarantee that. Exceptions are captured 
and rethrown at the end.
         final Set<Task> dirtyTasks = new 
TreeSet<>(Comparator.comparing(Task::id));
+        boolean prepareCommitSucceeded = false;
         try {
             if (revokedTasksNeedCommit) {
-                // in handleRevocation we must call 
commitOffsetsOrTransaction() directly rather than
-                // commitAndFillInConsumedOffsetsAndMetadataPerTaskMap() to 
make sure we don't skip the
-                // offset commit because we are in a rebalance
-                
taskExecutor.commitOffsetsOrTransaction(consumedOffsetsPerTask);
-            }
-        } catch (final TaskCorruptedException e) {
-            log.warn("Some tasks were corrupted when trying to commit offsets, 
these will be cleaned and revived: {}",
-                     e.corruptedTasks());
-
-            // If we hit a TaskCorruptedException it must be EOS, just handle 
the cleanup for those corrupted tasks right here
-            dirtyTasks.addAll(tasks.initializedTasks(e.corruptedTasks()));
-            closeDirtyAndRevive(dirtyTasks, true);
-        } catch (final TimeoutException e) {
-            log.warn("Timed out while trying to commit all tasks during 
revocation, these will be cleaned and revived");
-
-            // If we hit a TimeoutException it must be ALOS, just close dirty 
and revive without wiping the state
-            dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
-            closeDirtyAndRevive(dirtyTasks, false);
-        } catch (final RuntimeException e) {
-            log.error("Exception caught while committing those revoked tasks " 
+ revokedActiveTasks, e);
-            firstException.compareAndSet(null, e);
-            dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
-        }
-
-        // we enforce checkpointing upon suspending a task: if it is resumed 
later we just proceed normally, if it is
-        // going to be closed we would checkpoint by then
-        for (final Task task : revokedActiveTasks) {
-            if (!dirtyTasks.contains(task)) {
                 try {
-                    task.postCommit(true);
+                    prepareCommitAndAddOffsetsToMap(revokedActiveTasks, 
consumedOffsetsPerTask);
+                    // if we need to commit any revoking task then we just 
commit all of those needed committing together
+                    prepareCommitAndAddOffsetsToMap(commitNeededActiveTasks, 
consumedOffsetsPerTask);
+                    prepareCommitSucceeded = true;
                 } catch (final RuntimeException e) {
-                    log.error("Exception caught while post-committing task " + 
task.id(), e);
-                    maybeSetFirstException(false, maybeWrapTaskException(e, 
task.id()), firstException);
+                    log.error("Exception caught while preparing to commit 
revoked tasks " + revokedActiveTasks, e);
+                    maybeSetFirstException(false, e, firstException);
+                    dirtyTasks.addAll(revokedActiveTasks);
+                    dirtyTasks.addAll(commitNeededActiveTasks);
                 }
             }
-        }
 
-        if (revokedTasksNeedCommit) {
-            for (final Task task : commitNeededActiveTasks) {
+            try {
+                if (revokedTasksNeedCommit && prepareCommitSucceeded) {
+                    // in handleRevocation we must call 
commitOffsetsOrTransaction() directly rather than
+                    // commitAndFillInConsumedOffsetsAndMetadataPerTaskMap() 
to make sure we don't skip the
+                    // offset commit because we are in a rebalance
+                    
taskExecutor.commitOffsetsOrTransaction(consumedOffsetsPerTask);
+                }
+            } catch (final TaskCorruptedException e) {
+                log.warn("Some tasks were corrupted when trying to commit 
offsets, these will be cleaned and revived: {}",
+                         e.corruptedTasks());
+
+                // If we hit a TaskCorruptedException it must be EOS, just 
handle the cleanup for those corrupted tasks right here
+                dirtyTasks.addAll(tasks.initializedTasks(e.corruptedTasks()));
+                closeDirtyAndRevive(dirtyTasks, true);
+            } catch (final TimeoutException e) {
+                log.warn("Timed out while trying to commit all tasks during 
revocation, these will be cleaned and revived");
+
+                // If we hit a TimeoutException it must be ALOS, just close 
dirty and revive without wiping the state
+                dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
+                closeDirtyAndRevive(dirtyTasks, false);
+            } catch (final RuntimeException e) {
+                log.error("Exception caught while committing those revoked 
tasks " + revokedActiveTasks, e);

Review Comment:
   ditto



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1057,81 +1057,91 @@ void handleRevocation(final Collection<TopicPartition> 
revokedPartitions) {
                          "have been cleaned up by the handleAssignment 
callback.", remainingRevokedPartitions);
         }
 
-        if (revokedTasksNeedCommit) {
-            prepareCommitAndAddOffsetsToMap(revokedActiveTasks, 
consumedOffsetsPerTask);
-            // if we need to commit any revoking task then we just commit all 
of those needed committing together
-            prepareCommitAndAddOffsetsToMap(commitNeededActiveTasks, 
consumedOffsetsPerTask);
-        }
-
-        // even if commit failed, we should still continue and complete 
suspending those tasks, so we would capture
-        // any exception and rethrow it at the end. some exceptions may be 
handled immediately and then swallowed,
-        // as such we just need to skip those dirty tasks in the checkpoint
+        // even if prepare, commit, or postCommit failed, we must still 
suspend revoked tasks and unlock,
+        // so we use try-finally to guarantee that. Exceptions are captured 
and rethrown at the end.
         final Set<Task> dirtyTasks = new 
TreeSet<>(Comparator.comparing(Task::id));
+        boolean prepareCommitSucceeded = false;
         try {
             if (revokedTasksNeedCommit) {
-                // in handleRevocation we must call 
commitOffsetsOrTransaction() directly rather than
-                // commitAndFillInConsumedOffsetsAndMetadataPerTaskMap() to 
make sure we don't skip the
-                // offset commit because we are in a rebalance
-                
taskExecutor.commitOffsetsOrTransaction(consumedOffsetsPerTask);
-            }
-        } catch (final TaskCorruptedException e) {
-            log.warn("Some tasks were corrupted when trying to commit offsets, 
these will be cleaned and revived: {}",
-                     e.corruptedTasks());
-
-            // If we hit a TaskCorruptedException it must be EOS, just handle 
the cleanup for those corrupted tasks right here
-            dirtyTasks.addAll(tasks.initializedTasks(e.corruptedTasks()));
-            closeDirtyAndRevive(dirtyTasks, true);
-        } catch (final TimeoutException e) {
-            log.warn("Timed out while trying to commit all tasks during 
revocation, these will be cleaned and revived");
-
-            // If we hit a TimeoutException it must be ALOS, just close dirty 
and revive without wiping the state
-            dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
-            closeDirtyAndRevive(dirtyTasks, false);
-        } catch (final RuntimeException e) {
-            log.error("Exception caught while committing those revoked tasks " 
+ revokedActiveTasks, e);
-            firstException.compareAndSet(null, e);
-            dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
-        }
-
-        // we enforce checkpointing upon suspending a task: if it is resumed 
later we just proceed normally, if it is
-        // going to be closed we would checkpoint by then
-        for (final Task task : revokedActiveTasks) {
-            if (!dirtyTasks.contains(task)) {
                 try {
-                    task.postCommit(true);
+                    prepareCommitAndAddOffsetsToMap(revokedActiveTasks, 
consumedOffsetsPerTask);
+                    // if we need to commit any revoking task then we just 
commit all of those needed committing together
+                    prepareCommitAndAddOffsetsToMap(commitNeededActiveTasks, 
consumedOffsetsPerTask);
+                    prepareCommitSucceeded = true;
                 } catch (final RuntimeException e) {
-                    log.error("Exception caught while post-committing task " + 
task.id(), e);
-                    maybeSetFirstException(false, maybeWrapTaskException(e, 
task.id()), firstException);
+                    log.error("Exception caught while preparing to commit 
revoked tasks " + revokedActiveTasks, e);
+                    maybeSetFirstException(false, e, firstException);
+                    dirtyTasks.addAll(revokedActiveTasks);
+                    dirtyTasks.addAll(commitNeededActiveTasks);
                 }
             }
-        }
 
-        if (revokedTasksNeedCommit) {
-            for (final Task task : commitNeededActiveTasks) {
+            try {
+                if (revokedTasksNeedCommit && prepareCommitSucceeded) {
+                    // in handleRevocation we must call 
commitOffsetsOrTransaction() directly rather than
+                    // commitAndFillInConsumedOffsetsAndMetadataPerTaskMap() 
to make sure we don't skip the
+                    // offset commit because we are in a rebalance
+                    
taskExecutor.commitOffsetsOrTransaction(consumedOffsetsPerTask);
+                }
+            } catch (final TaskCorruptedException e) {
+                log.warn("Some tasks were corrupted when trying to commit 
offsets, these will be cleaned and revived: {}",
+                         e.corruptedTasks());
+
+                // If we hit a TaskCorruptedException it must be EOS, just 
handle the cleanup for those corrupted tasks right here
+                dirtyTasks.addAll(tasks.initializedTasks(e.corruptedTasks()));
+                closeDirtyAndRevive(dirtyTasks, true);
+            } catch (final TimeoutException e) {
+                log.warn("Timed out while trying to commit all tasks during 
revocation, these will be cleaned and revived");
+
+                // If we hit a TimeoutException it must be ALOS, just close 
dirty and revive without wiping the state
+                dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
+                closeDirtyAndRevive(dirtyTasks, false);
+            } catch (final RuntimeException e) {
+                log.error("Exception caught while committing those revoked 
tasks " + revokedActiveTasks, e);
+                maybeSetFirstException(false, e, firstException);
+                dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
+            }
+
+            // we enforce checkpointing upon suspending a task: if it is 
resumed later we just proceed normally, if it is
+            // going to be closed we would checkpoint by then
+            for (final Task task : revokedActiveTasks) {
                 if (!dirtyTasks.contains(task)) {
                     try {
-                        // for non-revoking active tasks, we should not 
enforce checkpoint
-                        // since if it is EOS enabled, no checkpoint should be 
written while
-                        // the task is in RUNNING tate
-                        task.postCommit(false);
+                        task.postCommit(true);
                     } catch (final RuntimeException e) {
                         log.error("Exception caught while post-committing task 
" + task.id(), e);
                         maybeSetFirstException(false, 
maybeWrapTaskException(e, task.id()), firstException);
                     }
                 }
             }
-        }
 
-        for (final Task task : revokedActiveTasks) {
-            try {
-                task.suspend();
-            } catch (final RuntimeException e) {
-                log.error("Caught the following exception while trying to 
suspend revoked task " + task.id(), e);
-                maybeSetFirstException(false, maybeWrapTaskException(e, 
task.id()), firstException);
+            if (revokedTasksNeedCommit) {
+                for (final Task task : commitNeededActiveTasks) {
+                    if (!dirtyTasks.contains(task)) {
+                        try {
+                            // for non-revoking active tasks, we should not 
enforce checkpoint
+                            // since if it is EOS enabled, no checkpoint 
should be written while
+                            // the task is in RUNNING tate
+                            task.postCommit(false);
+                        } catch (final RuntimeException e) {
+                            log.error("Exception caught while post-committing 
task " + task.id(), e);
+                            maybeSetFirstException(false, 
maybeWrapTaskException(e, task.id()), firstException);
+                        }
+                    }
+                }
+            }
+        } finally {
+            for (final Task task : revokedActiveTasks) {
+                try {
+                    task.suspend();
+                } catch (final RuntimeException e) {
+                    log.error("Caught the following exception while trying to 
suspend revoked task " + task.id(), e);

Review Comment:
   ditto



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1057,81 +1057,91 @@ void handleRevocation(final Collection<TopicPartition> 
revokedPartitions) {
                          "have been cleaned up by the handleAssignment 
callback.", remainingRevokedPartitions);
         }
 
-        if (revokedTasksNeedCommit) {
-            prepareCommitAndAddOffsetsToMap(revokedActiveTasks, 
consumedOffsetsPerTask);
-            // if we need to commit any revoking task then we just commit all 
of those needed committing together
-            prepareCommitAndAddOffsetsToMap(commitNeededActiveTasks, 
consumedOffsetsPerTask);
-        }
-
-        // even if commit failed, we should still continue and complete 
suspending those tasks, so we would capture
-        // any exception and rethrow it at the end. some exceptions may be 
handled immediately and then swallowed,
-        // as such we just need to skip those dirty tasks in the checkpoint
+        // even if prepare, commit, or postCommit failed, we must still 
suspend revoked tasks and unlock,
+        // so we use try-finally to guarantee that. Exceptions are captured 
and rethrown at the end.
         final Set<Task> dirtyTasks = new 
TreeSet<>(Comparator.comparing(Task::id));
+        boolean prepareCommitSucceeded = false;
         try {
             if (revokedTasksNeedCommit) {
-                // in handleRevocation we must call 
commitOffsetsOrTransaction() directly rather than
-                // commitAndFillInConsumedOffsetsAndMetadataPerTaskMap() to 
make sure we don't skip the
-                // offset commit because we are in a rebalance
-                
taskExecutor.commitOffsetsOrTransaction(consumedOffsetsPerTask);
-            }
-        } catch (final TaskCorruptedException e) {
-            log.warn("Some tasks were corrupted when trying to commit offsets, 
these will be cleaned and revived: {}",
-                     e.corruptedTasks());
-
-            // If we hit a TaskCorruptedException it must be EOS, just handle 
the cleanup for those corrupted tasks right here
-            dirtyTasks.addAll(tasks.initializedTasks(e.corruptedTasks()));
-            closeDirtyAndRevive(dirtyTasks, true);
-        } catch (final TimeoutException e) {
-            log.warn("Timed out while trying to commit all tasks during 
revocation, these will be cleaned and revived");
-
-            // If we hit a TimeoutException it must be ALOS, just close dirty 
and revive without wiping the state
-            dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
-            closeDirtyAndRevive(dirtyTasks, false);
-        } catch (final RuntimeException e) {
-            log.error("Exception caught while committing those revoked tasks " 
+ revokedActiveTasks, e);
-            firstException.compareAndSet(null, e);
-            dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
-        }
-
-        // we enforce checkpointing upon suspending a task: if it is resumed 
later we just proceed normally, if it is
-        // going to be closed we would checkpoint by then
-        for (final Task task : revokedActiveTasks) {
-            if (!dirtyTasks.contains(task)) {
                 try {
-                    task.postCommit(true);
+                    prepareCommitAndAddOffsetsToMap(revokedActiveTasks, 
consumedOffsetsPerTask);
+                    // if we need to commit any revoking task then we just 
commit all of those needed committing together
+                    prepareCommitAndAddOffsetsToMap(commitNeededActiveTasks, 
consumedOffsetsPerTask);
+                    prepareCommitSucceeded = true;
                 } catch (final RuntimeException e) {
-                    log.error("Exception caught while post-committing task " + 
task.id(), e);
-                    maybeSetFirstException(false, maybeWrapTaskException(e, 
task.id()), firstException);
+                    log.error("Exception caught while preparing to commit 
revoked tasks " + revokedActiveTasks, e);
+                    maybeSetFirstException(false, e, firstException);
+                    dirtyTasks.addAll(revokedActiveTasks);
+                    dirtyTasks.addAll(commitNeededActiveTasks);
                 }
             }
-        }
 
-        if (revokedTasksNeedCommit) {
-            for (final Task task : commitNeededActiveTasks) {
+            try {
+                if (revokedTasksNeedCommit && prepareCommitSucceeded) {
+                    // in handleRevocation we must call 
commitOffsetsOrTransaction() directly rather than
+                    // commitAndFillInConsumedOffsetsAndMetadataPerTaskMap() 
to make sure we don't skip the
+                    // offset commit because we are in a rebalance
+                    
taskExecutor.commitOffsetsOrTransaction(consumedOffsetsPerTask);
+                }
+            } catch (final TaskCorruptedException e) {
+                log.warn("Some tasks were corrupted when trying to commit 
offsets, these will be cleaned and revived: {}",
+                         e.corruptedTasks());
+
+                // If we hit a TaskCorruptedException it must be EOS, just 
handle the cleanup for those corrupted tasks right here
+                dirtyTasks.addAll(tasks.initializedTasks(e.corruptedTasks()));
+                closeDirtyAndRevive(dirtyTasks, true);
+            } catch (final TimeoutException e) {
+                log.warn("Timed out while trying to commit all tasks during 
revocation, these will be cleaned and revived");
+
+                // If we hit a TimeoutException it must be ALOS, just close 
dirty and revive without wiping the state
+                dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
+                closeDirtyAndRevive(dirtyTasks, false);
+            } catch (final RuntimeException e) {
+                log.error("Exception caught while committing those revoked 
tasks " + revokedActiveTasks, e);
+                maybeSetFirstException(false, e, firstException);
+                dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
+            }
+
+            // we enforce checkpointing upon suspending a task: if it is 
resumed later we just proceed normally, if it is
+            // going to be closed we would checkpoint by then
+            for (final Task task : revokedActiveTasks) {
                 if (!dirtyTasks.contains(task)) {
                     try {
-                        // for non-revoking active tasks, we should not 
enforce checkpoint
-                        // since if it is EOS enabled, no checkpoint should be 
written while
-                        // the task is in RUNNING tate
-                        task.postCommit(false);
+                        task.postCommit(true);
                     } catch (final RuntimeException e) {
                         log.error("Exception caught while post-committing task 
" + task.id(), e);
                         maybeSetFirstException(false, 
maybeWrapTaskException(e, task.id()), firstException);
                     }
                 }
             }
-        }
 
-        for (final Task task : revokedActiveTasks) {
-            try {
-                task.suspend();
-            } catch (final RuntimeException e) {
-                log.error("Caught the following exception while trying to 
suspend revoked task " + task.id(), e);
-                maybeSetFirstException(false, maybeWrapTaskException(e, 
task.id()), firstException);
+            if (revokedTasksNeedCommit) {
+                for (final Task task : commitNeededActiveTasks) {
+                    if (!dirtyTasks.contains(task)) {
+                        try {
+                            // for non-revoking active tasks, we should not 
enforce checkpoint
+                            // since if it is EOS enabled, no checkpoint 
should be written while
+                            // the task is in RUNNING tate
+                            task.postCommit(false);
+                        } catch (final RuntimeException e) {
+                            log.error("Exception caught while post-committing 
task " + task.id(), e);

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to