showuon commented on a change in pull request #10552:
URL: https://github.com/apache/kafka/pull/10552#discussion_r615228815
##########
File path:
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -362,23 +360,36 @@ private boolean allSubscriptionsEqual(Set<String>
allTopics,
// otherwise (the consumer still exists)
for (Iterator<TopicPartition> partitionIter =
entry.getValue().iterator(); partitionIter.hasNext();) {
TopicPartition partition = partitionIter.next();
- if
(!partition2AllPotentialConsumers.containsKey(partition)) {
+ if
(!topic2AllPotentialConsumers.containsKey(partition.topic())) {
// if this topic partition of this consumer no longer
exists remove it from currentAssignment of the consumer
partitionIter.remove();
currentPartitionConsumer.remove(partition);
- } else if
(!subscriptions.get(entry.getKey()).topics().contains(partition.topic())) {
+ } else if
(!consumerSubscription.topics().contains(partition.topic())) {
// if this partition cannot remain assigned to its
current consumer because the consumer
// is no longer subscribed to its topic remove it from
currentAssignment of the consumer
partitionIter.remove();
revocationRequired = true;
- } else
+ } else {
// otherwise, remove the topic partition from those
that need to be assigned only if
// its current consumer is still subscribed to its
topic (because it is already assigned
// and we would want to preserve that assignment as
much as possible)
- unassignedPartitions.remove(partition);
+ toBeRemovedPartitions.add(partition);
+ }
}
}
}
+
+ // all partitions that need to be assigned
+ List<TopicPartition> unassignedPartitions;
+
+ if (!toBeRemovedPartitions.isEmpty()) {
+ Collections.sort(toBeRemovedPartitions, new
PartitionComparator(topic2AllPotentialConsumers));
+ unassignedPartitions = getUnassignedPartitions(sortedPartitions,
toBeRemovedPartitions);
+ } else {
+ unassignedPartitions = sortedPartitions;
Review comment:
We use `unassignedPartitions` and `sortedPartitions` as the base list,
so make them refer to the same list to save memory when brand-new assignment.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]