guozhangwang commented on a change in pull request #10509:
URL: https://github.com/apache/kafka/pull/10509#discussion_r625403618



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -163,127 +159,180 @@ private boolean allSubscriptionsEqual(Set<String> 
allTopics,
      */
     private Map<String, List<TopicPartition>> constrainedAssign(Map<String, 
Integer> partitionsPerTopic,
                                                                 Map<String, 
List<TopicPartition>> consumerToOwnedPartitions) {
-        SortedSet<TopicPartition> unassignedPartitions = 
getTopicPartitions(partitionsPerTopic);
+        if (log.isDebugEnabled()) {
+            log.debug("performing constrained assign. partitionsPerTopic: {}, 
consumerToOwnedPartitions: {}",
+                partitionsPerTopic, consumerToOwnedPartitions);
+        }
 
         Set<TopicPartition> allRevokedPartitions = new HashSet<>();
 
-        // Each consumer should end up in exactly one of the below
-        // the consumers not yet at capacity
+        // the consumers not yet at expected capacity
         List<String> unfilledMembers = new LinkedList<>();
-        // the members with exactly maxQuota partitions assigned
-        Queue<String> maxCapacityMembers = new LinkedList<>();
-        // the members with exactly minQuota partitions assigned
-        Queue<String> minCapacityMembers = new LinkedList<>();
 
         int numberOfConsumers = consumerToOwnedPartitions.size();
-        int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
-        int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
+        int totalPartitionsCount = 
partitionsPerTopic.values().stream().reduce(0, Integer::sum);
+
+        int minQuota = (int) Math.floor(((double) totalPartitionsCount) / 
numberOfConsumers);
+        int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / 
numberOfConsumers);
+        // the expected number of members with maxQuota assignment
+        int numExpectedMaxCapacityMembers = totalPartitionsCount % 
numberOfConsumers;
+        // the number of members with exactly maxQuota partitions assigned
+        int numMaxCapacityMembers = 0;
 
-        // initialize the assignment map with an empty array of size minQuota 
for all members
+        // initialize the assignment map with an empty array of size maxQuota 
for all members
         Map<String, List<TopicPartition>> assignment = new HashMap<>(
-            
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(minQuota))));
+            
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(maxQuota))));
 
+        List<TopicPartition> assignedPartitions = new ArrayList<>();
         // Reassign as many previously owned partitions as possible
         for (Map.Entry<String, List<TopicPartition>> consumerEntry : 
consumerToOwnedPartitions.entrySet()) {
             String consumer = consumerEntry.getKey();
             List<TopicPartition> ownedPartitions = consumerEntry.getValue();
 
             List<TopicPartition> consumerAssignment = assignment.get(consumer);
-            int i = 0;
-            // assign the first N partitions up to the max quota, and mark the 
remaining as being revoked
-            for (TopicPartition tp : ownedPartitions) {
-                if (i < maxQuota) {
-                    consumerAssignment.add(tp);
-                    unassignedPartitions.remove(tp);
-                } else {
-                    allRevokedPartitions.add(tp);
-                }
-                ++i;
-            }
 
             if (ownedPartitions.size() < minQuota) {
+                // the expected assignment size is more than consumer have 
now, so keep all the owned partitions
+                // and put this member into unfilled member list
+                if (ownedPartitions.size() > 0) {
+                    consumerAssignment.addAll(ownedPartitions);
+                    assignedPartitions.addAll(ownedPartitions);
+                }
                 unfilledMembers.add(consumer);
+            } else if (ownedPartitions.size() >= maxQuota && 
numMaxCapacityMembers < numExpectedMaxCapacityMembers) {
+                // consumer owned the "maxQuota" of partitions or more, and 
we're still under the number of expected max capacity members
+                // so keep "maxQuota" of the owned partitions, and revoke the 
rest of the partitions
+                numMaxCapacityMembers++;
+                List<TopicPartition> maxQuotaPartitions = 
ownedPartitions.subList(0, maxQuota);
+                consumerAssignment.addAll(maxQuotaPartitions);
+                assignedPartitions.addAll(maxQuotaPartitions);
+                allRevokedPartitions.addAll(ownedPartitions.subList(maxQuota, 
ownedPartitions.size()));
             } else {
-                // It's possible for a consumer to be at both min and max 
capacity if minQuota == maxQuota
-                if (consumerAssignment.size() == minQuota)
-                    minCapacityMembers.add(consumer);
-                if (consumerAssignment.size() == maxQuota)
-                    maxCapacityMembers.add(consumer);
+                // consumer owned at least "minQuota" of partitions but we're 
already at the allowed number of max capacity members
+                // so keep "minQuota" of the owned partitions, and revoke the 
rest of the partitions
+                List<TopicPartition> minQuotaPartitions = 
ownedPartitions.subList(0, minQuota);
+                consumerAssignment.addAll(minQuotaPartitions);
+                assignedPartitions.addAll(minQuotaPartitions);
+                allRevokedPartitions.addAll(ownedPartitions.subList(minQuota, 
ownedPartitions.size()));
             }
         }
 
