philipnee commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1383786483


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -843,7 +845,15 @@ private void updatePatternSubscription(Cluster cluster) {
     @Override
     public void unsubscribe() {
         fetchBuffer.retainAll(Collections.emptySet());
-        subscriptions.unsubscribe();
+        UnsubscribeApplicationEvent unsubscribeApplicationEvent = new 
UnsubscribeApplicationEvent();
+        applicationEventHandler.add(unsubscribeApplicationEvent);
+        unsubscribeApplicationEvent.future().whenComplete((result, error) -> {
+            if (error != null) {
+                // Callback failed - Keeping same exception message thrown by 
the legacy consumer
+                throw new KafkaException("User rebalance callback throws an 
error", error);

Review Comment:
   Looking at the current consumer, I think it can be quite complicated in this 
implementation.  Maybe what we should do is to invoke the listener on the spot, 
then send an even to the background thread to leave group.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to