showuon commented on a change in pull request #10552:
URL: https://github.com/apache/kafka/pull/10552#discussion_r629079021



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -307,32 +306,35 @@ private boolean allSubscriptionsEqual(Set<String> 
allTopics,
      */
     private Map<String, List<TopicPartition>> generalAssign(Map<String, 
Integer> partitionsPerTopic,
                                                             Map<String, 
Subscription> subscriptions) {
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("performing general assign. 
partitionsPerTopic: %s, subscriptions: %s",
+                partitionsPerTopic, subscriptions));
+        }
+
         Map<String, List<TopicPartition>> currentAssignment = new HashMap<>();
         Map<TopicPartition, ConsumerGenerationPair> prevAssignment = new 
HashMap<>();
         partitionMovements = new PartitionMovements();
 
         prepopulateCurrentAssignments(subscriptions, currentAssignment, 
prevAssignment);
 
-        // a mapping of all topic partitions to all consumers that can be 
assigned to them
-        final Map<TopicPartition, List<String>> 
partition2AllPotentialConsumers = new HashMap<>();
-        // a mapping of all consumers to all potential topic partitions that 
can be assigned to them
-        final Map<String, List<TopicPartition>> 
consumer2AllPotentialPartitions = new HashMap<>();
+        // a mapping of all topics to all consumers that can be assigned to 
them
+        final Map<String, List<String>> topic2AllPotentialConsumers = new 
HashMap<>(partitionsPerTopic.keySet().size());
+        // a mapping of all consumers to all potential topics that can be 
assigned to them
+        final Map<String, List<String>> consumer2AllPotentialTopics = new 
HashMap<>(subscriptions.keySet().size());
 
-        // initialize partition2AllPotentialConsumers and 
consumer2AllPotentialPartitions in the following two for loops
+        // initialize topic2AllPotentialConsumers and 
consumer2AllPotentialTopics in the following two for loops
         for (Entry<String, Integer> entry: partitionsPerTopic.entrySet()) {
             for (int i = 0; i < entry.getValue(); ++i)
-                partition2AllPotentialConsumers.put(new 
TopicPartition(entry.getKey(), i), new ArrayList<>());
+                topic2AllPotentialConsumers.put(entry.getKey(), new 
ArrayList<>());

Review comment:
       Nice catch! Updated. Thank you.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to