+        List<TopicPartition> unassignedPartitions = 
getUnassignedPartitions(totalPartitionsCount, partitionsPerTopic, 
assignedPartitions);
+        assignedPartitions = null;
+
+        if (log.isDebugEnabled()) {
+            log.debug("After reassigning previously owned partitions, unfilled 
members: {}, unassigned partitions: {}, " +
+                "current assignment: {}", unfilledMembers, 
unassignedPartitions, assignment);
+        }
+
         Collections.sort(unfilledMembers);
-        Iterator<TopicPartition> unassignedPartitionsIter = 
unassignedPartitions.iterator();
-
-        // Fill remaining members up to minQuota
-        while (!unfilledMembers.isEmpty() && !unassignedPartitions.isEmpty()) {
-            Iterator<String> unfilledConsumerIter = unfilledMembers.iterator();
-
-            while (unfilledConsumerIter.hasNext()) {
-                String consumer = unfilledConsumerIter.next();
-                List<TopicPartition> consumerAssignment = 
assignment.get(consumer);
-
-                if (unassignedPartitionsIter.hasNext()) {
-                    TopicPartition tp = unassignedPartitionsIter.next();
-                    consumerAssignment.add(tp);
-                    unassignedPartitionsIter.remove();
-                    // We already assigned all possible ownedPartitions, so we 
know this must be newly to this consumer
-                    if (allRevokedPartitions.contains(tp))
-                        partitionsTransferringOwnership.put(tp, consumer);
-                } else {
-                    break;
-                }
 
-                if (consumerAssignment.size() == minQuota) {
-                    minCapacityMembers.add(consumer);
-                    unfilledConsumerIter.remove();
+        Iterator<String> unfilledConsumerIter = unfilledMembers.iterator();
+        // Round-Robin filling remaining members up to the expected numbers of 
maxQuota, otherwise, to minQuota
+        for (TopicPartition unassignedPartition : unassignedPartitions) {
+            if (!unfilledConsumerIter.hasNext()) {
+                if (unfilledMembers.isEmpty()) {
+                    // Should not enter here since we have calculated the 
exact number to assign to each consumer
+                    log.warn("No more unfilled consumers to be assigned.");

Review comment:
       This should never happen either --- so if it does, we can treat it as a 
bug and throw illegal state exception.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -163,127 +159,180 @@ private boolean allSubscriptionsEqual(Set<String> 
allTopics,
      */
     private Map<String, List<TopicPartition>> constrainedAssign(Map<String, 
Integer> partitionsPerTopic,
                                                                 Map<String, 
List<TopicPartition>> consumerToOwnedPartitions) {
-        SortedSet<TopicPartition> unassignedPartitions = 
getTopicPartitions(partitionsPerTopic);
+        if (log.isDebugEnabled()) {
+            log.debug("performing constrained assign. partitionsPerTopic: {}, 
consumerToOwnedPartitions: {}",
+                partitionsPerTopic, consumerToOwnedPartitions);
+        }
 
         Set<TopicPartition> allRevokedPartitions = new HashSet<>();
 
-        // Each consumer should end up in exactly one of the below
-        // the consumers not yet at capacity
+        // the consumers not yet at expected capacity
         List<String> unfilledMembers = new LinkedList<>();
-        // the members with exactly maxQuota partitions assigned
-        Queue<String> maxCapacityMembers = new LinkedList<>();
-        // the members with exactly minQuota partitions assigned
-        Queue<String> minCapacityMembers = new LinkedList<>();
 
         int numberOfConsumers = consumerToOwnedPartitions.size();
-        int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
-        int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
+        int totalPartitionsCount = 
partitionsPerTopic.values().stream().reduce(0, Integer::sum);
+
+        int minQuota = (int) Math.floor(((double) totalPartitionsCount) / 
numberOfConsumers);
+        int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / 
numberOfConsumers);
+        // the expected number of members with maxQuota assignment
+        int numExpectedMaxCapacityMembers = totalPartitionsCount % 
numberOfConsumers;
+        // the number of members with exactly maxQuota partitions assigned
+        int numMaxCapacityMembers = 0;
 
-        // initialize the assignment map with an empty array of size minQuota 
for all members
+        // initialize the assignment map with an empty array of size maxQuota 
for all members
         Map<String, List<TopicPartition>> assignment = new HashMap<>(
-            
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(minQuota))));
+            
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(maxQuota))));
 
+        List<TopicPartition> assignedPartitions = new ArrayList<>();
         // Reassign as many previously owned partitions as possible
         for (Map.Entry<String, List<TopicPartition>> consumerEntry : 
consumerToOwnedPartitions.entrySet()) {
             String consumer = consumerEntry.getKey();
             List<TopicPartition> ownedPartitions = consumerEntry.getValue();
 
             List<TopicPartition> consumerAssignment = assignment.get(consumer);
-            int i = 0;
-            // assign the first N partitions up to the max quota, and mark the 
remaining as being revoked
-            for (TopicPartition tp : ownedPartitions) {
-                if (i < maxQuota) {
-                    consumerAssignment.add(tp);
-                    unassignedPartitions.remove(tp);
-                } else {
-                    allRevokedPartitions.add(tp);
-                }
-                ++i;
-            }
 
             if (ownedPartitions.size() < minQuota) {
+                // the expected assignment size is more than consumer have 
now, so keep all the owned partitions
+                // and put this member into unfilled member list
+                if (ownedPartitions.size() > 0) {
+                    consumerAssignment.addAll(ownedPartitions);
+                    assignedPartitions.addAll(ownedPartitions);
+                }
                 unfilledMembers.add(consumer);
+            } else if (ownedPartitions.size() >= maxQuota && 
numMaxCapacityMembers < numExpectedMaxCapacityMembers) {
+                // consumer owned the "maxQuota" of partitions or more, and 
we're still under the number of expected max capacity members
+                // so keep "maxQuota" of the owned partitions, and revoke the 
rest of the partitions
+                numMaxCapacityMembers++;
+                List<TopicPartition> maxQuotaPartitions = 
ownedPartitions.subList(0, maxQuota);
+                consumerAssignment.addAll(maxQuotaPartitions);
+                assignedPartitions.addAll(maxQuotaPartitions);
+                allRevokedPartitions.addAll(ownedPartitions.subList(maxQuota, 
ownedPartitions.size()));
             } else {
-                // It's possible for a consumer to be at both min and max 
capacity if minQuota == maxQuota
-                if (consumerAssignment.size() == minQuota)
-                    minCapacityMembers.add(consumer);
-                if (consumerAssignment.size() == maxQuota)
-                    maxCapacityMembers.add(consumer);
+                // consumer owned at least "minQuota" of partitions but we're 
already at the allowed number of max capacity members

Review comment:
       I think this is actually possible even with more than one consumer, 
since in we can have both num.partitions AND num.consumers change. When that 
happens, relying on prev.assigned tasks would not guarantee that stickiness 
would be just sufficient to achieve the end goal.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -163,127 +159,180 @@ private boolean allSubscriptionsEqual(Set<String> 
allTopics,
      */
     private Map<String, List<TopicPartition>> constrainedAssign(Map<String, 
Integer> partitionsPerTopic,
                                                                 Map<String, 
List<TopicPartition>> consumerToOwnedPartitions) {
-        SortedSet<TopicPartition> unassignedPartitions = 
getTopicPartitions(partitionsPerTopic);
+        if (log.isDebugEnabled()) {
+            log.debug("performing constrained assign. partitionsPerTopic: {}, 
consumerToOwnedPartitions: {}",
+                partitionsPerTopic, consumerToOwnedPartitions);
+        }
 
         Set<TopicPartition> allRevokedPartitions = new HashSet<>();
 
-        // Each consumer should end up in exactly one of the below
-        // the consumers not yet at capacity
+        // the consumers not yet at expected capacity
         List<String> unfilledMembers = new LinkedList<>();
-        // the members with exactly maxQuota partitions assigned
-        Queue<String> maxCapacityMembers = new LinkedList<>();
-        // the members with exactly minQuota partitions assigned
-        Queue<String> minCapacityMembers = new LinkedList<>();
 
         int numberOfConsumers = consumerToOwnedPartitions.size();
-        int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
-        int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
+        int totalPartitionsCount = 
partitionsPerTopic.values().stream().reduce(0, Integer::sum);
+
+        int minQuota = (int) Math.floor(((double) totalPartitionsCount) / 
numberOfConsumers);
+        int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / 
numberOfConsumers);
+        // the expected number of members with maxQuota assignment
+        int numExpectedMaxCapacityMembers = totalPartitionsCount % 
numberOfConsumers;
+        // the number of members with exactly maxQuota partitions assigned
+        int numMaxCapacityMembers = 0;
 
-        // initialize the assignment map with an empty array of size minQuota 
for all members
+        // initialize the assignment map with an empty array of size maxQuota 
for all members
         Map<String, List<TopicPartition>> assignment = new HashMap<>(
-            
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(minQuota))));
+            
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(maxQuota))));
 
