lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1385059304


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -843,7 +845,15 @@ private void updatePatternSubscription(Cluster cluster) {
     @Override
     public void unsubscribe() {
         fetchBuffer.retainAll(Collections.emptySet());
-        subscriptions.unsubscribe();
+        UnsubscribeApplicationEvent unsubscribeApplicationEvent = new 
UnsubscribeApplicationEvent();
+        applicationEventHandler.add(unsubscribeApplicationEvent);
+        unsubscribeApplicationEvent.future().whenComplete((result, error) -> {
+            if (error != null) {
+                // Callback failed - Keeping same exception message thrown by 
the legacy consumer
+                throw new KafkaException("User rebalance callback throws an 
error", error);

Review Comment:
   Agree that the unsubscribe blocks on the callbacks, but since we don't have 
the implementation for how callbacks are going to be executed on this PR this 
unsubscribe is still not using it. It should come when we nail the 
implementation details on the follow-up PR (will depend on how we end up doing 
it, maybe blocking not here, but on the subscribe/unsubscribe events)
   
   As for the exception, it will originate in the Application thread, where the 
callback is executed, and it should only be returned to the user when it calls 
poll, to maintain the current behaviour, so I removed it from here to stay 
consistent and leave all logic related to the callback execution out, I see the 
confusion introduced.
   
   The when complete should stay because it represents a concept we need, 
un-related to callbacks: we do need to update the `subscriptionState` only when 
the Unsubscribe event completes (HB to leave group sent to the broker). We need 
it know to make sure we are able to run unsubscribe, no callbacks, sending 
leave group, and clearing up the subscription state after sending the request. 
   
   Trying to leave it in a consistent state with no callbacks. Follow-up PR 
should introduce implementation for executing them, blocking appropriately on 
the execution, and throwing the exceptions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to