ableegoldman commented on a change in pull request #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r738872895
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -292,12 +327,20 @@ public void handleAssignment(final Map<TaskId,
Set<TopicPartition>> activeTasks,
final Set<Task> tasksToRecycle = new TreeSet<>(byId);
final Set<Task> tasksToCloseClean = new TreeSet<>(byId);
final Set<Task> tasksToCloseDirty = new TreeSet<>(byId);
+ boolean commitAssignedActiveTasks = false;
+ final Set<Task> activeTasksNeedCommit = new HashSet<>();
// first rectify all existing tasks
for (final Task task : tasks.allTasks()) {
if (activeTasks.containsKey(task.id()) && task.isActive()) {
tasks.updateInputPartitionsAndResume(task,
activeTasks.get(task.id()));
activeTasksToCreate.remove(task.id());
+ if (task.state() == State.RESTORING) {
Review comment:
Note that we're currently going through only the _existing_ tasks -- but
we want to commit only if there are _new_ tasks which will need restoring.
Unfortunately due to the task lifecycle, specifically that all tasks pass
through the `RESTORING` phase before going into `RUNNING`, it's actually
nontrivial to figure out if we're going to need to actually spend any time
restoring new tasks. As a first pass, for now (so we can get some kind of fix
into 3.1), we can just set `commitAssignedActiveTasks = true` if there are any
newly added active tasks at all.
I think that's fine for a first pass, but if you're interested in how we
could check if any of the new active tasks actually need restoring, check out
the `StoreChangelogReader` class. It will always do at least a single first
pass to confirm that any new active tasks are all caught up, so one idea would
be to just check if there are any changelogs left that need to be restored from
after doing this first pass. Actually, you may not even need to worry about
this "first pass", you can just blindly do the commit if there are any active
tasks that need to be committed and there are some tasks still in RESTORING at
the end of TaskManager#tryToCompleteRestoration. Does that make sense?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]