Guozhang Wang created KAFKA-14847:
-------------------------------------

             Summary: Separate the callers of commitAllTasks v.s. commitTasks 
for EOS(-v2) and ALOS
                 Key: KAFKA-14847
                 URL: https://issues.apache.org/jira/browse/KAFKA-14847
             Project: Kafka
          Issue Type: Improvement
          Components: streams
            Reporter: Guozhang Wang


Today, EOS-v2/v1 and ALOS shares the same internal callpath inside 
TaskManager/TaskExecutor for committing tasks from various scenarios, the call 
path {{commitTasksAndMaybeUpdateCommitableOffsets}} -> 
{{commitOffsetsOrTransaction}} takes in a list of tasks as its input, which can 
be a subset of the tasks that thread / task manager owns. For EOS-v1 / ALOS, 
this is fine to commit just a subset of the tasks; however for EOS-v1, since 
all tasks participate in the same txn it could lead to dangerous violations, 
and today we are relying on all the callers of the commit function to make sure 
that the list of tasks they passed in, under EOS-v2, would still not violate 
the semantics. As summarized today (thanks to Matthias), today that callee 
could be triggered in the following cases:

1) Inside handleRevocation() -- this is a clean path, an we add all non-revoked 
tasks with commitNeeded() flag set to the commit -- so this seems to be fine.
2) tryCloseCleanAllActiveTasks() -- here we only call it, if 
tasksToCloseDirty.isEmpty() -- so it seems fine, too.
3) commit() with a list of task handed in -- we call commit() inside the TM 
three time
3.a) inside commitAll() as commit(tasks.values()) (passing in all tasks)
3.b) inside maybeCommitActiveTasksPerUserRequested as 
commit(activeTaskIterable()); (passing in all tasks)
3.c) inside handleCorruption() -- here, we only consider RUNNING and RESTORING 
tasks, which are not corrupted -- note we only throw a TaskCorruptedException 
during restore state initialization, thus, corrupted tasks did not process 
anything yet, and all other tasks should be clean to be committed.
3.d) commitSuccessfullyProcessedTasks() -- under EOS-v2, as we just commit a 
subset of tasks' source offsets while at the same time we still commit those 
unsuccessful task's outgoing records if there are any.

Just going through this list of callers itself, as demonstrated above, is 
already pretty complex, and very vulnerable to bugs. It's better to not rely on 
the callers, but the callees to make sure that's the case. More concretely, I 
think we can introduce a new function called {{commitAllTasks}} such that under 
EOS-v2, the caller always call {{commitAllTasks}} instead, and if there are 
some tasks that should not be committed because we know they have not processed 
any data, the {{commitAllTasks}} callee itself would do some clever filtering 
internally.

Given its scope, I think it's better to do this refactoring after EOS-v1 is 
removed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to