showuon commented on a change in pull request #10552:
URL: https://github.com/apache/kafka/pull/10552#discussion_r615227866
##########
File path:
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -313,26 +312,24 @@ private boolean allSubscriptionsEqual(Set<String>
allTopics,
prepopulateCurrentAssignments(subscriptions, currentAssignment,
prevAssignment);
- // a mapping of all topic partitions to all consumers that can be
assigned to them
- final Map<TopicPartition, List<String>>
partition2AllPotentialConsumers = new HashMap<>();
- // a mapping of all consumers to all potential topic partitions that
can be assigned to them
- final Map<String, List<TopicPartition>>
consumer2AllPotentialPartitions = new HashMap<>();
+ // a mapping of all topics to all consumers that can be assigned to
them
+ final Map<String, List<String>> topic2AllPotentialConsumers = new
HashMap<>(partitionsPerTopic.keySet().size());
+ // a mapping of all consumers to all potential topics that can be
assigned to them
+ final Map<String, List<String>> consumer2AllPotentialTopics = new
HashMap<>(subscriptions.keySet().size());
- // initialize partition2AllPotentialConsumers and
consumer2AllPotentialPartitions in the following two for loops
+ // initialize topic2AllPotentialConsumers and
consumer2AllPotentialTopics in the following two for loops
for (Entry<String, Integer> entry: partitionsPerTopic.entrySet()) {
for (int i = 0; i < entry.getValue(); ++i)
- partition2AllPotentialConsumers.put(new
TopicPartition(entry.getKey(), i), new ArrayList<>());
+ topic2AllPotentialConsumers.put(entry.getKey(), new
ArrayList<>());
}
for (Entry<String, Subscription> entry: subscriptions.entrySet()) {
String consumerId = entry.getKey();
- consumer2AllPotentialPartitions.put(consumerId, new ArrayList<>());
+ List<String> subscribedTopics = new
ArrayList<>(entry.getValue().topics().size());
+ consumer2AllPotentialTopics.put(consumerId, subscribedTopics);
entry.getValue().topics().stream().filter(topic ->
partitionsPerTopic.get(topic) != null).forEach(topic -> {
- for (int i = 0; i < partitionsPerTopic.get(topic); ++i) {
- TopicPartition topicPartition = new TopicPartition(topic,
i);
-
consumer2AllPotentialPartitions.get(consumerId).add(topicPartition);
-
partition2AllPotentialConsumers.get(topicPartition).add(consumerId);
Review comment:
refactor 1:
We used to have 2 map `consumer2AllPotentialPartitions` and
`partition2AllPotentialConsumers`. But that would need a lot of memory here,
ex: `consumer2AllPotentialPartitions` will need 2000 map, and each map contains
1M partitions (suppose 1 million partition and 2000 consumers). But actually,
we only need to store the topics of each potential partitions/consumers, and
mapped with `partitionsPerTopic`. so I changed to `topic2AllPotentialConsumers`
and `consumer2AllPotentialTopics`. Save memory and save time.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]