guozhangwang commented on a change in pull request #10552:
URL: https://github.com/apache/kafka/pull/10552#discussion_r625409879



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -307,32 +306,35 @@ private boolean allSubscriptionsEqual(Set<String> 
allTopics,
      */
     private Map<String, List<TopicPartition>> generalAssign(Map<String, 
Integer> partitionsPerTopic,
                                                             Map<String, 
Subscription> subscriptions) {
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("performing general assign. 
partitionsPerTopic: %s, subscriptions: %s",
+                partitionsPerTopic, subscriptions));
+        }
+
         Map<String, List<TopicPartition>> currentAssignment = new HashMap<>();
         Map<TopicPartition, ConsumerGenerationPair> prevAssignment = new 
HashMap<>();
         partitionMovements = new PartitionMovements();
 
         prepopulateCurrentAssignments(subscriptions, currentAssignment, 
prevAssignment);
 
-        // a mapping of all topic partitions to all consumers that can be 
assigned to them
-        final Map<TopicPartition, List<String>> 
partition2AllPotentialConsumers = new HashMap<>();
-        // a mapping of all consumers to all potential topic partitions that 
can be assigned to them
-        final Map<String, List<TopicPartition>> 
consumer2AllPotentialPartitions = new HashMap<>();
+        // a mapping of all topics to all consumers that can be assigned to 
them
+        final Map<String, List<String>> topic2AllPotentialConsumers = new 
HashMap<>(partitionsPerTopic.keySet().size());
+        // a mapping of all consumers to all potential topics that can be 
assigned to them
+        final Map<String, List<String>> consumer2AllPotentialTopics = new 
HashMap<>(subscriptions.keySet().size());
 
-        // initialize partition2AllPotentialConsumers and 
consumer2AllPotentialPartitions in the following two for loops
+        // initialize topic2AllPotentialConsumers and 
consumer2AllPotentialTopics in the following two for loops
         for (Entry<String, Integer> entry: partitionsPerTopic.entrySet()) {
             for (int i = 0; i < entry.getValue(); ++i)
-                partition2AllPotentialConsumers.put(new 
TopicPartition(entry.getKey(), i), new ArrayList<>());
+                topic2AllPotentialConsumers.put(entry.getKey(), new 
ArrayList<>());

Review comment:
       Hmm is this right? Wouldn't we put the same empty list for the key N 
times?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -387,58 +403,125 @@ private boolean allSubscriptionsEqual(Set<String> 
allTopics,
         TreeSet<String> sortedCurrentSubscriptions = new TreeSet<>(new 
SubscriptionComparator(currentAssignment));
         sortedCurrentSubscriptions.addAll(currentAssignment.keySet());
 
+        int totalPartitionCount = 
partitionsPerTopic.values().stream().reduce(0, Integer::sum);
+
         balance(currentAssignment, prevAssignment, sortedPartitions, 
unassignedPartitions, sortedCurrentSubscriptions,
-            consumer2AllPotentialPartitions, partition2AllPotentialConsumers, 
currentPartitionConsumer, revocationRequired);
+            consumer2AllPotentialTopics, topic2AllPotentialConsumers, 
currentPartitionConsumer, revocationRequired,
+            partitionsPerTopic, totalPartitionCount);
+
+        if (log.isDebugEnabled()) {
+            log.debug("final assignment: " + currentAssignment);
+        }
+
         return currentAssignment;
     }
 
