ableegoldman commented on a change in pull request #10509:
URL: https://github.com/apache/kafka/pull/10509#discussion_r621502594



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -163,127 +159,179 @@ private boolean allSubscriptionsEqual(Set<String> 
allTopics,
      */
     private Map<String, List<TopicPartition>> constrainedAssign(Map<String, 
Integer> partitionsPerTopic,
                                                                 Map<String, 
List<TopicPartition>> consumerToOwnedPartitions) {
-        SortedSet<TopicPartition> unassignedPartitions = 
getTopicPartitions(partitionsPerTopic);
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("performing constrained assign. 
partitionsPerTopic: %s, consumerToOwnedPartitions: %s",
+                partitionsPerTopic, consumerToOwnedPartitions));
+        }
+
+        List<TopicPartition> sortedAllPartitions = 
getTopicPartitions(partitionsPerTopic);
 
         Set<TopicPartition> allRevokedPartitions = new HashSet<>();
 
-        // Each consumer should end up in exactly one of the below
-        // the consumers not yet at capacity
+        // the consumers not yet at expected capacity
         List<String> unfilledMembers = new LinkedList<>();
-        // the members with exactly maxQuota partitions assigned
-        Queue<String> maxCapacityMembers = new LinkedList<>();
-        // the members with exactly minQuota partitions assigned
-        Queue<String> minCapacityMembers = new LinkedList<>();
 
         int numberOfConsumers = consumerToOwnedPartitions.size();
-        int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
-        int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
+        int totalPartitionsCount = sortedAllPartitions.size();
+
+        int minQuota = (int) Math.floor(((double) totalPartitionsCount) / 
numberOfConsumers);
+        int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / 
numberOfConsumers);
+        // the expected number of members with maxQuota assignment
+        int numExpectedMaxCapacityMembers = totalPartitionsCount % 
numberOfConsumers;
+        // the number of members with exactly maxQuota partitions assigned
+        int numMaxCapacityMembers = 0;
 
-        // initialize the assignment map with an empty array of size minQuota 
for all members
+        // initialize the assignment map with an empty array of size maxQuota 
for all members
         Map<String, List<TopicPartition>> assignment = new HashMap<>(
-            
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(minQuota))));
+            
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(maxQuota))));
 
+        List<TopicPartition> toBeRemovedPartitions = new ArrayList<>();
         // Reassign as many previously owned partitions as possible
         for (Map.Entry<String, List<TopicPartition>> consumerEntry : 
consumerToOwnedPartitions.entrySet()) {
             String consumer = consumerEntry.getKey();
             List<TopicPartition> ownedPartitions = consumerEntry.getValue();
 
             List<TopicPartition> consumerAssignment = assignment.get(consumer);
-            int i = 0;
-            // assign the first N partitions up to the max quota, and mark the 
remaining as being revoked
-            for (TopicPartition tp : ownedPartitions) {
-                if (i < maxQuota) {
-                    consumerAssignment.add(tp);
-                    unassignedPartitions.remove(tp);
-                } else {
-                    allRevokedPartitions.add(tp);
-                }
-                ++i;
-            }
 
             if (ownedPartitions.size() < minQuota) {
+                // the expected assignment size is more than consumer have 
now, so keep all the owned partitions
+                // and put this member into unfilled member list
+                if (ownedPartitions.size() > 0) {
+                    consumerAssignment.addAll(ownedPartitions);
+                    toBeRemovedPartitions.addAll(ownedPartitions);
+                }
                 unfilledMembers.add(consumer);
+            } else if (ownedPartitions.size() >= maxQuota && 
numMaxCapacityMembers++ <= numExpectedMaxCapacityMembers) {

Review comment:
       nit: don't use `++` inline like this, it makes the code harder to 
understand plus we'll end up incrementing it past the 
`numExpectedMaxCapacityMembers` so its value won't represent the actual number 
of members at max capacity in the end. This might be confusing, eg if we want 
to add logging that includes this value later on. Let's increment it in the 
body of this `else if`

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -163,127 +159,179 @@ private boolean allSubscriptionsEqual(Set<String> 
allTopics,
      */
     private Map<String, List<TopicPartition>> constrainedAssign(Map<String, 
Integer> partitionsPerTopic,
                                                                 Map<String, 
List<TopicPartition>> consumerToOwnedPartitions) {
-        SortedSet<TopicPartition> unassignedPartitions = 
getTopicPartitions(partitionsPerTopic);
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("performing constrained assign. 
partitionsPerTopic: %s, consumerToOwnedPartitions: %s",
+                partitionsPerTopic, consumerToOwnedPartitions));
+        }
+
+        List<TopicPartition> sortedAllPartitions = 
getTopicPartitions(partitionsPerTopic);
 
         Set<TopicPartition> allRevokedPartitions = new HashSet<>();
 
-        // Each consumer should end up in exactly one of the below
-        // the consumers not yet at capacity
+        // the consumers not yet at expected capacity
         List<String> unfilledMembers = new LinkedList<>();
-        // the members with exactly maxQuota partitions assigned
-        Queue<String> maxCapacityMembers = new LinkedList<>();
-        // the members with exactly minQuota partitions assigned
-        Queue<String> minCapacityMembers = new LinkedList<>();
 
         int numberOfConsumers = consumerToOwnedPartitions.size();
-        int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
-        int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
+        int totalPartitionsCount = sortedAllPartitions.size();
+
+        int minQuota = (int) Math.floor(((double) totalPartitionsCount) / 
numberOfConsumers);
+        int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / 
numberOfConsumers);
+        // the expected number of members with maxQuota assignment
+        int numExpectedMaxCapacityMembers = totalPartitionsCount % 
numberOfConsumers;
+        // the number of members with exactly maxQuota partitions assigned
+        int numMaxCapacityMembers = 0;
 
-        // initialize the assignment map with an empty array of size minQuota 
for all members
+        // initialize the assignment map with an empty array of size maxQuota 
for all members
         Map<String, List<TopicPartition>> assignment = new HashMap<>(
-            
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(minQuota))));
+            
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(maxQuota))));
 
