lucasbru commented on code in PR #15000:
URL: https://github.com/apache/kafka/pull/15000#discussion_r1450646214


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -760,14 +765,19 @@ public void commitAsync(Map<TopicPartition, 
OffsetAndMetadata> offsets, OffsetCo
             // waiting for a response.
             CompletableFuture<Void> future = commit(offsets, false, 
Optional.empty());
             future.whenComplete((r, t) -> {
+
+                if (t == null) {
+                    
offsetCommitCallbackInvoker.submitCommitInterceptors(offsets);
+                }
+
                 if (callback == null) {
                     if (t != null) {
                         log.error("Offset commit with offsets {} failed", 
offsets, t);
                     }
                     return;
                 }
 
-                invoker.submit(new OffsetCommitCallbackTask(callback, offsets, 
(Exception) t));
+                offsetCommitCallbackInvoker.submitUserCallback(callback, 
offsets, (Exception) t);

Review Comment:
   Refactored so that `OffsetCommitCallbackTask` can be an private class inside 
the invoker.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1902,65 +1913,14 @@ private void maybeThrowFencedInstanceException() {
     }
 
     private void maybeInvokeCommitCallbacks() {
-        if (callbacks() > 0) {
-            invoker.executeCallbacks();
+        if (offsetCommitCallbackInvoker.executeCallbacks()) {
+            isFenced = true;
         }
     }
 
-    // Visible for testing
-    int callbacks() {
-        return invoker.callbackQueue.size();
-    }
-
     // Visible for testing
     SubscriptionState subscriptions() {
         return subscriptions;
     }
 
-    /**
-     * Utility class that helps the application thread to invoke user 
registered {@link OffsetCommitCallback}. This is
-     * achieved by having the background thread register a {@link 
OffsetCommitCallbackTask} to the invoker upon the
-     * future completion, and execute the callbacks when user 
polls/commits/closes the consumer.
-     */
-    private class OffsetCommitCallbackInvoker {

Review Comment:
   Moved to a separate file since it's going to be shared across threads.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to