ableegoldman commented on a change in pull request #10509:
URL: https://github.com/apache/kafka/pull/10509#discussion_r623436340



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -163,127 +159,180 @@ private boolean allSubscriptionsEqual(Set<String> 
allTopics,
      */
     private Map<String, List<TopicPartition>> constrainedAssign(Map<String, 
Integer> partitionsPerTopic,
                                                                 Map<String, 
List<TopicPartition>> consumerToOwnedPartitions) {
-        SortedSet<TopicPartition> unassignedPartitions = 
getTopicPartitions(partitionsPerTopic);
+        if (log.isDebugEnabled()) {
+            log.debug("performing constrained assign. partitionsPerTopic: {}, 
consumerToOwnedPartitions: {}",
+                partitionsPerTopic, consumerToOwnedPartitions);
+        }
 
         Set<TopicPartition> allRevokedPartitions = new HashSet<>();
 
-        // Each consumer should end up in exactly one of the below
-        // the consumers not yet at capacity
+        // the consumers not yet at expected capacity
         List<String> unfilledMembers = new LinkedList<>();
-        // the members with exactly maxQuota partitions assigned
-        Queue<String> maxCapacityMembers = new LinkedList<>();
-        // the members with exactly minQuota partitions assigned
-        Queue<String> minCapacityMembers = new LinkedList<>();
 
         int numberOfConsumers = consumerToOwnedPartitions.size();
-        int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
-        int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
+        int totalPartitionsCount = 
partitionsPerTopic.values().stream().reduce(0, Integer::sum);
+
+        int minQuota = (int) Math.floor(((double) totalPartitionsCount) / 
numberOfConsumers);
+        int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / 
numberOfConsumers);
+        // the expected number of members with maxQuota assignment
+        int numExpectedMaxCapacityMembers = totalPartitionsCount % 
numberOfConsumers;
+        // the number of members with exactly maxQuota partitions assigned
+        int numMaxCapacityMembers = 0;
 
-        // initialize the assignment map with an empty array of size minQuota 
for all members
+        // initialize the assignment map with an empty array of size maxQuota 
for all members
         Map<String, List<TopicPartition>> assignment = new HashMap<>(
-            
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(minQuota))));
+            
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(maxQuota))));
 
+        List<TopicPartition> assignedPartitions = new ArrayList<>();
         // Reassign as many previously owned partitions as possible
         for (Map.Entry<String, List<TopicPartition>> consumerEntry : 
consumerToOwnedPartitions.entrySet()) {
             String consumer = consumerEntry.getKey();
             List<TopicPartition> ownedPartitions = consumerEntry.getValue();
 
             List<TopicPartition> consumerAssignment = assignment.get(consumer);
-            int i = 0;
-            // assign the first N partitions up to the max quota, and mark the 
remaining as being revoked
-            for (TopicPartition tp : ownedPartitions) {
-                if (i < maxQuota) {
-                    consumerAssignment.add(tp);
-                    unassignedPartitions.remove(tp);
-                } else {
-                    allRevokedPartitions.add(tp);
-                }
-                ++i;
-            }
 
             if (ownedPartitions.size() < minQuota) {
+                // the expected assignment size is more than consumer have 
now, so keep all the owned partitions
+                // and put this member into unfilled member list
+                if (ownedPartitions.size() > 0) {
+                    consumerAssignment.addAll(ownedPartitions);
+                    assignedPartitions.addAll(ownedPartitions);
+                }
                 unfilledMembers.add(consumer);
+            } else if (ownedPartitions.size() >= maxQuota && 
numMaxCapacityMembers < numExpectedMaxCapacityMembers) {
+                // consumer owned the "maxQuota" of partitions or more, and 
we're still under the number of expected max capacity members
+                // so keep "maxQuota" of the owned partitions, and revoke the 
rest of the partitions
+                numMaxCapacityMembers++;
+                List<TopicPartition> maxQuotaPartitions = 
ownedPartitions.subList(0, maxQuota);
+                consumerAssignment.addAll(maxQuotaPartitions);
+                assignedPartitions.addAll(maxQuotaPartitions);
+                allRevokedPartitions.addAll(ownedPartitions.subList(maxQuota, 
ownedPartitions.size()));
             } else {
-                // It's possible for a consumer to be at both min and max 
capacity if minQuota == maxQuota
-                if (consumerAssignment.size() == minQuota)
-                    minCapacityMembers.add(consumer);
-                if (consumerAssignment.size() == maxQuota)
-                    maxCapacityMembers.add(consumer);
+                // consumer owned at least "minQuota" of partitions but we're 
already at the allowed number of max capacity members
+                // so keep "minQuota" of the owned partitions, and revoke the 
rest of the partitions
+                List<TopicPartition> minQuotaPartitions = 
ownedPartitions.subList(0, minQuota);
+                consumerAssignment.addAll(minQuotaPartitions);
+                assignedPartitions.addAll(minQuotaPartitions);
+                allRevokedPartitions.addAll(ownedPartitions.subList(minQuota, 
ownedPartitions.size()));
             }
         }
 
+        List<TopicPartition> unassignedPartitions = 
getUnassignedPartitions(totalPartitionsCount, partitionsPerTopic, 
assignedPartitions);
+        assignedPartitions = null;
+
+        if (log.isDebugEnabled()) {
+            log.debug("After reassigning previously owned partitions, unfilled 
members: {}, unassigned partitions: {}, " +
+                "current assignment: {}", unfilledMembers, 
unassignedPartitions, assignment);
+        }
+
         Collections.sort(unfilledMembers);