+        List<TopicPartition> toBeRemovedPartitions = new ArrayList<>();
         // Reassign as many previously owned partitions as possible
         for (Map.Entry<String, List<TopicPartition>> consumerEntry : 
consumerToOwnedPartitions.entrySet()) {
             String consumer = consumerEntry.getKey();
             List<TopicPartition> ownedPartitions = consumerEntry.getValue();
 
             List<TopicPartition> consumerAssignment = assignment.get(consumer);
-            int i = 0;
-            // assign the first N partitions up to the max quota, and mark the 
remaining as being revoked
-            for (TopicPartition tp : ownedPartitions) {
-                if (i < maxQuota) {
-                    consumerAssignment.add(tp);
-                    unassignedPartitions.remove(tp);
-                } else {
-                    allRevokedPartitions.add(tp);
-                }
-                ++i;
-            }
 
             if (ownedPartitions.size() < minQuota) {
+                // the expected assignment size is more than consumer have 
now, so keep all the owned partitions
+                // and put this member into unfilled member list
+                if (ownedPartitions.size() > 0) {
+                    consumerAssignment.addAll(ownedPartitions);
+                    toBeRemovedPartitions.addAll(ownedPartitions);
+                }
                 unfilledMembers.add(consumer);
+            } else if (ownedPartitions.size() >= maxQuota && 
numMaxCapacityMembers++ <= numExpectedMaxCapacityMembers) {
+                // consumer owned the "maxQuota" of partitions or more, and we 
still under the number of expected max capacity members

Review comment:
       ```suggestion
                   // consumer owned the "maxQuota" of partitions or more, and 
we're still under the number of expected max capacity members
   ```

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -163,127 +159,179 @@ private boolean allSubscriptionsEqual(Set<String> 
allTopics,
      */
     private Map<String, List<TopicPartition>> constrainedAssign(Map<String, 
Integer> partitionsPerTopic,
                                                                 Map<String, 
List<TopicPartition>> consumerToOwnedPartitions) {
-        SortedSet<TopicPartition> unassignedPartitions = 
getTopicPartitions(partitionsPerTopic);
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("performing constrained assign. 
partitionsPerTopic: %s, consumerToOwnedPartitions: %s",
+                partitionsPerTopic, consumerToOwnedPartitions));
+        }
+
+        List<TopicPartition> sortedAllPartitions = 
getTopicPartitions(partitionsPerTopic);
 
         Set<TopicPartition> allRevokedPartitions = new HashSet<>();
 
-        // Each consumer should end up in exactly one of the below
-        // the consumers not yet at capacity
+        // the consumers not yet at expected capacity
         List<String> unfilledMembers = new LinkedList<>();
-        // the members with exactly maxQuota partitions assigned
-        Queue<String> maxCapacityMembers = new LinkedList<>();
-        // the members with exactly minQuota partitions assigned
-        Queue<String> minCapacityMembers = new LinkedList<>();
 
         int numberOfConsumers = consumerToOwnedPartitions.size();
-        int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
-        int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
+        int totalPartitionsCount = sortedAllPartitions.size();
+
+        int minQuota = (int) Math.floor(((double) totalPartitionsCount) / 
numberOfConsumers);
+        int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / 
numberOfConsumers);
+        // the expected number of members with maxQuota assignment
+        int numExpectedMaxCapacityMembers = totalPartitionsCount % 
numberOfConsumers;
+        // the number of members with exactly maxQuota partitions assigned
+        int numMaxCapacityMembers = 0;
 
-        // initialize the assignment map with an empty array of size minQuota 
for all members
+        // initialize the assignment map with an empty array of size maxQuota 
for all members
         Map<String, List<TopicPartition>> assignment = new HashMap<>(
-            
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(minQuota))));
+            
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(maxQuota))));
 
+        List<TopicPartition> toBeRemovedPartitions = new ArrayList<>();
         // Reassign as many previously owned partitions as possible
         for (Map.Entry<String, List<TopicPartition>> consumerEntry : 
consumerToOwnedPartitions.entrySet()) {
             String consumer = consumerEntry.getKey();
             List<TopicPartition> ownedPartitions = consumerEntry.getValue();
 
             List<TopicPartition> consumerAssignment = assignment.get(consumer);
-            int i = 0;
-            // assign the first N partitions up to the max quota, and mark the 
remaining as being revoked
-            for (TopicPartition tp : ownedPartitions) {
-                if (i < maxQuota) {
-                    consumerAssignment.add(tp);
-                    unassignedPartitions.remove(tp);
-                } else {
-                    allRevokedPartitions.add(tp);
-                }
-                ++i;
-            }
 
             if (ownedPartitions.size() < minQuota) {
+                // the expected assignment size is more than consumer have 
now, so keep all the owned partitions
+                // and put this member into unfilled member list
+                if (ownedPartitions.size() > 0) {
+                    consumerAssignment.addAll(ownedPartitions);
+                    toBeRemovedPartitions.addAll(ownedPartitions);
+                }
                 unfilledMembers.add(consumer);
+            } else if (ownedPartitions.size() >= maxQuota && 
numMaxCapacityMembers++ <= numExpectedMaxCapacityMembers) {
+                // consumer owned the "maxQuota" of partitions or more, and we 
still under the number of expected max capacity members
+                // so keep "maxQuota" of the owned partitions, and revoke the 
rest of the partitions
+                consumerAssignment.addAll(ownedPartitions.subList(0, 
maxQuota));
+                toBeRemovedPartitions.addAll(ownedPartitions.subList(0, 
maxQuota));
+                allRevokedPartitions.addAll(ownedPartitions.subList(maxQuota, 
ownedPartitions.size()));
             } else {
-                // It's possible for a consumer to be at both min and max 
capacity if minQuota == maxQuota
-                if (consumerAssignment.size() == minQuota)
-                    minCapacityMembers.add(consumer);
-                if (consumerAssignment.size() == maxQuota)
-                    maxCapacityMembers.add(consumer);
+                // consumer owned the "minQuota" of partitions or more

Review comment:
       ```suggestion
                   // consumer owned at least "minQuota" of partitions but 
we're already at the allowed number of max capacity members
   ```

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -163,127 +159,179 @@ private boolean allSubscriptionsEqual(Set<String> 
allTopics,
      */
     private Map<String, List<TopicPartition>> constrainedAssign(Map<String, 
Integer> partitionsPerTopic,
                                                                 Map<String, 
List<TopicPartition>> consumerToOwnedPartitions) {
-        SortedSet<TopicPartition> unassignedPartitions = 
getTopicPartitions(partitionsPerTopic);
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("performing constrained assign. 
partitionsPerTopic: %s, consumerToOwnedPartitions: %s",
+                partitionsPerTopic, consumerToOwnedPartitions));
+        }
+
+        List<TopicPartition> sortedAllPartitions = 
getTopicPartitions(partitionsPerTopic);
 
         Set<TopicPartition> allRevokedPartitions = new HashSet<>();
 
-        // Each consumer should end up in exactly one of the below
-        // the consumers not yet at capacity
+        // the consumers not yet at expected capacity
         List<String> unfilledMembers = new LinkedList<>();
-        // the members with exactly maxQuota partitions assigned
-        Queue<String> maxCapacityMembers = new LinkedList<>();
-        // the members with exactly minQuota partitions assigned
-        Queue<String> minCapacityMembers = new LinkedList<>();
 
         int numberOfConsumers = consumerToOwnedPartitions.size();
-        int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
-        int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
+        int totalPartitionsCount = sortedAllPartitions.size();
+
+        int minQuota = (int) Math.floor(((double) totalPartitionsCount) / 
numberOfConsumers);
+        int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / 
numberOfConsumers);
+        // the expected number of members with maxQuota assignment
+        int numExpectedMaxCapacityMembers = totalPartitionsCount % 
numberOfConsumers;
+        // the number of members with exactly maxQuota partitions assigned
+        int numMaxCapacityMembers = 0;
 
-        // initialize the assignment map with an empty array of size minQuota 
for all members
+        // initialize the assignment map with an empty array of size maxQuota 
for all members
         Map<String, List<TopicPartition>> assignment = new HashMap<>(
-            
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(minQuota))));
+            
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(maxQuota))));
 
