showuon commented on a change in pull request #10552:
URL: https://github.com/apache/kafka/pull/10552#discussion_r629994499



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -469,73 +426,184 @@ private boolean allSubscriptionsEqual(Set<String> 
allTopics,
         TreeSet<String> sortedCurrentSubscriptions = new TreeSet<>(new 
SubscriptionComparator(currentAssignment));
         sortedCurrentSubscriptions.addAll(currentAssignment.keySet());
 
-        balance(currentAssignment, prevAssignment, sortedPartitions, 
unassignedPartitions, sortedCurrentSubscriptions,
-            consumer2AllPotentialPartitions, partition2AllPotentialConsumers, 
currentPartitionConsumer, revocationRequired);
+        balance(currentAssignment, prevAssignment, sortedAllPartitions, 
unassignedPartitions, sortedCurrentSubscriptions,
+            consumer2AllPotentialTopics, topic2AllPotentialConsumers, 
currentPartitionConsumer, revocationRequired,
+            partitionsPerTopic, totalPartitionsCount);
+
+        if (log.isDebugEnabled()) {
+            log.debug("final assignment: {}", currentAssignment);
+        }
+
         return currentAssignment;
     }
 
+    /**
+     * get the unassigned partition list by computing the difference set of 
the sortedPartitions(all partitions)
+     * and sortedAssignedPartitions. If no assigned partitions, we'll just 
return all sorted topic partitions.
+     * This is used in generalAssign method
+     *
+     * We loop the sortedPartition, and compare the ith element in 
sortedAssignedPartitions(i start from 0):
+     *   - if not equal to the ith element, add to unassignedPartitions
+     *   - if equal to the the ith element, get next element from 
sortedAssignedPartitions
+     *
+     * @param sortedAllPartitions:          sorted all partitions
+     * @param sortedAssignedPartitions:     sorted partitions, all are 
included in the sortedPartitions
+     * @param topic2AllPotentialConsumers:  topics mapped to all consumers 
that subscribed to it
+     * @return                              the partitions don't assign to any 
current consumers
+     */
+    private List<TopicPartition> getUnassignedPartitions(List<TopicPartition> 
sortedAllPartitions,
+                                                         List<TopicPartition> 
sortedAssignedPartitions,
+                                                         Map<String, 
List<String>> topic2AllPotentialConsumers) {
+        if (sortedAssignedPartitions.isEmpty()) {
+            return sortedAllPartitions;
+        }
+
+        List<TopicPartition> unassignedPartitions = new ArrayList<>();
+
+        Collections.sort(sortedAssignedPartitions, new 
PartitionComparator(topic2AllPotentialConsumers));
+
+        boolean shouldAddDirectly = false;
+        Iterator<TopicPartition> sortedAssignedPartitionsIter = 
sortedAssignedPartitions.iterator();
+        TopicPartition nextAssignedPartition = 
sortedAssignedPartitionsIter.next();
+
+        for (TopicPartition topicPartition : sortedAllPartitions) {
+            if (shouldAddDirectly || 
!nextAssignedPartition.equals(topicPartition)) {
+                unassignedPartitions.add(topicPartition);
+            } else {
+                // this partition is in assignedPartitions, don't add to 
unassignedPartitions, just get next assigned partition
+                if (sortedAssignedPartitionsIter.hasNext()) {
+                    nextAssignedPartition = 
sortedAssignedPartitionsIter.next();
+                } else {
+                    // add the remaining directly since there is no more 
sortedAssignedPartitions
+                    shouldAddDirectly = true;
+                }
+            }
+        }
+        return unassignedPartitions;
+    }
+
+    /**
+     * get the unassigned partition list by computing the difference set of 
all sorted partitions
+     * and sortedAssignedPartitions. If no assigned partitions, we'll just 
return all sorted topic partitions.
+     * This is used in constrainedAssign method
+     *
+     * To compute the difference set, we use two pointers technique here:
+     *
+     * We loop through the all sorted topics, and then iterate all partitions 
the topic has,
+     * compared with the ith element in sortedAssignedPartitions(i starts from 
0):
+     *   - if not equal to the ith element, add to unassignedPartitions
+     *   - if equal to the the ith element, get next element from 
sortedAssignedPartitions
+     *
+     * @param totalPartitionsCount      all partitions counts in this 
assignment
+     * @param partitionsPerTopic        the number of partitions for each 
subscribed topic.
+     * @param sortedAssignedPartitions  sorted partitions, all are included in 
the sortedPartitions
+     * @return                          the partitions not yet assigned to any 
consumers
+     */
+    private List<TopicPartition> getUnassignedPartitions(int 
totalPartitionsCount,
+                                                         Map<String, Integer> 
partitionsPerTopic,
+                                                         List<TopicPartition> 
sortedAssignedPartitions) {
+        List<String> sortedAllTopics = new 
ArrayList<>(partitionsPerTopic.keySet());
+        // sort all topics first, then we can have sorted all topic partitions 
by adding partitions starting from 0
+        Collections.sort(sortedAllTopics);
+
+        if (sortedAssignedPartitions.isEmpty()) {
+            // no assigned partitions means all partitions are unassigned 
partitions
+            return getAllTopicPartitions(partitionsPerTopic, sortedAllTopics, 
totalPartitionsCount);
+        }
+
+        List<TopicPartition> unassignedPartitions = new 
ArrayList<>(totalPartitionsCount - sortedAssignedPartitions.size());
+
+        Collections.sort(sortedAssignedPartitions, 
Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition));
+
+        boolean shouldAddDirectly = false;
+        Iterator<TopicPartition> sortedAssignedPartitionsIter = 
sortedAssignedPartitions.iterator();
+        TopicPartition nextAssignedPartition = 
sortedAssignedPartitionsIter.next();
+
+        for (String topic : sortedAllTopics) {
+            int partitionCount = partitionsPerTopic.get(topic);
+            for (int i = 0; i < partitionCount; i++) {
+                if (shouldAddDirectly || 
!(nextAssignedPartition.topic().equals(topic) && 
nextAssignedPartition.partition() == i)) {
+                    unassignedPartitions.add(new TopicPartition(topic, i));
+                } else {
+                    // this partition is in assignedPartitions, don't add to 
unassignedPartitions, just get next assigned partition
+                    if (sortedAssignedPartitionsIter.hasNext()) {
+                        nextAssignedPartition = 
sortedAssignedPartitionsIter.next();
+                    } else {
+                        // add the remaining directly since there is no more 
sortedAssignedPartitions
+                        shouldAddDirectly = true;
+                    }
+                }
+            }
+        }
+
+        return unassignedPartitions;
+    }
+
+    /**
+     * update the prevAssignment with the partitions, consumer and generation 
in parameters
+     *
+     * @param partitions:       The partitions to be updated the 
prevAssignement
+     * @param consumer:         The consumer Id
+     * @param prevAssignment:   The assignment contains the assignment with 
the 2nd largest generation
+     * @param generation:       The generation of this assignment (partitions)
+     */
+    private void updatePrevAssignment(Map<TopicPartition, 
ConsumerGenerationPair> prevAssignment,
+                                      List<TopicPartition> partitions,
+                                      String consumer,
+                                      int generation) {
+        for (TopicPartition partition: partitions) {
+            if (prevAssignment.containsKey(partition)) {
+                // only keep the latest previous assignment
+                if (generation > prevAssignment.get(partition).generation) {
+                    prevAssignment.put(partition, new 
ConsumerGenerationPair(consumer, generation));
+                }
+            } else {
+                prevAssignment.put(partition, new 
ConsumerGenerationPair(consumer, generation));
+            }
+        }
+    }
+
+    /**
+     * filling in the prevAssignment from the subscriptions.
+     *
+     * @param subscriptions:        Map from the member id to their respective 
topic subscription
+     * @param prevAssignment:       The assignment contains the assignment 
with the 2nd largest generation
+     */
     private void prepopulateCurrentAssignments(Map<String, Subscription> 
subscriptions,
-                                               Map<String, 
List<TopicPartition>> currentAssignment,
                                                Map<TopicPartition, 
ConsumerGenerationPair> prevAssignment) {
         // we need to process subscriptions' user data with each consumer's 
reported generation in mind
         // higher generations overwrite lower generations in case of a conflict
         // note that a conflict could exists only if user data is for 
different generations
 
-        // for each partition we create a sorted map of its consumers by 
generation
-        Map<TopicPartition, TreeMap<Integer, String>> 
sortedPartitionConsumersByGeneration = new HashMap<>();
         for (Map.Entry<String, Subscription> subscriptionEntry: 
subscriptions.entrySet()) {
             String consumer = subscriptionEntry.getKey();
             MemberData memberData = memberData(subscriptionEntry.getValue());
 
-            for (TopicPartition partition: memberData.partitions) {
-                if 
(sortedPartitionConsumersByGeneration.containsKey(partition)) {
-                    Map<Integer, String> consumers = 
sortedPartitionConsumersByGeneration.get(partition);
-                    if (memberData.generation.isPresent() && 
consumers.containsKey(memberData.generation.get())) {
-                        // same partition is assigned to two consumers during 
the same rebalance.
-                        // log a warning and skip this record
-                        log.warn("Partition '{}' is assigned to multiple 
consumers following sticky assignment generation {}.",
-                            partition, memberData.generation);
-                    } else
-                        
consumers.put(memberData.generation.orElse(DEFAULT_GENERATION), consumer);
-                } else {
-                    TreeMap<Integer, String> sortedConsumers = new TreeMap<>();
-                    
sortedConsumers.put(memberData.generation.orElse(DEFAULT_GENERATION), consumer);
-                    sortedPartitionConsumersByGeneration.put(partition, 
sortedConsumers);
-                }
-            }
-        }
-
-        // prevAssignment holds the prior ConsumerGenerationPair (before 
current) of each partition
-        // current and previous consumers are the last two consumers of each 
partition in the above sorted map
-        for (Map.Entry<TopicPartition, TreeMap<Integer, String>> 
partitionConsumersEntry: sortedPartitionConsumersByGeneration.entrySet()) {
-            TopicPartition partition = partitionConsumersEntry.getKey();
-            TreeMap<Integer, String> consumers = 
partitionConsumersEntry.getValue();
-            Iterator<Integer> it = consumers.descendingKeySet().iterator();
-
-            // let's process the current (most recent) consumer first
-            String consumer = consumers.get(it.next());
-            currentAssignment.computeIfAbsent(consumer, k -> new 
ArrayList<>());
-            currentAssignment.get(consumer).add(partition);
-
-            // now update previous assignment if any
-            if (it.hasNext()) {
-                int generation = it.next();
-                prevAssignment.put(partition, new 
ConsumerGenerationPair(consumers.get(generation), generation));
+            // we already have the maxGeneration info, so just compare the 
current generation of memberData, and put into prevAssignment
+            if (memberData.generation.isPresent() && 
memberData.generation.get() < maxGeneration) {
+                // If the current member's generation is lower than 
maxGeneration, put into prevAssignment if needed
+                updatePrevAssignment(prevAssignment, memberData.partitions, 
consumer, memberData.generation.get());
+            } else if (!memberData.generation.isPresent()) {

Review comment:
       Since now, we already have ` currentAssignment` from 
`allSubscriptionsEqual` method, as well as the `maxGeneration` data, so we can 
simplify the logic here for `prevAssignment` only.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to