showuon commented on a change in pull request #10552:
URL: https://github.com/apache/kafka/pull/10552#discussion_r615228815



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -362,23 +360,36 @@ private boolean allSubscriptionsEqual(Set<String> 
allTopics,
                 // otherwise (the consumer still exists)
                 for (Iterator<TopicPartition> partitionIter = 
entry.getValue().iterator(); partitionIter.hasNext();) {
                     TopicPartition partition = partitionIter.next();
-                    if 
(!partition2AllPotentialConsumers.containsKey(partition)) {
+                    if 
(!topic2AllPotentialConsumers.containsKey(partition.topic())) {
                         // if this topic partition of this consumer no longer 
exists remove it from currentAssignment of the consumer
                         partitionIter.remove();
                         currentPartitionConsumer.remove(partition);
-                    } else if 
(!subscriptions.get(entry.getKey()).topics().contains(partition.topic())) {
+                    } else if 
(!consumerSubscription.topics().contains(partition.topic())) {
                         // if this partition cannot remain assigned to its 
current consumer because the consumer
                         // is no longer subscribed to its topic remove it from 
currentAssignment of the consumer
                         partitionIter.remove();
                         revocationRequired = true;
-                    } else
+                    } else {
                         // otherwise, remove the topic partition from those 
that need to be assigned only if
                         // its current consumer is still subscribed to its 
topic (because it is already assigned
                         // and we would want to preserve that assignment as 
much as possible)
-                        unassignedPartitions.remove(partition);
+                        toBeRemovedPartitions.add(partition);
+                    }
                 }
             }
         }
+
+        // all partitions that need to be assigned
+        List<TopicPartition> unassignedPartitions;
+
+        if (!toBeRemovedPartitions.isEmpty()) {
+            Collections.sort(toBeRemovedPartitions, new 
PartitionComparator(topic2AllPotentialConsumers));
+            unassignedPartitions = getUnassignedPartitions(sortedPartitions, 
toBeRemovedPartitions);
+        } else {
+            unassignedPartitions = sortedPartitions;

Review comment:
       We use `unassignedPartitions` and `sortedPartitions` as the base list, 
so make them refer to the same list to save memory when brand-new assignment.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to