lianetm commented on code in PR #18737:
URL: https://github.com/apache/kafka/pull/18737#discussion_r1949663649


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -267,7 +268,7 @@ private CompletableFuture<Map<TopicPartition, 
OffsetAndMetadata>> requestAutoCom
     public void maybeAutoCommitAsync() {
         if (autoCommitEnabled() && autoCommitState.get().shouldAutoCommit()) {
             OffsetCommitRequestState requestState = createOffsetCommitRequest(
-                subscriptions.allConsumed(),
+                latestPartitionOffsets,

Review Comment:
   I guess here we could have the same situation @junrao described for the 
commit before revocation right below:
   
   > However, when we get here, it's possible that a batch of records have just 
been returned to the application thread before the first step, but those 
records haven't been processed yet. So latestPartitionOffsets is not up to date 
yet. We need to wait for the next setLatestPartitionOffsets() call to happen. 
At that point, we know any record returned to the application will have been 
processed and no more records can be given to the application. So, it's safe to 
commit the offset at that point.
   
   I wonder if we should fix here in the same way we're tackling the commit 
before revocation, by fixing **when** we auto-commit, not **what** we 
auto-commit? 
   
   Before this PR, both auto-commits (on the interval and before revocation) 
were triggered freely in the background thread when polling the managers 
(opening door for race with fetch). If we move to triggering those commits 
before fetching (when processing `PollEvent`), wouldn't we solve the 
correctness/race issue, without needing to keep any snapshot of the 
`subscriptions.allConsumed`? (and the commit sync/async triggered from the app 
solve the situation just by ensuring they get the allConsumed before returning 
control to the app thread like you already did with the `awaitOffsetsReady` 
approach)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to