+    /**
+     * get the unassigned partition list by computing the difference set of 
the sortedPartitions(all partitions)
+     * and sortedToBeRemovedPartitions. We use two pointers technique here:
+     *
+     * We loop the sortedPartition, and compare the ith element in sorted 
toBeRemovedPartitions(i start from 0):
+     *   - if not equal to the ith element, add to unassignedPartitions
+     *   - if equal to the the ith element, get next element from 
sortedToBeRemovedPartitions
+     *
+     * @param sortedPartitions: sorted all partitions
+     * @param sortedToBeRemovedPartitions: sorted partitions, all are included 
in the sortedPartitions
+     * @return the partitions don't assign to any current consumers
+     */
+    private List<TopicPartition> getUnassignedPartitions(List<TopicPartition> 
sortedPartitions,
+                                                         List<TopicPartition> 
sortedToBeRemovedPartitions) {
+        List<TopicPartition> unassignedPartitions = new ArrayList<>();
+
+        int index = 0;
+        boolean shouldAddDirectly = false;
+        int sizeToBeRemovedPartitions = sortedToBeRemovedPartitions.size();
+        TopicPartition nextPartition = sortedToBeRemovedPartitions.get(index);
+        for (TopicPartition topicPartition : sortedPartitions) {
+            if (shouldAddDirectly || !nextPartition.equals(topicPartition)) {
+                unassignedPartitions.add(topicPartition);
+            } else {
+                // equal case, don't add to unassignedPartitions, just get 
next partition
+                if (index < sizeToBeRemovedPartitions - 1) {
+                    nextPartition = sortedToBeRemovedPartitions.get(++index);
+                } else {
+                    // add the remaining directly since there is no more 
toBeRemovedPartitions
+                    shouldAddDirectly = true;
+                }
+            }
+        }
+        return unassignedPartitions;
+    }
+
+    /**
+     * update the prevAssignment with the partitions, consumer and generation 
in parameters
+     *
+     * @param partitions: The partitions to be updated the prevAssignement
+     * @param consumer: The consumer Id
+     * @param prevAssignment: The assignment contains the assignment with the 
2nd largest generation
+     * @param generation: The generation of this assignment (partitions)
+     */
+    private void updatePrevAssignment(Map<TopicPartition, 
ConsumerGenerationPair> prevAssignment,
+                                      List<TopicPartition> partitions,
+                                      String consumer,
+                                      int generation) {
+        for (TopicPartition partition: partitions) {
+            ConsumerGenerationPair consumerGeneration = 
prevAssignment.get(partition);
+            if (consumerGeneration != null) {
+                // only keep the latest previous assignment
+                if (generation > consumerGeneration.generation)
+                    prevAssignment.put(partition, new 
ConsumerGenerationPair(consumer, generation));
+            } else {
+                prevAssignment.put(partition, new 
ConsumerGenerationPair(consumer, generation));
+            }
+        }
+    }
+
+    /**
+     * filling in the currentAssignment and prevAssignment from the 
subscriptions.
+     *
+     * @param subscriptions: Map from the member id to their respective topic 
subscription
+     * @param currentAssignment: The assignment contains the assignments with 
the largest generation
+     * @param prevAssignment: The assignment contains the assignment with the 
2nd largest generation
+     */
     private void prepopulateCurrentAssignments(Map<String, Subscription> 
subscriptions,
                                                Map<String, 
List<TopicPartition>> currentAssignment,
                                                Map<TopicPartition, 
ConsumerGenerationPair> prevAssignment) {
         // we need to process subscriptions' user data with each consumer's 
reported generation in mind
         // higher generations overwrite lower generations in case of a conflict
         // note that a conflict could exists only if user data is for 
different generations
 
-        // for each partition we create a sorted map of its consumers by 
generation
-        Map<TopicPartition, TreeMap<Integer, String>> 
sortedPartitionConsumersByGeneration = new HashMap<>();
+        Set<String> membersOfCurrentHighestGeneration = new HashSet<>();
+        int maxGeneration = DEFAULT_GENERATION;
+
         for (Map.Entry<String, Subscription> subscriptionEntry: 
subscriptions.entrySet()) {
             String consumer = subscriptionEntry.getKey();
             MemberData memberData = memberData(subscriptionEntry.getValue());
 
-            for (TopicPartition partition: memberData.partitions) {
-                if 
(sortedPartitionConsumersByGeneration.containsKey(partition)) {
-                    Map<Integer, String> consumers = 
sortedPartitionConsumersByGeneration.get(partition);
-                    if (memberData.generation.isPresent() && 
consumers.containsKey(memberData.generation.get())) {
-                        // same partition is assigned to two consumers during 
the same rebalance.
-                        // log a warning and skip this record
-                        log.warn("Partition '{}' is assigned to multiple 
consumers following sticky assignment generation {}.",
-                            partition, memberData.generation);
-                    } else
-                        
consumers.put(memberData.generation.orElse(DEFAULT_GENERATION), consumer);
-                } else {
-                    TreeMap<Integer, String> sortedConsumers = new TreeMap<>();
-                    
sortedConsumers.put(memberData.generation.orElse(DEFAULT_GENERATION), consumer);
-                    sortedPartitionConsumersByGeneration.put(partition, 
sortedConsumers);
-                }
-            }
-        }
+            List<TopicPartition> ownedPartitions = new ArrayList<>();
+            currentAssignment.put(consumer, ownedPartitions);
 
-        // prevAssignment holds the prior ConsumerGenerationPair (before 
current) of each partition
-        // current and previous consumers are the last two consumers of each 
partition in the above sorted map
-        for (Map.Entry<TopicPartition, TreeMap<Integer, String>> 
partitionConsumersEntry: sortedPartitionConsumersByGeneration.entrySet()) {
-            TopicPartition partition = partitionConsumersEntry.getKey();
-            TreeMap<Integer, String> consumers = 
partitionConsumersEntry.getValue();
-            Iterator<Integer> it = consumers.descendingKeySet().iterator();
+            // Only consider this consumer's owned partitions as valid if it 
is a member of the current highest
+            // generation, or it's generation is not present but we have not 
seen any known generation so far
+            if (memberData.generation.isPresent() && 
memberData.generation.get() >= maxGeneration
+                || !memberData.generation.isPresent() && maxGeneration == 
DEFAULT_GENERATION) {
 
-            // let's process the current (most recent) consumer first
-            String consumer = consumers.get(it.next());
-            currentAssignment.computeIfAbsent(consumer, k -> new 
ArrayList<>());
-            currentAssignment.get(consumer).add(partition);
+                // If the current member's generation is higher, all the 
previously owned partitions are invalid
+                if (memberData.generation.isPresent() && 
memberData.generation.get() > maxGeneration) {
+                    for (String member: membersOfCurrentHighestGeneration) {
+                        List<TopicPartition> oldGenerationPartitions = 
currentAssignment.get(member);
+                        updatePrevAssignment(prevAssignment, 
oldGenerationPartitions, member, maxGeneration);
+                        oldGenerationPartitions.clear();
+                    }
+                    membersOfCurrentHighestGeneration.clear();
+                    maxGeneration = memberData.generation.get();
+                }
 
-            // now update previous assignment if any
-            if (it.hasNext()) {
-                int generation = it.next();
-                prevAssignment.put(partition, new 
ConsumerGenerationPair(consumers.get(generation), generation));
+                membersOfCurrentHighestGeneration.add(consumer);
+                ownedPartitions.addAll(memberData.partitions);
+            } else if (!memberData.generation.isPresent()) {
+                // current maxGeneration is larger than DEFAULT_GENERATION,
+                // put all partitions as DEFAULT_GENERATION into provAssignment

Review comment:
       typo: prev.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -362,23 +365,36 @@ private boolean allSubscriptionsEqual(Set<String> 
allTopics,
                 // otherwise (the consumer still exists)
                 for (Iterator<TopicPartition> partitionIter = 
entry.getValue().iterator(); partitionIter.hasNext();) {
                     TopicPartition partition = partitionIter.next();
-                    if 
(!partition2AllPotentialConsumers.containsKey(partition)) {
+                    if 
(!topic2AllPotentialConsumers.containsKey(partition.topic())) {

Review comment:
       nit: the following comment needs to be updated as well.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -527,12 +615,23 @@ private int getBalanceScore(Map<String, 
List<TopicPartition>> assignment) {
      * Sort valid partitions so they are processed in the potential 
reassignment phase in the proper order
      * that causes minimal partition movement among consumers (hence honoring 
maximal stickiness)
      *
-     * @param partition2AllPotentialConsumers a mapping of partitions to their 
potential consumers
+     * @param topic2AllPotentialConsumers a mapping of partitions to their 
potential consumers
+     * @param partitionsPerTopic The number of partitions for each subscribed 
topic
      * @return  an ascending sorted list of topic partitions based on how many 
consumers can potentially use them
      */
-    private List<TopicPartition> sortPartitions(Map<TopicPartition, 
List<String>> partition2AllPotentialConsumers) {
-        List<TopicPartition> sortedPartitions = new 
ArrayList<>(partition2AllPotentialConsumers.keySet());
-        Collections.sort(sortedPartitions, new 
PartitionComparator(partition2AllPotentialConsumers));
+    private List<TopicPartition> sortPartitions(Map<String, List<String>> 
topic2AllPotentialConsumers,
+                                                Map<String, Integer> 
partitionsPerTopic) {
+        List<TopicPartition> sortedPartitions = new ArrayList<>();
+        List<String> allTopics = new 
ArrayList<>(topic2AllPotentialConsumers.keySet());
+        Collections.sort(allTopics, new 
TopicComparator(topic2AllPotentialConsumers));
+
+        // since allTopics are sorted, we can loop through allTopics to create 
the sortedPartitions

Review comment:
       SGTM.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -387,58 +398,121 @@ private boolean allSubscriptionsEqual(Set<String> 
allTopics,
         TreeSet<String> sortedCurrentSubscriptions = new TreeSet<>(new 
SubscriptionComparator(currentAssignment));
         sortedCurrentSubscriptions.addAll(currentAssignment.keySet());
 
+        int totalPartitionCount = 
partitionsPerTopic.values().stream().reduce(0, Integer::sum);
+
         balance(currentAssignment, prevAssignment, sortedPartitions, 
unassignedPartitions, sortedCurrentSubscriptions,
-            consumer2AllPotentialPartitions, partition2AllPotentialConsumers, 
currentPartitionConsumer, revocationRequired);
+            consumer2AllPotentialTopics, topic2AllPotentialConsumers, 
currentPartitionConsumer, revocationRequired,
+            partitionsPerTopic, totalPartitionCount);
+        
         return currentAssignment;
     }
 
+    /**
+     * get the unassigned partition list by computing the difference set of 
the sortedPartitions(all partitions)
+     * and toBeRemovedPartitions. We use two pointers technique here:
+     *
+     * We loop the sortedPartition, and compare the ith element in sorted 
toBeRemovedPartitions(i start from 0):
+     *   - if not equal to the ith element, add to unassignedPartitions
+     *   - if equal to the the ith element, get next element from 
toBeRemovedPartitions
+     *
+     * @param sortedPartitions: sorted all partitions
+     * @param toBeRemovedPartitions: sorted partitions, all are included in 
the sortedPartitions
+     * @return the partitions don't assign to any current consumers
+     */
+    private List<TopicPartition> getUnassignedPartitions(List<TopicPartition> 
sortedPartitions,

Review comment:
       Sounds great!

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -387,58 +403,125 @@ private boolean allSubscriptionsEqual(Set<String> 
allTopics,
         TreeSet<String> sortedCurrentSubscriptions = new TreeSet<>(new 
SubscriptionComparator(currentAssignment));
         sortedCurrentSubscriptions.addAll(currentAssignment.keySet());
 
+        int totalPartitionCount = 
partitionsPerTopic.values().stream().reduce(0, Integer::sum);
+
         balance(currentAssignment, prevAssignment, sortedPartitions, 
unassignedPartitions, sortedCurrentSubscriptions,
-            consumer2AllPotentialPartitions, partition2AllPotentialConsumers, 
currentPartitionConsumer, revocationRequired);
+            consumer2AllPotentialTopics, topic2AllPotentialConsumers, 
currentPartitionConsumer, revocationRequired,
+            partitionsPerTopic, totalPartitionCount);
+
+        if (log.isDebugEnabled()) {
+            log.debug("final assignment: " + currentAssignment);
+        }
+
         return currentAssignment;
     }
 
+    /**
+     * get the unassigned partition list by computing the difference set of 
the sortedPartitions(all partitions)
+     * and sortedToBeRemovedPartitions. We use two pointers technique here:
+     *
+     * We loop the sortedPartition, and compare the ith element in sorted 
toBeRemovedPartitions(i start from 0):
+     *   - if not equal to the ith element, add to unassignedPartitions
+     *   - if equal to the the ith element, get next element from 
sortedToBeRemovedPartitions
+     *
+     * @param sortedPartitions: sorted all partitions
+     * @param sortedToBeRemovedPartitions: sorted partitions, all are included 
in the sortedPartitions
+     * @return the partitions don't assign to any current consumers
+     */
+    private List<TopicPartition> getUnassignedPartitions(List<TopicPartition> 
sortedPartitions,
+                                                         List<TopicPartition> 
sortedToBeRemovedPartitions) {
+        List<TopicPartition> unassignedPartitions = new ArrayList<>();
+
+        int index = 0;
+        boolean shouldAddDirectly = false;
+        int sizeToBeRemovedPartitions = sortedToBeRemovedPartitions.size();
+        TopicPartition nextPartition = sortedToBeRemovedPartitions.get(index);
+        for (TopicPartition topicPartition : sortedPartitions) {
+            if (shouldAddDirectly || !nextPartition.equals(topicPartition)) {
+                unassignedPartitions.add(topicPartition);
+            } else {
+                // equal case, don't add to unassignedPartitions, just get 
next partition
+                if (index < sizeToBeRemovedPartitions - 1) {
+                    nextPartition = sortedToBeRemovedPartitions.get(++index);
+                } else {
+                    // add the remaining directly since there is no more 
toBeRemovedPartitions
+                    shouldAddDirectly = true;
+                }
+            }
+        }
+        return unassignedPartitions;
+    }
+
+    /**
+     * update the prevAssignment with the partitions, consumer and generation 
in parameters
+     *
+     * @param partitions: The partitions to be updated the prevAssignement
+     * @param consumer: The consumer Id
+     * @param prevAssignment: The assignment contains the assignment with the 
2nd largest generation
+     * @param generation: The generation of this assignment (partitions)
+     */
+    private void updatePrevAssignment(Map<TopicPartition, 
ConsumerGenerationPair> prevAssignment,
+                                      List<TopicPartition> partitions,
+                                      String consumer,
+                                      int generation) {
+        for (TopicPartition partition: partitions) {
+            ConsumerGenerationPair consumerGeneration = 
prevAssignment.get(partition);
+            if (consumerGeneration != null) {
+                // only keep the latest previous assignment
+                if (generation > consumerGeneration.generation)
+                    prevAssignment.put(partition, new 
ConsumerGenerationPair(consumer, generation));
+            } else {
+                prevAssignment.put(partition, new 
ConsumerGenerationPair(consumer, generation));
+            }
+        }
+    }
+
+    /**
+     * filling in the currentAssignment and prevAssignment from the 
subscriptions.
+     *
+     * @param subscriptions: Map from the member id to their respective topic 
subscription
+     * @param currentAssignment: The assignment contains the assignments with 
the largest generation
+     * @param prevAssignment: The assignment contains the assignment with the 
2nd largest generation
+     */
     private void prepopulateCurrentAssignments(Map<String, Subscription> 
subscriptions,
                                                Map<String, 
List<TopicPartition>> currentAssignment,
                                                Map<TopicPartition, 
ConsumerGenerationPair> prevAssignment) {
         // we need to process subscriptions' user data with each consumer's 
reported generation in mind
         // higher generations overwrite lower generations in case of a conflict
         // note that a conflict could exists only if user data is for 
different generations
 
-        // for each partition we create a sorted map of its consumers by 
generation
-        Map<TopicPartition, TreeMap<Integer, String>> 
sortedPartitionConsumersByGeneration = new HashMap<>();
+        Set<String> membersOfCurrentHighestGeneration = new HashSet<>();
+        int maxGeneration = DEFAULT_GENERATION;
+
         for (Map.Entry<String, Subscription> subscriptionEntry: 
subscriptions.entrySet()) {
             String consumer = subscriptionEntry.getKey();
             MemberData memberData = memberData(subscriptionEntry.getValue());
 
-            for (TopicPartition partition: memberData.partitions) {
-                if 
(sortedPartitionConsumersByGeneration.containsKey(partition)) {
-                    Map<Integer, String> consumers = 
sortedPartitionConsumersByGeneration.get(partition);
-                    if (memberData.generation.isPresent() && 
consumers.containsKey(memberData.generation.get())) {
-                        // same partition is assigned to two consumers during 
the same rebalance.
-                        // log a warning and skip this record
-                        log.warn("Partition '{}' is assigned to multiple 
consumers following sticky assignment generation {}.",
-                            partition, memberData.generation);
-                    } else
-                        
consumers.put(memberData.generation.orElse(DEFAULT_GENERATION), consumer);
-                } else {
-                    TreeMap<Integer, String> sortedConsumers = new TreeMap<>();
-                    
sortedConsumers.put(memberData.generation.orElse(DEFAULT_GENERATION), consumer);
-                    sortedPartitionConsumersByGeneration.put(partition, 
sortedConsumers);
-                }
-            }
-        }
+            List<TopicPartition> ownedPartitions = new ArrayList<>();
+            currentAssignment.put(consumer, ownedPartitions);
 
-        // prevAssignment holds the prior ConsumerGenerationPair (before 
current) of each partition
-        // current and previous consumers are the last two consumers of each 
partition in the above sorted map
-        for (Map.Entry<TopicPartition, TreeMap<Integer, String>> 
partitionConsumersEntry: sortedPartitionConsumersByGeneration.entrySet()) {
-            TopicPartition partition = partitionConsumersEntry.getKey();
-            TreeMap<Integer, String> consumers = 
partitionConsumersEntry.getValue();
-            Iterator<Integer> it = consumers.descendingKeySet().iterator();
+            // Only consider this consumer's owned partitions as valid if it 
is a member of the current highest
+            // generation, or it's generation is not present but we have not 
seen any known generation so far
+            if (memberData.generation.isPresent() && 
memberData.generation.get() >= maxGeneration
+                || !memberData.generation.isPresent() && maxGeneration == 
DEFAULT_GENERATION) {
 
-            // let's process the current (most recent) consumer first
-            String consumer = consumers.get(it.next());
-            currentAssignment.computeIfAbsent(consumer, k -> new 
ArrayList<>());
-            currentAssignment.get(consumer).add(partition);
+                // If the current member's generation is higher, all the 
previously owned partitions are invalid
+                if (memberData.generation.isPresent() && 
memberData.generation.get() > maxGeneration) {
+                    for (String member: membersOfCurrentHighestGeneration) {
+                        List<TopicPartition> oldGenerationPartitions = 
currentAssignment.get(member);
+                        updatePrevAssignment(prevAssignment, 
oldGenerationPartitions, member, maxGeneration);
+                        oldGenerationPartitions.clear();
+                    }
+                    membersOfCurrentHighestGeneration.clear();
+                    maxGeneration = memberData.generation.get();
+                }
 
-            // now update previous assignment if any
-            if (it.hasNext()) {
-                int generation = it.next();
-                prevAssignment.put(partition, new 
ConsumerGenerationPair(consumers.get(generation), generation));
+                membersOfCurrentHighestGeneration.add(consumer);
+                ownedPartitions.addAll(memberData.partitions);
+            } else if (!memberData.generation.isPresent()) {
+                // current maxGeneration is larger than DEFAULT_GENERATION,
+                // put all partitions as DEFAULT_GENERATION into provAssignment

Review comment:
       I'm not 100% sure if the refactored code has the exactly same logic as 
the old code now since its branching conditions have largely changed. E.g. do 
we still detect if a partition is assigned to different consumers in a 
generation or not?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to