+        List<TopicPartition> toBeRemovedPartitions = new ArrayList<>();
         // Reassign as many previously owned partitions as possible
         for (Map.Entry<String, List<TopicPartition>> consumerEntry : 
consumerToOwnedPartitions.entrySet()) {
             String consumer = consumerEntry.getKey();
             List<TopicPartition> ownedPartitions = consumerEntry.getValue();
 
             List<TopicPartition> consumerAssignment = assignment.get(consumer);
-            int i = 0;
-            // assign the first N partitions up to the max quota, and mark the 
remaining as being revoked
-            for (TopicPartition tp : ownedPartitions) {
-                if (i < maxQuota) {
-                    consumerAssignment.add(tp);
-                    unassignedPartitions.remove(tp);
-                } else {
-                    allRevokedPartitions.add(tp);
-                }
-                ++i;
-            }
 
             if (ownedPartitions.size() < minQuota) {
+                // the expected assignment size is more than consumer have 
now, so keep all the owned partitions
+                // and put this member into unfilled member list
+                if (ownedPartitions.size() > 0) {
+                    consumerAssignment.addAll(ownedPartitions);
+                    toBeRemovedPartitions.addAll(ownedPartitions);
+                }
                 unfilledMembers.add(consumer);
+            } else if (ownedPartitions.size() >= maxQuota && 
numMaxCapacityMembers++ <= numExpectedMaxCapacityMembers) {
+                // consumer owned the "maxQuota" of partitions or more, and we 
still under the number of expected max capacity members
+                // so keep "maxQuota" of the owned partitions, and revoke the 
rest of the partitions
+                consumerAssignment.addAll(ownedPartitions.subList(0, 
maxQuota));
+                toBeRemovedPartitions.addAll(ownedPartitions.subList(0, 
maxQuota));
+                allRevokedPartitions.addAll(ownedPartitions.subList(maxQuota, 
ownedPartitions.size()));
             } else {
-                // It's possible for a consumer to be at both min and max 
capacity if minQuota == maxQuota
-                if (consumerAssignment.size() == minQuota)
-                    minCapacityMembers.add(consumer);
-                if (consumerAssignment.size() == maxQuota)
-                    maxCapacityMembers.add(consumer);
+                // consumer owned the "minQuota" of partitions or more
+                // so keep "minQuota" of the owned partitions, and revoke the 
rest of the partitions
+                consumerAssignment.addAll(ownedPartitions.subList(0, 
minQuota));
+                toBeRemovedPartitions.addAll(ownedPartitions.subList(0, 
minQuota));
+                allRevokedPartitions.addAll(ownedPartitions.subList(minQuota, 
ownedPartitions.size()));
             }
         }
 
+        List<TopicPartition> unassignedPartitions;
+        if (!toBeRemovedPartitions.isEmpty()) {
+            Collections.sort(toBeRemovedPartitions,
+                
Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition));
+            unassignedPartitions = 
getUnassignedPartitions(sortedAllPartitions, toBeRemovedPartitions);
+            sortedAllPartitions = null;
+        } else {
+            unassignedPartitions = sortedAllPartitions;
+        }
+        toBeRemovedPartitions = null;
+
+        if (log.isDebugEnabled()) {
+            log.debug(String.format(
+                "After reassigning previously owned partitions, unfilled 
members: %s, unassigned partitions: %s, " +
+                    "current assignment: %s", unfilledMembers, 
unassignedPartitions, assignment));
+        }
+
         Collections.sort(unfilledMembers);
         Iterator<TopicPartition> unassignedPartitionsIter = 
unassignedPartitions.iterator();
 
-        // Fill remaining members up to minQuota
+        // fill remaining members up to the expected numbers of maxQuota, 
otherwise, to minQuota
         while (!unfilledMembers.isEmpty() && !unassignedPartitions.isEmpty()) {
             Iterator<String> unfilledConsumerIter = unfilledMembers.iterator();
 
             while (unfilledConsumerIter.hasNext()) {
                 String consumer = unfilledConsumerIter.next();
                 List<TopicPartition> consumerAssignment = 
assignment.get(consumer);
 
+                int expectedAssignedCount = numMaxCapacityMembers < 
numExpectedMaxCapacityMembers ? maxQuota : minQuota;
+                int currentAssignedCount = consumerAssignment.size();
                 if (unassignedPartitionsIter.hasNext()) {
                     TopicPartition tp = unassignedPartitionsIter.next();
                     consumerAssignment.add(tp);
-                    unassignedPartitionsIter.remove();
+                    currentAssignedCount++;
                     // We already assigned all possible ownedPartitions, so we 
know this must be newly to this consumer
                     if (allRevokedPartitions.contains(tp))
                         partitionsTransferringOwnership.put(tp, consumer);
                 } else {
+                    // This will only happen when current consumer has 
minQuota of partitions, and in previous round,
+                    // the expectedAssignedCount is maxQuota, so, still in 
unfilledMembers list.
+                    // But now, expectedAssignedCount is minQuota, we can 
remove it.
+                    if (currentAssignedCount != minQuota) {
+                        // Should not enter here since we have calculated the 
exact number to assign to each consumer
+                        log.warn(String.format(
+                            "No more partitions to be assigned. consumer: [%s] 
with current size: %d, but expected size is %d",
+                            consumer, currentAssignedCount, 
expectedAssignedCount));
+                    }
+                    unfilledConsumerIter.remove();
                     break;
                 }
 
-                if (consumerAssignment.size() == minQuota) {
-                    minCapacityMembers.add(consumer);
+                if (currentAssignedCount == expectedAssignedCount) {
+                    if (currentAssignedCount == maxQuota) {
+                        numMaxCapacityMembers++;
+                    }
                     unfilledConsumerIter.remove();
                 }
             }
         }
 
-        // If we ran out of unassigned partitions before filling all 
consumers, we need to start stealing partitions
-        // from the over-full consumers at max capacity
-        for (String consumer : unfilledMembers) {
-            List<TopicPartition> consumerAssignment = assignment.get(consumer);
-            int remainingCapacity = minQuota - consumerAssignment.size();
-            while (remainingCapacity > 0) {
-                String overloadedConsumer = maxCapacityMembers.poll();
-                if (overloadedConsumer == null) {
-                    throw new IllegalStateException("Some consumers are under 
capacity but all partitions have been assigned");
+        if (log.isDebugEnabled()) {
+            log.debug("final assignment: " + assignment);

Review comment:
       ```suggestion
               log.debug("Final assignment of partitions to consumers: \n{}", 
assignment);
   ```

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -163,127 +159,179 @@ private boolean allSubscriptionsEqual(Set<String> 
allTopics,
      */
     private Map<String, List<TopicPartition>> constrainedAssign(Map<String, 
Integer> partitionsPerTopic,
                                                                 Map<String, 
List<TopicPartition>> consumerToOwnedPartitions) {
-        SortedSet<TopicPartition> unassignedPartitions = 
getTopicPartitions(partitionsPerTopic);
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("performing constrained assign. 
partitionsPerTopic: %s, consumerToOwnedPartitions: %s",
+                partitionsPerTopic, consumerToOwnedPartitions));
+        }
+
+        List<TopicPartition> sortedAllPartitions = 
getTopicPartitions(partitionsPerTopic);
 
         Set<TopicPartition> allRevokedPartitions = new HashSet<>();
 
-        // Each consumer should end up in exactly one of the below
-        // the consumers not yet at capacity
+        // the consumers not yet at expected capacity
         List<String> unfilledMembers = new LinkedList<>();
-        // the members with exactly maxQuota partitions assigned
-        Queue<String> maxCapacityMembers = new LinkedList<>();
-        // the members with exactly minQuota partitions assigned
-        Queue<String> minCapacityMembers = new LinkedList<>();
 
         int numberOfConsumers = consumerToOwnedPartitions.size();
-        int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
-        int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
+        int totalPartitionsCount = sortedAllPartitions.size();
+
+        int minQuota = (int) Math.floor(((double) totalPartitionsCount) / 
numberOfConsumers);
+        int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / 
numberOfConsumers);
+        // the expected number of members with maxQuota assignment
+        int numExpectedMaxCapacityMembers = totalPartitionsCount % 
numberOfConsumers;
+        // the number of members with exactly maxQuota partitions assigned
+        int numMaxCapacityMembers = 0;
 
-        // initialize the assignment map with an empty array of size minQuota 
for all members
+        // initialize the assignment map with an empty array of size maxQuota 
for all members
         Map<String, List<TopicPartition>> assignment = new HashMap<>(
-            
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(minQuota))));
+            
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(maxQuota))));
 
