lianetm commented on code in PR #15585:
URL: https://github.com/apache/kafka/pull/15585#discussion_r1550178486


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1750,8 +1753,14 @@ private void subscribeInternal(Pattern pattern, 
Optional<ConsumerRebalanceListen
             throwIfNoAssignorsConfigured();
             log.info("Subscribed to pattern: '{}'", pattern);
             subscriptions.subscribe(pattern, listener);
-            updatePatternSubscription(metadata.fetch());
             metadata.requestUpdateForNewTopics();
+            Cluster cache = metadata.fetch();
+
+            while (cache == metadata.fetch()) {
+                log.info("Waiting for new metadata update");
+            }

Review Comment:
   hey @Phuc-Hong-Tran, when we call the `updatePatternSubscription`, we are 
passing the latest metadata already, so if we make it into the if 
[here](https://github.com/apache/kafka/blob/cf2874a2db845daaed80808ce42b50fc05584fdb/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1466)
 it means that, with the latest metadata, we discovered new topics that made 
the subscription change. So only at that point we need to create the 
`SubscriptionChangeEvent` I would say (and yes, then we also call 
`requestUpdateForNewTopics`)
   
   So echoing my first comment on this thread, seems to me that we shouldn't 
rely on any metadata object/version check here as it could not be accurate 
(removing 
[this](https://github.com/apache/kafka/blob/cf2874a2db845daaed80808ce42b50fc05584fdb/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1757-L1761)),
 and we should just make sure that we send the `SubscriptionChangeEvent` only 
when we know that the subscription changed, which is inside the 
`updatePatternSubscription` 
[if](https://github.com/apache/kafka/blob/cf2874a2db845daaed80808ce42b50fc05584fdb/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1465).
   
   Makes sense?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to