mjsax commented on a change in pull request #9835:
URL: https://github.com/apache/kafka/pull/9835#discussion_r552878708
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -390,71 +384,28 @@ private void handleCloseAndRecycle(final Set<Task>
tasksToRecycle,
}
tasksToRecycle.removeAll(tasksToCloseDirty);
- for (final Task task : tasksToRecycle) {
+ for (final Task oldTask : tasksToRecycle) {
final Task newTask;
try {
- if (task.isActive()) {
- final Set<TopicPartition> partitions =
standbyTasksToCreate.remove(task.id());
- newTask =
standbyTaskCreator.createStandbyTaskFromActive((StreamTask) task, partitions);
- cleanUpTaskProducer(task, taskCloseExceptions);
+ if (oldTask.isActive()) {
+ final Set<TopicPartition> partitions =
standbyTasksToCreate.remove(oldTask.id());
+ tasks.convertActiveToStandby((StreamTask) oldTask,
partitions, taskCloseExceptions);
} else {
- final Set<TopicPartition> partitions =
activeTasksToCreate.remove(task.id());
- newTask =
activeTaskCreator.createActiveTaskFromStandby((StandbyTask) task, partitions,
mainConsumer);
+ final Set<TopicPartition> partitions =
activeTasksToCreate.remove(oldTask.id());
+ tasks.convertStandbyToActive((StandbyTask) oldTask,
partitions);
}
- tasks.remove(task.id());
- addNewTask(newTask);
} catch (final RuntimeException e) {
- final String uncleanMessage = String.format("Failed to recycle
task %s cleanly. Attempting to close remaining tasks before re-throwing:",
task.id());
+ final String uncleanMessage = String.format("Failed to recycle
task %s cleanly. Attempting to close remaining tasks before re-throwing:",
oldTask.id());
log.error(uncleanMessage, e);
- taskCloseExceptions.putIfAbsent(task.id(), e);
- tasksToCloseDirty.add(task);
+ taskCloseExceptions.putIfAbsent(oldTask.id(), e);
+ tasksToCloseDirty.add(oldTask);
}
}
// for tasks that cannot be cleanly closed or recycled, close them
dirty
for (final Task task : tasksToCloseDirty) {
closeTaskDirty(task);
- cleanUpTaskProducer(task, taskCloseExceptions);
- tasks.remove(task.id());
- }
- }
-
- private void cleanUpTaskProducer(final Task task,
Review comment:
Moved those method to the new `Tasks` container class to descope what
`TaskManager` does -- will move more methods in a follow up PR. (Just tried to
keep this PR small).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]