+        List<TopicPartition> toBeRemovedPartitions = new ArrayList<>();

Review comment:
       Can we pick a better name for this? I had trouble understanding what 
this list was for, but after reading all the code it sounds like it's actually 
just the partitions which have been assigned. And thus are "to be removed" from 
the list of partitions to be assigned -- is that where the name comes from? I 
do see the logic behind that, but it wasn't clear until I'd carefully read 
everything. Maybe just `assignedPartitions` would be clearer? 🤷‍♀️ 
   

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -263,16 +279,59 @@ private boolean allSubscriptionsEqual(Set<String> 
allTopics,
         if (log.isDebugEnabled()) {
             log.debug("final assignment: " + assignment);
         }
-
+        
         return assignment;
     }
 
-    private SortedSet<TopicPartition> getTopicPartitions(Map<String, Integer> 
partitionsPerTopic) {
-        SortedSet<TopicPartition> allPartitions =
-            new 
TreeSet<>(Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition));
-        for (Entry<String, Integer> entry: partitionsPerTopic.entrySet()) {
-            String topic = entry.getKey();
-            for (int i = 0; i < entry.getValue(); ++i) {
+    /**
+     * get the unassigned partition list by computing the difference set of 
the sortedPartitions(all partitions)
+     * and sortedToBeRemovedPartitions. We use two pointers technique here:
+     *
+     * We loop the sortedPartition, and compare the ith element in sorted 
toBeRemovedPartitions(i start from 0):
+     *   - if not equal to the ith element, add to unassignedPartitions
+     *   - if equal to the the ith element, get next element from 
sortedToBeRemovedPartitions
+     *
+     * @param sortedPartitions: sorted all partitions
+     * @param sortedToBeRemovedPartitions: sorted partitions, all are included 
in the sortedPartitions
+     * @return the partitions don't assign to any current consumers
+     */
+    private List<TopicPartition> getUnassignedPartitions(List<TopicPartition> 
sortedPartitions,
+                                                         List<TopicPartition> 
sortedToBeRemovedPartitions) {
+        List<TopicPartition> unassignedPartitions = new ArrayList<>(
+            sortedPartitions.size() - sortedToBeRemovedPartitions.size());
+
+        int index = 0;
+        boolean shouldAddDirectly = false;
+        int sizeToBeRemovedPartitions = sortedToBeRemovedPartitions.size();
+        TopicPartition nextPartition = sortedToBeRemovedPartitions.get(index);
+        for (TopicPartition topicPartition : sortedPartitions) {
+            if (shouldAddDirectly || !nextPartition.equals(topicPartition)) {
+                unassignedPartitions.add(topicPartition);
+            } else {
+                // equal case, don't add to unassignedPartitions, just get 
next partition
+                if (index < sizeToBeRemovedPartitions - 1) {
+                    nextPartition = sortedToBeRemovedPartitions.get(++index);
+                } else {
+                    // add the remaining directly since there is no more 
toBeRemovedPartitions
+                    shouldAddDirectly = true;
+                }
+            }
+        }
+        return unassignedPartitions;
+    }
+
+
+    private List<TopicPartition> getTopicPartitions(Map<String, Integer> 
partitionsPerTopic) {
+        List<TopicPartition> allPartitions = new ArrayList<>(
+            partitionsPerTopic.values().stream().reduce(0, Integer::sum));
+
+        List<String> allTopics = new ArrayList<>(partitionsPerTopic.keySet());
+        // sort all topics first, then we can have sorted all topic partitions 
by adding partitions starting from 0
+        Collections.sort(allTopics);

Review comment:
       I did the math and it seems to come down to roughly O(N*logN) vs 
O(2*logN), which for N = 1 million is a roughly 10x improvement. Not bad, of 
course it is a tradeoff and there are other factors as mentioned above.  But 
still very nice 👍 

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -263,16 +279,59 @@ private boolean allSubscriptionsEqual(Set<String> 
allTopics,
         if (log.isDebugEnabled()) {
             log.debug("final assignment: " + assignment);
         }
-
+        
         return assignment;
     }
 
-    private SortedSet<TopicPartition> getTopicPartitions(Map<String, Integer> 
partitionsPerTopic) {
-        SortedSet<TopicPartition> allPartitions =
-            new 
TreeSet<>(Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition));
-        for (Entry<String, Integer> entry: partitionsPerTopic.entrySet()) {
-            String topic = entry.getKey();
-            for (int i = 0; i < entry.getValue(); ++i) {
+    /**
+     * get the unassigned partition list by computing the difference set of 
the sortedPartitions(all partitions)
+     * and sortedToBeRemovedPartitions. We use two pointers technique here:
+     *
+     * We loop the sortedPartition, and compare the ith element in sorted 
toBeRemovedPartitions(i start from 0):
+     *   - if not equal to the ith element, add to unassignedPartitions
+     *   - if equal to the the ith element, get next element from 
sortedToBeRemovedPartitions
+     *
+     * @param sortedPartitions: sorted all partitions
+     * @param sortedToBeRemovedPartitions: sorted partitions, all are included 
in the sortedPartitions
+     * @return the partitions don't assign to any current consumers
+     */
+    private List<TopicPartition> getUnassignedPartitions(List<TopicPartition> 
sortedPartitions,

Review comment:
       Just to be clear, a `removeAll(M partitions)` operation on TreeSet 
should still be only O(M*logN) since each individual remove is only logN. Even 
for N = 1 million, logN is under 20. So it scales more with how many partitions 
are being removed. 
   
   I tried to do the math here and found the time complexity of the original to 
be (slightly) better on paper, but maybe I missed something or the reality is 
just different for certain input parameters (bigO time is not an absolute law 
after all 🙂 ) Maybe you can check my work and then run some tests with and 
without this specific change (but with all other improvements included).
   
   Let's say the number of consumers is C, the number of partitions assigned to 
each consumer is M, and the total number of partitions is N
   
   Before:
   Loop through all consumers and call (TreeSet) 
unassignedPartitions.removeAll(assignedPartitions). This is `C * M * logN` 
(where N will actually decrease down to ~0 by the end of the loop since as you 
pointed out, most partitions should be reassigned in the _sticky_ assignor)
   
   After:
   Loop through all consumers and call (ArrayList) 
toBeRemovedPartitions.addAll(assignedPartitions). Since addAll has to copy all 
M elements, this is C * M. After that we call sort(toBeRemovedPartitions), 
where toBeRemovedPartitions = C * M, so this is C * M * log(C * M). And then 
finally we call getUnassignedPartitions, which  is O(N). So putting it 
altogether we have C * M + C * M * log(C*M) + N
   
   It's a little hard to compare these directly but in general we'll have C * M 
≈ N (since most partitions are reassigned) in which case the before vs after 
runtimes can be reduced to O(N<sup>2</sup>log(N)) vs O(2N + 
N<sup>2</sup>log(N)). The before case is actually better, although they're 
roughly the same for large N. If this analysis holds up in your experiments 
then we should just go with whichever one uses less memory and/or has the 
simplest code. In my (admittedly biased) opinion the original method was the 
easiest to understand in the code, and objectively speaking it also used less 
memory since we only need the one `unassignedPartitions` data structure. ie we 
can just rename `sortedAllPartitions` back to `unassignedPartitions` (or maybe 
partitionsToBeAssigned is better?) and then we can delete both the 
`partitionsToBeRemoved` and the second `unassignedPartitions`. Thoughts?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -263,16 +279,59 @@ private boolean allSubscriptionsEqual(Set<String> 
allTopics,
         if (log.isDebugEnabled()) {
             log.debug("final assignment: " + assignment);
         }
-
+        
         return assignment;
     }
 
-    private SortedSet<TopicPartition> getTopicPartitions(Map<String, Integer> 
partitionsPerTopic) {
-        SortedSet<TopicPartition> allPartitions =
-            new 
TreeSet<>(Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition));
-        for (Entry<String, Integer> entry: partitionsPerTopic.entrySet()) {
-            String topic = entry.getKey();
-            for (int i = 0; i < entry.getValue(); ++i) {
+    /**
+     * get the unassigned partition list by computing the difference set of 
the sortedPartitions(all partitions)
+     * and sortedToBeRemovedPartitions. We use two pointers technique here:
+     *
+     * We loop the sortedPartition, and compare the ith element in sorted 
toBeRemovedPartitions(i start from 0):
+     *   - if not equal to the ith element, add to unassignedPartitions
+     *   - if equal to the the ith element, get next element from 
sortedToBeRemovedPartitions
+     *
+     * @param sortedPartitions: sorted all partitions
+     * @param sortedToBeRemovedPartitions: sorted partitions, all are included 
in the sortedPartitions
+     * @return the partitions don't assign to any current consumers
+     */
+    private List<TopicPartition> getUnassignedPartitions(List<TopicPartition> 
sortedPartitions,

Review comment:
       Of course we also need to take into account that going back to the 
TreeSet will mean we lose the improvement in `getTopicPartitions` as well. I 
did the math there and found it came down to O(NlogN) vs O(2N). And since logN 
grows so quickly this is like a 10x improvement for 1 million partitions. 
Adding this all up, the new algorithm does come out _slightly_ on top now:
   
   Before we had O(NlogN + N<sup>2</sup>log(N)) and now we have O(4N + NlogN + 
N<sup>2</sup>log(N)). Taking logN ≈ 20 means a 5x improvement for the part that 
scales as N.
   
     -- which is great, obviously, but the time for `getTopicPartitions` is 
negligible compared to the time complexity of this loop we discussed above, ie 
the N<sup>2</sup>term is going to dominate the N term. So we have a tradeoff 
here between an unknown but possibly small performance improvement that uses a 
lot more memory, and a somewhat worse algorithm with only the one data 
structure and slightly cleaner code.
   
   That's a hard tradeoff to comment on without more data. If you could re-run 
your experiments with all other improvements implemented but without these two 
things (ie the use of `toBeRemovedPartitions instead of dynamically removing 
from `unassignedPartitions`, and going back to TreeSet in `getTopicPartitions`) 
then we can see what the actual performance characteristics are and also get a 
sense of how much extra memory we're talking. I'd recommend first addressing 
all my other comments here and re-running with/without this stuff in case those 
other comments have an impact, so we can be fair to both sides.
   
   Anyways that's my analysis here, but you've been looking at this more 
carefully, more recently so it's possible I'm missing something here. 
Interested to hear your take on this

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -108,6 +107,8 @@ private boolean allSubscriptionsEqual(Set<String> allTopics,
                 subscribedTopics.addAll(subscription.topics());
             } else if (!(subscription.topics().size() == 
subscribedTopics.size()
                 && subscribedTopics.containsAll(subscription.topics()))) {
+                // we don't need consumerToOwnedPartitions in general assign 
case
+                consumerToOwnedPartitions = null;

Review comment:
       nit: can we set it to null up in `#assign`, before invoking 
`generalAssign`? I feel that's slightly more future-proof, as it's easy to miss 
that this gets cleared when it occurs deep in this boolean check method, in 
case someone might decide they want to use this map in `generalAssign`.
   Which could happen since it does build up basically this exact same 
information in a later loop -- an alternative to nullifying this map we could 
just pass it in to `generalAssign` to replace the `currentAssignment` map that 
gets filled in via `prepopulateCurrentAssignments`. That won't save us from 
looping through all the assignments entirely since we also need to populate 
`prevAssignments` but will still save some time by cutting out the filling in 
of `currentAssignment`

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -163,127 +159,179 @@ private boolean allSubscriptionsEqual(Set<String> 
allTopics,
      */
     private Map<String, List<TopicPartition>> constrainedAssign(Map<String, 
Integer> partitionsPerTopic,
                                                                 Map<String, 
List<TopicPartition>> consumerToOwnedPartitions) {
-        SortedSet<TopicPartition> unassignedPartitions = 
getTopicPartitions(partitionsPerTopic);
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("performing constrained assign. 
partitionsPerTopic: %s, consumerToOwnedPartitions: %s",
+                partitionsPerTopic, consumerToOwnedPartitions));
+        }
+
+        List<TopicPartition> sortedAllPartitions = 
getTopicPartitions(partitionsPerTopic);
 
         Set<TopicPartition> allRevokedPartitions = new HashSet<>();
 
-        // Each consumer should end up in exactly one of the below
-        // the consumers not yet at capacity
+        // the consumers not yet at expected capacity
         List<String> unfilledMembers = new LinkedList<>();
-        // the members with exactly maxQuota partitions assigned
-        Queue<String> maxCapacityMembers = new LinkedList<>();
-        // the members with exactly minQuota partitions assigned
-        Queue<String> minCapacityMembers = new LinkedList<>();
 
         int numberOfConsumers = consumerToOwnedPartitions.size();
-        int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
-        int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
+        int totalPartitionsCount = sortedAllPartitions.size();
+
+        int minQuota = (int) Math.floor(((double) totalPartitionsCount) / 
numberOfConsumers);
+        int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / 
numberOfConsumers);
+        // the expected number of members with maxQuota assignment
+        int numExpectedMaxCapacityMembers = totalPartitionsCount % 
numberOfConsumers;
+        // the number of members with exactly maxQuota partitions assigned
+        int numMaxCapacityMembers = 0;
 
-        // initialize the assignment map with an empty array of size minQuota 
for all members
+        // initialize the assignment map with an empty array of size maxQuota 
for all members
         Map<String, List<TopicPartition>> assignment = new HashMap<>(
-            
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(minQuota))));
+            
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(maxQuota))));
 
+        List<TopicPartition> toBeRemovedPartitions = new ArrayList<>();
         // Reassign as many previously owned partitions as possible
         for (Map.Entry<String, List<TopicPartition>> consumerEntry : 
consumerToOwnedPartitions.entrySet()) {
             String consumer = consumerEntry.getKey();
             List<TopicPartition> ownedPartitions = consumerEntry.getValue();
 
             List<TopicPartition> consumerAssignment = assignment.get(consumer);
-            int i = 0;
-            // assign the first N partitions up to the max quota, and mark the 
remaining as being revoked
-            for (TopicPartition tp : ownedPartitions) {
-                if (i < maxQuota) {
-                    consumerAssignment.add(tp);
-                    unassignedPartitions.remove(tp);
-                } else {
-                    allRevokedPartitions.add(tp);
-                }
-                ++i;
-            }
 
             if (ownedPartitions.size() < minQuota) {
+                // the expected assignment size is more than consumer have 
now, so keep all the owned partitions
+                // and put this member into unfilled member list
+                if (ownedPartitions.size() > 0) {
+                    consumerAssignment.addAll(ownedPartitions);
+                    toBeRemovedPartitions.addAll(ownedPartitions);
+                }
                 unfilledMembers.add(consumer);
+            } else if (ownedPartitions.size() >= maxQuota && 
numMaxCapacityMembers++ <= numExpectedMaxCapacityMembers) {
+                // consumer owned the "maxQuota" of partitions or more, and we 
still under the number of expected max capacity members
+                // so keep "maxQuota" of the owned partitions, and revoke the 
rest of the partitions
+                consumerAssignment.addAll(ownedPartitions.subList(0, 
maxQuota));
+                toBeRemovedPartitions.addAll(ownedPartitions.subList(0, 
maxQuota));
+                allRevokedPartitions.addAll(ownedPartitions.subList(maxQuota, 
ownedPartitions.size()));
             } else {
-                // It's possible for a consumer to be at both min and max 
capacity if minQuota == maxQuota
-                if (consumerAssignment.size() == minQuota)
-                    minCapacityMembers.add(consumer);
-                if (consumerAssignment.size() == maxQuota)
-                    maxCapacityMembers.add(consumer);
+                // consumer owned the "minQuota" of partitions or more
+                // so keep "minQuota" of the owned partitions, and revoke the 
rest of the partitions
+                consumerAssignment.addAll(ownedPartitions.subList(0, 
minQuota));
+                toBeRemovedPartitions.addAll(ownedPartitions.subList(0, 
minQuota));

Review comment:
       Same as above, let's assign `ownedPartitions.subList(0, minQuota)` to a 
variable

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -163,127 +159,179 @@ private boolean allSubscriptionsEqual(Set<String> 
allTopics,
      */
     private Map<String, List<TopicPartition>> constrainedAssign(Map<String, 
Integer> partitionsPerTopic,
                                                                 Map<String, 
List<TopicPartition>> consumerToOwnedPartitions) {
-        SortedSet<TopicPartition> unassignedPartitions = 
getTopicPartitions(partitionsPerTopic);
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("performing constrained assign. 
partitionsPerTopic: %s, consumerToOwnedPartitions: %s",
+                partitionsPerTopic, consumerToOwnedPartitions));
+        }
+
+        List<TopicPartition> sortedAllPartitions = 
getTopicPartitions(partitionsPerTopic);
 
         Set<TopicPartition> allRevokedPartitions = new HashSet<>();
 
-        // Each consumer should end up in exactly one of the below
-        // the consumers not yet at capacity
+        // the consumers not yet at expected capacity
         List<String> unfilledMembers = new LinkedList<>();
-        // the members with exactly maxQuota partitions assigned
-        Queue<String> maxCapacityMembers = new LinkedList<>();
-        // the members with exactly minQuota partitions assigned
-        Queue<String> minCapacityMembers = new LinkedList<>();
 
         int numberOfConsumers = consumerToOwnedPartitions.size();
-        int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
-        int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
+        int totalPartitionsCount = sortedAllPartitions.size();
+
+        int minQuota = (int) Math.floor(((double) totalPartitionsCount) / 
numberOfConsumers);
+        int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / 
numberOfConsumers);
+        // the expected number of members with maxQuota assignment
+        int numExpectedMaxCapacityMembers = totalPartitionsCount % 
numberOfConsumers;
+        // the number of members with exactly maxQuota partitions assigned
+        int numMaxCapacityMembers = 0;
 
-        // initialize the assignment map with an empty array of size minQuota 
for all members
+        // initialize the assignment map with an empty array of size maxQuota 
for all members
         Map<String, List<TopicPartition>> assignment = new HashMap<>(
-            
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(minQuota))));
+            
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(maxQuota))));
 
