showuon commented on a change in pull request #10552:
URL: https://github.com/apache/kafka/pull/10552#discussion_r637943535



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -80,9 +80,7 @@ public MemberData(List<TopicPartition> partitions, 
Optional<Integer> generation)
             log.debug("Detected that all not consumers were subscribed to same 
set of topics, falling back to the "
                           + "general case assignment algorithm");

Review comment:
       Updated. Thanks.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -469,73 +426,190 @@ private boolean allSubscriptionsEqual(Set<String> 
allTopics,
         TreeSet<String> sortedCurrentSubscriptions = new TreeSet<>(new 
SubscriptionComparator(currentAssignment));
         sortedCurrentSubscriptions.addAll(currentAssignment.keySet());
 
-        balance(currentAssignment, prevAssignment, sortedPartitions, 
unassignedPartitions, sortedCurrentSubscriptions,
-            consumer2AllPotentialPartitions, partition2AllPotentialConsumers, 
currentPartitionConsumer, revocationRequired);
+        balance(currentAssignment, prevAssignment, sortedAllPartitions, 
unassignedPartitions, sortedCurrentSubscriptions,
+            consumer2AllPotentialTopics, topic2AllPotentialConsumers, 
currentPartitionConsumer, revocationRequired,
+            partitionsPerTopic, totalPartitionsCount);
+
+        if (log.isDebugEnabled()) {
+            log.debug("final assignment: {}", currentAssignment);
+        }
+
         return currentAssignment;
     }
 
+    /**
+     * get the unassigned partition list by computing the difference set of 
the sortedPartitions(all partitions)
+     * and sortedAssignedPartitions. If no assigned partitions, we'll just 
return all sorted topic partitions.
+     * This is used in generalAssign method
+     *
+     * We loop the sortedPartition, and compare the ith element in 
sortedAssignedPartitions(i start from 0):
+     *   - if not equal to the ith element, add to unassignedPartitions
+     *   - if equal to the the ith element, get next element from 
sortedAssignedPartitions
+     *
+     * @param sortedAllPartitions:          sorted all partitions
+     * @param sortedAssignedPartitions:     sorted partitions, all are 
included in the sortedPartitions
+     * @param topic2AllPotentialConsumers:  topics mapped to all consumers 
that subscribed to it
+     * @return                              the partitions don't assign to any 
current consumers

Review comment:
       Updated. Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to