showuon commented on a change in pull request #10552:
URL: https://github.com/apache/kafka/pull/10552#discussion_r615228600



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -387,58 +398,121 @@ private boolean allSubscriptionsEqual(Set<String> 
allTopics,
         TreeSet<String> sortedCurrentSubscriptions = new TreeSet<>(new 
SubscriptionComparator(currentAssignment));
         sortedCurrentSubscriptions.addAll(currentAssignment.keySet());
 
+        int totalPartitionCount = 
partitionsPerTopic.values().stream().reduce(0, Integer::sum);
+
         balance(currentAssignment, prevAssignment, sortedPartitions, 
unassignedPartitions, sortedCurrentSubscriptions,
-            consumer2AllPotentialPartitions, partition2AllPotentialConsumers, 
currentPartitionConsumer, revocationRequired);
+            consumer2AllPotentialTopics, topic2AllPotentialConsumers, 
currentPartitionConsumer, revocationRequired,
+            partitionsPerTopic, totalPartitionCount);
+        
         return currentAssignment;
     }
 
+    /**
+     * get the unassigned partition list by computing the difference set of 
the sortedPartitions(all partitions)
+     * and toBeRemovedPartitions. We use two pointers technique here:
+     *
+     * We loop the sortedPartition, and compare the ith element in sorted 
toBeRemovedPartitions(i start from 0):
+     *   - if not equal to the ith element, add to unassignedPartitions
+     *   - if equal to the the ith element, get next element from 
toBeRemovedPartitions
+     *
+     * @param sortedPartitions: sorted all partitions
+     * @param toBeRemovedPartitions: sorted partitions, all are included in 
the sortedPartitions
+     * @return the partitions don't assign to any current consumers
+     */
+    private List<TopicPartition> getUnassignedPartitions(List<TopicPartition> 
sortedPartitions,

Review comment:
       refactor 2:
   We used to have an ArrayList of `unassignedPartitions`, with all sorted 
partitions (ex: 1 million partitions), and loop through current assignment, to 
remove already assigned partitions, ex: 999,000 of them, so we'll only have 
1000 partitions left. However, the ArrayList element remove is pretty slow for 
huge size because it needs to find element first, and then, do arrayCopy for 
the removed array with size of (originalSize -1). This situation should happen 
a lot since each rebalance, we should only have small set of changes (ex: 1 
consumer dropped), so this is an important improvement.
   
   To refactor it, I used two pointer technique to loop through 2 sorted list: 
`sortedPartitions` and `sortedToBeRemovedPartitions`. And only add the 
difference set of the 2 lists. The element looping and element added is very 
fast in ArrayList. So, it improves a lot.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to