showuon commented on a change in pull request #10509:
URL: https://github.com/apache/kafka/pull/10509#discussion_r622064430



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -263,16 +279,59 @@ private boolean allSubscriptionsEqual(Set<String> 
allTopics,
         if (log.isDebugEnabled()) {
             log.debug("final assignment: " + assignment);
         }
-
+        
         return assignment;
     }
 
-    private SortedSet<TopicPartition> getTopicPartitions(Map<String, Integer> 
partitionsPerTopic) {
-        SortedSet<TopicPartition> allPartitions =
-            new 
TreeSet<>(Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition));
-        for (Entry<String, Integer> entry: partitionsPerTopic.entrySet()) {
-            String topic = entry.getKey();
-            for (int i = 0; i < entry.getValue(); ++i) {
+    /**
+     * get the unassigned partition list by computing the difference set of 
the sortedPartitions(all partitions)
+     * and sortedToBeRemovedPartitions. We use two pointers technique here:
+     *
+     * We loop the sortedPartition, and compare the ith element in sorted 
toBeRemovedPartitions(i start from 0):
+     *   - if not equal to the ith element, add to unassignedPartitions
+     *   - if equal to the the ith element, get next element from 
sortedToBeRemovedPartitions
+     *
+     * @param sortedPartitions: sorted all partitions
+     * @param sortedToBeRemovedPartitions: sorted partitions, all are included 
in the sortedPartitions
+     * @return the partitions don't assign to any current consumers
+     */
+    private List<TopicPartition> getUnassignedPartitions(List<TopicPartition> 
sortedPartitions,

Review comment:
       > the removed assigned partitions takes almost the same time, but after 
is still faster. (explained below)
   
   I think the reason is:
   1. `sort(toBeRemovedPartitions)` is `C * M * log(C * M)` , that's correct in 
theory! But in our assignment case, usually, the consumer assignment is 
paritition-sorted (due to the consumer leader create the assignment in sorted 
order). So, based on the java doc in `List.sort`:
   
   > This implementation is a stable, adaptive, iterative mergesort that 
requires far fewer than n lg(n) comparisons when the input array is partially 
sorted, while offering the performance of a traditional mergesort when the 
input array is randomly ordered.  If the input array is nearly sorted, the 
implementation requires approximately n comparisons.
   
   That means, the sort should be faster than we thought.
   
   2. The nature of `ArrayList` is a continuous memory, which is 
cache-friendly. That is, when retrieving data, we can get their neighbor data 
at the same time, it improves the data retrieval from memory. So, this nature 
makes the ArrayList` iteration` and `elements addition` (we can copy a M 
continuous memory into another memory directly, it could be O(1)) faster than 
TreeSet.   
   ```
   // ArrayList.addAll()
   public boolean addAll(Collection<? extends E> c) {
           ...
           if (numNew > (elementData = this.elementData).length - (s = size))
               elementData = grow(s + numNew);
           System.arraycopy(a, 0, elementData, s, numNew);
           size = s + numNew;
           return true;
       }
   ```
   
   I don't know if there are other potential reason, but the performance is 
indeed improved by `ArrayList`. 
   
   > an unknown but possibly small performance improvement that uses a lot more 
memory, and a somewhat worse algorithm with only the one data structure and 
slightly cleaner code.
   
   If the memory usage for **after** is a lot higher than the **before**, I'd 
hesitate to say **after** is better. But now, with my above improvement, the 
memory usage is basically the same as **before**. I'd choose my change to make 
the assignor faster. After all, faster assignor, faster rebalance.
   
   That's my 2 cents.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to