mjsax commented on code in PR #19164:
URL: https://github.com/apache/kafka/pull/19164#discussion_r1986158617


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1163,10 +1166,12 @@ void handleRevocation(final Collection<TopicPartition> 
revokedPartitions) {
         // as such we just need to skip those dirty tasks in the checkpoint
         final Set<Task> dirtyTasks = new HashSet<>();
         try {
-            // in handleRevocation we must call commitOffsetsOrTransaction() 
directly rather than
-            // commitAndFillInConsumedOffsetsAndMetadataPerTaskMap() to make 
sure we don't skip the
-            // offset commit because we are in a rebalance
-            taskExecutor.commitOffsetsOrTransaction(consumedOffsetsPerTask);
+            if (revokedTasksNeedCommit) {

Review Comment:
   Not sure if I can follow?
   
   Your proposal would say, we stop committing if a TX is in-flight, but we do 
want to commit for this case, right? Even if offset-map is empty.
   
   And moving it to the outer-most context seems not to be "correct", because 
checking if a TX is inflight for the ALOS case seems unnecessary (guess it 
would not be wrong, because the call would just always return `false` so not 
really changing anything effectively, but it seems unnecessary to change the 
code) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to