-        Iterator<TopicPartition> unassignedPartitionsIter = 
unassignedPartitions.iterator();
-
-        // Fill remaining members up to minQuota
-        while (!unfilledMembers.isEmpty() && !unassignedPartitions.isEmpty()) {
-            Iterator<String> unfilledConsumerIter = unfilledMembers.iterator();
-
-            while (unfilledConsumerIter.hasNext()) {
-                String consumer = unfilledConsumerIter.next();
-                List<TopicPartition> consumerAssignment = 
assignment.get(consumer);
-
-                if (unassignedPartitionsIter.hasNext()) {
-                    TopicPartition tp = unassignedPartitionsIter.next();
-                    consumerAssignment.add(tp);
-                    unassignedPartitionsIter.remove();
-                    // We already assigned all possible ownedPartitions, so we 
know this must be newly to this consumer
-                    if (allRevokedPartitions.contains(tp))
-                        partitionsTransferringOwnership.put(tp, consumer);
-                } else {
-                    break;
-                }
 
-                if (consumerAssignment.size() == minQuota) {
-                    minCapacityMembers.add(consumer);
-                    unfilledConsumerIter.remove();
+        Iterator<String> unfilledConsumerIter = unfilledMembers.iterator();
+        // Round-Robin filling remaining members up to the expected numbers of 
maxQuota, otherwise, to minQuota
+        for (TopicPartition unassignedPartition : unassignedPartitions) {
+            if (!unfilledConsumerIter.hasNext()) {
+                if (unfilledMembers.isEmpty()) {
+                    // Should not enter here since we have calculated the 
exact number to assign to each consumer
+                    log.warn("No more unfilled consumers to be assigned.");
+                    break;
                 }
+                unfilledConsumerIter = unfilledMembers.iterator();
             }
-        }
-
-        // If we ran out of unassigned partitions before filling all 
consumers, we need to start stealing partitions
-        // from the over-full consumers at max capacity
-        for (String consumer : unfilledMembers) {
+            String consumer = unfilledConsumerIter.next();
             List<TopicPartition> consumerAssignment = assignment.get(consumer);
-            int remainingCapacity = minQuota - consumerAssignment.size();
-            while (remainingCapacity > 0) {
-                String overloadedConsumer = maxCapacityMembers.poll();
-                if (overloadedConsumer == null) {
-                    throw new IllegalStateException("Some consumers are under 
capacity but all partitions have been assigned");
+            consumerAssignment.add(unassignedPartition);
+
+            // We already assigned all possible ownedPartitions, so we know 
this must be newly to this consumer

Review comment:
       nit: 
   ```suggestion
               // We already assigned all possible ownedPartitions, so we know 
this must be newly assigned to this consumer
   ```

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -163,127 +159,180 @@ private boolean allSubscriptionsEqual(Set<String> 
allTopics,
      */
     private Map<String, List<TopicPartition>> constrainedAssign(Map<String, 
Integer> partitionsPerTopic,
                                                                 Map<String, 
List<TopicPartition>> consumerToOwnedPartitions) {
-        SortedSet<TopicPartition> unassignedPartitions = 
getTopicPartitions(partitionsPerTopic);
+        if (log.isDebugEnabled()) {
+            log.debug("performing constrained assign. partitionsPerTopic: {}, 
consumerToOwnedPartitions: {}",
+                partitionsPerTopic, consumerToOwnedPartitions);
+        }
 
         Set<TopicPartition> allRevokedPartitions = new HashSet<>();
 
-        // Each consumer should end up in exactly one of the below
-        // the consumers not yet at capacity
+        // the consumers not yet at expected capacity
         List<String> unfilledMembers = new LinkedList<>();
-        // the members with exactly maxQuota partitions assigned
-        Queue<String> maxCapacityMembers = new LinkedList<>();
-        // the members with exactly minQuota partitions assigned
-        Queue<String> minCapacityMembers = new LinkedList<>();
 
         int numberOfConsumers = consumerToOwnedPartitions.size();
-        int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
-        int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
+        int totalPartitionsCount = 
partitionsPerTopic.values().stream().reduce(0, Integer::sum);
+
+        int minQuota = (int) Math.floor(((double) totalPartitionsCount) / 
numberOfConsumers);
+        int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / 
numberOfConsumers);
+        // the expected number of members with maxQuota assignment
+        int numExpectedMaxCapacityMembers = totalPartitionsCount % 
numberOfConsumers;
+        // the number of members with exactly maxQuota partitions assigned
+        int numMaxCapacityMembers = 0;
 
-        // initialize the assignment map with an empty array of size minQuota 
for all members
+        // initialize the assignment map with an empty array of size maxQuota 
for all members
         Map<String, List<TopicPartition>> assignment = new HashMap<>(
-            
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(minQuota))));
+            
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(maxQuota))));
 
+        List<TopicPartition> assignedPartitions = new ArrayList<>();
         // Reassign as many previously owned partitions as possible
         for (Map.Entry<String, List<TopicPartition>> consumerEntry : 
consumerToOwnedPartitions.entrySet()) {
             String consumer = consumerEntry.getKey();
             List<TopicPartition> ownedPartitions = consumerEntry.getValue();
 
             List<TopicPartition> consumerAssignment = assignment.get(consumer);
-            int i = 0;
-            // assign the first N partitions up to the max quota, and mark the 
remaining as being revoked
-            for (TopicPartition tp : ownedPartitions) {
-                if (i < maxQuota) {
-                    consumerAssignment.add(tp);
-                    unassignedPartitions.remove(tp);
-                } else {
-                    allRevokedPartitions.add(tp);
-                }
-                ++i;
-            }
 
             if (ownedPartitions.size() < minQuota) {
+                // the expected assignment size is more than consumer have 
now, so keep all the owned partitions
+                // and put this member into unfilled member list
+                if (ownedPartitions.size() > 0) {
+                    consumerAssignment.addAll(ownedPartitions);
+                    assignedPartitions.addAll(ownedPartitions);
+                }
                 unfilledMembers.add(consumer);
+            } else if (ownedPartitions.size() >= maxQuota && 
numMaxCapacityMembers < numExpectedMaxCapacityMembers) {
+                // consumer owned the "maxQuota" of partitions or more, and 
we're still under the number of expected max capacity members
+                // so keep "maxQuota" of the owned partitions, and revoke the 
rest of the partitions
+                numMaxCapacityMembers++;
+                List<TopicPartition> maxQuotaPartitions = 
ownedPartitions.subList(0, maxQuota);
+                consumerAssignment.addAll(maxQuotaPartitions);
+                assignedPartitions.addAll(maxQuotaPartitions);
+                allRevokedPartitions.addAll(ownedPartitions.subList(maxQuota, 
ownedPartitions.size()));
             } else {
-                // It's possible for a consumer to be at both min and max 
capacity if minQuota == maxQuota
-                if (consumerAssignment.size() == minQuota)
-                    minCapacityMembers.add(consumer);
-                if (consumerAssignment.size() == maxQuota)
-                    maxCapacityMembers.add(consumer);
+                // consumer owned at least "minQuota" of partitions but we're 
already at the allowed number of max capacity members
+                // so keep "minQuota" of the owned partitions, and revoke the 
rest of the partitions
+                List<TopicPartition> minQuotaPartitions = 
ownedPartitions.subList(0, minQuota);
+                consumerAssignment.addAll(minQuotaPartitions);
+                assignedPartitions.addAll(minQuotaPartitions);
+                allRevokedPartitions.addAll(ownedPartitions.subList(minQuota, 
ownedPartitions.size()));
             }
         }
 
+        List<TopicPartition> unassignedPartitions = 
getUnassignedPartitions(totalPartitionsCount, partitionsPerTopic, 
assignedPartitions);
+        assignedPartitions = null;
+
+        if (log.isDebugEnabled()) {
+            log.debug("After reassigning previously owned partitions, unfilled 
members: {}, unassigned partitions: {}, " +
+                "current assignment: {}", unfilledMembers, 
unassignedPartitions, assignment);
+        }
+
         Collections.sort(unfilledMembers);
-        Iterator<TopicPartition> unassignedPartitionsIter = 
unassignedPartitions.iterator();
-
-        // Fill remaining members up to minQuota
-        while (!unfilledMembers.isEmpty() && !unassignedPartitions.isEmpty()) {
-            Iterator<String> unfilledConsumerIter = unfilledMembers.iterator();
-
-            while (unfilledConsumerIter.hasNext()) {
-                String consumer = unfilledConsumerIter.next();
-                List<TopicPartition> consumerAssignment = 
assignment.get(consumer);
-
-                if (unassignedPartitionsIter.hasNext()) {
-                    TopicPartition tp = unassignedPartitionsIter.next();
-                    consumerAssignment.add(tp);
-                    unassignedPartitionsIter.remove();
-                    // We already assigned all possible ownedPartitions, so we 
know this must be newly to this consumer
-                    if (allRevokedPartitions.contains(tp))
-                        partitionsTransferringOwnership.put(tp, consumer);
-                } else {
-                    break;
-                }
 
-                if (consumerAssignment.size() == minQuota) {
-                    minCapacityMembers.add(consumer);
-                    unfilledConsumerIter.remove();
+        Iterator<String> unfilledConsumerIter = unfilledMembers.iterator();
+        // Round-Robin filling remaining members up to the expected numbers of 
maxQuota, otherwise, to minQuota
+        for (TopicPartition unassignedPartition : unassignedPartitions) {
+            if (!unfilledConsumerIter.hasNext()) {
+                if (unfilledMembers.isEmpty()) {
+                    // Should not enter here since we have calculated the 
exact number to assign to each consumer
+                    log.warn("No more unfilled consumers to be assigned.");
+                    break;
                 }
+                unfilledConsumerIter = unfilledMembers.iterator();
             }
-        }
-
-        // If we ran out of unassigned partitions before filling all 
consumers, we need to start stealing partitions
-        // from the over-full consumers at max capacity
-        for (String consumer : unfilledMembers) {
+            String consumer = unfilledConsumerIter.next();
             List<TopicPartition> consumerAssignment = assignment.get(consumer);
-            int remainingCapacity = minQuota - consumerAssignment.size();
-            while (remainingCapacity > 0) {
-                String overloadedConsumer = maxCapacityMembers.poll();
-                if (overloadedConsumer == null) {
-                    throw new IllegalStateException("Some consumers are under 
capacity but all partitions have been assigned");
+            consumerAssignment.add(unassignedPartition);
+
+            // We already assigned all possible ownedPartitions, so we know 
this must be newly to this consumer
+            if (allRevokedPartitions.contains(unassignedPartition))
+                partitionsTransferringOwnership.put(unassignedPartition, 
consumer);
+
+            int currentAssignedCount = consumerAssignment.size();
+            int expectedAssignedCount = numMaxCapacityMembers < 
numExpectedMaxCapacityMembers ? maxQuota : minQuota;
+            if (currentAssignedCount == expectedAssignedCount) {
+                if (currentAssignedCount == maxQuota) {
+                    numMaxCapacityMembers++;
                 }
-                TopicPartition swappedPartition = 
assignment.get(overloadedConsumer).remove(0);
-                consumerAssignment.add(swappedPartition);
-                --remainingCapacity;
-                // This partition is by definition transferring ownership, the 
swapped partition must have come from
-                // the max capacity member's owned partitions since it can 
only reach max capacity with owned partitions
-                partitionsTransferringOwnership.put(swappedPartition, 
consumer);
+                unfilledConsumerIter.remove();
             }
-            minCapacityMembers.add(consumer);
         }
 
-        // Otherwise we may have run out of unfilled consumers before 
assigning all partitions, in which case we
-        // should just distribute one partition each to all consumers at min 
capacity
-        for (TopicPartition unassignedPartition : unassignedPartitions) {
-            String underCapacityConsumer = minCapacityMembers.poll();
-            if (underCapacityConsumer == null) {
-                throw new IllegalStateException("Some partitions are 
unassigned but all consumers are at maximum capacity");
-            }
-            // We can skip the bookkeeping of unassignedPartitions and 
maxCapacityMembers here since we are at the end
-            assignment.get(underCapacityConsumer).add(unassignedPartition);
+        if (!unfilledMembers.isEmpty()) {
+            // Should not enter here since we have calculated the exact number 
to assign to each consumer
+            log.warn("No more partitions to be assigned to unfilled consumers: 
{}", unfilledMembers);
+        }
 
-            if (allRevokedPartitions.contains(unassignedPartition))
-                partitionsTransferringOwnership.put(unassignedPartition, 
underCapacityConsumer);
+        if (log.isDebugEnabled()) {
+            log.debug("Final assignment of partitions to consumers: \n{}", 
assignment);
         }
 
         return assignment;
     }
 
-    private SortedSet<TopicPartition> getTopicPartitions(Map<String, Integer> 
partitionsPerTopic) {
-        SortedSet<TopicPartition> allPartitions =
-            new 
TreeSet<>(Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition));
-        for (Entry<String, Integer> entry: partitionsPerTopic.entrySet()) {
-            String topic = entry.getKey();
-            for (int i = 0; i < entry.getValue(); ++i) {
+    /**
+     * get the unassigned partition list by computing the difference set of 
all sorted partitions
+     * and sortedAssignedPartitions. If no assigned partitions, we'll just 
return all topic partitions.
+     *
+     * To compute the difference set, we use two pointers technique here:
+     *
+     * We loop through the all sorted topics, and then iterate all partitions 
the topic has,
+     * compared with the ith element in sortedAssignedPartitions(i starts from 
0):
+     *   - if not equal to the ith element, add to unassignedPartitions
+     *   - if equal to the the ith element, get next element from 
sortedAssignedPartitions
+     *
+     * @param totalPartitionsCount  all partitions counts in this assignment
+     * @param partitionsPerTopic  The number of partitions for each subscribed 
topic.
+     * @param sortedAssignedPartitions  sorted partitions, all are included in 
the sortedPartitions
+     * @return the partitions don't assign to any current consumers

Review comment:
       ```suggestion
        * @return the partitions not yet assigned to any consumers
   ```
   
   also, nit: can you align the spacing so all param/return descriptions start 
on the same column?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -82,6 +80,8 @@ public MemberData(List<TopicPartition> partitions, 
Optional<Integer> generation)
             log.debug("Detected that all not consumers were subscribed to same 
set of topics, falling back to the "
                           + "general case assignment algorithm");
             partitionsTransferringOwnership = null;
+            // we don't need consumerToOwnedPartitions in general assign case
+            consumerToOwnedPartitions = null;

Review comment:
       I feel like the compiler _should_ know to garbage collect this since 
it's unused after this point, but you just never know. Better to be sure

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -163,127 +159,180 @@ private boolean allSubscriptionsEqual(Set<String> 
allTopics,
      */
     private Map<String, List<TopicPartition>> constrainedAssign(Map<String, 
Integer> partitionsPerTopic,
                                                                 Map<String, 
List<TopicPartition>> consumerToOwnedPartitions) {
-        SortedSet<TopicPartition> unassignedPartitions = 
getTopicPartitions(partitionsPerTopic);
+        if (log.isDebugEnabled()) {
+            log.debug("performing constrained assign. partitionsPerTopic: {}, 
consumerToOwnedPartitions: {}",
+                partitionsPerTopic, consumerToOwnedPartitions);
+        }
 
         Set<TopicPartition> allRevokedPartitions = new HashSet<>();
 
-        // Each consumer should end up in exactly one of the below
-        // the consumers not yet at capacity
+        // the consumers not yet at expected capacity
         List<String> unfilledMembers = new LinkedList<>();
-        // the members with exactly maxQuota partitions assigned
-        Queue<String> maxCapacityMembers = new LinkedList<>();
-        // the members with exactly minQuota partitions assigned
-        Queue<String> minCapacityMembers = new LinkedList<>();
 
         int numberOfConsumers = consumerToOwnedPartitions.size();
-        int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
-        int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
+        int totalPartitionsCount = 
partitionsPerTopic.values().stream().reduce(0, Integer::sum);
+
+        int minQuota = (int) Math.floor(((double) totalPartitionsCount) / 
numberOfConsumers);
+        int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / 
numberOfConsumers);
+        // the expected number of members with maxQuota assignment
+        int numExpectedMaxCapacityMembers = totalPartitionsCount % 
numberOfConsumers;
+        // the number of members with exactly maxQuota partitions assigned
+        int numMaxCapacityMembers = 0;
 
-        // initialize the assignment map with an empty array of size minQuota 
for all members
+        // initialize the assignment map with an empty array of size maxQuota 
for all members
         Map<String, List<TopicPartition>> assignment = new HashMap<>(
-            
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(minQuota))));
+            
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(maxQuota))));
 
+        List<TopicPartition> assignedPartitions = new ArrayList<>();
         // Reassign as many previously owned partitions as possible
         for (Map.Entry<String, List<TopicPartition>> consumerEntry : 
consumerToOwnedPartitions.entrySet()) {
             String consumer = consumerEntry.getKey();
             List<TopicPartition> ownedPartitions = consumerEntry.getValue();
 
             List<TopicPartition> consumerAssignment = assignment.get(consumer);
-            int i = 0;
-            // assign the first N partitions up to the max quota, and mark the 
remaining as being revoked
-            for (TopicPartition tp : ownedPartitions) {
-                if (i < maxQuota) {
-                    consumerAssignment.add(tp);
-                    unassignedPartitions.remove(tp);
-                } else {
-                    allRevokedPartitions.add(tp);
-                }
-                ++i;
-            }
 
             if (ownedPartitions.size() < minQuota) {
+                // the expected assignment size is more than consumer have 
now, so keep all the owned partitions
+                // and put this member into unfilled member list
+                if (ownedPartitions.size() > 0) {
+                    consumerAssignment.addAll(ownedPartitions);
+                    assignedPartitions.addAll(ownedPartitions);
+                }
                 unfilledMembers.add(consumer);
+            } else if (ownedPartitions.size() >= maxQuota && 
numMaxCapacityMembers < numExpectedMaxCapacityMembers) {
+                // consumer owned the "maxQuota" of partitions or more, and 
we're still under the number of expected max capacity members
+                // so keep "maxQuota" of the owned partitions, and revoke the 
rest of the partitions
+                numMaxCapacityMembers++;
+                List<TopicPartition> maxQuotaPartitions = 
ownedPartitions.subList(0, maxQuota);
+                consumerAssignment.addAll(maxQuotaPartitions);
+                assignedPartitions.addAll(maxQuotaPartitions);
+                allRevokedPartitions.addAll(ownedPartitions.subList(maxQuota, 
ownedPartitions.size()));
             } else {
-                // It's possible for a consumer to be at both min and max 
capacity if minQuota == maxQuota
-                if (consumerAssignment.size() == minQuota)
-                    minCapacityMembers.add(consumer);
-                if (consumerAssignment.size() == maxQuota)
-                    maxCapacityMembers.add(consumer);
+                // consumer owned at least "minQuota" of partitions but we're 
already at the allowed number of max capacity members
+                // so keep "minQuota" of the owned partitions, and revoke the 
rest of the partitions
+                List<TopicPartition> minQuotaPartitions = 
ownedPartitions.subList(0, minQuota);
+                consumerAssignment.addAll(minQuotaPartitions);
+                assignedPartitions.addAll(minQuotaPartitions);
+                allRevokedPartitions.addAll(ownedPartitions.subList(minQuota, 
ownedPartitions.size()));
             }
         }
 
+        List<TopicPartition> unassignedPartitions = 
getUnassignedPartitions(totalPartitionsCount, partitionsPerTopic, 
assignedPartitions);
+        assignedPartitions = null;
+
+        if (log.isDebugEnabled()) {
+            log.debug("After reassigning previously owned partitions, unfilled 
members: {}, unassigned partitions: {}, " +
+                "current assignment: {}", unfilledMembers, 
unassignedPartitions, assignment);
+        }
+
         Collections.sort(unfilledMembers);
-        Iterator<TopicPartition> unassignedPartitionsIter = 
unassignedPartitions.iterator();
-
-        // Fill remaining members up to minQuota
-        while (!unfilledMembers.isEmpty() && !unassignedPartitions.isEmpty()) {
-            Iterator<String> unfilledConsumerIter = unfilledMembers.iterator();
-
-            while (unfilledConsumerIter.hasNext()) {
-                String consumer = unfilledConsumerIter.next();
-                List<TopicPartition> consumerAssignment = 
assignment.get(consumer);
-
-                if (unassignedPartitionsIter.hasNext()) {
-                    TopicPartition tp = unassignedPartitionsIter.next();
-                    consumerAssignment.add(tp);
-                    unassignedPartitionsIter.remove();
-                    // We already assigned all possible ownedPartitions, so we 
know this must be newly to this consumer
-                    if (allRevokedPartitions.contains(tp))
-                        partitionsTransferringOwnership.put(tp, consumer);
-                } else {
-                    break;
-                }
 
-                if (consumerAssignment.size() == minQuota) {
-                    minCapacityMembers.add(consumer);
-                    unfilledConsumerIter.remove();
+        Iterator<String> unfilledConsumerIter = unfilledMembers.iterator();
+        // Round-Robin filling remaining members up to the expected numbers of 
maxQuota, otherwise, to minQuota
+        for (TopicPartition unassignedPartition : unassignedPartitions) {
+            if (!unfilledConsumerIter.hasNext()) {
+                if (unfilledMembers.isEmpty()) {
+                    // Should not enter here since we have calculated the 
exact number to assign to each consumer
+                    log.warn("No more unfilled consumers to be assigned.");
+                    break;
                 }
+                unfilledConsumerIter = unfilledMembers.iterator();
             }
-        }
-
-        // If we ran out of unassigned partitions before filling all 
consumers, we need to start stealing partitions
-        // from the over-full consumers at max capacity
-        for (String consumer : unfilledMembers) {
+            String consumer = unfilledConsumerIter.next();
             List<TopicPartition> consumerAssignment = assignment.get(consumer);
-            int remainingCapacity = minQuota - consumerAssignment.size();
-            while (remainingCapacity > 0) {
-                String overloadedConsumer = maxCapacityMembers.poll();
-                if (overloadedConsumer == null) {
-                    throw new IllegalStateException("Some consumers are under 
capacity but all partitions have been assigned");
+            consumerAssignment.add(unassignedPartition);
+
+            // We already assigned all possible ownedPartitions, so we know 
this must be newly to this consumer
+            if (allRevokedPartitions.contains(unassignedPartition))
+                partitionsTransferringOwnership.put(unassignedPartition, 
consumer);
+
+            int currentAssignedCount = consumerAssignment.size();
+            int expectedAssignedCount = numMaxCapacityMembers < 
numExpectedMaxCapacityMembers ? maxQuota : minQuota;
+            if (currentAssignedCount == expectedAssignedCount) {
+                if (currentAssignedCount == maxQuota) {
+                    numMaxCapacityMembers++;
                 }
-                TopicPartition swappedPartition = 
assignment.get(overloadedConsumer).remove(0);
-                consumerAssignment.add(swappedPartition);
-                --remainingCapacity;
-                // This partition is by definition transferring ownership, the 
swapped partition must have come from
-                // the max capacity member's owned partitions since it can 
only reach max capacity with owned partitions
-                partitionsTransferringOwnership.put(swappedPartition, 
consumer);
+                unfilledConsumerIter.remove();
             }
-            minCapacityMembers.add(consumer);
         }
 
-        // Otherwise we may have run out of unfilled consumers before 
assigning all partitions, in which case we
-        // should just distribute one partition each to all consumers at min 
capacity
-        for (TopicPartition unassignedPartition : unassignedPartitions) {
-            String underCapacityConsumer = minCapacityMembers.poll();
-            if (underCapacityConsumer == null) {
-                throw new IllegalStateException("Some partitions are 
unassigned but all consumers are at maximum capacity");
-            }
-            // We can skip the bookkeeping of unassignedPartitions and 
maxCapacityMembers here since we are at the end
-            assignment.get(underCapacityConsumer).add(unassignedPartition);
+        if (!unfilledMembers.isEmpty()) {
+            // Should not enter here since we have calculated the exact number 
to assign to each consumer
+            log.warn("No more partitions to be assigned to unfilled consumers: 
{}", unfilledMembers);
+        }
 
-            if (allRevokedPartitions.contains(unassignedPartition))
-                partitionsTransferringOwnership.put(unassignedPartition, 
underCapacityConsumer);
+        if (log.isDebugEnabled()) {
+            log.debug("Final assignment of partitions to consumers: \n{}", 
assignment);
         }
 
         return assignment;
     }
 
-    private SortedSet<TopicPartition> getTopicPartitions(Map<String, Integer> 
partitionsPerTopic) {
-        SortedSet<TopicPartition> allPartitions =
-            new 
TreeSet<>(Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition));
-        for (Entry<String, Integer> entry: partitionsPerTopic.entrySet()) {
-            String topic = entry.getKey();
-            for (int i = 0; i < entry.getValue(); ++i) {
+    /**
+     * get the unassigned partition list by computing the difference set of 
all sorted partitions
+     * and sortedAssignedPartitions. If no assigned partitions, we'll just 
return all topic partitions.
+     *
+     * To compute the difference set, we use two pointers technique here:
+     *
+     * We loop through the all sorted topics, and then iterate all partitions 
the topic has,
+     * compared with the ith element in sortedAssignedPartitions(i starts from 
0):
+     *   - if not equal to the ith element, add to unassignedPartitions
+     *   - if equal to the the ith element, get next element from 
sortedAssignedPartitions
+     *
+     * @param totalPartitionsCount  all partitions counts in this assignment
+     * @param partitionsPerTopic  The number of partitions for each subscribed 
topic.
+     * @param sortedAssignedPartitions  sorted partitions, all are included in 
the sortedPartitions
+     * @return the partitions don't assign to any current consumers
+     */
+    private List<TopicPartition> getUnassignedPartitions(int 
totalPartitionsCount,
+                                                         Map<String, Integer> 
partitionsPerTopic,
+                                                         List<TopicPartition> 
sortedAssignedPartitions) {
+        List<String> sortedAllTopics = new 
ArrayList<>(partitionsPerTopic.keySet());
+        // sort all topics first, then we can have sorted all topic partitions 
by adding partitions starting from 0
+        Collections.sort(sortedAllTopics);
+
+        if (sortedAssignedPartitions.isEmpty()) {
+            // no assigned partitions means all partitions are unassigned 
partitions
+            return getAllTopicPartitions(partitionsPerTopic, sortedAllTopics, 
totalPartitionsCount);
+        }
+
+        List<TopicPartition> unassignedPartitions = new 
ArrayList<>(totalPartitionsCount - sortedAssignedPartitions.size());
+
+        Collections.sort(sortedAssignedPartitions, 
Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition));
+
+        boolean shouldAddDirectly = false;
+        Iterator<TopicPartition> sortedAssignedPartitionsIter = 
sortedAssignedPartitions.iterator();
+        TopicPartition nextPartition = sortedAssignedPartitionsIter.next();

Review comment:
       nit: rename to `nextAssignedPartition`

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -163,127 +159,180 @@ private boolean allSubscriptionsEqual(Set<String> 
allTopics,
      */
     private Map<String, List<TopicPartition>> constrainedAssign(Map<String, 
Integer> partitionsPerTopic,
                                                                 Map<String, 
List<TopicPartition>> consumerToOwnedPartitions) {
-        SortedSet<TopicPartition> unassignedPartitions = 
getTopicPartitions(partitionsPerTopic);
+        if (log.isDebugEnabled()) {
+            log.debug("performing constrained assign. partitionsPerTopic: {}, 
consumerToOwnedPartitions: {}",
+                partitionsPerTopic, consumerToOwnedPartitions);
+        }
 
         Set<TopicPartition> allRevokedPartitions = new HashSet<>();
 
-        // Each consumer should end up in exactly one of the below
-        // the consumers not yet at capacity
+        // the consumers not yet at expected capacity
         List<String> unfilledMembers = new LinkedList<>();
-        // the members with exactly maxQuota partitions assigned
-        Queue<String> maxCapacityMembers = new LinkedList<>();
-        // the members with exactly minQuota partitions assigned
-        Queue<String> minCapacityMembers = new LinkedList<>();
 
         int numberOfConsumers = consumerToOwnedPartitions.size();
-        int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
-        int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
+        int totalPartitionsCount = 
partitionsPerTopic.values().stream().reduce(0, Integer::sum);
+
+        int minQuota = (int) Math.floor(((double) totalPartitionsCount) / 
numberOfConsumers);
+        int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / 
numberOfConsumers);
+        // the expected number of members with maxQuota assignment
+        int numExpectedMaxCapacityMembers = totalPartitionsCount % 
numberOfConsumers;
+        // the number of members with exactly maxQuota partitions assigned
+        int numMaxCapacityMembers = 0;
 
-        // initialize the assignment map with an empty array of size minQuota 
for all members
+        // initialize the assignment map with an empty array of size maxQuota 
for all members
         Map<String, List<TopicPartition>> assignment = new HashMap<>(
-            
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(minQuota))));
+            
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(maxQuota))));
 
+        List<TopicPartition> assignedPartitions = new ArrayList<>();
         // Reassign as many previously owned partitions as possible
         for (Map.Entry<String, List<TopicPartition>> consumerEntry : 
consumerToOwnedPartitions.entrySet()) {
             String consumer = consumerEntry.getKey();
             List<TopicPartition> ownedPartitions = consumerEntry.getValue();
 
             List<TopicPartition> consumerAssignment = assignment.get(consumer);
-            int i = 0;
-            // assign the first N partitions up to the max quota, and mark the 
remaining as being revoked
-            for (TopicPartition tp : ownedPartitions) {
-                if (i < maxQuota) {
-                    consumerAssignment.add(tp);
-                    unassignedPartitions.remove(tp);
-                } else {
-                    allRevokedPartitions.add(tp);
-                }
-                ++i;
-            }
 
             if (ownedPartitions.size() < minQuota) {
+                // the expected assignment size is more than consumer have 
now, so keep all the owned partitions
+                // and put this member into unfilled member list
+                if (ownedPartitions.size() > 0) {
+                    consumerAssignment.addAll(ownedPartitions);
+                    assignedPartitions.addAll(ownedPartitions);
+                }
                 unfilledMembers.add(consumer);
+            } else if (ownedPartitions.size() >= maxQuota && 
numMaxCapacityMembers < numExpectedMaxCapacityMembers) {
+                // consumer owned the "maxQuota" of partitions or more, and 
we're still under the number of expected max capacity members
+                // so keep "maxQuota" of the owned partitions, and revoke the 
rest of the partitions
+                numMaxCapacityMembers++;
+                List<TopicPartition> maxQuotaPartitions = 
ownedPartitions.subList(0, maxQuota);
+                consumerAssignment.addAll(maxQuotaPartitions);
+                assignedPartitions.addAll(maxQuotaPartitions);
+                allRevokedPartitions.addAll(ownedPartitions.subList(maxQuota, 
ownedPartitions.size()));
             } else {
-                // It's possible for a consumer to be at both min and max 
capacity if minQuota == maxQuota
-                if (consumerAssignment.size() == minQuota)
-                    minCapacityMembers.add(consumer);
-                if (consumerAssignment.size() == maxQuota)
-                    maxCapacityMembers.add(consumer);
+                // consumer owned at least "minQuota" of partitions but we're 
already at the allowed number of max capacity members
+                // so keep "minQuota" of the owned partitions, and revoke the 
rest of the partitions
+                List<TopicPartition> minQuotaPartitions = 
ownedPartitions.subList(0, minQuota);
+                consumerAssignment.addAll(minQuotaPartitions);
+                assignedPartitions.addAll(minQuotaPartitions);
+                allRevokedPartitions.addAll(ownedPartitions.subList(minQuota, 
ownedPartitions.size()));
             }
         }
 
+        List<TopicPartition> unassignedPartitions = 
getUnassignedPartitions(totalPartitionsCount, partitionsPerTopic, 
assignedPartitions);
+        assignedPartitions = null;
+
+        if (log.isDebugEnabled()) {
+            log.debug("After reassigning previously owned partitions, unfilled 
members: {}, unassigned partitions: {}, " +
+                "current assignment: {}", unfilledMembers, 
unassignedPartitions, assignment);
+        }
+
         Collections.sort(unfilledMembers);
-        Iterator<TopicPartition> unassignedPartitionsIter = 
unassignedPartitions.iterator();
-
-        // Fill remaining members up to minQuota
-        while (!unfilledMembers.isEmpty() && !unassignedPartitions.isEmpty()) {
-            Iterator<String> unfilledConsumerIter = unfilledMembers.iterator();
-
-            while (unfilledConsumerIter.hasNext()) {
-                String consumer = unfilledConsumerIter.next();
-                List<TopicPartition> consumerAssignment = 
assignment.get(consumer);
-
-                if (unassignedPartitionsIter.hasNext()) {
-                    TopicPartition tp = unassignedPartitionsIter.next();
-                    consumerAssignment.add(tp);
-                    unassignedPartitionsIter.remove();
-                    // We already assigned all possible ownedPartitions, so we 
know this must be newly to this consumer
-                    if (allRevokedPartitions.contains(tp))
-                        partitionsTransferringOwnership.put(tp, consumer);
-                } else {
-                    break;
-                }
 
-                if (consumerAssignment.size() == minQuota) {
-                    minCapacityMembers.add(consumer);
-                    unfilledConsumerIter.remove();
+        Iterator<String> unfilledConsumerIter = unfilledMembers.iterator();
+        // Round-Robin filling remaining members up to the expected numbers of 
maxQuota, otherwise, to minQuota
+        for (TopicPartition unassignedPartition : unassignedPartitions) {
+            if (!unfilledConsumerIter.hasNext()) {
+                if (unfilledMembers.isEmpty()) {
+                    // Should not enter here since we have calculated the 
exact number to assign to each consumer
+                    log.warn("No more unfilled consumers to be assigned.");
+                    break;
                 }
+                unfilledConsumerIter = unfilledMembers.iterator();
             }
-        }
-
-        // If we ran out of unassigned partitions before filling all 
consumers, we need to start stealing partitions
-        // from the over-full consumers at max capacity
-        for (String consumer : unfilledMembers) {
+            String consumer = unfilledConsumerIter.next();
             List<TopicPartition> consumerAssignment = assignment.get(consumer);
-            int remainingCapacity = minQuota - consumerAssignment.size();
-            while (remainingCapacity > 0) {
-                String overloadedConsumer = maxCapacityMembers.poll();
-                if (overloadedConsumer == null) {
-                    throw new IllegalStateException("Some consumers are under 
capacity but all partitions have been assigned");
+            consumerAssignment.add(unassignedPartition);
+
+            // We already assigned all possible ownedPartitions, so we know 
this must be newly to this consumer
+            if (allRevokedPartitions.contains(unassignedPartition))
+                partitionsTransferringOwnership.put(unassignedPartition, 
consumer);
+
+            int currentAssignedCount = consumerAssignment.size();
+            int expectedAssignedCount = numMaxCapacityMembers < 
numExpectedMaxCapacityMembers ? maxQuota : minQuota;
+            if (currentAssignedCount == expectedAssignedCount) {
+                if (currentAssignedCount == maxQuota) {
+                    numMaxCapacityMembers++;
                 }
-                TopicPartition swappedPartition = 
assignment.get(overloadedConsumer).remove(0);
-                consumerAssignment.add(swappedPartition);
-                --remainingCapacity;
-                // This partition is by definition transferring ownership, the 
swapped partition must have come from
-                // the max capacity member's owned partitions since it can 
only reach max capacity with owned partitions
-                partitionsTransferringOwnership.put(swappedPartition, 
consumer);
+                unfilledConsumerIter.remove();
             }
-            minCapacityMembers.add(consumer);
         }
 
-        // Otherwise we may have run out of unfilled consumers before 
assigning all partitions, in which case we
-        // should just distribute one partition each to all consumers at min 
capacity
-        for (TopicPartition unassignedPartition : unassignedPartitions) {
-            String underCapacityConsumer = minCapacityMembers.poll();
-            if (underCapacityConsumer == null) {
-                throw new IllegalStateException("Some partitions are 
unassigned but all consumers are at maximum capacity");
-            }
-            // We can skip the bookkeeping of unassignedPartitions and 
maxCapacityMembers here since we are at the end
-            assignment.get(underCapacityConsumer).add(unassignedPartition);
+        if (!unfilledMembers.isEmpty()) {
+            // Should not enter here since we have calculated the exact number 
to assign to each consumer
+            log.warn("No more partitions to be assigned to unfilled consumers: 
{}", unfilledMembers);

Review comment:
       This one seems ok to leave as a warning, vs the other way around (ie not 
all partitions assigned). But it could be bumped to ERROR as well

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -163,127 +159,180 @@ private boolean allSubscriptionsEqual(Set<String> 
allTopics,
      */
     private Map<String, List<TopicPartition>> constrainedAssign(Map<String, 
Integer> partitionsPerTopic,
                                                                 Map<String, 
List<TopicPartition>> consumerToOwnedPartitions) {
-        SortedSet<TopicPartition> unassignedPartitions = 
getTopicPartitions(partitionsPerTopic);
+        if (log.isDebugEnabled()) {
+            log.debug("performing constrained assign. partitionsPerTopic: {}, 
consumerToOwnedPartitions: {}",
+                partitionsPerTopic, consumerToOwnedPartitions);
+        }
 
         Set<TopicPartition> allRevokedPartitions = new HashSet<>();
 
-        // Each consumer should end up in exactly one of the below
-        // the consumers not yet at capacity
+        // the consumers not yet at expected capacity
         List<String> unfilledMembers = new LinkedList<>();
-        // the members with exactly maxQuota partitions assigned
-        Queue<String> maxCapacityMembers = new LinkedList<>();
-        // the members with exactly minQuota partitions assigned
-        Queue<String> minCapacityMembers = new LinkedList<>();
 
         int numberOfConsumers = consumerToOwnedPartitions.size();
-        int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
-        int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
+        int totalPartitionsCount = 
partitionsPerTopic.values().stream().reduce(0, Integer::sum);
+
+        int minQuota = (int) Math.floor(((double) totalPartitionsCount) / 
numberOfConsumers);
+        int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / 
numberOfConsumers);
+        // the expected number of members with maxQuota assignment
+        int numExpectedMaxCapacityMembers = totalPartitionsCount % 
numberOfConsumers;
+        // the number of members with exactly maxQuota partitions assigned
+        int numMaxCapacityMembers = 0;
 
-        // initialize the assignment map with an empty array of size minQuota 
for all members
+        // initialize the assignment map with an empty array of size maxQuota 
for all members
         Map<String, List<TopicPartition>> assignment = new HashMap<>(
-            
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(minQuota))));
+            
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(maxQuota))));
 
+        List<TopicPartition> assignedPartitions = new ArrayList<>();
         // Reassign as many previously owned partitions as possible
         for (Map.Entry<String, List<TopicPartition>> consumerEntry : 
consumerToOwnedPartitions.entrySet()) {
             String consumer = consumerEntry.getKey();
             List<TopicPartition> ownedPartitions = consumerEntry.getValue();
 
             List<TopicPartition> consumerAssignment = assignment.get(consumer);
-            int i = 0;
-            // assign the first N partitions up to the max quota, and mark the 
remaining as being revoked
-            for (TopicPartition tp : ownedPartitions) {
-                if (i < maxQuota) {
-                    consumerAssignment.add(tp);
-                    unassignedPartitions.remove(tp);
-                } else {
-                    allRevokedPartitions.add(tp);
-                }
-                ++i;
-            }
 
             if (ownedPartitions.size() < minQuota) {
+                // the expected assignment size is more than consumer have 
now, so keep all the owned partitions
+                // and put this member into unfilled member list
+                if (ownedPartitions.size() > 0) {
+                    consumerAssignment.addAll(ownedPartitions);
+                    assignedPartitions.addAll(ownedPartitions);
+                }
                 unfilledMembers.add(consumer);
+            } else if (ownedPartitions.size() >= maxQuota && 
numMaxCapacityMembers < numExpectedMaxCapacityMembers) {
+                // consumer owned the "maxQuota" of partitions or more, and 
we're still under the number of expected max capacity members
+                // so keep "maxQuota" of the owned partitions, and revoke the 
rest of the partitions
+                numMaxCapacityMembers++;
+                List<TopicPartition> maxQuotaPartitions = 
ownedPartitions.subList(0, maxQuota);
+                consumerAssignment.addAll(maxQuotaPartitions);
+                assignedPartitions.addAll(maxQuotaPartitions);
+                allRevokedPartitions.addAll(ownedPartitions.subList(maxQuota, 
ownedPartitions.size()));
             } else {
-                // It's possible for a consumer to be at both min and max 
capacity if minQuota == maxQuota
-                if (consumerAssignment.size() == minQuota)
-                    minCapacityMembers.add(consumer);
-                if (consumerAssignment.size() == maxQuota)
-                    maxCapacityMembers.add(consumer);
+                // consumer owned at least "minQuota" of partitions but we're 
already at the allowed number of max capacity members

Review comment:
       I wonder if we need to handle one more case? I'm thinking about where 
the consumer had exactly `minQuota` previously owned partitions but we're not 
yet at `numExpectedMaxCapacityMembers`. Should we consider the consumer to be 
"unfilled" since it may be that this consumer needs to eventually get to 
`maxQuota`?
   
   I tried to think up an example where this current logic would break, and I 
couldn't. It's possible that there isn't one just by mathematical coincidence 
-- eg if there's just one consumer which had `minQuota` previously owned 
partitions then in theory it might need to have one more assigned, but in 
reality if there's only one consumer then min and max quota will always be 
equal. Just wondering if you could give this some thought and make sure we're 
not missing something here.
   
   (Even if not, we should update the comment here since it's a bit misleading 
-- we don't _know_ that we're at the allowed max capacity members yet. Maybe we 
can instead leave a comment explaining why we think the above case is not 
possible, assuming you do come to that same conclusion)

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -163,127 +159,180 @@ private boolean allSubscriptionsEqual(Set<String> 
allTopics,
      */
     private Map<String, List<TopicPartition>> constrainedAssign(Map<String, 
Integer> partitionsPerTopic,
                                                                 Map<String, 
List<TopicPartition>> consumerToOwnedPartitions) {
-        SortedSet<TopicPartition> unassignedPartitions = 
getTopicPartitions(partitionsPerTopic);
+        if (log.isDebugEnabled()) {
+            log.debug("performing constrained assign. partitionsPerTopic: {}, 
consumerToOwnedPartitions: {}",
+                partitionsPerTopic, consumerToOwnedPartitions);
+        }
 
         Set<TopicPartition> allRevokedPartitions = new HashSet<>();
 
-        // Each consumer should end up in exactly one of the below
-        // the consumers not yet at capacity
+        // the consumers not yet at expected capacity
         List<String> unfilledMembers = new LinkedList<>();
-        // the members with exactly maxQuota partitions assigned
-        Queue<String> maxCapacityMembers = new LinkedList<>();
-        // the members with exactly minQuota partitions assigned
-        Queue<String> minCapacityMembers = new LinkedList<>();
 
         int numberOfConsumers = consumerToOwnedPartitions.size();
-        int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
-        int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
+        int totalPartitionsCount = 
partitionsPerTopic.values().stream().reduce(0, Integer::sum);
+
+        int minQuota = (int) Math.floor(((double) totalPartitionsCount) / 
numberOfConsumers);
+        int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / 
numberOfConsumers);
+        // the expected number of members with maxQuota assignment
+        int numExpectedMaxCapacityMembers = totalPartitionsCount % 
numberOfConsumers;
+        // the number of members with exactly maxQuota partitions assigned
+        int numMaxCapacityMembers = 0;
 
-        // initialize the assignment map with an empty array of size minQuota 
for all members
+        // initialize the assignment map with an empty array of size maxQuota 
for all members
         Map<String, List<TopicPartition>> assignment = new HashMap<>(
-            
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(minQuota))));
+            
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(maxQuota))));
 
+        List<TopicPartition> assignedPartitions = new ArrayList<>();
         // Reassign as many previously owned partitions as possible
         for (Map.Entry<String, List<TopicPartition>> consumerEntry : 
consumerToOwnedPartitions.entrySet()) {
             String consumer = consumerEntry.getKey();
             List<TopicPartition> ownedPartitions = consumerEntry.getValue();
 
             List<TopicPartition> consumerAssignment = assignment.get(consumer);
-            int i = 0;
-            // assign the first N partitions up to the max quota, and mark the 
remaining as being revoked
-            for (TopicPartition tp : ownedPartitions) {
-                if (i < maxQuota) {
-                    consumerAssignment.add(tp);
-                    unassignedPartitions.remove(tp);
-                } else {
-                    allRevokedPartitions.add(tp);
-                }
-                ++i;
-            }
 
             if (ownedPartitions.size() < minQuota) {
+                // the expected assignment size is more than consumer have 
now, so keep all the owned partitions
+                // and put this member into unfilled member list
+                if (ownedPartitions.size() > 0) {
+                    consumerAssignment.addAll(ownedPartitions);
+                    assignedPartitions.addAll(ownedPartitions);
+                }
                 unfilledMembers.add(consumer);
+            } else if (ownedPartitions.size() >= maxQuota && 
numMaxCapacityMembers < numExpectedMaxCapacityMembers) {
+                // consumer owned the "maxQuota" of partitions or more, and 
we're still under the number of expected max capacity members
+                // so keep "maxQuota" of the owned partitions, and revoke the 
rest of the partitions
+                numMaxCapacityMembers++;
+                List<TopicPartition> maxQuotaPartitions = 
ownedPartitions.subList(0, maxQuota);
+                consumerAssignment.addAll(maxQuotaPartitions);
+                assignedPartitions.addAll(maxQuotaPartitions);
+                allRevokedPartitions.addAll(ownedPartitions.subList(maxQuota, 
ownedPartitions.size()));
             } else {
-                // It's possible for a consumer to be at both min and max 
capacity if minQuota == maxQuota
-                if (consumerAssignment.size() == minQuota)
-                    minCapacityMembers.add(consumer);
-                if (consumerAssignment.size() == maxQuota)
-                    maxCapacityMembers.add(consumer);
+                // consumer owned at least "minQuota" of partitions but we're 
already at the allowed number of max capacity members
+                // so keep "minQuota" of the owned partitions, and revoke the 
rest of the partitions
+                List<TopicPartition> minQuotaPartitions = 
ownedPartitions.subList(0, minQuota);
+                consumerAssignment.addAll(minQuotaPartitions);
+                assignedPartitions.addAll(minQuotaPartitions);
+                allRevokedPartitions.addAll(ownedPartitions.subList(minQuota, 
ownedPartitions.size()));
             }
         }
 
+        List<TopicPartition> unassignedPartitions = 
getUnassignedPartitions(totalPartitionsCount, partitionsPerTopic, 
assignedPartitions);
+        assignedPartitions = null;
+
+        if (log.isDebugEnabled()) {
+            log.debug("After reassigning previously owned partitions, unfilled 
members: {}, unassigned partitions: {}, " +
+                "current assignment: {}", unfilledMembers, 
unassignedPartitions, assignment);
+        }
+
         Collections.sort(unfilledMembers);
-        Iterator<TopicPartition> unassignedPartitionsIter = 
unassignedPartitions.iterator();
-
-        // Fill remaining members up to minQuota
-        while (!unfilledMembers.isEmpty() && !unassignedPartitions.isEmpty()) {
-            Iterator<String> unfilledConsumerIter = unfilledMembers.iterator();
-
-            while (unfilledConsumerIter.hasNext()) {
-                String consumer = unfilledConsumerIter.next();
-                List<TopicPartition> consumerAssignment = 
assignment.get(consumer);
-
-                if (unassignedPartitionsIter.hasNext()) {
-                    TopicPartition tp = unassignedPartitionsIter.next();
-                    consumerAssignment.add(tp);
-                    unassignedPartitionsIter.remove();
-                    // We already assigned all possible ownedPartitions, so we 
know this must be newly to this consumer
-                    if (allRevokedPartitions.contains(tp))
-                        partitionsTransferringOwnership.put(tp, consumer);
-                } else {
-                    break;
-                }
 
-                if (consumerAssignment.size() == minQuota) {
-                    minCapacityMembers.add(consumer);
-                    unfilledConsumerIter.remove();
+        Iterator<String> unfilledConsumerIter = unfilledMembers.iterator();
+        // Round-Robin filling remaining members up to the expected numbers of 
maxQuota, otherwise, to minQuota
+        for (TopicPartition unassignedPartition : unassignedPartitions) {
+            if (!unfilledConsumerIter.hasNext()) {
+                if (unfilledMembers.isEmpty()) {
+                    // Should not enter here since we have calculated the 
exact number to assign to each consumer
+                    log.warn("No more unfilled consumers to be assigned.");

Review comment:
       I think we should take this case more seriously, because it will mean 
some partitions don't get assigned. Either we log it as error and throw an 
exception, or try to handle it gracefully by continuing through the loop to get 
all remaining unassigned partitions and then distributing them out to whoever 
has the fewest assigned partitions. Imo we should just throw an exception since 
hitting this might mean there's a more critical bug in the algorithm, at which 
point who knows if there are other effects like assigning the same partition to 
two owners, etc




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to