frankvicky commented on code in PR #18737:
URL: https://github.com/apache/kafka/pull/18737#discussion_r1947493357


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -234,7 +239,10 @@ private void process(final AsyncCommitEvent event) {
 
         try {
             CommitRequestManager manager = 
requestManagers.commitRequestManager.get();
-            CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = 
manager.commitAsync(event.offsets());
+            Optional<Map<TopicPartition, OffsetAndMetadata>> offsets = 
event.offsets().isEmpty() ?
+                Optional.of(subscriptions.allConsumed()) : event.offsets();
+            event.markOffsetsReady();

Review Comment:
   I introduce the logic here to see if we need to invoke `allCounsumed`. After 
that, it will complete the `offsetsReady` to notify the app thread the offsets 
are up to date.
   
   It's quite nasty here; I will improve it if we agree on this solution.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to