mjsax commented on code in PR #19164: URL: https://github.com/apache/kafka/pull/19164#discussion_r1986165537
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ########## @@ -1153,8 +1157,7 @@ void handleRevocation(final Collection<TopicPartition> revokedPartitions) { prepareCommitAndAddOffsetsToMap(revokedActiveTasks, consumedOffsetsPerTask); Review Comment: Hmmm... Thinking about this again, it seems the `if` below should actually only apply to EOSv2 case? I believe we did actually include some task unnecessarily for ALOS (and older version EOSv1) case? However, changing this code below does break two tests... ``` if (processingMode == EXACTLY_ONCE_V2 && revokedTasksNeedCommit) { prepareCommitAndAddOffsetsToMap(commitNeededActiveTasks, consumedOffsetsPerTask); } ``` Tests: ``` TaskManagerTest#shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitDuringRevocationWithAlos() TaskManagerTest#shouldCommitAllNeededTasksOnHandleRevocation() ``` At least the second test assumes we commit everything for ALOS, too. I was added when we added EOSv2 and unified commit logic (https://github.com/apache/kafka/pull/8318) -- but I cannot remember why we did it this way... \cc @guozhangwang @ableegoldman do you remember? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org