showuon commented on a change in pull request #11218:
URL: https://github.com/apache/kafka/pull/11218#discussion_r689370424



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -597,28 +603,32 @@ private void updateGroupSubscription(Set<String> topics) {
         // own metadata with the newly added topics so that it will not 
trigger a subsequent rebalance
         // when these topics gets updated from metadata refresh.
         //
+        // We skip the check for in-product assignors since this will not 
happen in in-product assignors.
+        //
         // TODO: this is a hack and not something we want to support long-term 
unless we push regex into the protocol
         //       we may need to modify the ConsumerPartitionAssignor API to 
better support this case.
-        Set<String> assignedTopics = new HashSet<>();
-        for (Assignment assigned : assignments.values()) {
-            for (TopicPartition tp : assigned.partitions())
-                assignedTopics.add(tp.topic());
-        }
+        if (!isInProductAssignor(assignor.name())) {
+            Set<String> assignedTopics = new HashSet<>();
+            for (Assignment assigned : assignments.values()) {
+                for (TopicPartition tp : assigned.partitions())
+                    assignedTopics.add(tp.topic());
+            }
 
-        if (!assignedTopics.containsAll(allSubscribedTopics)) {
-            Set<String> notAssignedTopics = new HashSet<>(allSubscribedTopics);
-            notAssignedTopics.removeAll(assignedTopics);
-            log.warn("The following subscribed topics are not assigned to any 
members: {} ", notAssignedTopics);
-        }
+            if (!assignedTopics.containsAll(allSubscribedTopics)) {
+                Set<String> notAssignedTopics = new 
HashSet<>(allSubscribedTopics);
+                notAssignedTopics.removeAll(assignedTopics);
+                log.warn("The following subscribed topics are not assigned to 
any members: {} ", notAssignedTopics);
+            }
 
-        if (!allSubscribedTopics.containsAll(assignedTopics)) {
-            Set<String> newlyAddedTopics = new HashSet<>(assignedTopics);
-            newlyAddedTopics.removeAll(allSubscribedTopics);
-            log.info("The following not-subscribed topics are assigned, and 
their metadata will be " +
+            if (!allSubscribedTopics.containsAll(assignedTopics)) {
+                Set<String> newlyAddedTopics = new HashSet<>(assignedTopics);
+                newlyAddedTopics.removeAll(allSubscribedTopics);
+                log.info("The following not-subscribed topics are assigned, 
and their metadata will be " +
                     "fetched from the brokers: {}", newlyAddedTopics);
 
-            allSubscribedTopics.addAll(assignedTopics);
-            updateGroupSubscription(allSubscribedTopics);
+                allSubscribedTopics.addAll(newlyAddedTopics);

Review comment:
       side fix: We can just add the `newlyAddedTopics` here because we already 
computed the it by `assignedTopics - allSubscribedTopics`. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to