+        List<TopicPartition> toBeRemovedPartitions = new ArrayList<>();
         // Reassign as many previously owned partitions as possible
         for (Map.Entry<String, List<TopicPartition>> consumerEntry : 
consumerToOwnedPartitions.entrySet()) {
             String consumer = consumerEntry.getKey();
             List<TopicPartition> ownedPartitions = consumerEntry.getValue();
 
             List<TopicPartition> consumerAssignment = assignment.get(consumer);
-            int i = 0;
-            // assign the first N partitions up to the max quota, and mark the 
remaining as being revoked
-            for (TopicPartition tp : ownedPartitions) {
-                if (i < maxQuota) {
-                    consumerAssignment.add(tp);
-                    unassignedPartitions.remove(tp);
-                } else {
-                    allRevokedPartitions.add(tp);
-                }
-                ++i;
-            }
 
             if (ownedPartitions.size() < minQuota) {
+                // the expected assignment size is more than consumer have 
now, so keep all the owned partitions
+                // and put this member into unfilled member list
+                if (ownedPartitions.size() > 0) {
+                    consumerAssignment.addAll(ownedPartitions);
+                    toBeRemovedPartitions.addAll(ownedPartitions);
+                }
                 unfilledMembers.add(consumer);
+            } else if (ownedPartitions.size() >= maxQuota && 
numMaxCapacityMembers++ <= numExpectedMaxCapacityMembers) {

Review comment:
       Also, should it be `numMaxCapacityMembers < 
numExpectedMaxCapacityMember`? If `numMaxCapacityMembers` is already equal to 
the expected number then that means we can't give this member a `maxQuota` 
number of partitions. I'm guessing this is a typo since you yourself say "we're 
still _under_ the number of expected max capacity members.." in the comment 
below. Or maybe you made it `<=` because of the `++`, but that's a 
post-increment which means the value is only incremented _after_ the variable 
is used in the comparison, so you'd only need `<=` to be correct if it was 
`++numMaxCapacityMembers` instead. This post/-pre-increment stuff is confusing, 
that's exactly why I suggested to not do this inline but just increment it in 
the body 🙂 

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -234,23 +245,28 @@ private boolean allSubscriptionsEqual(Set<String> 
allTopics,
                 List<TopicPartition> consumerAssignment = 
assignment.get(consumer);
 
                 int expectedAssignedCount = numMaxCapacityMembers < 
numExpectedMaxCapacityMembers ? maxQuota : minQuota;
+                int currentAssignedCount = consumerAssignment.size();
                 if (unassignedPartitionsIter.hasNext()) {
                     TopicPartition tp = unassignedPartitionsIter.next();
                     consumerAssignment.add(tp);
-                    unassignedPartitionsIter.remove();

Review comment:
       Well, `unassignedPartitions` used to be a TreeSet so removal should 
still be relatively fast (`O(logN)`). But I see you turned this into an 
ArrayList which certainly would be slow. That's a good observation though, 
since we have computed the exact capacity then we can know the remaining number 
of unfilled members exactly and thus we don't need to remove from this list. I 
think we can then simplify things even further, see my comment above

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -163,127 +159,179 @@ private boolean allSubscriptionsEqual(Set<String> 
allTopics,
      */
     private Map<String, List<TopicPartition>> constrainedAssign(Map<String, 
Integer> partitionsPerTopic,
                                                                 Map<String, 
List<TopicPartition>> consumerToOwnedPartitions) {
-        SortedSet<TopicPartition> unassignedPartitions = 
getTopicPartitions(partitionsPerTopic);
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("performing constrained assign. 
partitionsPerTopic: %s, consumerToOwnedPartitions: %s",
+                partitionsPerTopic, consumerToOwnedPartitions));
+        }
+
+        List<TopicPartition> sortedAllPartitions = 
getTopicPartitions(partitionsPerTopic);
 
         Set<TopicPartition> allRevokedPartitions = new HashSet<>();
 
-        // Each consumer should end up in exactly one of the below
-        // the consumers not yet at capacity
+        // the consumers not yet at expected capacity
         List<String> unfilledMembers = new LinkedList<>();
-        // the members with exactly maxQuota partitions assigned
-        Queue<String> maxCapacityMembers = new LinkedList<>();
-        // the members with exactly minQuota partitions assigned
-        Queue<String> minCapacityMembers = new LinkedList<>();
 
         int numberOfConsumers = consumerToOwnedPartitions.size();
-        int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
-        int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
+        int totalPartitionsCount = sortedAllPartitions.size();
+
+        int minQuota = (int) Math.floor(((double) totalPartitionsCount) / 
numberOfConsumers);
+        int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / 
numberOfConsumers);
+        // the expected number of members with maxQuota assignment
+        int numExpectedMaxCapacityMembers = totalPartitionsCount % 
numberOfConsumers;
+        // the number of members with exactly maxQuota partitions assigned
+        int numMaxCapacityMembers = 0;
 
-        // initialize the assignment map with an empty array of size minQuota 
for all members
+        // initialize the assignment map with an empty array of size maxQuota 
for all members
         Map<String, List<TopicPartition>> assignment = new HashMap<>(
-            
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(minQuota))));
+            
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(maxQuota))));
 
