lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1385059304
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ########## @@ -843,7 +845,15 @@ private void updatePatternSubscription(Cluster cluster) { @Override public void unsubscribe() { fetchBuffer.retainAll(Collections.emptySet()); - subscriptions.unsubscribe(); + UnsubscribeApplicationEvent unsubscribeApplicationEvent = new UnsubscribeApplicationEvent(); + applicationEventHandler.add(unsubscribeApplicationEvent); + unsubscribeApplicationEvent.future().whenComplete((result, error) -> { + if (error != null) { + // Callback failed - Keeping same exception message thrown by the legacy consumer + throw new KafkaException("User rebalance callback throws an error", error); Review Comment: Agree that the unsubscribe blocks on the callbacks, but since we don't have the implementation for how callbacks are going to be executed on this PR this unsubscribe is still not using it. It should come when we nail the implementation details on the follow-up PR (will depend on how we end up doing it, maybe blocking not here, but on the subscribe/unsubscribe events) As for the exception, it will originate in the Application thread, where the callback is executed, and it should only be returned to the user when it calls poll, to maintain the current behaviour, so I removed it from here to stay consistent and leave all logic related to the callback execution out, I see the confusion introduced. The when complete should stay because it represents a concept we need, un-related to callbacks: we do need to update the `subscriptionState` only when the Unsubscribe event completes (HB to leave group sent to the broker). We need it know to make sure we are able to run unsubscribe, no callbacks, sending leave group, and clearing up the subscription state after sending the request. Trying to leave it in a consistent state with no callbacks. Follow-up PR should introduce implementation for executing them, blocking appropriately on the execution, and throwing the exceptions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org