frankvicky commented on code in PR #22282:
URL: https://github.com/apache/kafka/pull/22282#discussion_r3247219830
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1057,81 +1057,91 @@ void handleRevocation(final Collection<TopicPartition>
revokedPartitions) {
"have been cleaned up by the handleAssignment
callback.", remainingRevokedPartitions);
}
- if (revokedTasksNeedCommit) {
- prepareCommitAndAddOffsetsToMap(revokedActiveTasks,
consumedOffsetsPerTask);
- // if we need to commit any revoking task then we just commit all
of those needed committing together
- prepareCommitAndAddOffsetsToMap(commitNeededActiveTasks,
consumedOffsetsPerTask);
- }
-
- // even if commit failed, we should still continue and complete
suspending those tasks, so we would capture
- // any exception and rethrow it at the end. some exceptions may be
handled immediately and then swallowed,
- // as such we just need to skip those dirty tasks in the checkpoint
+ // even if prepare, commit, or postCommit failed, we must still
suspend revoked tasks and unlock,
+ // so we use try-finally to guarantee that. Exceptions are captured
and rethrown at the end.
final Set<Task> dirtyTasks = new
TreeSet<>(Comparator.comparing(Task::id));
+ boolean prepareCommitSucceeded = false;
try {
if (revokedTasksNeedCommit) {
- // in handleRevocation we must call
commitOffsetsOrTransaction() directly rather than
- // commitAndFillInConsumedOffsetsAndMetadataPerTaskMap() to
make sure we don't skip the
- // offset commit because we are in a rebalance
-
taskExecutor.commitOffsetsOrTransaction(consumedOffsetsPerTask);
- }
- } catch (final TaskCorruptedException e) {
- log.warn("Some tasks were corrupted when trying to commit offsets,
these will be cleaned and revived: {}",
- e.corruptedTasks());
-
- // If we hit a TaskCorruptedException it must be EOS, just handle
the cleanup for those corrupted tasks right here
- dirtyTasks.addAll(tasks.initializedTasks(e.corruptedTasks()));
- closeDirtyAndRevive(dirtyTasks, true);
- } catch (final TimeoutException e) {
- log.warn("Timed out while trying to commit all tasks during
revocation, these will be cleaned and revived");
-
- // If we hit a TimeoutException it must be ALOS, just close dirty
and revive without wiping the state
- dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
- closeDirtyAndRevive(dirtyTasks, false);
- } catch (final RuntimeException e) {
- log.error("Exception caught while committing those revoked tasks "
+ revokedActiveTasks, e);
- firstException.compareAndSet(null, e);
- dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
- }
-
- // we enforce checkpointing upon suspending a task: if it is resumed
later we just proceed normally, if it is
- // going to be closed we would checkpoint by then
- for (final Task task : revokedActiveTasks) {
- if (!dirtyTasks.contains(task)) {
try {
- task.postCommit(true);
+ prepareCommitAndAddOffsetsToMap(revokedActiveTasks,
consumedOffsetsPerTask);
+ // if we need to commit any revoking task then we just
commit all of those needed committing together
+ prepareCommitAndAddOffsetsToMap(commitNeededActiveTasks,
consumedOffsetsPerTask);
+ prepareCommitSucceeded = true;
} catch (final RuntimeException e) {
- log.error("Exception caught while post-committing task " +
task.id(), e);
- maybeSetFirstException(false, maybeWrapTaskException(e,
task.id()), firstException);
+ log.error("Exception caught while preparing to commit
revoked tasks " + revokedActiveTasks, e);
Review Comment:
Please replace with parametrised message.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1057,81 +1057,91 @@ void handleRevocation(final Collection<TopicPartition>
revokedPartitions) {
"have been cleaned up by the handleAssignment
callback.", remainingRevokedPartitions);
}
- if (revokedTasksNeedCommit) {
- prepareCommitAndAddOffsetsToMap(revokedActiveTasks,
consumedOffsetsPerTask);
- // if we need to commit any revoking task then we just commit all
of those needed committing together
- prepareCommitAndAddOffsetsToMap(commitNeededActiveTasks,
consumedOffsetsPerTask);
- }
-
- // even if commit failed, we should still continue and complete
suspending those tasks, so we would capture
- // any exception and rethrow it at the end. some exceptions may be
handled immediately and then swallowed,
- // as such we just need to skip those dirty tasks in the checkpoint
+ // even if prepare, commit, or postCommit failed, we must still
suspend revoked tasks and unlock,
+ // so we use try-finally to guarantee that. Exceptions are captured
and rethrown at the end.
final Set<Task> dirtyTasks = new
TreeSet<>(Comparator.comparing(Task::id));
+ boolean prepareCommitSucceeded = false;
try {
if (revokedTasksNeedCommit) {
- // in handleRevocation we must call
commitOffsetsOrTransaction() directly rather than
- // commitAndFillInConsumedOffsetsAndMetadataPerTaskMap() to
make sure we don't skip the
- // offset commit because we are in a rebalance
-
taskExecutor.commitOffsetsOrTransaction(consumedOffsetsPerTask);
- }
- } catch (final TaskCorruptedException e) {
- log.warn("Some tasks were corrupted when trying to commit offsets,
these will be cleaned and revived: {}",
- e.corruptedTasks());
-
- // If we hit a TaskCorruptedException it must be EOS, just handle
the cleanup for those corrupted tasks right here
- dirtyTasks.addAll(tasks.initializedTasks(e.corruptedTasks()));
- closeDirtyAndRevive(dirtyTasks, true);
- } catch (final TimeoutException e) {
- log.warn("Timed out while trying to commit all tasks during
revocation, these will be cleaned and revived");
-
- // If we hit a TimeoutException it must be ALOS, just close dirty
and revive without wiping the state
- dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
- closeDirtyAndRevive(dirtyTasks, false);
- } catch (final RuntimeException e) {
- log.error("Exception caught while committing those revoked tasks "
+ revokedActiveTasks, e);
- firstException.compareAndSet(null, e);
- dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
- }
-
- // we enforce checkpointing upon suspending a task: if it is resumed
later we just proceed normally, if it is
- // going to be closed we would checkpoint by then
- for (final Task task : revokedActiveTasks) {
- if (!dirtyTasks.contains(task)) {
try {
- task.postCommit(true);
+ prepareCommitAndAddOffsetsToMap(revokedActiveTasks,
consumedOffsetsPerTask);
+ // if we need to commit any revoking task then we just
commit all of those needed committing together
+ prepareCommitAndAddOffsetsToMap(commitNeededActiveTasks,
consumedOffsetsPerTask);
+ prepareCommitSucceeded = true;
} catch (final RuntimeException e) {
- log.error("Exception caught while post-committing task " +
task.id(), e);
- maybeSetFirstException(false, maybeWrapTaskException(e,
task.id()), firstException);
+ log.error("Exception caught while preparing to commit
revoked tasks " + revokedActiveTasks, e);
+ maybeSetFirstException(false, e, firstException);
+ dirtyTasks.addAll(revokedActiveTasks);
+ dirtyTasks.addAll(commitNeededActiveTasks);
}
}
- }
- if (revokedTasksNeedCommit) {
- for (final Task task : commitNeededActiveTasks) {
+ try {
+ if (revokedTasksNeedCommit && prepareCommitSucceeded) {
+ // in handleRevocation we must call
commitOffsetsOrTransaction() directly rather than
+ // commitAndFillInConsumedOffsetsAndMetadataPerTaskMap()
to make sure we don't skip the
+ // offset commit because we are in a rebalance
+
taskExecutor.commitOffsetsOrTransaction(consumedOffsetsPerTask);
+ }
+ } catch (final TaskCorruptedException e) {
+ log.warn("Some tasks were corrupted when trying to commit
offsets, these will be cleaned and revived: {}",
+ e.corruptedTasks());
+
+ // If we hit a TaskCorruptedException it must be EOS, just
handle the cleanup for those corrupted tasks right here
+ dirtyTasks.addAll(tasks.initializedTasks(e.corruptedTasks()));
+ closeDirtyAndRevive(dirtyTasks, true);
+ } catch (final TimeoutException e) {
+ log.warn("Timed out while trying to commit all tasks during
revocation, these will be cleaned and revived");
+
+ // If we hit a TimeoutException it must be ALOS, just close
dirty and revive without wiping the state
+ dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
+ closeDirtyAndRevive(dirtyTasks, false);
+ } catch (final RuntimeException e) {
+ log.error("Exception caught while committing those revoked
tasks " + revokedActiveTasks, e);
Review Comment:
ditto
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1057,81 +1057,91 @@ void handleRevocation(final Collection<TopicPartition>
revokedPartitions) {
"have been cleaned up by the handleAssignment
callback.", remainingRevokedPartitions);
}
- if (revokedTasksNeedCommit) {
- prepareCommitAndAddOffsetsToMap(revokedActiveTasks,
consumedOffsetsPerTask);
- // if we need to commit any revoking task then we just commit all
of those needed committing together
- prepareCommitAndAddOffsetsToMap(commitNeededActiveTasks,
consumedOffsetsPerTask);
- }
-
- // even if commit failed, we should still continue and complete
suspending those tasks, so we would capture
- // any exception and rethrow it at the end. some exceptions may be
handled immediately and then swallowed,
- // as such we just need to skip those dirty tasks in the checkpoint
+ // even if prepare, commit, or postCommit failed, we must still
suspend revoked tasks and unlock,
+ // so we use try-finally to guarantee that. Exceptions are captured
and rethrown at the end.
final Set<Task> dirtyTasks = new
TreeSet<>(Comparator.comparing(Task::id));
+ boolean prepareCommitSucceeded = false;
try {
if (revokedTasksNeedCommit) {
- // in handleRevocation we must call
commitOffsetsOrTransaction() directly rather than
- // commitAndFillInConsumedOffsetsAndMetadataPerTaskMap() to
make sure we don't skip the
- // offset commit because we are in a rebalance
-
taskExecutor.commitOffsetsOrTransaction(consumedOffsetsPerTask);
- }
- } catch (final TaskCorruptedException e) {
- log.warn("Some tasks were corrupted when trying to commit offsets,
these will be cleaned and revived: {}",
- e.corruptedTasks());
-
- // If we hit a TaskCorruptedException it must be EOS, just handle
the cleanup for those corrupted tasks right here
- dirtyTasks.addAll(tasks.initializedTasks(e.corruptedTasks()));
- closeDirtyAndRevive(dirtyTasks, true);
- } catch (final TimeoutException e) {
- log.warn("Timed out while trying to commit all tasks during
revocation, these will be cleaned and revived");
-
- // If we hit a TimeoutException it must be ALOS, just close dirty
and revive without wiping the state
- dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
- closeDirtyAndRevive(dirtyTasks, false);
- } catch (final RuntimeException e) {
- log.error("Exception caught while committing those revoked tasks "
+ revokedActiveTasks, e);
- firstException.compareAndSet(null, e);
- dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
- }
-
- // we enforce checkpointing upon suspending a task: if it is resumed
later we just proceed normally, if it is
- // going to be closed we would checkpoint by then
- for (final Task task : revokedActiveTasks) {
- if (!dirtyTasks.contains(task)) {
try {
- task.postCommit(true);
+ prepareCommitAndAddOffsetsToMap(revokedActiveTasks,
consumedOffsetsPerTask);
+ // if we need to commit any revoking task then we just
commit all of those needed committing together
+ prepareCommitAndAddOffsetsToMap(commitNeededActiveTasks,
consumedOffsetsPerTask);
+ prepareCommitSucceeded = true;
} catch (final RuntimeException e) {
- log.error("Exception caught while post-committing task " +
task.id(), e);
- maybeSetFirstException(false, maybeWrapTaskException(e,
task.id()), firstException);
+ log.error("Exception caught while preparing to commit
revoked tasks " + revokedActiveTasks, e);
+ maybeSetFirstException(false, e, firstException);
+ dirtyTasks.addAll(revokedActiveTasks);
+ dirtyTasks.addAll(commitNeededActiveTasks);
}
}
- }
- if (revokedTasksNeedCommit) {
- for (final Task task : commitNeededActiveTasks) {
+ try {
+ if (revokedTasksNeedCommit && prepareCommitSucceeded) {
+ // in handleRevocation we must call
commitOffsetsOrTransaction() directly rather than
+ // commitAndFillInConsumedOffsetsAndMetadataPerTaskMap()
to make sure we don't skip the
+ // offset commit because we are in a rebalance
+
taskExecutor.commitOffsetsOrTransaction(consumedOffsetsPerTask);
+ }
+ } catch (final TaskCorruptedException e) {
+ log.warn("Some tasks were corrupted when trying to commit
offsets, these will be cleaned and revived: {}",
+ e.corruptedTasks());
+
+ // If we hit a TaskCorruptedException it must be EOS, just
handle the cleanup for those corrupted tasks right here
+ dirtyTasks.addAll(tasks.initializedTasks(e.corruptedTasks()));
+ closeDirtyAndRevive(dirtyTasks, true);
+ } catch (final TimeoutException e) {
+ log.warn("Timed out while trying to commit all tasks during
revocation, these will be cleaned and revived");
+
+ // If we hit a TimeoutException it must be ALOS, just close
dirty and revive without wiping the state
+ dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
+ closeDirtyAndRevive(dirtyTasks, false);
+ } catch (final RuntimeException e) {
+ log.error("Exception caught while committing those revoked
tasks " + revokedActiveTasks, e);
+ maybeSetFirstException(false, e, firstException);
+ dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
+ }
+
+ // we enforce checkpointing upon suspending a task: if it is
resumed later we just proceed normally, if it is
+ // going to be closed we would checkpoint by then
+ for (final Task task : revokedActiveTasks) {
if (!dirtyTasks.contains(task)) {
try {
- // for non-revoking active tasks, we should not
enforce checkpoint
- // since if it is EOS enabled, no checkpoint should be
written while
- // the task is in RUNNING tate
- task.postCommit(false);
+ task.postCommit(true);
} catch (final RuntimeException e) {
log.error("Exception caught while post-committing task
" + task.id(), e);
maybeSetFirstException(false,
maybeWrapTaskException(e, task.id()), firstException);
}
}
}
- }
- for (final Task task : revokedActiveTasks) {
- try {
- task.suspend();
- } catch (final RuntimeException e) {
- log.error("Caught the following exception while trying to
suspend revoked task " + task.id(), e);
- maybeSetFirstException(false, maybeWrapTaskException(e,
task.id()), firstException);
+ if (revokedTasksNeedCommit) {
+ for (final Task task : commitNeededActiveTasks) {
+ if (!dirtyTasks.contains(task)) {
+ try {
+ // for non-revoking active tasks, we should not
enforce checkpoint
+ // since if it is EOS enabled, no checkpoint
should be written while
+ // the task is in RUNNING tate
+ task.postCommit(false);
+ } catch (final RuntimeException e) {
+ log.error("Exception caught while post-committing
task " + task.id(), e);
+ maybeSetFirstException(false,
maybeWrapTaskException(e, task.id()), firstException);
+ }
+ }
+ }
+ }
+ } finally {
+ for (final Task task : revokedActiveTasks) {
+ try {
+ task.suspend();
+ } catch (final RuntimeException e) {
+ log.error("Caught the following exception while trying to
suspend revoked task " + task.id(), e);
Review Comment:
ditto
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1057,81 +1057,91 @@ void handleRevocation(final Collection<TopicPartition>
revokedPartitions) {
"have been cleaned up by the handleAssignment
callback.", remainingRevokedPartitions);
}
- if (revokedTasksNeedCommit) {
- prepareCommitAndAddOffsetsToMap(revokedActiveTasks,
consumedOffsetsPerTask);
- // if we need to commit any revoking task then we just commit all
of those needed committing together
- prepareCommitAndAddOffsetsToMap(commitNeededActiveTasks,
consumedOffsetsPerTask);
- }
-
- // even if commit failed, we should still continue and complete
suspending those tasks, so we would capture
- // any exception and rethrow it at the end. some exceptions may be
handled immediately and then swallowed,
- // as such we just need to skip those dirty tasks in the checkpoint
+ // even if prepare, commit, or postCommit failed, we must still
suspend revoked tasks and unlock,
+ // so we use try-finally to guarantee that. Exceptions are captured
and rethrown at the end.
final Set<Task> dirtyTasks = new
TreeSet<>(Comparator.comparing(Task::id));
+ boolean prepareCommitSucceeded = false;
try {
if (revokedTasksNeedCommit) {
- // in handleRevocation we must call
commitOffsetsOrTransaction() directly rather than
- // commitAndFillInConsumedOffsetsAndMetadataPerTaskMap() to
make sure we don't skip the
- // offset commit because we are in a rebalance
-
taskExecutor.commitOffsetsOrTransaction(consumedOffsetsPerTask);
- }
- } catch (final TaskCorruptedException e) {
- log.warn("Some tasks were corrupted when trying to commit offsets,
these will be cleaned and revived: {}",
- e.corruptedTasks());
-
- // If we hit a TaskCorruptedException it must be EOS, just handle
the cleanup for those corrupted tasks right here
- dirtyTasks.addAll(tasks.initializedTasks(e.corruptedTasks()));
- closeDirtyAndRevive(dirtyTasks, true);
- } catch (final TimeoutException e) {
- log.warn("Timed out while trying to commit all tasks during
revocation, these will be cleaned and revived");
-
- // If we hit a TimeoutException it must be ALOS, just close dirty
and revive without wiping the state
- dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
- closeDirtyAndRevive(dirtyTasks, false);
- } catch (final RuntimeException e) {
- log.error("Exception caught while committing those revoked tasks "
+ revokedActiveTasks, e);
- firstException.compareAndSet(null, e);
- dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
- }
-
- // we enforce checkpointing upon suspending a task: if it is resumed
later we just proceed normally, if it is
- // going to be closed we would checkpoint by then
- for (final Task task : revokedActiveTasks) {
- if (!dirtyTasks.contains(task)) {
try {
- task.postCommit(true);
+ prepareCommitAndAddOffsetsToMap(revokedActiveTasks,
consumedOffsetsPerTask);
+ // if we need to commit any revoking task then we just
commit all of those needed committing together
+ prepareCommitAndAddOffsetsToMap(commitNeededActiveTasks,
consumedOffsetsPerTask);
+ prepareCommitSucceeded = true;
} catch (final RuntimeException e) {
- log.error("Exception caught while post-committing task " +
task.id(), e);
- maybeSetFirstException(false, maybeWrapTaskException(e,
task.id()), firstException);
+ log.error("Exception caught while preparing to commit
revoked tasks " + revokedActiveTasks, e);
+ maybeSetFirstException(false, e, firstException);
+ dirtyTasks.addAll(revokedActiveTasks);
+ dirtyTasks.addAll(commitNeededActiveTasks);
}
}
- }
- if (revokedTasksNeedCommit) {
- for (final Task task : commitNeededActiveTasks) {
+ try {
+ if (revokedTasksNeedCommit && prepareCommitSucceeded) {
+ // in handleRevocation we must call
commitOffsetsOrTransaction() directly rather than
+ // commitAndFillInConsumedOffsetsAndMetadataPerTaskMap()
to make sure we don't skip the
+ // offset commit because we are in a rebalance
+
taskExecutor.commitOffsetsOrTransaction(consumedOffsetsPerTask);
+ }
+ } catch (final TaskCorruptedException e) {
+ log.warn("Some tasks were corrupted when trying to commit
offsets, these will be cleaned and revived: {}",
+ e.corruptedTasks());
+
+ // If we hit a TaskCorruptedException it must be EOS, just
handle the cleanup for those corrupted tasks right here
+ dirtyTasks.addAll(tasks.initializedTasks(e.corruptedTasks()));
+ closeDirtyAndRevive(dirtyTasks, true);
+ } catch (final TimeoutException e) {
+ log.warn("Timed out while trying to commit all tasks during
revocation, these will be cleaned and revived");
+
+ // If we hit a TimeoutException it must be ALOS, just close
dirty and revive without wiping the state
+ dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
+ closeDirtyAndRevive(dirtyTasks, false);
+ } catch (final RuntimeException e) {
+ log.error("Exception caught while committing those revoked
tasks " + revokedActiveTasks, e);
+ maybeSetFirstException(false, e, firstException);
+ dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
+ }
+
+ // we enforce checkpointing upon suspending a task: if it is
resumed later we just proceed normally, if it is
+ // going to be closed we would checkpoint by then
+ for (final Task task : revokedActiveTasks) {
if (!dirtyTasks.contains(task)) {
try {
- // for non-revoking active tasks, we should not
enforce checkpoint
- // since if it is EOS enabled, no checkpoint should be
written while
- // the task is in RUNNING tate
- task.postCommit(false);
+ task.postCommit(true);
} catch (final RuntimeException e) {
log.error("Exception caught while post-committing task
" + task.id(), e);
maybeSetFirstException(false,
maybeWrapTaskException(e, task.id()), firstException);
}
}
}
- }
- for (final Task task : revokedActiveTasks) {
- try {
- task.suspend();
- } catch (final RuntimeException e) {
- log.error("Caught the following exception while trying to
suspend revoked task " + task.id(), e);
- maybeSetFirstException(false, maybeWrapTaskException(e,
task.id()), firstException);
+ if (revokedTasksNeedCommit) {
+ for (final Task task : commitNeededActiveTasks) {
+ if (!dirtyTasks.contains(task)) {
+ try {
+ // for non-revoking active tasks, we should not
enforce checkpoint
+ // since if it is EOS enabled, no checkpoint
should be written while
+ // the task is in RUNNING tate
+ task.postCommit(false);
+ } catch (final RuntimeException e) {
+ log.error("Exception caught while post-committing
task " + task.id(), e);
Review Comment:
ditto
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]