+        List<TopicPartition> toBeRemovedPartitions = new ArrayList<>();
         // Reassign as many previously owned partitions as possible
         for (Map.Entry<String, List<TopicPartition>> consumerEntry : 
consumerToOwnedPartitions.entrySet()) {
             String consumer = consumerEntry.getKey();
             List<TopicPartition> ownedPartitions = consumerEntry.getValue();
 
             List<TopicPartition> consumerAssignment = assignment.get(consumer);
-            int i = 0;
-            // assign the first N partitions up to the max quota, and mark the 
remaining as being revoked
-            for (TopicPartition tp : ownedPartitions) {
-                if (i < maxQuota) {
-                    consumerAssignment.add(tp);
-                    unassignedPartitions.remove(tp);
-                } else {
-                    allRevokedPartitions.add(tp);
-                }
-                ++i;
-            }
 
             if (ownedPartitions.size() < minQuota) {
+                // the expected assignment size is more than consumer have 
now, so keep all the owned partitions
+                // and put this member into unfilled member list
+                if (ownedPartitions.size() > 0) {
+                    consumerAssignment.addAll(ownedPartitions);
+                    toBeRemovedPartitions.addAll(ownedPartitions);
+                }
                 unfilledMembers.add(consumer);
+            } else if (ownedPartitions.size() >= maxQuota && 
numMaxCapacityMembers++ <= numExpectedMaxCapacityMembers) {
+                // consumer owned the "maxQuota" of partitions or more, and we 
still under the number of expected max capacity members
+                // so keep "maxQuota" of the owned partitions, and revoke the 
rest of the partitions
+                consumerAssignment.addAll(ownedPartitions.subList(0, 
maxQuota));
+                toBeRemovedPartitions.addAll(ownedPartitions.subList(0, 
maxQuota));
+                allRevokedPartitions.addAll(ownedPartitions.subList(maxQuota, 
ownedPartitions.size()));
             } else {
-                // It's possible for a consumer to be at both min and max 
capacity if minQuota == maxQuota
-                if (consumerAssignment.size() == minQuota)
-                    minCapacityMembers.add(consumer);
-                if (consumerAssignment.size() == maxQuota)
-                    maxCapacityMembers.add(consumer);
+                // consumer owned the "minQuota" of partitions or more
+                // so keep "minQuota" of the owned partitions, and revoke the 
rest of the partitions
+                consumerAssignment.addAll(ownedPartitions.subList(0, 
minQuota));
+                toBeRemovedPartitions.addAll(ownedPartitions.subList(0, 
minQuota));
+                allRevokedPartitions.addAll(ownedPartitions.subList(minQuota, 
ownedPartitions.size()));
             }
         }
 
