squah-confluent commented on code in PR #17385:
URL: https://github.com/apache/kafka/pull/17385#discussion_r1799117092


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilder.java:
##########
@@ -157,716 +240,779 @@ public GroupAssignment build() {
 
         assignStickyPartitions();
 
-        unassignedPartitionsAssignment();
+        computeUnassignedPartitions(unassignedPartitions);

Review Comment:
   It's a half-done refactor from the previous implementation where 
unassignedPartitions was used in more than one place. I think it should be 
turned into a local variable here.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilder.java:
##########
@@ -157,716 +240,779 @@ public GroupAssignment build() {
 
         assignStickyPartitions();
 
-        unassignedPartitionsAssignment();
+        computeUnassignedPartitions(unassignedPartitions);
+        unassignedPartitionsAssignment(unassignedPartitions);
 
         balance();
 
         return new GroupAssignment(targetAssignment);
     }
 
     /**
-     * <li> TopicIdPartitions are sorted in descending order based on the 
value:
-     *       totalPartitions/number of subscribed members. </li>
-     * <li> If the above value is the same then topicIdPartitions are sorted in
-     *      ascending order of number of subscribers. </li>
-     * <li> If both criteria are the same, sort in ascending order of the 
partition Id.
-     *      This last criteria is for predictability of the assignments. </li>
+     * Sorts topics based on the following criteria:
+     * <ol>
+     *   <li>Topics are sorted in descending order of totalPartitions / number 
of subscribers
+     *       (partitions per subscriber).</li>
+     *   <li>Ties are broken by sorting in ascending order of number of 
subscribers.</li>
+     *   <li>Any remaining ties are broken by sorting in ascending order of 
topic id.</li>
+     * </ol>
+     *
+     * The last criteria is for predictability of assignments.
      *
-     * @param topicIdPartitions       The topic partitions that need to be 
sorted.
-     * @return A list of sorted topic partitions.
+     * @param topicIds          The topic ids that need to be sorted.
+     * @return A list of sorted topic ids.
      */
-    private List<TopicIdPartition> 
sortTopicIdPartitions(Collection<TopicIdPartition> topicIdPartitions) {
-        Comparator<TopicIdPartition> comparator = Comparator
-            .comparingDouble((TopicIdPartition topicIdPartition) -> {
-                int totalPartitions = 
subscribedTopicDescriber.numPartitions(topicIdPartition.topicId());
-                int totalSubscribers = 
membersPerTopic.get(topicIdPartition.topicId()).size();
-                return (double) totalPartitions / totalSubscribers;
-            })
-            .reversed()
-            .thenComparingInt(topicIdPartition -> 
membersPerTopic.get(topicIdPartition.topicId()).size())
-            .thenComparingInt(TopicIdPartition::partitionId);
+    private List<Uuid> sortTopicIds(Collection<Uuid> topicIds) {
+        Comparator<Uuid> comparator = new Comparator<Uuid>() {
+            @Override
+            public int compare(final Uuid topic1Id, final Uuid topic2Id) {
+                int topic1PartitionCount = 
subscribedTopicDescriber.numPartitions(topic1Id);
+                int topic2PartitionCount = 
subscribedTopicDescriber.numPartitions(topic2Id);
+                int topic1SubscriberCount = 
topicSubscribers.get(topic1Id).size();
+                int topic2SubscriberCount = 
topicSubscribers.get(topic2Id).size();
+
+                // Order by partitions per subscriber, descending.
+                int order = Double.compare(
+                    (double) topic2PartitionCount / topic2SubscriberCount,
+                    (double) topic1PartitionCount / topic1SubscriberCount
+                );
+
+                // Then order by subscriber count, ascending.
+                if (order == 0) {
+                    order = Integer.compare(topic1SubscriberCount, 
topic2SubscriberCount);
+                }
 
-        return topicIdPartitions.stream()
-            .sorted(comparator)
-            .collect(Collectors.toList());
+                // Then order by topic id, ascending.
+                if (order == 0) {
+                    order = topic1Id.compareTo(topic2Id);
+                }
+
+                return order;
+            }
+        };
+
+        List<Uuid> sortedTopicIds = new ArrayList<>(topicIds);
+        sortedTopicIds.sort(comparator);
+        return sortedTopicIds;
     }
 
     /**
-     * Gets a set of partitions that are to be retained from the existing 
assignment. This includes:
-     * <li> Partitions from topics that are still present in both the new 
subscriptions and the topic metadata. </li>
+     * Imports the existing assignment into the target assignment, taking into 
account topic
+     * unsubscriptions.
      */
     private void assignStickyPartitions() {
-        groupSpec.memberIds().forEach(memberId ->
-            
groupSpec.memberAssignment(memberId).partitions().forEach((topicId, 
currentAssignment) -> {
+        for (String memberId : groupSpec.memberIds()) {
+            int memberIndex = memberIndices.get(memberId);
+
+            Map<Uuid, Set<Integer>> oldAssignment = 
groupSpec.memberAssignment(memberId).partitions();
+            Map<Uuid, Set<Integer>> newAssignment = null;
+
+            for (Map.Entry<Uuid, Set<Integer>> entry : 
oldAssignment.entrySet()) {
+                Uuid topicId = entry.getKey();
+                Set<Integer> partitions = entry.getValue();
+
                 if 
(groupSpec.memberSubscription(memberId).subscribedTopicIds().contains(topicId)) 
{
-                    currentAssignment.forEach(partition -> {
-                        TopicIdPartition topicIdPartition = new 
TopicIdPartition(topicId, partition);
-                        
assignmentManager.addPartitionToTargetAssignment(topicIdPartition, memberId);
-                        assignedStickyPartitions.add(topicIdPartition);
-                    });
+                    memberTargetAssignmentSizes[memberIndex] += 
partitions.size();
+
+                    int numPartitions = 
subscribedTopicDescriber.numPartitions(topicId);
+                    for (int partition : partitions) {
+                        if (partition >= numPartitions) {
+                            LOG.warn(
+                                "Previous assignment for {} contains {}:{}, 
but topic only has {} partitions",
+                                memberId,
+                                topicId,
+                                partition,
+                                numPartitions
+                            );
+
+                            if (newAssignment == null) {
+                                // If the new assignment is null, we create a 
deep copy of the
+                                // original assignment so that we can alter it.
+                                newAssignment = deepCopy(oldAssignment);
+                            }
+                            Set<Integer> newPartitions = 
newAssignment.get(topicId);
+                            newPartitions.remove(partition);
+                            if (newPartitions.isEmpty()) {
+                                newAssignment.remove(topicId);
+                            }
+                            memberTargetAssignmentSizes[memberIndex]--;
+
+                            continue;
+                        }
+
+                        
previousAssignmentPartitionOwners.get(topicId)[partition] = memberIndex;
+                        
targetAssignmentPartitionOwners.get(topicId)[partition] = memberIndex;
+                    }
                 } else {
+                    // The member is no longer subscribed to the topic. Remove 
it from the
+                    // assignment.
                     LOG.debug("The topic " + topicId + " is no longer present 
in the subscribed topics list");
+
+                    if (newAssignment == null) {
+                        // If the new assignment is null, we create a deep 
copy of the
+                        // original assignment so that we can alter it.
+                        newAssignment = deepCopy(oldAssignment);
+                    }
+                    // Remove the entire topic.
+                    newAssignment.remove(topicId);
                 }
-            })
-        );
+            }
+
+            if (newAssignment == null) {
+                newAssignment = oldAssignment;
+            }
+            targetAssignment.put(memberId, new 
MemberAssignmentImpl(newAssignment));
+        }
     }
 
     /**
-     * Allocates the remaining unassigned partitions to members in a balanced 
manner.
-     * <li> Partitions are sorted to maximize the probability of a balanced 
assignment. </li>
-     * <li> Sort members in ascending order of their current target assignment 
sizes
-     *      to ensure the least filled member gets the partition first. </li>
+     * A class that holds the state for the balancing process.
+     * <p/>
+     * Keeps track of two ranges of members: the members with the least number 
of assigned
+     * partitions and the members with the most number of assigned partitions. 
Within each range,
+     * all members start with the same number of assigned partitions.
      */
-    private void unassignedPartitionsAssignment() {
-        List<TopicIdPartition> sortedPartitions = 
sortTopicIdPartitions(unassignedPartitions);
+    private final class MemberAssignmentBalancer {
+        private final int[] memberTargetAssignmentSizes;
 
-        for (TopicIdPartition partition : sortedPartitions) {
-            TreeSet<String> sortedMembers = 
assignmentManager.sortMembersByAssignmentSize(
-                membersPerTopic.get(partition.topicId())
-            );
+        /**
+         * The members to balance, sorted by number of assigned partitions.
+         * <p/>
+         * Can be visualized like so:
+         * <pre>
+         *           ^
+         *           |          #
+         * partition |        ###
+         * count     |        ###
+         *           | ##########
+         *           +----------->
+         *             members
+         * </pre>
+         */
+        private final List<Integer> sortedMembers;
 
-            for (String member : sortedMembers) {
-                if (assignmentManager.maybeAssignPartitionToMember(partition, 
member)) {
-                    break;
+        /**
+         * [0, leastLoadedRangeEnd) represents the range of members with the 
smallest partition
+         * count.
+         */
+        private int leastLoadedRangeEnd = 0; // exclusive
+
+        /**
+         * The partition count of each member in the least loaded range before 
we start assigning
+         * partitions to them.
+         */
+        private int leastLoadedRangePartitionCount = -1;
+
+        /**
+         * The index of the next element in sortedMembers to which to assign 
the next partition.
+         */
+        private int nextLeastLoadedMember = 0;
+
+        /**
+         * [mostLoadedRangeStart, mostLoadedRangeEnd) is the range of members 
with the largest
+         * partition count.
+         */
+        private int mostLoadedRangeStart = 0; // inclusive
+
+        /**
+         * [mostLoadedRangeStart, mostLoadedRangeEnd) is the range of members 
with the largest
+         * partition count.
+         * <p/>
+         * mostLoadedRangeEnd is not simply the end of the member list, since 
we need to be able to
+         * exclude heavily loaded members which do not have any partitions to 
give up.
+         */
+        private int mostLoadedRangeEnd = 0; // inclusive
+
+        /**
+         * The partition count of each member in the most loaded range before 
we start unassigning
+         * partitions from them.
+         */
+        private int mostLoadedRangePartitionCount = 1;
+
+        /**
+         * The index of the next element in sortedMembers from which to 
unassign the next partition.
+         */
+        private int nextMostLoadedMember = -1;
+
+        // The ranges can be visualized like so:
+        //
+        //                         most
+        //                         loaded
+        //           ^ least       range
+        //           | loaded               #
+        //           | range      [##<-   ]##
+        // partition |          ##[#######]##
+        // count     | [...->  ]##[#######]##
+        //           | [#######]##[#######]##
+        //           +----------------------->
+        //             members
+        //                                 ^^
+        //                                 These members have no partitions to 
give up.
+        //
+        // The least loaded range expands rightwards and the most loaded range 
expands leftwards.
+        // Note that we are not done once the ranges touch, but rather when 
the partition counts in
+        // the ranges differ by one or less.
+
+        public MemberAssignmentBalancer() {
+            this.memberTargetAssignmentSizes = 
UniformHeterogeneousAssignmentBuilder.this.memberTargetAssignmentSizes;
+            this.sortedMembers = new 
ArrayList<>(memberTargetAssignmentSizes.length);
+        }
+
+        /**
+         * Resets the state of the balancer for a new set of members.
+         *
+         * @param members The members to balance.
+         * @return The difference in partition counts between the most and 
least loaded members.
+         */
+        public int initialize(List<Integer> members) {
+            if (members.size() < 1) {
+                // nextLeastLoadedMember needs at least one member.
+                throw new IllegalArgumentException("Cannot balance an empty 
subscriber list.");
+            }
+
+            // Build a sorted list of subscribers, where members with the 
smallest partition count
+            // appear first.
+            sortedMembers.clear();
+            sortedMembers.addAll(members);
+            sortedMembers.sort(memberComparator);
+
+            leastLoadedRangeEnd = 0; // exclusive
+            leastLoadedRangePartitionCount = 
memberTargetAssignmentSizes[sortedMembers.get(0)] - 1;
+            nextLeastLoadedMember = 0;
+
+            mostLoadedRangeStart = sortedMembers.size(); // inclusive
+            mostLoadedRangeEnd = sortedMembers.size(); // exclusive
+            mostLoadedRangePartitionCount = 
memberTargetAssignmentSizes[sortedMembers.get(sortedMembers.size() - 1)] + 1;
+            nextMostLoadedMember = sortedMembers.size() - 1;
+
+            return 
memberTargetAssignmentSizes[sortedMembers.get(sortedMembers.size() - 1)] -
+                   memberTargetAssignmentSizes[sortedMembers.get(0)];
+        }
+
+        /**
+         * Gets the least loaded member to which to assign a partition.
+         * <p/>
+         * Advances the state of the balancer by assuming that an additional 
partition has been
+         * assigned to the returned member.
+         * <p/>
+         * Assumes that there is at least one member.
+         *
+         * @return The least loaded member to which to assign a partition
+         */
+        public int nextLeastLoadedMember() {
+            // To generate a balanced assignment, we assign partitions to 
members in the
+            // [0, leastLoadedRangeEnd) range first. Once each member in the 
range has received a
+            // partition, the partition count of the range rises by one and we 
can try to expand it.
+            //                                     Range full,             
Range full,
+            //                                     bump level by one       
bump level by one,
+            //                                     can't expand range.     
expand range.
+            //           ^                       ^                       ^
+            //           |            #          |            #          | 
[..->     ]#
+            // partition |          ###    ->    | [..->   ]###    ->    | 
[.......##]#
+            // count     | [..->   ]###          | [.......]###          | 
[.......##]#
+            //           | [#######]###          | [#######]###          | 
[#########]#
+            //           +------------->         +------------->         
+------------>
+            //             members                members                
members
+
+            if (nextLeastLoadedMember >= leastLoadedRangeEnd) {
+                // We've hit the end of the range. Bump its level and try to 
expand it.
+                leastLoadedRangePartitionCount++;
+
+                // Expand the range.
+                while (leastLoadedRangeEnd < sortedMembers.size() &&
+                    
memberTargetAssignmentSizes[sortedMembers.get(leastLoadedRangeEnd)] == 
leastLoadedRangePartitionCount) {
+                    leastLoadedRangeEnd++;
                 }
+
+                // Reset the pointer to the start of the range of members with 
identical
+                // partition counts.
+                nextLeastLoadedMember = 0;
             }
+
+            int leastLoadedMemberIndex = 
sortedMembers.get(nextLeastLoadedMember);
+            nextLeastLoadedMember++;
+
+            return leastLoadedMemberIndex;
         }
-    }
 
-    /**
-     * If a topic has two or more potential members it is subject to 
reassignment.
-     *
-     * @return true if the topic can participate in reassignment, false 
otherwise.
-     */
-    private boolean canTopicParticipateInReassignment(Uuid topicId) {
-        return membersPerTopic.get(topicId).size() >= 2;
+        /**
+         * Gets the most loaded member from which to reassign a partition.
+         * <p/>
+         * Advances the state of the balancer by assuming that a partition has 
been unassigned from
+         * the returned member.
+         *
+         * @return The most loaded member from which to reassign a partition, 
or -1 if no such
+         *         member exists.
+         */
+        public int nextMostLoadedMember() {
+            if (nextMostLoadedMember < mostLoadedRangeStart) {
+                if (mostLoadedRangeEnd <= mostLoadedRangeStart &&
+                    mostLoadedRangeStart > 0) {
+                    // The range is empty due to calls to 
excludeMostLoadedMember(). We risk not
+                    // expanding the range below and returning a member 
outside the range. Ensure
+                    // that we always expand the range below by resetting the 
partition count.
+                    mostLoadedRangePartitionCount = 
memberTargetAssignmentSizes[sortedMembers.get(mostLoadedRangeStart - 1)];
+                } else {
+                    mostLoadedRangePartitionCount--;
+                }
+
+                // Expand the range.
+                while (mostLoadedRangeStart > 0 &&
+                    
memberTargetAssignmentSizes[sortedMembers.get(mostLoadedRangeStart - 1)] == 
mostLoadedRangePartitionCount) {
+                    mostLoadedRangeStart--;
+                }
+
+                // Reset the pointer to the end of the range of members with 
identical partition
+                // counts.
+                nextMostLoadedMember = mostLoadedRangeEnd - 1;
+            }
+
+            if (nextMostLoadedMember < 0) {
+                // The range is empty due to excludeMostLoadedMember() calls 
and there are no more
+                // members that can give up partitions.
+                return -1;
+            }
+
+            int mostLoadedMemberIndex = 
sortedMembers.get(nextMostLoadedMember);
+            nextMostLoadedMember--;
+
+            return mostLoadedMemberIndex;
+        }
+
+        /**
+         * Excludes the last member returned from nextMostLoadedMember from 
the most loaded range.
+         *
+         * Must not be called if nextMostLoadedMember has not been called yet 
or returned -1.
+         */
+        public void excludeMostLoadedMember() {
+            // Kick the member out of the most loaded range by swapping with 
the member at the end
+            // of the range and contracting the end of the range.
+            int mostLoadedMemberIndex = sortedMembers.get(nextMostLoadedMember 
+ 1);
+            sortedMembers.set(nextMostLoadedMember + 1, 
sortedMembers.get(mostLoadedRangeEnd - 1));
+            sortedMembers.set(mostLoadedRangeEnd - 1, mostLoadedMemberIndex);
+            mostLoadedRangeEnd--;
+        }
+
+        /**
+         * Checks whether the members are balanced based on the partition 
counts of the least and
+         * most loaded ranges.
+         */
+        public boolean isBalanced() {
+            return mostLoadedRangePartitionCount - 
leastLoadedRangePartitionCount <= 1;
+        }
     }
 
     /**
-     * If a member is not assigned all its potential partitions it is subject 
to reassignment.
-     * If any of the partitions assigned to a member is subject to 
reassignment, the member itself
-     * is subject to reassignment.
+     * Allocates the remaining unassigned partitions to members in a balanced 
manner.
+     * <ol>
+     *   <li>Topics are sorted to maximize the probability of a balanced 
assignment.</li>
+     *   <li>Unassigned partitions within each topic are distributed to the 
least loaded
+     *       subscribers, repeatedly.</li>
+     * </ol>
      *
-     * @return true if the member can participate in reassignment, false 
otherwise.
+     * @param partitions        The partitions to be assigned.
      */
-    private boolean canMemberParticipateInReassignment(String memberId) {
-        Set<Uuid> assignedTopicIds = 
targetAssignment.get(memberId).partitions().keySet();
+    private void unassignedPartitionsAssignment(Map<Uuid, List<Integer>> 
partitions) {
+        List<Uuid> sortedTopicIds = sortTopicIds(partitions.keySet());
 
-        int currentAssignmentSize = 
assignmentManager.targetAssignmentSize(memberId);
-        int maxAssignmentSize = assignmentManager.maxAssignmentSize(memberId);
+        MemberAssignmentBalancer memberAssignmentBalancer = new 
MemberAssignmentBalancer();
 
-        if (currentAssignmentSize > maxAssignmentSize)
-            LOG.error("The member {} is assigned more partitions than the 
maximum possible.", memberId);
+        for (Uuid topicId : sortedTopicIds) {
+            memberAssignmentBalancer.initialize(topicSubscribers.get(topicId));
 
-        if (currentAssignmentSize < maxAssignmentSize)
-            return true;
+            for (int partition : partitions.get(topicId)) {
+                int leastLoadedMemberIndex = 
memberAssignmentBalancer.nextLeastLoadedMember();
 
-        for (Uuid topicId : assignedTopicIds) {
-            if (canTopicParticipateInReassignment(topicId))
-                return true;
+                assignPartition(topicId, partition, leastLoadedMemberIndex);
+            }
         }
-        return false;
     }
 
     /**
-     * Checks if the current assignments of partitions to members is balanced.
-     *
-     * Balance is determined by first checking if the difference in the number 
of partitions assigned
-     * to any two members is one. If this is not true, it verifies that no 
member can
-     * receive additional partitions without disrupting the balance.
+     * If a topic has two or more potential members it is subject to 
reassignment.
      *
-     * @return true if the assignment is balanced, false otherwise.
+     * @return true if the topic can participate in reassignment, false 
otherwise.
      */
-
-    private boolean isBalanced() {
-        int min = 
assignmentManager.targetAssignmentSize(sortedMembersByAssignmentSize.first());
-        int max = 
assignmentManager.targetAssignmentSize(sortedMembersByAssignmentSize.last());
-
-        // If minimum and maximum numbers of partitions assigned to members 
differ by at most one return true.
-        if (min >= max - 1)
-            return true;
-
-        // Ensure that members without a complete set of topic partitions 
cannot receive any additional partitions.
-        // This maintains balance. Start by checking members with the fewest 
assigned partitions to see if they can take more.
-        for (String member : sortedMembersByAssignmentSize) {
-            int memberPartitionCount = 
assignmentManager.targetAssignmentSize(member);
-
-            // Skip if this member already has all the topic partitions it can 
get.
-            int maxAssignmentSize = 
assignmentManager.maxAssignmentSize(member);
-            if (memberPartitionCount == maxAssignmentSize)
-                continue;
-
-            // Otherwise make sure it cannot get any more partitions.
-            for (Uuid topicId : 
groupSpec.memberSubscription(member).subscribedTopicIds()) {
-                Set<Integer> assignedPartitions = 
targetAssignment.get(member).partitions().get(topicId);
-                for (int i = 0; i < 
subscribedTopicDescriber.numPartitions(topicId); i++) {
-                    TopicIdPartition topicIdPartition = new 
TopicIdPartition(topicId, i);
-                    if (assignedPartitions == null || 
!assignedPartitions.contains(i)) {
-                        String otherMember = 
partitionOwnerInTargetAssignment.get(topicIdPartition);
-                        int otherMemberPartitionCount = 
assignmentManager.targetAssignmentSize(otherMember);
-                        if (memberPartitionCount + 1 < 
otherMemberPartitionCount) {
-                            LOG.debug("{} can be moved from member {} to 
member {} for a more balanced assignment.",
-                                topicIdPartition, otherMember, member);
-                            return false;
-                        }
-                    }
-                }
-            }
-        }
-        return true;
+    private boolean canTopicParticipateInReassignment(Uuid topicId) {
+        return topicSubscribers.get(topicId).size() >= 2;
     }
 
     /**
      * Balance the current assignment after the initial round of assignments 
have completed.
      */
     private void balance() {
-        if (!unassignedPartitions.isEmpty())
-            throw new PartitionAssignorException("Some partitions were left 
unassigned");
-        // Refill unassigned partitions with all the topicId partitions.
-        unassignedPartitions.addAll(topicIdPartitions(subscribedTopicIds, 
subscribedTopicDescriber));
+        Set<Uuid> topicIds = new HashSet<>(subscribedTopicIds);
 
-        // Narrow down the reassignment scope to only those partitions that 
can actually be reassigned.
-        Set<TopicIdPartition> fixedPartitions = new HashSet<>();
+        // Narrow down the reassignment scope to only those topics that can 
actually be reassigned.
         for (Uuid topicId : subscribedTopicIds) {
             if (!canTopicParticipateInReassignment(topicId)) {
-                for (int i = 0; i < 
subscribedTopicDescriber.numPartitions(topicId); i++) {
-                    fixedPartitions.add(new TopicIdPartition(topicId, i));
-                }
-            }
-        }
-        unassignedPartitions.removeAll(fixedPartitions);
-
-        // Narrow down the reassignment scope to only those members that are 
subject to reassignment.
-        for (String member : groupSpec.memberIds()) {
-            if (!canMemberParticipateInReassignment(member)) {
-                sortedMembersByAssignmentSize.remove(member);
+                topicIds.remove(topicId);
             }
         }
 
         // If all the partitions are fixed i.e. unassigned partitions is empty 
there is no point of re-balancing.
-        if (!unassignedPartitions.isEmpty()) performReassignments();
+        if (!topicIds.isEmpty()) balanceTopics(topicIds);
     }
 
     /**
      * Performs reassignments of partitions to balance the load across members.
      * This method iteratively reassigns partitions until no further moves can 
improve the balance.
+     * <p/>
+     * The method loops over the topics repeatedly until an entire loop around 
the topics has
+     * completed without any reassignments, or we hit an iteration limit.
+     * <p/>
+     * This method produces perfectly balanced assignments when all 
subscribers of a topic have the
+     * same subscriptions. However, when subscribers have overlapping, 
non-identical subscriptions,
+     * the method produces almost-balanced assignments, assuming the iteration 
limit is not hit.
+     * eg. if there are three members and two topics and members 1 and 2 are 
subscribed to the first
+     *     topic and members 2 and 3 are subscribed to the second topic, we 
can end up with an
+     *     assignment like:
+     * <ul>
+     *   <li>Member 1: 9 partitions</li>
+     *   <li>Member 2: 10 partitions</li>
+     *   <li>Member 3: 11 partitions</li>
+     * </ul>
      *
-     * The method uses a do-while loop to ensure at least one pass over the 
partitions and continues
-     * reassigning as long as there are modifications to the current 
assignments. It checks for balance
-     * after each reassignment and exits if the balance is achieved.
+     * In this assignment, the subscribers of the first topic have a 
difference in partitions of 1,
+     * so the topic is considered balanced. The same applies to the second 
topic. However, balance
+     * can be improved by moving a partition from the second topic from member 
3 to member 2 and a
+     * partition from the first topic from member 2 to member 1.
      *
-     * @throws PartitionAssignorException if there are inconsistencies in 
expected members per partition
-     *         or if a partition is expected to already be assigned but isn't.
+     * @param topicIds          The topics to consider for reassignment. These 
topics must have at
+     *                          least two subscribers.
      */
-    private void performReassignments() {
-        boolean modified;
-        boolean reassignmentOccurred;
-        // Repeat reassignment until no partition can be moved to improve the 
balance.
-        do {
-            modified = false;
-            reassignmentOccurred = false;
-            // Reassign all reassignable partitions sorted in descending order
-            // by totalPartitions/number of subscribed members,
-            // until the full list is processed or a balance is achieved.
-            List<TopicIdPartition> reassignablePartitions = 
sortTopicIdPartitions(unassignedPartitions);
-
-            for (TopicIdPartition reassignablePartition : 
reassignablePartitions) {
-                // Only check if there is any change in balance if any moves 
were made.
-                if (reassignmentOccurred && isBalanced()) {
+    private void balanceTopics(Collection<Uuid> topicIds) {
+        List<Uuid> sortedTopicIds = sortTopicIds(topicIds);
+        // The index of the last topic in sortedTopicIds that was rebalanced.
+        // Used to decide when to exit early.
+        int lastRebalanceTopicIndex = -1;
+
+        MemberAssignmentBalancer memberAssignmentBalancer = new 
MemberAssignmentBalancer();
+
+        // An array of partitions, with partitions owned by the same member 
grouped together.
+        List<Integer> partitions = new ArrayList<>();
+
+        // The ranges in the partitions list assigned to members.
+        // Maintaining these ranges allows for constant time, deterministic 
picking of partitions
+        // owned by a given member.
+        Map<Integer, Integer> startPartitionIndices = new HashMap<>();
+        Map<Integer, Integer> endPartitionIndices = new HashMap<>(); // 
exclusive
+
+        // Repeat reassignment until no partition can be moved to improve the 
balance or we hit an
+        // iteration limit.
+        for (int i = 0; i < 16; i++) {
+            for (int topicIndex = 0; topicIndex < sortedTopicIds.size(); 
topicIndex++) {
+                if (topicIndex == lastRebalanceTopicIndex) {
+                    // The last rebalanced topic was this one, which means 
we've gone through all
+                    // the topics and didn't perform any additional 
rebalancing. Don't bother trying
+                    // to rebalance this topic again and exit.
                     return;
                 }
-                reassignmentOccurred = false;
 
-                // The topicIdPartition must have at least two members.
-                if 
(membersPerTopic.get(reassignablePartition.topicId()).size() <= 1)
-                    throw new 
PartitionAssignorException(String.format("Expected more than one potential 
member for " +
-                        "topicIdPartition '%s'", reassignablePartition)
-                    );
+                Uuid topicId = sortedTopicIds.get(topicIndex);
 
-                // The topicIdPartition must have a current target owner.
-                String currentTargetOwner = 
partitionOwnerInTargetAssignment.get(reassignablePartition);
-                if (currentTargetOwner == null)
-                    throw new 
PartitionAssignorException(String.format("Expected topicIdPartition '%s' to be 
assigned " +
-                        "to a member", reassignablePartition)
-                    );
+                int reassignedPartitionCount = balanceTopic(
+                    topicId,
+                    memberAssignmentBalancer,
+                    partitions,
+                    startPartitionIndices,
+                    endPartitionIndices
+                );
 
-                for (String otherMember : 
membersPerTopic.get(reassignablePartition.topicId())) {
-                    if 
(assignmentManager.targetAssignmentSize(currentTargetOwner) > 
assignmentManager.targetAssignmentSize(otherMember) + 1) {
-                        reassignPartition(reassignablePartition);
-                        modified = true;
-                        reassignmentOccurred = true;
-                        break;
-                    }
+                if (reassignedPartitionCount > 0 ||
+                    lastRebalanceTopicIndex == -1) {
+                    lastRebalanceTopicIndex = topicIndex;
                 }
             }
-        } while (modified);
-    }
-
-    /**
-     * Reassigns a partition to an eligible member with the fewest current 
target assignments.
-     * <ul>
-     *   <li> Iterates over members sorted by ascending assignment size. </li>
-     *   <li> Selects the first member subscribed to the partition's topic. 
</li>
-     * </ul>
-     *
-     * @param partition         The partition to reassign.
-     * @throws AssertionError   If no subscribed member is found.
-     */
-    private void reassignPartition(TopicIdPartition partition) {
-        // Find the new member with the least assignment size.
-        String newOwner = null;
-        for (String anotherMember : sortedMembersByAssignmentSize) {
-            if 
(groupSpec.memberSubscription(anotherMember).subscribedTopicIds().contains(partition.topicId()))
 {
-                newOwner = anotherMember;
-                break;
-            }
-        }
-
-        if (newOwner == null) {
-            throw new PartitionAssignorException("No suitable new owner was 
found for the partition" + partition);
         }
-
-        reassignPartition(partition, newOwner);
     }
 
     /**
-     * Reassigns the given partition to a new member while considering 
partition movements and stickiness.
-     * <p>
-     * This method performs the following actions:
-     * <ol>
-     *   <li> Determines the current owner of the partition. </li>
-     *   <li> Identifies the correct partition to move, adhering to stickiness 
constraints. </li>
-     *   <li> Processes the partition movement to the new member. </li>
-     * </ol>
+     * Performs reassignments of a topic's partitions to balance the load 
across its subscribers.
      *
-     * @param partition     The {@link TopicIdPartition} to be reassigned.
-     * @param newMember     The Id of the member to which the partition should 
be reassigned.
+     * @param topicId                  The topic to consider for reassignment. 
The topic must have
+     *                                 at least two subscribers.
+     * @param memberAssignmentBalancer A MemberAssignmentBalancer to hold the 
state of the balancing
+     *                                 algorithm.
+     *                                 subscribers in ascending order of total 
assigned partitions.
+     * @param partitions               A list for the balancing algorithm to 
store the topic's
+     *                                 partitions, with partitions owned by 
the same member grouped
+     *                                 together.
+     * @param startPartitionIndices    A map for the balancing algorithm to 
store the ranges in the
+     *                                 partitions list assigned to members. 
These ranges allows for
+     *                                 constant time, deterministic picking of 
partitions owned by a
+     *                                 given member.
+     * @param endPartitionIndices      A map for the balancing algorithm to 
store the ranges in the
+     *                                 partitions list assigned to members. 
These ranges allows for
+     *                                 constant time, deterministic picking of 
partitions owned by a
+     *                                 given member.
+     * @return the number of partitions reassigned.
      */
-    private void reassignPartition(TopicIdPartition partition, String 
newMember) {
-        String member = partitionOwnerInTargetAssignment.get(partition);
-        // Find the correct partition movement considering the stickiness 
requirement.
-        TopicIdPartition partitionToBeMoved = 
partitionMovements.computeActualPartitionToBeMoved(
-            partition,
-            member,
-            newMember
-        );
-        processPartitionMovement(partitionToBeMoved, newMember);
-    }
-
-    private void processPartitionMovement(TopicIdPartition topicIdPartition, 
String newMember) {
-        String oldMember = 
partitionOwnerInTargetAssignment.get(topicIdPartition);
-
-        partitionMovements.movePartition(topicIdPartition, oldMember, 
newMember);
-
-        
assignmentManager.removePartitionFromTargetAssignment(topicIdPartition, 
oldMember);
-        assignmentManager.addPartitionToTargetAssignment(topicIdPartition, 
newMember);
-    }
-
-    /**
-     * Adds the topic's partition to the member's target assignment.
-     */
-    private static void addPartitionToAssignment(
-        Map<String, MemberAssignment> memberAssignments,
-        String memberId,
+    private int balanceTopic(
         Uuid topicId,
-        int partition
+        MemberAssignmentBalancer memberAssignmentBalancer,
+        List<Integer> partitions,
+        Map<Integer, Integer> startPartitionIndices,
+        Map<Integer, Integer> endPartitionIndices // exclusive
     ) {
-        memberAssignments.get(memberId)
-            .partitions()
-            .computeIfAbsent(topicId, __ -> new HashSet<>())
-            .add(partition);
-    }
+        int reassignedPartitionCount = 0;
 
-    /**
-     * Constructs a set of {@code TopicIdPartition} including all the given 
topic Ids based on their partition counts.
-     *
-     * @param topicIds                      Collection of topic Ids.
-     * @param subscribedTopicDescriber      Describer to fetch partition 
counts for topics.
-     *
-     * @return Set of {@code TopicIdPartition} including all the provided 
topic Ids.
-     */
-    private static Set<TopicIdPartition> topicIdPartitions(
-        Collection<Uuid> topicIds,
-        SubscribedTopicDescriber subscribedTopicDescriber
-    ) {
-        Set<TopicIdPartition> topicIdPartitions = new HashSet<>();
-        for (Uuid topicId : topicIds) {
-            int numPartitions = 
subscribedTopicDescriber.numPartitions(topicId);
-            for (int partitionId = 0; partitionId < numPartitions; 
partitionId++) {
-                topicIdPartitions.add(new TopicIdPartition(topicId, 
partitionId));
-            }
-        }
-        return topicIdPartitions;
-    }
+        int numPartitions = subscribedTopicDescriber.numPartitions(topicId);
 
-    /**
-     * This class represents a pair of member Ids involved in a partition 
reassignment.
-     * Each pair contains a source and a destination member Id.
-     * It normally corresponds to a particular partition or topic, and 
indicates that the particular partition or some
-     * partition of the particular topic was moved from the source member to 
the destination member during the rebalance.
-     */
-    private static class MemberPair {
-        private final String srcMemberId;
-        private final String dstMemberId;
+        int[] previousPartitionOwners = 
previousAssignmentPartitionOwners.get(topicId);
+        int[] partitionOwners = targetAssignmentPartitionOwners.get(topicId);
 
-        MemberPair(String srcMemberId, String dstMemberId) {
-            this.srcMemberId = srcMemberId;
-            this.dstMemberId = dstMemberId;
-        }
+        // Try to return partitions to their previous owners if it improves 
balance.
+        reassignedPartitionCount += balanceTopicRestoringStickiness(
+            topicId,
+            numPartitions,
+            previousPartitionOwners,
+            partitionOwners
+        );
 
-        @Override
-        public int hashCode() {
-            final int prime = 31;
-            int result = 1;
-            result = prime * result + ((this.srcMemberId == null) ? 0 : 
this.srcMemberId.hashCode());
-            result = prime * result + ((this.dstMemberId == null) ? 0 : 
this.dstMemberId.hashCode());
-            return result;
+        // Next, try to reassign partitions from the most loaded subscribers 
to the least loaded
+        // subscribers. We use a similar approach to 
unassignedPartitionsAssignment, except instead
+        // we are reassigning partitions from the most loaded subscribers to 
the least loaded
+        // subscribers. Once the difference in partition counts between the 
ranges is one or less,
+        // we are done, since reassigning partitions would not improve balance.
+        int imbalance = 
memberAssignmentBalancer.initialize(topicSubscribers.get(topicId));
+        if (imbalance <= 1) {
+            // The topic is already balanced.
+            return reassignedPartitionCount;
         }
 
-        @Override
-        public boolean equals(Object obj) {
-            if (obj == null)
-                return false;
-
-            if (!getClass().isInstance(obj))
-                return false;
-
-            MemberPair otherPair = (MemberPair) obj;
-            return this.srcMemberId.equals(otherPair.srcMemberId) && 
this.dstMemberId.equals(otherPair.dstMemberId);
+        // Initialize the array of partitions, with partitions owned by the 
same member grouped
+        // together.
+        partitions.clear();
+        for (int partition = 0; partition < numPartitions; partition++) {
+            partitions.add(partition);
         }
+        partitions.sort(
+            Comparator
+                .comparingInt((Integer partition) -> 
partitionOwners[partition])
+                .thenComparingInt(partition -> partition)
+        );
 
-        @Override
-        public String toString() {
-            return "MemberPair(" +
-                "srcMemberId='" + srcMemberId + '\'' +
-                ", dstMemberId='" + dstMemberId + '\'' +
-                ')';
-        }
-    }
+        // Initialize the ranges in the partitions list owned by members.
+        startPartitionIndices.clear();
+        endPartitionIndices.clear();
 
-    /**
-     * This class maintains some data structures to simplify lookup of 
partition movements among members.
-     * During a partition rebalance, it keeps track of partition movements 
corresponding to each topic,
-     * and also possible movement (in form a <code>MemberPair</code> object) 
for each partition.
-     */
-    private static class PartitionMovements {
-        private final Map<Uuid, Map<MemberPair, Set<TopicIdPartition>>> 
partitionMovementsByTopic = new HashMap<>();
-        private final Map<TopicIdPartition, MemberPair> 
partitionMovementsByPartition = new HashMap<>();
-
-        private MemberPair removeMovementRecordOfPartition(TopicIdPartition 
partition) {
-            MemberPair pair = partitionMovementsByPartition.remove(partition);
-
-            Uuid topic = partition.topicId();
-            Map<MemberPair, Set<TopicIdPartition>> 
partitionMovementsForThisTopic = partitionMovementsByTopic.get(topic);
-            partitionMovementsForThisTopic.get(pair).remove(partition);
-            if (partitionMovementsForThisTopic.get(pair).isEmpty())
-                partitionMovementsForThisTopic.remove(pair);
-            if (partitionMovementsByTopic.get(topic).isEmpty())
-                partitionMovementsByTopic.remove(topic);
-
-            return pair;
+        for (int i = 0; i < numPartitions; i++) {
+            int partition = partitions.get(i);
+            int ownerIndex = partitionOwners[partition];
+            if (startPartitionIndices.get(ownerIndex) == null) {
+                startPartitionIndices.put(ownerIndex, i);
+            }
+            endPartitionIndices.put(ownerIndex, i + 1); // endPartitionIndices 
is exclusive
         }
 
-        private void addPartitionMovementRecord(TopicIdPartition partition, 
MemberPair pair) {
-            partitionMovementsByPartition.put(partition, pair);
-
-            Uuid topic = partition.topicId();
-            if (!partitionMovementsByTopic.containsKey(topic))
-                partitionMovementsByTopic.put(topic, new HashMap<>());
+        // Redistribute partitions from the most loaded subscriber to the 
least loaded subscriber
+        // until the topic's subscribers are balanced.
+        while (!memberAssignmentBalancer.isBalanced()) {
+            // NB: The condition above does not do much. This loop will 
terminate due to the checks
+            //     inside instead.
 
-            Map<MemberPair, Set<TopicIdPartition>> 
partitionMovementsForThisTopic = partitionMovementsByTopic.get(topic);
-            if (!partitionMovementsForThisTopic.containsKey(pair))
-                partitionMovementsForThisTopic.put(pair, new HashSet<>());
+            // First, choose a member from the most loaded range to reassign a 
partition from.
 
-            partitionMovementsForThisTopic.get(pair).add(partition);
-        }
+            // Loop until we find a member that has partitions to give up.
+            int mostLoadedMemberIndex = -1;
+            while (true) {
+                mostLoadedMemberIndex = 
memberAssignmentBalancer.nextMostLoadedMember();
 
-        private void movePartition(TopicIdPartition partition, String 
oldOwner, String newOwner) {
-            MemberPair pair = new MemberPair(oldOwner, newOwner);
-
-            if (partitionMovementsByPartition.containsKey(partition)) {
-                // This partition was previously moved.
-                MemberPair existingPair = 
removeMovementRecordOfPartition(partition);
-                if (existingPair.dstMemberId.equals(oldOwner)) {
-                    throw new PartitionAssignorException("Mismatch in 
partition movement record with respect to " +
-                        "partition ownership during a rebalance"
-                    );
-                }
-                if (!existingPair.srcMemberId.equals(newOwner)) {
-                    // The partition is not moving back to its previous member.
-                    addPartitionMovementRecord(partition, new 
MemberPair(existingPair.srcMemberId, newOwner));
+                if (mostLoadedMemberIndex == -1) {
+                    // There are no more members with partitions to give up.
+                    // We need to break out of two loops here.
+                    break;
                 }
-            } else
-                addPartitionMovementRecord(partition, pair);
-        }
 
-        /**
-         * Computes the actual partition to be moved based on the current and 
proposed partition owners.
-         * This method determines the appropriate partition movement, 
considering existing partition movements
-         * and constraints within a topic.
-         *
-         * @param partition         The {@link TopicIdPartition} object 
representing the partition to be moved.
-         * @param oldOwner          The memberId of the current owner of the 
partition.
-         * @param newOwner          The memberId of the proposed new owner of 
the partition.
-         * @return The {@link TopicIdPartition} that should be moved, based on 
existing movement patterns
-         *         and ownership. Returns the original partition if no 
specific movement pattern applies.
-         * @throws PartitionAssignorException if the old owner does not match 
the expected value for the partition.
-         */
-        private TopicIdPartition computeActualPartitionToBeMoved(
-            TopicIdPartition partition,
-            String oldOwner,
-            String newOwner
-        ) {
-            Uuid topic = partition.topicId();
-
-            if (!partitionMovementsByTopic.containsKey(topic))
-                return partition;
-
-            if (partitionMovementsByPartition.containsKey(partition)) {
-                String expectedOldOwner = 
partitionMovementsByPartition.get(partition).dstMemberId;
-                if (!oldOwner.equals(expectedOldOwner)) {
-                    throw new PartitionAssignorException("Old owner does not 
match expected value for partition: " + partition);
+                if (!endPartitionIndices.containsKey(mostLoadedMemberIndex) ||
+                    endPartitionIndices.get(mostLoadedMemberIndex) - 
startPartitionIndices.get(mostLoadedMemberIndex) <= 0) {
+                    memberAssignmentBalancer.excludeMostLoadedMember();
+                    continue;
                 }
-                oldOwner = 
partitionMovementsByPartition.get(partition).srcMemberId;
-            }
-
-            Map<MemberPair, Set<TopicIdPartition>> 
partitionMovementsForThisTopic = partitionMovementsByTopic.get(topic);
-            MemberPair reversePair = new MemberPair(newOwner, oldOwner);
-            if (!partitionMovementsForThisTopic.containsKey(reversePair))
-                return partition;
-
-            return 
partitionMovementsForThisTopic.get(reversePair).iterator().next();
-        }
-    }
-
-    /**
-     * Manages assignments to members based on their current assignment size 
and maximum allowed assignment size.
-     */
-    private class AssignmentManager {
-        private final Map<String, MemberAssignmentData> 
membersWithAssignmentSizes = new HashMap<>();
 
-        /**
-         * Represents the assignment metadata for a member.
-         */
-        private class MemberAssignmentData {
-            final String memberId;
-            int currentAssignmentSize = 0;
-            int maxAssignmentSize;
-
-            /**
-             * Constructs a MemberAssignmentData with the given member Id.
-             *
-             * @param memberId The Id of the member.
-             */
-            MemberAssignmentData(String memberId) {
-                this.memberId = memberId;
+                break;
             }
 
-            @Override
-            public boolean equals(Object o) {
-                if (this == o) return true;
-                if (o == null || getClass() != o.getClass()) return false;
-                MemberAssignmentData that = (MemberAssignmentData) o;
-                return memberId.equals(that.memberId);
+            if (mostLoadedMemberIndex == -1) {
+                // There are no more members with partitions to give up.
+                break;
             }
 
-            @Override
-            public int hashCode() {
-                return Objects.hash(memberId);
-            }
+            // We've picked a member. Now pick its last partition to reassign.
+            int partition = 
partitions.get(endPartitionIndices.get(mostLoadedMemberIndex) - 1);
+            endPartitionIndices.put(mostLoadedMemberIndex, 
endPartitionIndices.get(mostLoadedMemberIndex) - 1);
 
-            @Override
-            public String toString() {
-                return "MemberAssignmentData(" +
-                    "memberId='" + memberId + '\'' +
-                    ", currentAssignmentSize=" + currentAssignmentSize +
-                    ", maxAssignmentSize=" + maxAssignmentSize +
-                    ')';
+            // Find the least loaded subscriber.
+            int leastLoadedMemberIndex = 
memberAssignmentBalancer.nextLeastLoadedMember();
+
+            // If the most and least loaded subscribers came from member 
ranges with partition
+            // counts that differ by one or less, then the members are 
balanced and we are done.
+            if (memberAssignmentBalancer.isBalanced()) {
+                // The topic is balanced.
+                break;
             }
-        }
 
-        /**
-         * Initializes an AssignmentManager, setting up the necessary data 
structures.
-         */
-        public AssignmentManager(
-            SubscribedTopicDescriber subscribedTopicDescriber
-        ) {
-            groupSpec.memberIds().forEach(memberId -> {
-                int maxSize = 
groupSpec.memberSubscription(memberId).subscribedTopicIds().stream()
-                    .mapToInt(subscribedTopicDescriber::numPartitions)
-                    .sum();
-
-                MemberAssignmentData memberAssignmentData = 
membersWithAssignmentSizes
-                    .computeIfAbsent(memberId, MemberAssignmentData::new);
-                memberAssignmentData.maxAssignmentSize = maxSize;
-                memberAssignmentData.currentAssignmentSize = 0;
-            });
+            // Reassign the partition.
+            assignPartition(topicId, partition, leastLoadedMemberIndex);
+            reassignedPartitionCount++;
         }
 
-        /**
-         * @param memberId       The member Id.
-         * @return The current assignment size for the given member.
-         */
-        private int targetAssignmentSize(String memberId) {
-            MemberAssignmentData memberData = 
this.membersWithAssignmentSizes.get(memberId);
-            if (memberData == null) {
-                LOG.warn("Member Id {} not found", memberId);
-                return 0;
-            }
-            return memberData.currentAssignmentSize;
-        }
+        return reassignedPartitionCount;
+    }
 
-        /**
-         * @param memberId      The member Id.
-         * @return The maximum assignment size for the given member.
-         */
-        private int maxAssignmentSize(String memberId) {
-            MemberAssignmentData memberData = 
this.membersWithAssignmentSizes.get(memberId);
-            if (memberData == null) {
-                LOG.warn("Member Id {} not found", memberId);
-                return 0;
+    /**
+     * Restores topic partitions to their previous owners if it improves 
balance.
+     *
+     * @param topicId                  The topic to rebalance.
+     * @param numPartitions            The number of partitions in the topic.
+     * @param previousPartitionOwners  The previous owners of the partitions.
+     * @param partitionOwners          The current owners of the partitions.
+     * @return the number of partitions reassigned.
+     */
+    private int balanceTopicRestoringStickiness(
+        Uuid topicId,
+        int numPartitions,
+        int[] previousPartitionOwners,
+        int[] partitionOwners
+    ) {
+        int reassignedPartitionCount = 0;
+
+        // Try to return partitions to their previous owners if it improves 
balance.
+        // Note that this is a best effort approach towards restoring 
stickiness and will not
+        // produce optimally sticky assignments.
+        for (int partition = 0; partition < numPartitions; partition++) {
+            int ownerIndex = partitionOwners[partition];
+            int previousOwnerIndex = previousPartitionOwners[partition];
+            if (previousOwnerIndex != -1 &&
+                previousOwnerIndex != ownerIndex &&
+                targetAssignmentSize(ownerIndex) > 
targetAssignmentSize(previousOwnerIndex)) {
+                assignPartition(topicId, partition, previousOwnerIndex);
+                reassignedPartitionCount++;
             }
-            return memberData.maxAssignmentSize;
         }
 
-        /**
-         * @param memberId      The member Id.
-         * @return If the given member is at maximum capacity.
-         */
-        private boolean isMemberAtMaxCapacity(String memberId) {
-            return targetAssignmentSize(memberId) >= 
maxAssignmentSize(memberId);
-        }
+        return reassignedPartitionCount;
+    }
 
-        /**
-         * @param memberId      The member Id.
-         * Increment the current target assignment size for the member.
-         */
-        private void incrementTargetAssignmentSize(String memberId) {
-            MemberAssignmentData memberData = 
this.membersWithAssignmentSizes.get(memberId);
-            if (memberData == null) {
-                LOG.warn("Member Id {} not found", memberId);
-                return;
-            }
-            memberData.currentAssignmentSize++;
-        }
+    /**
+     * Computes the set of unassigned partitions, based on 
targetAssignmentPartitionOwners.
+     *
+     * @param unassignedPartitions          The map in which to store the 
unassigned partitions.
+     */
+    private void computeUnassignedPartitions(Map<Uuid, List<Integer>> 
unassignedPartitions) {
+        unassignedPartitions.clear();

Review Comment:
   It's not strictly necessary. After turning `unassignedPartitions` into a 
local var it's no longer present.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilder.java:
##########
@@ -157,716 +240,779 @@ public GroupAssignment build() {
 
         assignStickyPartitions();
 
-        unassignedPartitionsAssignment();
+        computeUnassignedPartitions(unassignedPartitions);
+        unassignedPartitionsAssignment(unassignedPartitions);
 
         balance();
 
         return new GroupAssignment(targetAssignment);
     }
 
     /**
-     * <li> TopicIdPartitions are sorted in descending order based on the 
value:
-     *       totalPartitions/number of subscribed members. </li>
-     * <li> If the above value is the same then topicIdPartitions are sorted in
-     *      ascending order of number of subscribers. </li>
-     * <li> If both criteria are the same, sort in ascending order of the 
partition Id.
-     *      This last criteria is for predictability of the assignments. </li>
+     * Sorts topics based on the following criteria:
+     * <ol>
+     *   <li>Topics are sorted in descending order of totalPartitions / number 
of subscribers
+     *       (partitions per subscriber).</li>
+     *   <li>Ties are broken by sorting in ascending order of number of 
subscribers.</li>
+     *   <li>Any remaining ties are broken by sorting in ascending order of 
topic id.</li>
+     * </ol>
+     *
+     * The last criteria is for predictability of assignments.
      *
-     * @param topicIdPartitions       The topic partitions that need to be 
sorted.
-     * @return A list of sorted topic partitions.
+     * @param topicIds          The topic ids that need to be sorted.
+     * @return A list of sorted topic ids.
      */
-    private List<TopicIdPartition> 
sortTopicIdPartitions(Collection<TopicIdPartition> topicIdPartitions) {
-        Comparator<TopicIdPartition> comparator = Comparator
-            .comparingDouble((TopicIdPartition topicIdPartition) -> {
-                int totalPartitions = 
subscribedTopicDescriber.numPartitions(topicIdPartition.topicId());
-                int totalSubscribers = 
membersPerTopic.get(topicIdPartition.topicId()).size();
-                return (double) totalPartitions / totalSubscribers;
-            })
-            .reversed()
-            .thenComparingInt(topicIdPartition -> 
membersPerTopic.get(topicIdPartition.topicId()).size())
-            .thenComparingInt(TopicIdPartition::partitionId);
+    private List<Uuid> sortTopicIds(Collection<Uuid> topicIds) {
+        Comparator<Uuid> comparator = new Comparator<Uuid>() {
+            @Override
+            public int compare(final Uuid topic1Id, final Uuid topic2Id) {
+                int topic1PartitionCount = 
subscribedTopicDescriber.numPartitions(topic1Id);
+                int topic2PartitionCount = 
subscribedTopicDescriber.numPartitions(topic2Id);
+                int topic1SubscriberCount = 
topicSubscribers.get(topic1Id).size();
+                int topic2SubscriberCount = 
topicSubscribers.get(topic2Id).size();
+
+                // Order by partitions per subscriber, descending.
+                int order = Double.compare(
+                    (double) topic2PartitionCount / topic2SubscriberCount,
+                    (double) topic1PartitionCount / topic1SubscriberCount
+                );
+
+                // Then order by subscriber count, ascending.
+                if (order == 0) {
+                    order = Integer.compare(topic1SubscriberCount, 
topic2SubscriberCount);
+                }
 
-        return topicIdPartitions.stream()
-            .sorted(comparator)
-            .collect(Collectors.toList());
+                // Then order by topic id, ascending.
+                if (order == 0) {
+                    order = topic1Id.compareTo(topic2Id);
+                }
+
+                return order;
+            }
+        };
+
+        List<Uuid> sortedTopicIds = new ArrayList<>(topicIds);
+        sortedTopicIds.sort(comparator);
+        return sortedTopicIds;
     }
 
     /**
-     * Gets a set of partitions that are to be retained from the existing 
assignment. This includes:
-     * <li> Partitions from topics that are still present in both the new 
subscriptions and the topic metadata. </li>
+     * Imports the existing assignment into the target assignment, taking into 
account topic
+     * unsubscriptions.
      */
     private void assignStickyPartitions() {
-        groupSpec.memberIds().forEach(memberId ->
-            
groupSpec.memberAssignment(memberId).partitions().forEach((topicId, 
currentAssignment) -> {
+        for (String memberId : groupSpec.memberIds()) {
+            int memberIndex = memberIndices.get(memberId);
+
+            Map<Uuid, Set<Integer>> oldAssignment = 
groupSpec.memberAssignment(memberId).partitions();
+            Map<Uuid, Set<Integer>> newAssignment = null;
+
+            for (Map.Entry<Uuid, Set<Integer>> entry : 
oldAssignment.entrySet()) {
+                Uuid topicId = entry.getKey();
+                Set<Integer> partitions = entry.getValue();
+
                 if 
(groupSpec.memberSubscription(memberId).subscribedTopicIds().contains(topicId)) 
{
-                    currentAssignment.forEach(partition -> {
-                        TopicIdPartition topicIdPartition = new 
TopicIdPartition(topicId, partition);
-                        
assignmentManager.addPartitionToTargetAssignment(topicIdPartition, memberId);
-                        assignedStickyPartitions.add(topicIdPartition);
-                    });
+                    memberTargetAssignmentSizes[memberIndex] += 
partitions.size();
+
+                    int numPartitions = 
subscribedTopicDescriber.numPartitions(topicId);
+                    for (int partition : partitions) {
+                        if (partition >= numPartitions) {
+                            LOG.warn(
+                                "Previous assignment for {} contains {}:{}, 
but topic only has {} partitions",
+                                memberId,
+                                topicId,
+                                partition,
+                                numPartitions
+                            );
+
+                            if (newAssignment == null) {
+                                // If the new assignment is null, we create a 
deep copy of the
+                                // original assignment so that we can alter it.
+                                newAssignment = deepCopy(oldAssignment);
+                            }
+                            Set<Integer> newPartitions = 
newAssignment.get(topicId);
+                            newPartitions.remove(partition);
+                            if (newPartitions.isEmpty()) {
+                                newAssignment.remove(topicId);
+                            }
+                            memberTargetAssignmentSizes[memberIndex]--;
+
+                            continue;
+                        }
+
+                        
previousAssignmentPartitionOwners.get(topicId)[partition] = memberIndex;
+                        
targetAssignmentPartitionOwners.get(topicId)[partition] = memberIndex;
+                    }
                 } else {
+                    // The member is no longer subscribed to the topic. Remove 
it from the
+                    // assignment.
                     LOG.debug("The topic " + topicId + " is no longer present 
in the subscribed topics list");
+
+                    if (newAssignment == null) {
+                        // If the new assignment is null, we create a deep 
copy of the
+                        // original assignment so that we can alter it.
+                        newAssignment = deepCopy(oldAssignment);
+                    }
+                    // Remove the entire topic.
+                    newAssignment.remove(topicId);
                 }
-            })
-        );
+            }
+
+            if (newAssignment == null) {
+                newAssignment = oldAssignment;
+            }
+            targetAssignment.put(memberId, new 
MemberAssignmentImpl(newAssignment));
+        }
     }
 
     /**
-     * Allocates the remaining unassigned partitions to members in a balanced 
manner.
-     * <li> Partitions are sorted to maximize the probability of a balanced 
assignment. </li>
-     * <li> Sort members in ascending order of their current target assignment 
sizes
-     *      to ensure the least filled member gets the partition first. </li>
+     * A class that holds the state for the balancing process.
+     * <p/>
+     * Keeps track of two ranges of members: the members with the least number 
of assigned
+     * partitions and the members with the most number of assigned partitions. 
Within each range,
+     * all members start with the same number of assigned partitions.
      */
-    private void unassignedPartitionsAssignment() {
-        List<TopicIdPartition> sortedPartitions = 
sortTopicIdPartitions(unassignedPartitions);
+    private final class MemberAssignmentBalancer {
+        private final int[] memberTargetAssignmentSizes;
 
-        for (TopicIdPartition partition : sortedPartitions) {
-            TreeSet<String> sortedMembers = 
assignmentManager.sortMembersByAssignmentSize(
-                membersPerTopic.get(partition.topicId())
-            );
+        /**
+         * The members to balance, sorted by number of assigned partitions.
+         * <p/>
+         * Can be visualized like so:
+         * <pre>
+         *           ^
+         *           |          #
+         * partition |        ###
+         * count     |        ###
+         *           | ##########
+         *           +----------->
+         *             members
+         * </pre>
+         */
+        private final List<Integer> sortedMembers;
 
-            for (String member : sortedMembers) {
-                if (assignmentManager.maybeAssignPartitionToMember(partition, 
member)) {
-                    break;
+        /**
+         * [0, leastLoadedRangeEnd) represents the range of members with the 
smallest partition
+         * count.
+         */
+        private int leastLoadedRangeEnd = 0; // exclusive
+
+        /**
+         * The partition count of each member in the least loaded range before 
we start assigning
+         * partitions to them.
+         */
+        private int leastLoadedRangePartitionCount = -1;
+
+        /**
+         * The index of the next element in sortedMembers to which to assign 
the next partition.
+         */
+        private int nextLeastLoadedMember = 0;
+
+        /**
+         * [mostLoadedRangeStart, mostLoadedRangeEnd) is the range of members 
with the largest
+         * partition count.
+         */
+        private int mostLoadedRangeStart = 0; // inclusive
+
+        /**
+         * [mostLoadedRangeStart, mostLoadedRangeEnd) is the range of members 
with the largest
+         * partition count.
+         * <p/>
+         * mostLoadedRangeEnd is not simply the end of the member list, since 
we need to be able to
+         * exclude heavily loaded members which do not have any partitions to 
give up.
+         */
+        private int mostLoadedRangeEnd = 0; // inclusive
+
+        /**
+         * The partition count of each member in the most loaded range before 
we start unassigning
+         * partitions from them.
+         */
+        private int mostLoadedRangePartitionCount = 1;
+
+        /**
+         * The index of the next element in sortedMembers from which to 
unassign the next partition.
+         */
+        private int nextMostLoadedMember = -1;
+
+        // The ranges can be visualized like so:
+        //
+        //                         most
+        //                         loaded
+        //           ^ least       range
+        //           | loaded               #
+        //           | range      [##<-   ]##
+        // partition |          ##[#######]##
+        // count     | [...->  ]##[#######]##
+        //           | [#######]##[#######]##
+        //           +----------------------->
+        //             members
+        //                                 ^^
+        //                                 These members have no partitions to 
give up.
+        //
+        // The least loaded range expands rightwards and the most loaded range 
expands leftwards.
+        // Note that we are not done once the ranges touch, but rather when 
the partition counts in
+        // the ranges differ by one or less.
+
+        public MemberAssignmentBalancer() {
+            this.memberTargetAssignmentSizes = 
UniformHeterogeneousAssignmentBuilder.this.memberTargetAssignmentSizes;
+            this.sortedMembers = new 
ArrayList<>(memberTargetAssignmentSizes.length);
+        }
+
+        /**
+         * Resets the state of the balancer for a new set of members.
+         *
+         * @param members The members to balance.
+         * @return The difference in partition counts between the most and 
least loaded members.
+         */
+        public int initialize(List<Integer> members) {
+            if (members.size() < 1) {
+                // nextLeastLoadedMember needs at least one member.
+                throw new IllegalArgumentException("Cannot balance an empty 
subscriber list.");
+            }
+
+            // Build a sorted list of subscribers, where members with the 
smallest partition count
+            // appear first.
+            sortedMembers.clear();
+            sortedMembers.addAll(members);
+            sortedMembers.sort(memberComparator);
+
+            leastLoadedRangeEnd = 0; // exclusive
+            leastLoadedRangePartitionCount = 
memberTargetAssignmentSizes[sortedMembers.get(0)] - 1;
+            nextLeastLoadedMember = 0;
+
+            mostLoadedRangeStart = sortedMembers.size(); // inclusive
+            mostLoadedRangeEnd = sortedMembers.size(); // exclusive
+            mostLoadedRangePartitionCount = 
memberTargetAssignmentSizes[sortedMembers.get(sortedMembers.size() - 1)] + 1;
+            nextMostLoadedMember = sortedMembers.size() - 1;
+
+            return 
memberTargetAssignmentSizes[sortedMembers.get(sortedMembers.size() - 1)] -
+                   memberTargetAssignmentSizes[sortedMembers.get(0)];
+        }
+
+        /**
+         * Gets the least loaded member to which to assign a partition.
+         * <p/>
+         * Advances the state of the balancer by assuming that an additional 
partition has been
+         * assigned to the returned member.
+         * <p/>
+         * Assumes that there is at least one member.
+         *
+         * @return The least loaded member to which to assign a partition
+         */
+        public int nextLeastLoadedMember() {
+            // To generate a balanced assignment, we assign partitions to 
members in the
+            // [0, leastLoadedRangeEnd) range first. Once each member in the 
range has received a
+            // partition, the partition count of the range rises by one and we 
can try to expand it.
+            //                                     Range full,             
Range full,
+            //                                     bump level by one       
bump level by one,
+            //                                     can't expand range.     
expand range.
+            //           ^                       ^                       ^
+            //           |            #          |            #          | 
[..->     ]#
+            // partition |          ###    ->    | [..->   ]###    ->    | 
[.......##]#
+            // count     | [..->   ]###          | [.......]###          | 
[.......##]#
+            //           | [#######]###          | [#######]###          | 
[#########]#
+            //           +------------->         +------------->         
+------------>
+            //             members                members                
members
+
+            if (nextLeastLoadedMember >= leastLoadedRangeEnd) {
+                // We've hit the end of the range. Bump its level and try to 
expand it.
+                leastLoadedRangePartitionCount++;
+
+                // Expand the range.
+                while (leastLoadedRangeEnd < sortedMembers.size() &&
+                    
memberTargetAssignmentSizes[sortedMembers.get(leastLoadedRangeEnd)] == 
leastLoadedRangePartitionCount) {
+                    leastLoadedRangeEnd++;
                 }
+
+                // Reset the pointer to the start of the range of members with 
identical
+                // partition counts.
+                nextLeastLoadedMember = 0;
             }
+
+            int leastLoadedMemberIndex = 
sortedMembers.get(nextLeastLoadedMember);
+            nextLeastLoadedMember++;
+
+            return leastLoadedMemberIndex;
         }
-    }
 
-    /**
-     * If a topic has two or more potential members it is subject to 
reassignment.
-     *
-     * @return true if the topic can participate in reassignment, false 
otherwise.
-     */
-    private boolean canTopicParticipateInReassignment(Uuid topicId) {
-        return membersPerTopic.get(topicId).size() >= 2;
+        /**
+         * Gets the most loaded member from which to reassign a partition.
+         * <p/>
+         * Advances the state of the balancer by assuming that a partition has 
been unassigned from
+         * the returned member.
+         *
+         * @return The most loaded member from which to reassign a partition, 
or -1 if no such
+         *         member exists.
+         */
+        public int nextMostLoadedMember() {
+            if (nextMostLoadedMember < mostLoadedRangeStart) {
+                if (mostLoadedRangeEnd <= mostLoadedRangeStart &&
+                    mostLoadedRangeStart > 0) {
+                    // The range is empty due to calls to 
excludeMostLoadedMember(). We risk not
+                    // expanding the range below and returning a member 
outside the range. Ensure
+                    // that we always expand the range below by resetting the 
partition count.
+                    mostLoadedRangePartitionCount = 
memberTargetAssignmentSizes[sortedMembers.get(mostLoadedRangeStart - 1)];
+                } else {
+                    mostLoadedRangePartitionCount--;
+                }
+
+                // Expand the range.
+                while (mostLoadedRangeStart > 0 &&
+                    
memberTargetAssignmentSizes[sortedMembers.get(mostLoadedRangeStart - 1)] == 
mostLoadedRangePartitionCount) {
+                    mostLoadedRangeStart--;
+                }
+
+                // Reset the pointer to the end of the range of members with 
identical partition
+                // counts.
+                nextMostLoadedMember = mostLoadedRangeEnd - 1;
+            }
+
+            if (nextMostLoadedMember < 0) {
+                // The range is empty due to excludeMostLoadedMember() calls 
and there are no more
+                // members that can give up partitions.
+                return -1;
+            }
+
+            int mostLoadedMemberIndex = 
sortedMembers.get(nextMostLoadedMember);
+            nextMostLoadedMember--;
+
+            return mostLoadedMemberIndex;
+        }
+
+        /**
+         * Excludes the last member returned from nextMostLoadedMember from 
the most loaded range.
+         *
+         * Must not be called if nextMostLoadedMember has not been called yet 
or returned -1.
+         */
+        public void excludeMostLoadedMember() {
+            // Kick the member out of the most loaded range by swapping with 
the member at the end
+            // of the range and contracting the end of the range.
+            int mostLoadedMemberIndex = sortedMembers.get(nextMostLoadedMember 
+ 1);
+            sortedMembers.set(nextMostLoadedMember + 1, 
sortedMembers.get(mostLoadedRangeEnd - 1));
+            sortedMembers.set(mostLoadedRangeEnd - 1, mostLoadedMemberIndex);
+            mostLoadedRangeEnd--;
+        }
+
+        /**
+         * Checks whether the members are balanced based on the partition 
counts of the least and
+         * most loaded ranges.
+         */
+        public boolean isBalanced() {
+            return mostLoadedRangePartitionCount - 
leastLoadedRangePartitionCount <= 1;
+        }
     }
 
     /**
-     * If a member is not assigned all its potential partitions it is subject 
to reassignment.
-     * If any of the partitions assigned to a member is subject to 
reassignment, the member itself
-     * is subject to reassignment.
+     * Allocates the remaining unassigned partitions to members in a balanced 
manner.
+     * <ol>
+     *   <li>Topics are sorted to maximize the probability of a balanced 
assignment.</li>
+     *   <li>Unassigned partitions within each topic are distributed to the 
least loaded
+     *       subscribers, repeatedly.</li>
+     * </ol>
      *
-     * @return true if the member can participate in reassignment, false 
otherwise.
+     * @param partitions        The partitions to be assigned.
      */
-    private boolean canMemberParticipateInReassignment(String memberId) {
-        Set<Uuid> assignedTopicIds = 
targetAssignment.get(memberId).partitions().keySet();
+    private void unassignedPartitionsAssignment(Map<Uuid, List<Integer>> 
partitions) {

Review Comment:
   Renamed it to assignRemainingPartitions for consistency with the homogeneous 
assignor.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilder.java:
##########
@@ -157,716 +240,779 @@ public GroupAssignment build() {
 
         assignStickyPartitions();
 
-        unassignedPartitionsAssignment();
+        computeUnassignedPartitions(unassignedPartitions);
+        unassignedPartitionsAssignment(unassignedPartitions);
 
         balance();
 
         return new GroupAssignment(targetAssignment);
     }
 
     /**
-     * <li> TopicIdPartitions are sorted in descending order based on the 
value:
-     *       totalPartitions/number of subscribed members. </li>
-     * <li> If the above value is the same then topicIdPartitions are sorted in
-     *      ascending order of number of subscribers. </li>
-     * <li> If both criteria are the same, sort in ascending order of the 
partition Id.
-     *      This last criteria is for predictability of the assignments. </li>
+     * Sorts topics based on the following criteria:
+     * <ol>
+     *   <li>Topics are sorted in descending order of totalPartitions / number 
of subscribers
+     *       (partitions per subscriber).</li>
+     *   <li>Ties are broken by sorting in ascending order of number of 
subscribers.</li>
+     *   <li>Any remaining ties are broken by sorting in ascending order of 
topic id.</li>
+     * </ol>
+     *
+     * The last criteria is for predictability of assignments.
      *
-     * @param topicIdPartitions       The topic partitions that need to be 
sorted.
-     * @return A list of sorted topic partitions.
+     * @param topicIds          The topic ids that need to be sorted.
+     * @return A list of sorted topic ids.
      */
-    private List<TopicIdPartition> 
sortTopicIdPartitions(Collection<TopicIdPartition> topicIdPartitions) {
-        Comparator<TopicIdPartition> comparator = Comparator
-            .comparingDouble((TopicIdPartition topicIdPartition) -> {
-                int totalPartitions = 
subscribedTopicDescriber.numPartitions(topicIdPartition.topicId());
-                int totalSubscribers = 
membersPerTopic.get(topicIdPartition.topicId()).size();
-                return (double) totalPartitions / totalSubscribers;
-            })
-            .reversed()
-            .thenComparingInt(topicIdPartition -> 
membersPerTopic.get(topicIdPartition.topicId()).size())
-            .thenComparingInt(TopicIdPartition::partitionId);
+    private List<Uuid> sortTopicIds(Collection<Uuid> topicIds) {
+        Comparator<Uuid> comparator = new Comparator<Uuid>() {
+            @Override
+            public int compare(final Uuid topic1Id, final Uuid topic2Id) {
+                int topic1PartitionCount = 
subscribedTopicDescriber.numPartitions(topic1Id);
+                int topic2PartitionCount = 
subscribedTopicDescriber.numPartitions(topic2Id);
+                int topic1SubscriberCount = 
topicSubscribers.get(topic1Id).size();
+                int topic2SubscriberCount = 
topicSubscribers.get(topic2Id).size();
+
+                // Order by partitions per subscriber, descending.
+                int order = Double.compare(
+                    (double) topic2PartitionCount / topic2SubscriberCount,
+                    (double) topic1PartitionCount / topic1SubscriberCount
+                );
+
+                // Then order by subscriber count, ascending.
+                if (order == 0) {
+                    order = Integer.compare(topic1SubscriberCount, 
topic2SubscriberCount);
+                }
 
-        return topicIdPartitions.stream()
-            .sorted(comparator)
-            .collect(Collectors.toList());
+                // Then order by topic id, ascending.
+                if (order == 0) {
+                    order = topic1Id.compareTo(topic2Id);
+                }
+
+                return order;
+            }
+        };
+
+        List<Uuid> sortedTopicIds = new ArrayList<>(topicIds);
+        sortedTopicIds.sort(comparator);
+        return sortedTopicIds;
     }
 
     /**
-     * Gets a set of partitions that are to be retained from the existing 
assignment. This includes:
-     * <li> Partitions from topics that are still present in both the new 
subscriptions and the topic metadata. </li>
+     * Imports the existing assignment into the target assignment, taking into 
account topic
+     * unsubscriptions.
      */
     private void assignStickyPartitions() {
-        groupSpec.memberIds().forEach(memberId ->
-            
groupSpec.memberAssignment(memberId).partitions().forEach((topicId, 
currentAssignment) -> {
+        for (String memberId : groupSpec.memberIds()) {
+            int memberIndex = memberIndices.get(memberId);
+
+            Map<Uuid, Set<Integer>> oldAssignment = 
groupSpec.memberAssignment(memberId).partitions();
+            Map<Uuid, Set<Integer>> newAssignment = null;
+
+            for (Map.Entry<Uuid, Set<Integer>> entry : 
oldAssignment.entrySet()) {
+                Uuid topicId = entry.getKey();
+                Set<Integer> partitions = entry.getValue();
+
                 if 
(groupSpec.memberSubscription(memberId).subscribedTopicIds().contains(topicId)) 
{
-                    currentAssignment.forEach(partition -> {
-                        TopicIdPartition topicIdPartition = new 
TopicIdPartition(topicId, partition);
-                        
assignmentManager.addPartitionToTargetAssignment(topicIdPartition, memberId);
-                        assignedStickyPartitions.add(topicIdPartition);
-                    });
+                    memberTargetAssignmentSizes[memberIndex] += 
partitions.size();
+
+                    int numPartitions = 
subscribedTopicDescriber.numPartitions(topicId);
+                    for (int partition : partitions) {
+                        if (partition >= numPartitions) {
+                            LOG.warn(
+                                "Previous assignment for {} contains {}:{}, 
but topic only has {} partitions",
+                                memberId,
+                                topicId,
+                                partition,
+                                numPartitions
+                            );
+
+                            if (newAssignment == null) {
+                                // If the new assignment is null, we create a 
deep copy of the
+                                // original assignment so that we can alter it.
+                                newAssignment = deepCopy(oldAssignment);
+                            }
+                            Set<Integer> newPartitions = 
newAssignment.get(topicId);
+                            newPartitions.remove(partition);
+                            if (newPartitions.isEmpty()) {
+                                newAssignment.remove(topicId);
+                            }
+                            memberTargetAssignmentSizes[memberIndex]--;
+
+                            continue;
+                        }
+
+                        
previousAssignmentPartitionOwners.get(topicId)[partition] = memberIndex;
+                        
targetAssignmentPartitionOwners.get(topicId)[partition] = memberIndex;
+                    }
                 } else {
+                    // The member is no longer subscribed to the topic. Remove 
it from the
+                    // assignment.
                     LOG.debug("The topic " + topicId + " is no longer present 
in the subscribed topics list");
+
+                    if (newAssignment == null) {
+                        // If the new assignment is null, we create a deep 
copy of the
+                        // original assignment so that we can alter it.
+                        newAssignment = deepCopy(oldAssignment);
+                    }
+                    // Remove the entire topic.
+                    newAssignment.remove(topicId);
                 }
-            })
-        );
+            }
+
+            if (newAssignment == null) {
+                newAssignment = oldAssignment;
+            }
+            targetAssignment.put(memberId, new 
MemberAssignmentImpl(newAssignment));
+        }
     }
 
     /**
-     * Allocates the remaining unassigned partitions to members in a balanced 
manner.
-     * <li> Partitions are sorted to maximize the probability of a balanced 
assignment. </li>
-     * <li> Sort members in ascending order of their current target assignment 
sizes
-     *      to ensure the least filled member gets the partition first. </li>
+     * A class that holds the state for the balancing process.
+     * <p/>
+     * Keeps track of two ranges of members: the members with the least number 
of assigned
+     * partitions and the members with the most number of assigned partitions. 
Within each range,
+     * all members start with the same number of assigned partitions.
      */
-    private void unassignedPartitionsAssignment() {
-        List<TopicIdPartition> sortedPartitions = 
sortTopicIdPartitions(unassignedPartitions);
+    private final class MemberAssignmentBalancer {
+        private final int[] memberTargetAssignmentSizes;
 
-        for (TopicIdPartition partition : sortedPartitions) {
-            TreeSet<String> sortedMembers = 
assignmentManager.sortMembersByAssignmentSize(
-                membersPerTopic.get(partition.topicId())
-            );
+        /**
+         * The members to balance, sorted by number of assigned partitions.
+         * <p/>
+         * Can be visualized like so:
+         * <pre>
+         *           ^
+         *           |          #
+         * partition |        ###
+         * count     |        ###
+         *           | ##########
+         *           +----------->
+         *             members
+         * </pre>
+         */
+        private final List<Integer> sortedMembers;
 
-            for (String member : sortedMembers) {
-                if (assignmentManager.maybeAssignPartitionToMember(partition, 
member)) {
-                    break;
+        /**
+         * [0, leastLoadedRangeEnd) represents the range of members with the 
smallest partition
+         * count.
+         */
+        private int leastLoadedRangeEnd = 0; // exclusive
+
+        /**
+         * The partition count of each member in the least loaded range before 
we start assigning
+         * partitions to them.
+         */
+        private int leastLoadedRangePartitionCount = -1;
+
+        /**
+         * The index of the next element in sortedMembers to which to assign 
the next partition.
+         */
+        private int nextLeastLoadedMember = 0;
+
+        /**
+         * [mostLoadedRangeStart, mostLoadedRangeEnd) is the range of members 
with the largest
+         * partition count.
+         */
+        private int mostLoadedRangeStart = 0; // inclusive
+
+        /**
+         * [mostLoadedRangeStart, mostLoadedRangeEnd) is the range of members 
with the largest
+         * partition count.
+         * <p/>
+         * mostLoadedRangeEnd is not simply the end of the member list, since 
we need to be able to
+         * exclude heavily loaded members which do not have any partitions to 
give up.
+         */
+        private int mostLoadedRangeEnd = 0; // inclusive
+
+        /**
+         * The partition count of each member in the most loaded range before 
we start unassigning
+         * partitions from them.
+         */
+        private int mostLoadedRangePartitionCount = 1;
+
+        /**
+         * The index of the next element in sortedMembers from which to 
unassign the next partition.
+         */
+        private int nextMostLoadedMember = -1;
+
+        // The ranges can be visualized like so:
+        //
+        //                         most
+        //                         loaded
+        //           ^ least       range
+        //           | loaded               #
+        //           | range      [##<-   ]##
+        // partition |          ##[#######]##
+        // count     | [...->  ]##[#######]##
+        //           | [#######]##[#######]##
+        //           +----------------------->
+        //             members
+        //                                 ^^
+        //                                 These members have no partitions to 
give up.
+        //
+        // The least loaded range expands rightwards and the most loaded range 
expands leftwards.
+        // Note that we are not done once the ranges touch, but rather when 
the partition counts in
+        // the ranges differ by one or less.
+
+        public MemberAssignmentBalancer() {
+            this.memberTargetAssignmentSizes = 
UniformHeterogeneousAssignmentBuilder.this.memberTargetAssignmentSizes;
+            this.sortedMembers = new 
ArrayList<>(memberTargetAssignmentSizes.length);
+        }
+
+        /**
+         * Resets the state of the balancer for a new set of members.
+         *
+         * @param members The members to balance.
+         * @return The difference in partition counts between the most and 
least loaded members.
+         */
+        public int initialize(List<Integer> members) {
+            if (members.size() < 1) {
+                // nextLeastLoadedMember needs at least one member.
+                throw new IllegalArgumentException("Cannot balance an empty 
subscriber list.");
+            }
+
+            // Build a sorted list of subscribers, where members with the 
smallest partition count
+            // appear first.
+            sortedMembers.clear();
+            sortedMembers.addAll(members);
+            sortedMembers.sort(memberComparator);
+
+            leastLoadedRangeEnd = 0; // exclusive
+            leastLoadedRangePartitionCount = 
memberTargetAssignmentSizes[sortedMembers.get(0)] - 1;
+            nextLeastLoadedMember = 0;
+
+            mostLoadedRangeStart = sortedMembers.size(); // inclusive
+            mostLoadedRangeEnd = sortedMembers.size(); // exclusive
+            mostLoadedRangePartitionCount = 
memberTargetAssignmentSizes[sortedMembers.get(sortedMembers.size() - 1)] + 1;
+            nextMostLoadedMember = sortedMembers.size() - 1;
+
+            return 
memberTargetAssignmentSizes[sortedMembers.get(sortedMembers.size() - 1)] -
+                   memberTargetAssignmentSizes[sortedMembers.get(0)];
+        }
+
+        /**
+         * Gets the least loaded member to which to assign a partition.
+         * <p/>
+         * Advances the state of the balancer by assuming that an additional 
partition has been
+         * assigned to the returned member.
+         * <p/>
+         * Assumes that there is at least one member.
+         *
+         * @return The least loaded member to which to assign a partition
+         */
+        public int nextLeastLoadedMember() {
+            // To generate a balanced assignment, we assign partitions to 
members in the
+            // [0, leastLoadedRangeEnd) range first. Once each member in the 
range has received a
+            // partition, the partition count of the range rises by one and we 
can try to expand it.
+            //                                     Range full,             
Range full,
+            //                                     bump level by one       
bump level by one,
+            //                                     can't expand range.     
expand range.
+            //           ^                       ^                       ^
+            //           |            #          |            #          | 
[..->     ]#
+            // partition |          ###    ->    | [..->   ]###    ->    | 
[.......##]#
+            // count     | [..->   ]###          | [.......]###          | 
[.......##]#
+            //           | [#######]###          | [#######]###          | 
[#########]#
+            //           +------------->         +------------->         
+------------>
+            //             members                members                
members
+
+            if (nextLeastLoadedMember >= leastLoadedRangeEnd) {
+                // We've hit the end of the range. Bump its level and try to 
expand it.
+                leastLoadedRangePartitionCount++;
+
+                // Expand the range.
+                while (leastLoadedRangeEnd < sortedMembers.size() &&
+                    
memberTargetAssignmentSizes[sortedMembers.get(leastLoadedRangeEnd)] == 
leastLoadedRangePartitionCount) {
+                    leastLoadedRangeEnd++;
                 }
+
+                // Reset the pointer to the start of the range of members with 
identical
+                // partition counts.
+                nextLeastLoadedMember = 0;
             }
+
+            int leastLoadedMemberIndex = 
sortedMembers.get(nextLeastLoadedMember);
+            nextLeastLoadedMember++;
+
+            return leastLoadedMemberIndex;
         }
-    }
 
-    /**
-     * If a topic has two or more potential members it is subject to 
reassignment.
-     *
-     * @return true if the topic can participate in reassignment, false 
otherwise.
-     */
-    private boolean canTopicParticipateInReassignment(Uuid topicId) {
-        return membersPerTopic.get(topicId).size() >= 2;
+        /**
+         * Gets the most loaded member from which to reassign a partition.
+         * <p/>
+         * Advances the state of the balancer by assuming that a partition has 
been unassigned from
+         * the returned member.
+         *
+         * @return The most loaded member from which to reassign a partition, 
or -1 if no such
+         *         member exists.
+         */
+        public int nextMostLoadedMember() {
+            if (nextMostLoadedMember < mostLoadedRangeStart) {
+                if (mostLoadedRangeEnd <= mostLoadedRangeStart &&
+                    mostLoadedRangeStart > 0) {
+                    // The range is empty due to calls to 
excludeMostLoadedMember(). We risk not
+                    // expanding the range below and returning a member 
outside the range. Ensure
+                    // that we always expand the range below by resetting the 
partition count.
+                    mostLoadedRangePartitionCount = 
memberTargetAssignmentSizes[sortedMembers.get(mostLoadedRangeStart - 1)];
+                } else {
+                    mostLoadedRangePartitionCount--;
+                }
+
+                // Expand the range.
+                while (mostLoadedRangeStart > 0 &&
+                    
memberTargetAssignmentSizes[sortedMembers.get(mostLoadedRangeStart - 1)] == 
mostLoadedRangePartitionCount) {
+                    mostLoadedRangeStart--;
+                }
+
+                // Reset the pointer to the end of the range of members with 
identical partition
+                // counts.
+                nextMostLoadedMember = mostLoadedRangeEnd - 1;
+            }
+
+            if (nextMostLoadedMember < 0) {
+                // The range is empty due to excludeMostLoadedMember() calls 
and there are no more
+                // members that can give up partitions.
+                return -1;
+            }
+
+            int mostLoadedMemberIndex = 
sortedMembers.get(nextMostLoadedMember);
+            nextMostLoadedMember--;
+
+            return mostLoadedMemberIndex;
+        }
+
+        /**
+         * Excludes the last member returned from nextMostLoadedMember from 
the most loaded range.
+         *
+         * Must not be called if nextMostLoadedMember has not been called yet 
or returned -1.
+         */
+        public void excludeMostLoadedMember() {
+            // Kick the member out of the most loaded range by swapping with 
the member at the end
+            // of the range and contracting the end of the range.
+            int mostLoadedMemberIndex = sortedMembers.get(nextMostLoadedMember 
+ 1);
+            sortedMembers.set(nextMostLoadedMember + 1, 
sortedMembers.get(mostLoadedRangeEnd - 1));
+            sortedMembers.set(mostLoadedRangeEnd - 1, mostLoadedMemberIndex);
+            mostLoadedRangeEnd--;
+        }
+
+        /**
+         * Checks whether the members are balanced based on the partition 
counts of the least and
+         * most loaded ranges.
+         */
+        public boolean isBalanced() {
+            return mostLoadedRangePartitionCount - 
leastLoadedRangePartitionCount <= 1;
+        }
     }
 
     /**
-     * If a member is not assigned all its potential partitions it is subject 
to reassignment.
-     * If any of the partitions assigned to a member is subject to 
reassignment, the member itself
-     * is subject to reassignment.
+     * Allocates the remaining unassigned partitions to members in a balanced 
manner.
+     * <ol>
+     *   <li>Topics are sorted to maximize the probability of a balanced 
assignment.</li>
+     *   <li>Unassigned partitions within each topic are distributed to the 
least loaded
+     *       subscribers, repeatedly.</li>
+     * </ol>
      *
-     * @return true if the member can participate in reassignment, false 
otherwise.
+     * @param partitions        The partitions to be assigned.
      */
-    private boolean canMemberParticipateInReassignment(String memberId) {
-        Set<Uuid> assignedTopicIds = 
targetAssignment.get(memberId).partitions().keySet();
+    private void unassignedPartitionsAssignment(Map<Uuid, List<Integer>> 
partitions) {
+        List<Uuid> sortedTopicIds = sortTopicIds(partitions.keySet());
 
-        int currentAssignmentSize = 
assignmentManager.targetAssignmentSize(memberId);
-        int maxAssignmentSize = assignmentManager.maxAssignmentSize(memberId);
+        MemberAssignmentBalancer memberAssignmentBalancer = new 
MemberAssignmentBalancer();
 
-        if (currentAssignmentSize > maxAssignmentSize)
-            LOG.error("The member {} is assigned more partitions than the 
maximum possible.", memberId);
+        for (Uuid topicId : sortedTopicIds) {
+            memberAssignmentBalancer.initialize(topicSubscribers.get(topicId));
 
-        if (currentAssignmentSize < maxAssignmentSize)
-            return true;
+            for (int partition : partitions.get(topicId)) {
+                int leastLoadedMemberIndex = 
memberAssignmentBalancer.nextLeastLoadedMember();
 
-        for (Uuid topicId : assignedTopicIds) {
-            if (canTopicParticipateInReassignment(topicId))
-                return true;
+                assignPartition(topicId, partition, leastLoadedMemberIndex);
+            }
         }
-        return false;
     }
 
     /**
-     * Checks if the current assignments of partitions to members is balanced.
-     *
-     * Balance is determined by first checking if the difference in the number 
of partitions assigned
-     * to any two members is one. If this is not true, it verifies that no 
member can
-     * receive additional partitions without disrupting the balance.
+     * If a topic has two or more potential members it is subject to 
reassignment.
      *
-     * @return true if the assignment is balanced, false otherwise.
+     * @return true if the topic can participate in reassignment, false 
otherwise.
      */
-
-    private boolean isBalanced() {
-        int min = 
assignmentManager.targetAssignmentSize(sortedMembersByAssignmentSize.first());
-        int max = 
assignmentManager.targetAssignmentSize(sortedMembersByAssignmentSize.last());
-
-        // If minimum and maximum numbers of partitions assigned to members 
differ by at most one return true.
-        if (min >= max - 1)
-            return true;
-
-        // Ensure that members without a complete set of topic partitions 
cannot receive any additional partitions.
-        // This maintains balance. Start by checking members with the fewest 
assigned partitions to see if they can take more.
-        for (String member : sortedMembersByAssignmentSize) {
-            int memberPartitionCount = 
assignmentManager.targetAssignmentSize(member);
-
-            // Skip if this member already has all the topic partitions it can 
get.
-            int maxAssignmentSize = 
assignmentManager.maxAssignmentSize(member);
-            if (memberPartitionCount == maxAssignmentSize)
-                continue;
-
-            // Otherwise make sure it cannot get any more partitions.
-            for (Uuid topicId : 
groupSpec.memberSubscription(member).subscribedTopicIds()) {
-                Set<Integer> assignedPartitions = 
targetAssignment.get(member).partitions().get(topicId);
-                for (int i = 0; i < 
subscribedTopicDescriber.numPartitions(topicId); i++) {
-                    TopicIdPartition topicIdPartition = new 
TopicIdPartition(topicId, i);
-                    if (assignedPartitions == null || 
!assignedPartitions.contains(i)) {
-                        String otherMember = 
partitionOwnerInTargetAssignment.get(topicIdPartition);
-                        int otherMemberPartitionCount = 
assignmentManager.targetAssignmentSize(otherMember);
-                        if (memberPartitionCount + 1 < 
otherMemberPartitionCount) {
-                            LOG.debug("{} can be moved from member {} to 
member {} for a more balanced assignment.",
-                                topicIdPartition, otherMember, member);
-                            return false;
-                        }
-                    }
-                }
-            }
-        }
-        return true;
+    private boolean canTopicParticipateInReassignment(Uuid topicId) {
+        return topicSubscribers.get(topicId).size() >= 2;
     }
 
     /**
      * Balance the current assignment after the initial round of assignments 
have completed.
      */
     private void balance() {
-        if (!unassignedPartitions.isEmpty())
-            throw new PartitionAssignorException("Some partitions were left 
unassigned");
-        // Refill unassigned partitions with all the topicId partitions.
-        unassignedPartitions.addAll(topicIdPartitions(subscribedTopicIds, 
subscribedTopicDescriber));
+        Set<Uuid> topicIds = new HashSet<>(subscribedTopicIds);
 
-        // Narrow down the reassignment scope to only those partitions that 
can actually be reassigned.
-        Set<TopicIdPartition> fixedPartitions = new HashSet<>();
+        // Narrow down the reassignment scope to only those topics that can 
actually be reassigned.
         for (Uuid topicId : subscribedTopicIds) {
             if (!canTopicParticipateInReassignment(topicId)) {
-                for (int i = 0; i < 
subscribedTopicDescriber.numPartitions(topicId); i++) {
-                    fixedPartitions.add(new TopicIdPartition(topicId, i));
-                }
-            }
-        }
-        unassignedPartitions.removeAll(fixedPartitions);
-
-        // Narrow down the reassignment scope to only those members that are 
subject to reassignment.
-        for (String member : groupSpec.memberIds()) {
-            if (!canMemberParticipateInReassignment(member)) {
-                sortedMembersByAssignmentSize.remove(member);
+                topicIds.remove(topicId);
             }
         }
 
         // If all the partitions are fixed i.e. unassigned partitions is empty 
there is no point of re-balancing.
-        if (!unassignedPartitions.isEmpty()) performReassignments();
+        if (!topicIds.isEmpty()) balanceTopics(topicIds);
     }
 
     /**
      * Performs reassignments of partitions to balance the load across members.
      * This method iteratively reassigns partitions until no further moves can 
improve the balance.
+     * <p/>
+     * The method loops over the topics repeatedly until an entire loop around 
the topics has
+     * completed without any reassignments, or we hit an iteration limit.
+     * <p/>
+     * This method produces perfectly balanced assignments when all 
subscribers of a topic have the
+     * same subscriptions. However, when subscribers have overlapping, 
non-identical subscriptions,
+     * the method produces almost-balanced assignments, assuming the iteration 
limit is not hit.
+     * eg. if there are three members and two topics and members 1 and 2 are 
subscribed to the first
+     *     topic and members 2 and 3 are subscribed to the second topic, we 
can end up with an
+     *     assignment like:
+     * <ul>
+     *   <li>Member 1: 9 partitions</li>
+     *   <li>Member 2: 10 partitions</li>
+     *   <li>Member 3: 11 partitions</li>
+     * </ul>
      *
-     * The method uses a do-while loop to ensure at least one pass over the 
partitions and continues
-     * reassigning as long as there are modifications to the current 
assignments. It checks for balance
-     * after each reassignment and exits if the balance is achieved.
+     * In this assignment, the subscribers of the first topic have a 
difference in partitions of 1,
+     * so the topic is considered balanced. The same applies to the second 
topic. However, balance
+     * can be improved by moving a partition from the second topic from member 
3 to member 2 and a
+     * partition from the first topic from member 2 to member 1.
      *
-     * @throws PartitionAssignorException if there are inconsistencies in 
expected members per partition
-     *         or if a partition is expected to already be assigned but isn't.
+     * @param topicIds          The topics to consider for reassignment. These 
topics must have at
+     *                          least two subscribers.
      */
-    private void performReassignments() {
-        boolean modified;
-        boolean reassignmentOccurred;
-        // Repeat reassignment until no partition can be moved to improve the 
balance.
-        do {
-            modified = false;
-            reassignmentOccurred = false;
-            // Reassign all reassignable partitions sorted in descending order
-            // by totalPartitions/number of subscribed members,
-            // until the full list is processed or a balance is achieved.
-            List<TopicIdPartition> reassignablePartitions = 
sortTopicIdPartitions(unassignedPartitions);
-
-            for (TopicIdPartition reassignablePartition : 
reassignablePartitions) {
-                // Only check if there is any change in balance if any moves 
were made.
-                if (reassignmentOccurred && isBalanced()) {
+    private void balanceTopics(Collection<Uuid> topicIds) {
+        List<Uuid> sortedTopicIds = sortTopicIds(topicIds);
+        // The index of the last topic in sortedTopicIds that was rebalanced.
+        // Used to decide when to exit early.
+        int lastRebalanceTopicIndex = -1;
+
+        MemberAssignmentBalancer memberAssignmentBalancer = new 
MemberAssignmentBalancer();
+
+        // An array of partitions, with partitions owned by the same member 
grouped together.
+        List<Integer> partitions = new ArrayList<>();
+
+        // The ranges in the partitions list assigned to members.
+        // Maintaining these ranges allows for constant time, deterministic 
picking of partitions
+        // owned by a given member.
+        Map<Integer, Integer> startPartitionIndices = new HashMap<>();
+        Map<Integer, Integer> endPartitionIndices = new HashMap<>(); // 
exclusive
+
+        // Repeat reassignment until no partition can be moved to improve the 
balance or we hit an
+        // iteration limit.
+        for (int i = 0; i < 16; i++) {

Review Comment:
   Yes. The choice of 16 was arbitrary. I didn't want something too low (not 
enough time for convergence) or too high (too much cpu cost).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to