+        List<TopicPartition> assignedPartitions = new ArrayList<>();
         // Reassign as many previously owned partitions as possible
         for (Map.Entry<String, List<TopicPartition>> consumerEntry : 
consumerToOwnedPartitions.entrySet()) {
             String consumer = consumerEntry.getKey();
             List<TopicPartition> ownedPartitions = consumerEntry.getValue();
 
             List<TopicPartition> consumerAssignment = assignment.get(consumer);
-            int i = 0;
-            // assign the first N partitions up to the max quota, and mark the 
remaining as being revoked
-            for (TopicPartition tp : ownedPartitions) {
-                if (i < maxQuota) {
-                    consumerAssignment.add(tp);
-                    unassignedPartitions.remove(tp);
-                } else {
-                    allRevokedPartitions.add(tp);
-                }
-                ++i;
-            }
 
             if (ownedPartitions.size() < minQuota) {
+                // the expected assignment size is more than consumer have 
now, so keep all the owned partitions
+                // and put this member into unfilled member list
+                if (ownedPartitions.size() > 0) {
+                    consumerAssignment.addAll(ownedPartitions);
+                    assignedPartitions.addAll(ownedPartitions);
+                }
                 unfilledMembers.add(consumer);
+            } else if (ownedPartitions.size() >= maxQuota && 
numMaxCapacityMembers < numExpectedMaxCapacityMembers) {
+                // consumer owned the "maxQuota" of partitions or more, and 
we're still under the number of expected max capacity members
+                // so keep "maxQuota" of the owned partitions, and revoke the 
rest of the partitions
+                numMaxCapacityMembers++;
+                List<TopicPartition> maxQuotaPartitions = 
ownedPartitions.subList(0, maxQuota);
+                consumerAssignment.addAll(maxQuotaPartitions);
+                assignedPartitions.addAll(maxQuotaPartitions);
+                allRevokedPartitions.addAll(ownedPartitions.subList(maxQuota, 
ownedPartitions.size()));
             } else {
-                // It's possible for a consumer to be at both min and max 
capacity if minQuota == maxQuota
-                if (consumerAssignment.size() == minQuota)
-                    minCapacityMembers.add(consumer);
-                if (consumerAssignment.size() == maxQuota)
-                    maxCapacityMembers.add(consumer);
+                // consumer owned at least "minQuota" of partitions but we're 
already at the allowed number of max capacity members
+                // so keep "minQuota" of the owned partitions, and revoke the 
rest of the partitions
+                List<TopicPartition> minQuotaPartitions = 
ownedPartitions.subList(0, minQuota);
+                consumerAssignment.addAll(minQuotaPartitions);
+                assignedPartitions.addAll(minQuotaPartitions);
+                allRevokedPartitions.addAll(ownedPartitions.subList(minQuota, 
ownedPartitions.size()));
             }
         }
 