+        List<TopicPartition> unassignedPartitions;
+        if (!toBeRemovedPartitions.isEmpty()) {
+            Collections.sort(toBeRemovedPartitions,
+                
Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition));
+            unassignedPartitions = 
getUnassignedPartitions(sortedAllPartitions, toBeRemovedPartitions);
+            sortedAllPartitions = null;
+        } else {
+            unassignedPartitions = sortedAllPartitions;
+        }
+        toBeRemovedPartitions = null;
+
+        if (log.isDebugEnabled()) {
+            log.debug(String.format(
+                "After reassigning previously owned partitions, unfilled 
members: %s, unassigned partitions: %s, " +
+                    "current assignment: %s", unfilledMembers, 
unassignedPartitions, assignment));
+        }
+
         Collections.sort(unfilledMembers);
         Iterator<TopicPartition> unassignedPartitionsIter = 
unassignedPartitions.iterator();
 
-        // Fill remaining members up to minQuota
+        // fill remaining members up to the expected numbers of maxQuota, 
otherwise, to minQuota
         while (!unfilledMembers.isEmpty() && !unassignedPartitions.isEmpty()) {

Review comment:
       Now that we're not removing from `unassignedPartitions` there's no 
reason to include it here. But I'd actually go a step further and change this 
to `for (TopicPartition unassignedPartition : unassignedPartitions)` -- that 
way we don't have to worry about breaking as we should only need to loop over 
to assign each of these partitions once, and we have one less Iterator to deal 
with
   
   Then we can check and log an error (or even throw an exception)  if there 
are still consumers in `unfilledMembers`  at the end, or of course if we run 
out of `unfilledMembers` before exiting the for loop

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -163,127 +159,179 @@ private boolean allSubscriptionsEqual(Set<String> 
allTopics,
      */
     private Map<String, List<TopicPartition>> constrainedAssign(Map<String, 
Integer> partitionsPerTopic,
                                                                 Map<String, 
List<TopicPartition>> consumerToOwnedPartitions) {
-        SortedSet<TopicPartition> unassignedPartitions = 
getTopicPartitions(partitionsPerTopic);
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("performing constrained assign. 
partitionsPerTopic: %s, consumerToOwnedPartitions: %s",
+                partitionsPerTopic, consumerToOwnedPartitions));
+        }
+
+        List<TopicPartition> sortedAllPartitions = 
getTopicPartitions(partitionsPerTopic);
 
         Set<TopicPartition> allRevokedPartitions = new HashSet<>();
 
-        // Each consumer should end up in exactly one of the below
-        // the consumers not yet at capacity
+        // the consumers not yet at expected capacity
         List<String> unfilledMembers = new LinkedList<>();
-        // the members with exactly maxQuota partitions assigned
-        Queue<String> maxCapacityMembers = new LinkedList<>();
-        // the members with exactly minQuota partitions assigned
-        Queue<String> minCapacityMembers = new LinkedList<>();
 
         int numberOfConsumers = consumerToOwnedPartitions.size();
-        int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
-        int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
+        int totalPartitionsCount = sortedAllPartitions.size();
+
+        int minQuota = (int) Math.floor(((double) totalPartitionsCount) / 
numberOfConsumers);
+        int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / 
numberOfConsumers);
+        // the expected number of members with maxQuota assignment
+        int numExpectedMaxCapacityMembers = totalPartitionsCount % 
numberOfConsumers;
+        // the number of members with exactly maxQuota partitions assigned
+        int numMaxCapacityMembers = 0;
 
