frankvicky commented on code in PR #18737:
URL: https://github.com/apache/kafka/pull/18737#discussion_r1958983469


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -748,9 +748,14 @@ public ConsumerRecords<K, V> poll(final Duration timeout) {
             }
 
             do {
-
+                PollEvent event = new PollEvent(timer.currentTimeMs());
                 // Make sure to let the background thread know that we are 
still polling.
-                applicationEventHandler.add(new 
PollEvent(timer.currentTimeMs()));
+                // This will trigger async auto-commits of consumed positions 
when hitting
+                // the interval time or reconciling new assignments
+                applicationEventHandler.add(event);
+                // Wait for reconciliation and auto-commit to be triggered, to 
ensure all commit requests
+                // retrieve the positions to commit before proceeding with 
fetching new records
+                ConsumerUtils.getResult(event.reconcileAndAutoCommit());

Review Comment:
   Makes sense. 
   Previously, I thought it was crucial for the app thread to wait for 
reconciliation and auto-commit completion, but I didn't consider the 
possibility of a faulty background thread.
   Given that, we also need to apply this timeout to `offsetsReady`, right?
   
https://github.com/apache/kafka/blob/dee389dc88e35c99e22d9e4bb26d55df14b7df7b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L856



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to