mjsax commented on code in PR #19164:
URL: https://github.com/apache/kafka/pull/19164#discussion_r1986165537


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1153,8 +1157,7 @@ void handleRevocation(final Collection<TopicPartition> 
revokedPartitions) {
         prepareCommitAndAddOffsetsToMap(revokedActiveTasks, 
consumedOffsetsPerTask);

Review Comment:
   Hmmm... Thinking about this again, it seems the `if` below should actually 
only apply to EOSv2 case? I believe we did actually include some task 
unnecessarily for ALOS (and older version EOSv1) case?
   
   However, changing this code below does break two tests...
   ```
   if (processingMode == EXACTLY_ONCE_V2 && revokedTasksNeedCommit) {
       prepareCommitAndAddOffsetsToMap(commitNeededActiveTasks, 
consumedOffsetsPerTask);
   }
   ```
   Tests:
   ```
   
TaskManagerTest#shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitDuringRevocationWithAlos()
   TaskManagerTest#shouldCommitAllNeededTasksOnHandleRevocation()
   ```
   At least the second test assumes we commit everything for ALOS, too. I was 
added when we added EOSv2 and unified commit logic 
(https://github.com/apache/kafka/pull/8318) -- but I cannot remember why we did 
it this way... \cc @guozhangwang @ableegoldman do you remember?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to