-        // initialize the assignment map with an empty array of size minQuota 
for all members
+        // initialize the assignment map with an empty array of size maxQuota 
for all members
         Map<String, List<TopicPartition>> assignment = new HashMap<>(
-            
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(minQuota))));
+            
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(maxQuota))));
 
+        List<TopicPartition> toBeRemovedPartitions = new ArrayList<>();
         // Reassign as many previously owned partitions as possible
         for (Map.Entry<String, List<TopicPartition>> consumerEntry : 
consumerToOwnedPartitions.entrySet()) {
             String consumer = consumerEntry.getKey();
             List<TopicPartition> ownedPartitions = consumerEntry.getValue();
 
             List<TopicPartition> consumerAssignment = assignment.get(consumer);
-            int i = 0;
-            // assign the first N partitions up to the max quota, and mark the 
remaining as being revoked
-            for (TopicPartition tp : ownedPartitions) {
-                if (i < maxQuota) {
-                    consumerAssignment.add(tp);
-                    unassignedPartitions.remove(tp);
-                } else {
-                    allRevokedPartitions.add(tp);
-                }
-                ++i;
-            }
 
             if (ownedPartitions.size() < minQuota) {
+                // the expected assignment size is more than consumer have 
now, so keep all the owned partitions
+                // and put this member into unfilled member list
+                if (ownedPartitions.size() > 0) {
+                    consumerAssignment.addAll(ownedPartitions);
+                    toBeRemovedPartitions.addAll(ownedPartitions);
+                }
                 unfilledMembers.add(consumer);
+            } else if (ownedPartitions.size() >= maxQuota && 
numMaxCapacityMembers++ <= numExpectedMaxCapacityMembers) {
+                // consumer owned the "maxQuota" of partitions or more, and we 
still under the number of expected max capacity members
+                // so keep "maxQuota" of the owned partitions, and revoke the 
rest of the partitions
+                consumerAssignment.addAll(ownedPartitions.subList(0, 
maxQuota));
+                toBeRemovedPartitions.addAll(ownedPartitions.subList(0, 
maxQuota));

Review comment:
       I'm not sure how much of a difference this will make, but since we're 
using the `ownedPartitions.subList(0, maxQuota)` twice maybe we should assign 
it to a variable rather than compute it twice

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -163,127 +159,179 @@ private boolean allSubscriptionsEqual(Set<String> 
allTopics,
      */
     private Map<String, List<TopicPartition>> constrainedAssign(Map<String, 
Integer> partitionsPerTopic,
                                                                 Map<String, 
List<TopicPartition>> consumerToOwnedPartitions) {
-        SortedSet<TopicPartition> unassignedPartitions = 
getTopicPartitions(partitionsPerTopic);
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("performing constrained assign. 
partitionsPerTopic: %s, consumerToOwnedPartitions: %s",
+                partitionsPerTopic, consumerToOwnedPartitions));
+        }
+
+        List<TopicPartition> sortedAllPartitions = 
getTopicPartitions(partitionsPerTopic);
 
         Set<TopicPartition> allRevokedPartitions = new HashSet<>();
 
-        // Each consumer should end up in exactly one of the below
-        // the consumers not yet at capacity
+        // the consumers not yet at expected capacity
         List<String> unfilledMembers = new LinkedList<>();
-        // the members with exactly maxQuota partitions assigned
-        Queue<String> maxCapacityMembers = new LinkedList<>();
-        // the members with exactly minQuota partitions assigned
-        Queue<String> minCapacityMembers = new LinkedList<>();
 
         int numberOfConsumers = consumerToOwnedPartitions.size();
-        int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
-        int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
+        int totalPartitionsCount = sortedAllPartitions.size();
+
+        int minQuota = (int) Math.floor(((double) totalPartitionsCount) / 
numberOfConsumers);
+        int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / 
numberOfConsumers);
+        // the expected number of members with maxQuota assignment
+        int numExpectedMaxCapacityMembers = totalPartitionsCount % 
numberOfConsumers;
+        // the number of members with exactly maxQuota partitions assigned
+        int numMaxCapacityMembers = 0;
 
-        // initialize the assignment map with an empty array of size minQuota 
for all members
+        // initialize the assignment map with an empty array of size maxQuota 
for all members
         Map<String, List<TopicPartition>> assignment = new HashMap<>(
-            
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(minQuota))));
+            
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(maxQuota))));
 
+        List<TopicPartition> toBeRemovedPartitions = new ArrayList<>();
         // Reassign as many previously owned partitions as possible
         for (Map.Entry<String, List<TopicPartition>> consumerEntry : 
consumerToOwnedPartitions.entrySet()) {
             String consumer = consumerEntry.getKey();
             List<TopicPartition> ownedPartitions = consumerEntry.getValue();
 
             List<TopicPartition> consumerAssignment = assignment.get(consumer);
-            int i = 0;
-            // assign the first N partitions up to the max quota, and mark the 
remaining as being revoked
-            for (TopicPartition tp : ownedPartitions) {
-                if (i < maxQuota) {
-                    consumerAssignment.add(tp);
-                    unassignedPartitions.remove(tp);
-                } else {
-                    allRevokedPartitions.add(tp);
-                }
-                ++i;
-            }
 
             if (ownedPartitions.size() < minQuota) {
+                // the expected assignment size is more than consumer have 
now, so keep all the owned partitions
+                // and put this member into unfilled member list
+                if (ownedPartitions.size() > 0) {
+                    consumerAssignment.addAll(ownedPartitions);
+                    toBeRemovedPartitions.addAll(ownedPartitions);
+                }
                 unfilledMembers.add(consumer);
+            } else if (ownedPartitions.size() >= maxQuota && 
numMaxCapacityMembers++ <= numExpectedMaxCapacityMembers) {
+                // consumer owned the "maxQuota" of partitions or more, and we 
still under the number of expected max capacity members
+                // so keep "maxQuota" of the owned partitions, and revoke the 
rest of the partitions
+                consumerAssignment.addAll(ownedPartitions.subList(0, 
maxQuota));
+                toBeRemovedPartitions.addAll(ownedPartitions.subList(0, 
maxQuota));
+                allRevokedPartitions.addAll(ownedPartitions.subList(maxQuota, 
ownedPartitions.size()));
             } else {
-                // It's possible for a consumer to be at both min and max 
capacity if minQuota == maxQuota
-                if (consumerAssignment.size() == minQuota)
-                    minCapacityMembers.add(consumer);
-                if (consumerAssignment.size() == maxQuota)
-                    maxCapacityMembers.add(consumer);
+                // consumer owned the "minQuota" of partitions or more
+                // so keep "minQuota" of the owned partitions, and revoke the 
rest of the partitions
+                consumerAssignment.addAll(ownedPartitions.subList(0, 
minQuota));
+                toBeRemovedPartitions.addAll(ownedPartitions.subList(0, 
minQuota));
+                allRevokedPartitions.addAll(ownedPartitions.subList(minQuota, 
ownedPartitions.size()));
             }
         }
 
+        List<TopicPartition> unassignedPartitions;
+        if (!toBeRemovedPartitions.isEmpty()) {
+            Collections.sort(toBeRemovedPartitions,
+                
Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition));
+            unassignedPartitions = 
getUnassignedPartitions(sortedAllPartitions, toBeRemovedPartitions);
+            sortedAllPartitions = null;

Review comment:
       `sortedAllPartitions` can be cleared for both cases I think, like 
`toBeRemovedPartitions`? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to