+        List<TopicPartition> unassignedPartitions = 
getUnassignedPartitions(totalPartitionsCount, partitionsPerTopic, 
assignedPartitions);
+        assignedPartitions = null;
+
+        if (log.isDebugEnabled()) {
+            log.debug("After reassigning previously owned partitions, unfilled 
members: {}, unassigned partitions: {}, " +
+                "current assignment: {}", unfilledMembers, 
unassignedPartitions, assignment);
+        }
+
         Collections.sort(unfilledMembers);
-        Iterator<TopicPartition> unassignedPartitionsIter = 
unassignedPartitions.iterator();
-
-        // Fill remaining members up to minQuota
-        while (!unfilledMembers.isEmpty() && !unassignedPartitions.isEmpty()) {
-            Iterator<String> unfilledConsumerIter = unfilledMembers.iterator();
-
-            while (unfilledConsumerIter.hasNext()) {
-                String consumer = unfilledConsumerIter.next();
-                List<TopicPartition> consumerAssignment = 
assignment.get(consumer);
-
-                if (unassignedPartitionsIter.hasNext()) {
-                    TopicPartition tp = unassignedPartitionsIter.next();
-                    consumerAssignment.add(tp);
-                    unassignedPartitionsIter.remove();
-                    // We already assigned all possible ownedPartitions, so we 
know this must be newly to this consumer
-                    if (allRevokedPartitions.contains(tp))
-                        partitionsTransferringOwnership.put(tp, consumer);
-                } else {
-                    break;
-                }
 
-                if (consumerAssignment.size() == minQuota) {
-                    minCapacityMembers.add(consumer);
-                    unfilledConsumerIter.remove();
+        Iterator<String> unfilledConsumerIter = unfilledMembers.iterator();
+        // Round-Robin filling remaining members up to the expected numbers of 
maxQuota, otherwise, to minQuota
+        for (TopicPartition unassignedPartition : unassignedPartitions) {
+            if (!unfilledConsumerIter.hasNext()) {
+                if (unfilledMembers.isEmpty()) {
+                    // Should not enter here since we have calculated the 
exact number to assign to each consumer
+                    log.warn("No more unfilled consumers to be assigned.");
+                    break;
                 }
+                unfilledConsumerIter = unfilledMembers.iterator();
             }
-        }
-
-        // If we ran out of unassigned partitions before filling all 
consumers, we need to start stealing partitions
-        // from the over-full consumers at max capacity
-        for (String consumer : unfilledMembers) {
+            String consumer = unfilledConsumerIter.next();
             List<TopicPartition> consumerAssignment = assignment.get(consumer);
-            int remainingCapacity = minQuota - consumerAssignment.size();
-            while (remainingCapacity > 0) {
-                String overloadedConsumer = maxCapacityMembers.poll();
-                if (overloadedConsumer == null) {
-                    throw new IllegalStateException("Some consumers are under 
capacity but all partitions have been assigned");
+            consumerAssignment.add(unassignedPartition);
+
+            // We already assigned all possible ownedPartitions, so we know 
this must be newly to this consumer
+            if (allRevokedPartitions.contains(unassignedPartition))
+                partitionsTransferringOwnership.put(unassignedPartition, 
consumer);
+
+            int currentAssignedCount = consumerAssignment.size();
+            int expectedAssignedCount = numMaxCapacityMembers < 
numExpectedMaxCapacityMembers ? maxQuota : minQuota;
+            if (currentAssignedCount == expectedAssignedCount) {
+                if (currentAssignedCount == maxQuota) {
+                    numMaxCapacityMembers++;
                 }
-                TopicPartition swappedPartition = 
assignment.get(overloadedConsumer).remove(0);
-                consumerAssignment.add(swappedPartition);
-                --remainingCapacity;
-                // This partition is by definition transferring ownership, the 
swapped partition must have come from
-                // the max capacity member's owned partitions since it can 
only reach max capacity with owned partitions
-                partitionsTransferringOwnership.put(swappedPartition, 
consumer);
+                unfilledConsumerIter.remove();
             }
-            minCapacityMembers.add(consumer);
         }
 
-        // Otherwise we may have run out of unfilled consumers before 
assigning all partitions, in which case we
-        // should just distribute one partition each to all consumers at min 
capacity
-        for (TopicPartition unassignedPartition : unassignedPartitions) {
-            String underCapacityConsumer = minCapacityMembers.poll();
-            if (underCapacityConsumer == null) {
-                throw new IllegalStateException("Some partitions are 
unassigned but all consumers are at maximum capacity");
-            }
-            // We can skip the bookkeeping of unassignedPartitions and 
maxCapacityMembers here since we are at the end
-            assignment.get(underCapacityConsumer).add(unassignedPartition);
+        if (!unfilledMembers.isEmpty()) {
+            // Should not enter here since we have calculated the exact number 
to assign to each consumer
+            log.warn("No more partitions to be assigned to unfilled consumers: 
{}", unfilledMembers);

Review comment:
       Personally I'd prefer to throw an illegal state exception too, if we 
consider we should never enter here. I agree its impact is smaller if it ever 
happened, but in either case it indicate a bug in our code :)

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -149,12 +149,8 @@ private boolean allSubscriptionsEqual(Set<String> 
allTopics,
      * This constrainedAssign optimizes the assignment algorithm when all 
consumers were subscribed to same set of topics.
      * The method includes the following steps:
      *
-     * 1. Reassign as many previously owned partitions as possible, up to the 
maxQuota
-     * 2. Fill remaining members up to minQuota
-     * 3. If we ran out of unassigned partitions before filling all consumers, 
we need to start stealing partitions
-     *    from the over-full consumers at max capacity
-     * 4. Otherwise we may have run out of unfilled consumers before assigning 
all partitions, in which case we
-     *    should just distribute one partition each to all consumers at min 
capacity
+     * 1. Reassign as many previously owned partitions as possible, up to the 
expected number of maxQuota, otherwise, minQuota

Review comment:
       nit: the first line seems should just be `up to the expected number of 
maxQuota`? Also the second line seems incomplete: we need to explain what 
condition is `otherwise` on.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to