squah-confluent commented on code in PR #17385:
URL: https://github.com/apache/kafka/pull/17385#discussion_r1799117413


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilder.java:
##########
@@ -157,716 +240,779 @@ public GroupAssignment build() {
 
         assignStickyPartitions();
 
-        unassignedPartitionsAssignment();
+        computeUnassignedPartitions(unassignedPartitions);
+        unassignedPartitionsAssignment(unassignedPartitions);
 
         balance();
 
         return new GroupAssignment(targetAssignment);
     }
 
     /**
-     * <li> TopicIdPartitions are sorted in descending order based on the 
value:
-     *       totalPartitions/number of subscribed members. </li>
-     * <li> If the above value is the same then topicIdPartitions are sorted in
-     *      ascending order of number of subscribers. </li>
-     * <li> If both criteria are the same, sort in ascending order of the 
partition Id.
-     *      This last criteria is for predictability of the assignments. </li>
+     * Sorts topics based on the following criteria:
+     * <ol>
+     *   <li>Topics are sorted in descending order of totalPartitions / number 
of subscribers
+     *       (partitions per subscriber).</li>
+     *   <li>Ties are broken by sorting in ascending order of number of 
subscribers.</li>
+     *   <li>Any remaining ties are broken by sorting in ascending order of 
topic id.</li>
+     * </ol>
+     *
+     * The last criteria is for predictability of assignments.
      *
-     * @param topicIdPartitions       The topic partitions that need to be 
sorted.
-     * @return A list of sorted topic partitions.
+     * @param topicIds          The topic ids that need to be sorted.
+     * @return A list of sorted topic ids.
      */
-    private List<TopicIdPartition> 
sortTopicIdPartitions(Collection<TopicIdPartition> topicIdPartitions) {
-        Comparator<TopicIdPartition> comparator = Comparator
-            .comparingDouble((TopicIdPartition topicIdPartition) -> {
-                int totalPartitions = 
subscribedTopicDescriber.numPartitions(topicIdPartition.topicId());
-                int totalSubscribers = 
membersPerTopic.get(topicIdPartition.topicId()).size();
-                return (double) totalPartitions / totalSubscribers;
-            })
-            .reversed()
-            .thenComparingInt(topicIdPartition -> 
membersPerTopic.get(topicIdPartition.topicId()).size())
-            .thenComparingInt(TopicIdPartition::partitionId);
+    private List<Uuid> sortTopicIds(Collection<Uuid> topicIds) {
+        Comparator<Uuid> comparator = new Comparator<Uuid>() {
+            @Override
+            public int compare(final Uuid topic1Id, final Uuid topic2Id) {
+                int topic1PartitionCount = 
subscribedTopicDescriber.numPartitions(topic1Id);
+                int topic2PartitionCount = 
subscribedTopicDescriber.numPartitions(topic2Id);
+                int topic1SubscriberCount = 
topicSubscribers.get(topic1Id).size();
+                int topic2SubscriberCount = 
topicSubscribers.get(topic2Id).size();
+
+                // Order by partitions per subscriber, descending.
+                int order = Double.compare(
+                    (double) topic2PartitionCount / topic2SubscriberCount,
+                    (double) topic1PartitionCount / topic1SubscriberCount
+                );
+
+                // Then order by subscriber count, ascending.
+                if (order == 0) {
+                    order = Integer.compare(topic1SubscriberCount, 
topic2SubscriberCount);
+                }
 
-        return topicIdPartitions.stream()
-            .sorted(comparator)
-            .collect(Collectors.toList());
+                // Then order by topic id, ascending.
+                if (order == 0) {
+                    order = topic1Id.compareTo(topic2Id);
+                }
+
+                return order;
+            }
+        };
+
+        List<Uuid> sortedTopicIds = new ArrayList<>(topicIds);
+        sortedTopicIds.sort(comparator);
+        return sortedTopicIds;
     }
 
     /**
-     * Gets a set of partitions that are to be retained from the existing 
assignment. This includes:
-     * <li> Partitions from topics that are still present in both the new 
subscriptions and the topic metadata. </li>
+     * Imports the existing assignment into the target assignment, taking into 
account topic
+     * unsubscriptions.
      */
     private void assignStickyPartitions() {
-        groupSpec.memberIds().forEach(memberId ->
-            
groupSpec.memberAssignment(memberId).partitions().forEach((topicId, 
currentAssignment) -> {
+        for (String memberId : groupSpec.memberIds()) {
+            int memberIndex = memberIndices.get(memberId);
+
+            Map<Uuid, Set<Integer>> oldAssignment = 
groupSpec.memberAssignment(memberId).partitions();
+            Map<Uuid, Set<Integer>> newAssignment = null;
+
+            for (Map.Entry<Uuid, Set<Integer>> entry : 
oldAssignment.entrySet()) {
+                Uuid topicId = entry.getKey();
+                Set<Integer> partitions = entry.getValue();
+
                 if 
(groupSpec.memberSubscription(memberId).subscribedTopicIds().contains(topicId)) 
{
-                    currentAssignment.forEach(partition -> {
-                        TopicIdPartition topicIdPartition = new 
TopicIdPartition(topicId, partition);
-                        
assignmentManager.addPartitionToTargetAssignment(topicIdPartition, memberId);
-                        assignedStickyPartitions.add(topicIdPartition);
-                    });
+                    memberTargetAssignmentSizes[memberIndex] += 
partitions.size();
+
+                    int numPartitions = 
subscribedTopicDescriber.numPartitions(topicId);
+                    for (int partition : partitions) {
+                        if (partition >= numPartitions) {
+                            LOG.warn(
+                                "Previous assignment for {} contains {}:{}, 
but topic only has {} partitions",
+                                memberId,
+                                topicId,
+                                partition,
+                                numPartitions
+                            );
+
+                            if (newAssignment == null) {
+                                // If the new assignment is null, we create a 
deep copy of the
+                                // original assignment so that we can alter it.
+                                newAssignment = deepCopy(oldAssignment);
+                            }
+                            Set<Integer> newPartitions = 
newAssignment.get(topicId);
+                            newPartitions.remove(partition);
+                            if (newPartitions.isEmpty()) {
+                                newAssignment.remove(topicId);
+                            }
+                            memberTargetAssignmentSizes[memberIndex]--;
+
+                            continue;
+                        }
+
+                        
previousAssignmentPartitionOwners.get(topicId)[partition] = memberIndex;
+                        
targetAssignmentPartitionOwners.get(topicId)[partition] = memberIndex;
+                    }
                 } else {
+                    // The member is no longer subscribed to the topic. Remove 
it from the
+                    // assignment.
                     LOG.debug("The topic " + topicId + " is no longer present 
in the subscribed topics list");
+
+                    if (newAssignment == null) {
+                        // If the new assignment is null, we create a deep 
copy of the
+                        // original assignment so that we can alter it.
+                        newAssignment = deepCopy(oldAssignment);
+                    }
+                    // Remove the entire topic.
+                    newAssignment.remove(topicId);
                 }
-            })
-        );
+            }
+
+            if (newAssignment == null) {
+                newAssignment = oldAssignment;
+            }
+            targetAssignment.put(memberId, new 
MemberAssignmentImpl(newAssignment));
+        }
     }
 
     /**
-     * Allocates the remaining unassigned partitions to members in a balanced 
manner.
-     * <li> Partitions are sorted to maximize the probability of a balanced 
assignment. </li>
-     * <li> Sort members in ascending order of their current target assignment 
sizes
-     *      to ensure the least filled member gets the partition first. </li>
+     * A class that holds the state for the balancing process.
+     * <p/>
+     * Keeps track of two ranges of members: the members with the least number 
of assigned
+     * partitions and the members with the most number of assigned partitions. 
Within each range,
+     * all members start with the same number of assigned partitions.
      */
-    private void unassignedPartitionsAssignment() {
-        List<TopicIdPartition> sortedPartitions = 
sortTopicIdPartitions(unassignedPartitions);
+    private final class MemberAssignmentBalancer {
+        private final int[] memberTargetAssignmentSizes;
 
-        for (TopicIdPartition partition : sortedPartitions) {
-            TreeSet<String> sortedMembers = 
assignmentManager.sortMembersByAssignmentSize(
-                membersPerTopic.get(partition.topicId())
-            );
+        /**
+         * The members to balance, sorted by number of assigned partitions.
+         * <p/>
+         * Can be visualized like so:
+         * <pre>
+         *           ^
+         *           |          #
+         * partition |        ###
+         * count     |        ###
+         *           | ##########
+         *           +----------->
+         *             members
+         * </pre>
+         */
+        private final List<Integer> sortedMembers;
 
-            for (String member : sortedMembers) {
-                if (assignmentManager.maybeAssignPartitionToMember(partition, 
member)) {
-                    break;
+        /**
+         * [0, leastLoadedRangeEnd) represents the range of members with the 
smallest partition
+         * count.
+         */
+        private int leastLoadedRangeEnd = 0; // exclusive
+
+        /**
+         * The partition count of each member in the least loaded range before 
we start assigning
+         * partitions to them.
+         */
+        private int leastLoadedRangePartitionCount = -1;
+
+        /**
+         * The index of the next element in sortedMembers to which to assign 
the next partition.
+         */
+        private int nextLeastLoadedMember = 0;
+
+        /**
+         * [mostLoadedRangeStart, mostLoadedRangeEnd) is the range of members 
with the largest
+         * partition count.
+         */
+        private int mostLoadedRangeStart = 0; // inclusive
+
+        /**
+         * [mostLoadedRangeStart, mostLoadedRangeEnd) is the range of members 
with the largest
+         * partition count.
+         * <p/>
+         * mostLoadedRangeEnd is not simply the end of the member list, since 
we need to be able to
+         * exclude heavily loaded members which do not have any partitions to 
give up.
+         */
+        private int mostLoadedRangeEnd = 0; // inclusive
+
+        /**
+         * The partition count of each member in the most loaded range before 
we start unassigning
+         * partitions from them.
+         */
+        private int mostLoadedRangePartitionCount = 1;
+
+        /**
+         * The index of the next element in sortedMembers from which to 
unassign the next partition.
+         */
+        private int nextMostLoadedMember = -1;
+
+        // The ranges can be visualized like so:
+        //
+        //                         most
+        //                         loaded
+        //           ^ least       range
+        //           | loaded               #
+        //           | range      [##<-   ]##
+        // partition |          ##[#######]##
+        // count     | [...->  ]##[#######]##
+        //           | [#######]##[#######]##
+        //           +----------------------->
+        //             members
+        //                                 ^^
+        //                                 These members have no partitions to 
give up.
+        //
+        // The least loaded range expands rightwards and the most loaded range 
expands leftwards.
+        // Note that we are not done once the ranges touch, but rather when 
the partition counts in
+        // the ranges differ by one or less.
+
+        public MemberAssignmentBalancer() {
+            this.memberTargetAssignmentSizes = 
UniformHeterogeneousAssignmentBuilder.this.memberTargetAssignmentSizes;
+            this.sortedMembers = new 
ArrayList<>(memberTargetAssignmentSizes.length);
+        }
+
+        /**
+         * Resets the state of the balancer for a new set of members.
+         *
+         * @param members The members to balance.
+         * @return The difference in partition counts between the most and 
least loaded members.
+         */
+        public int initialize(List<Integer> members) {
+            if (members.size() < 1) {
+                // nextLeastLoadedMember needs at least one member.
+                throw new IllegalArgumentException("Cannot balance an empty 
subscriber list.");
+            }
+
+            // Build a sorted list of subscribers, where members with the 
smallest partition count
+            // appear first.
+            sortedMembers.clear();
+            sortedMembers.addAll(members);
+            sortedMembers.sort(memberComparator);
+
+            leastLoadedRangeEnd = 0; // exclusive
+            leastLoadedRangePartitionCount = 
memberTargetAssignmentSizes[sortedMembers.get(0)] - 1;
+            nextLeastLoadedMember = 0;
+
+            mostLoadedRangeStart = sortedMembers.size(); // inclusive
+            mostLoadedRangeEnd = sortedMembers.size(); // exclusive
+            mostLoadedRangePartitionCount = 
memberTargetAssignmentSizes[sortedMembers.get(sortedMembers.size() - 1)] + 1;
+            nextMostLoadedMember = sortedMembers.size() - 1;
+
+            return 
memberTargetAssignmentSizes[sortedMembers.get(sortedMembers.size() - 1)] -
+                   memberTargetAssignmentSizes[sortedMembers.get(0)];
+        }
+
+        /**
+         * Gets the least loaded member to which to assign a partition.
+         * <p/>
+         * Advances the state of the balancer by assuming that an additional 
partition has been
+         * assigned to the returned member.
+         * <p/>
+         * Assumes that there is at least one member.
+         *
+         * @return The least loaded member to which to assign a partition
+         */
+        public int nextLeastLoadedMember() {
+            // To generate a balanced assignment, we assign partitions to 
members in the
+            // [0, leastLoadedRangeEnd) range first. Once each member in the 
range has received a
+            // partition, the partition count of the range rises by one and we 
can try to expand it.
+            //                                     Range full,             
Range full,
+            //                                     bump level by one       
bump level by one,
+            //                                     can't expand range.     
expand range.
+            //           ^                       ^                       ^
+            //           |            #          |            #          | 
[..->     ]#
+            // partition |          ###    ->    | [..->   ]###    ->    | 
[.......##]#
+            // count     | [..->   ]###          | [.......]###          | 
[.......##]#
+            //           | [#######]###          | [#######]###          | 
[#########]#
+            //           +------------->         +------------->         
+------------>
+            //             members                members                
members
+
+            if (nextLeastLoadedMember >= leastLoadedRangeEnd) {
+                // We've hit the end of the range. Bump its level and try to 
expand it.
+                leastLoadedRangePartitionCount++;
+
+                // Expand the range.
+                while (leastLoadedRangeEnd < sortedMembers.size() &&
+                    
memberTargetAssignmentSizes[sortedMembers.get(leastLoadedRangeEnd)] == 
leastLoadedRangePartitionCount) {
+                    leastLoadedRangeEnd++;
                 }
+
+                // Reset the pointer to the start of the range of members with 
identical
+                // partition counts.
+                nextLeastLoadedMember = 0;
             }
+
+            int leastLoadedMemberIndex = 
sortedMembers.get(nextLeastLoadedMember);
+            nextLeastLoadedMember++;
+
+            return leastLoadedMemberIndex;
         }
-    }
 
-    /**
-     * If a topic has two or more potential members it is subject to 
reassignment.
-     *
-     * @return true if the topic can participate in reassignment, false 
otherwise.
-     */
-    private boolean canTopicParticipateInReassignment(Uuid topicId) {
-        return membersPerTopic.get(topicId).size() >= 2;
+        /**
+         * Gets the most loaded member from which to reassign a partition.
+         * <p/>
+         * Advances the state of the balancer by assuming that a partition has 
been unassigned from
+         * the returned member.
+         *
+         * @return The most loaded member from which to reassign a partition, 
or -1 if no such
+         *         member exists.
+         */
+        public int nextMostLoadedMember() {
+            if (nextMostLoadedMember < mostLoadedRangeStart) {
+                if (mostLoadedRangeEnd <= mostLoadedRangeStart &&
+                    mostLoadedRangeStart > 0) {
+                    // The range is empty due to calls to 
excludeMostLoadedMember(). We risk not
+                    // expanding the range below and returning a member 
outside the range. Ensure
+                    // that we always expand the range below by resetting the 
partition count.
+                    mostLoadedRangePartitionCount = 
memberTargetAssignmentSizes[sortedMembers.get(mostLoadedRangeStart - 1)];
+                } else {
+                    mostLoadedRangePartitionCount--;
+                }
+
+                // Expand the range.
+                while (mostLoadedRangeStart > 0 &&
+                    
memberTargetAssignmentSizes[sortedMembers.get(mostLoadedRangeStart - 1)] == 
mostLoadedRangePartitionCount) {
+                    mostLoadedRangeStart--;
+                }
+
+                // Reset the pointer to the end of the range of members with 
identical partition
+                // counts.
+                nextMostLoadedMember = mostLoadedRangeEnd - 1;
+            }
+
+            if (nextMostLoadedMember < 0) {
+                // The range is empty due to excludeMostLoadedMember() calls 
and there are no more
+                // members that can give up partitions.
+                return -1;
+            }
+
+            int mostLoadedMemberIndex = 
sortedMembers.get(nextMostLoadedMember);
+            nextMostLoadedMember--;
+
+            return mostLoadedMemberIndex;
+        }
+
+        /**
+         * Excludes the last member returned from nextMostLoadedMember from 
the most loaded range.
+         *
+         * Must not be called if nextMostLoadedMember has not been called yet 
or returned -1.
+         */
+        public void excludeMostLoadedMember() {
+            // Kick the member out of the most loaded range by swapping with 
the member at the end
+            // of the range and contracting the end of the range.
+            int mostLoadedMemberIndex = sortedMembers.get(nextMostLoadedMember 
+ 1);
+            sortedMembers.set(nextMostLoadedMember + 1, 
sortedMembers.get(mostLoadedRangeEnd - 1));
+            sortedMembers.set(mostLoadedRangeEnd - 1, mostLoadedMemberIndex);
+            mostLoadedRangeEnd--;
+        }
+
+        /**
+         * Checks whether the members are balanced based on the partition 
counts of the least and
+         * most loaded ranges.
+         */
+        public boolean isBalanced() {
+            return mostLoadedRangePartitionCount - 
leastLoadedRangePartitionCount <= 1;
+        }
     }
 
     /**
-     * If a member is not assigned all its potential partitions it is subject 
to reassignment.
-     * If any of the partitions assigned to a member is subject to 
reassignment, the member itself
-     * is subject to reassignment.
+     * Allocates the remaining unassigned partitions to members in a balanced 
manner.
+     * <ol>
+     *   <li>Topics are sorted to maximize the probability of a balanced 
assignment.</li>
+     *   <li>Unassigned partitions within each topic are distributed to the 
least loaded
+     *       subscribers, repeatedly.</li>
+     * </ol>
      *
-     * @return true if the member can participate in reassignment, false 
otherwise.
+     * @param partitions        The partitions to be assigned.
      */
-    private boolean canMemberParticipateInReassignment(String memberId) {
-        Set<Uuid> assignedTopicIds = 
targetAssignment.get(memberId).partitions().keySet();
+    private void unassignedPartitionsAssignment(Map<Uuid, List<Integer>> 
partitions) {
+        List<Uuid> sortedTopicIds = sortTopicIds(partitions.keySet());
 
-        int currentAssignmentSize = 
assignmentManager.targetAssignmentSize(memberId);
-        int maxAssignmentSize = assignmentManager.maxAssignmentSize(memberId);
+        MemberAssignmentBalancer memberAssignmentBalancer = new 
MemberAssignmentBalancer();
 
-        if (currentAssignmentSize > maxAssignmentSize)
-            LOG.error("The member {} is assigned more partitions than the 
maximum possible.", memberId);
+        for (Uuid topicId : sortedTopicIds) {
+            memberAssignmentBalancer.initialize(topicSubscribers.get(topicId));
 
-        if (currentAssignmentSize < maxAssignmentSize)
-            return true;
+            for (int partition : partitions.get(topicId)) {
+                int leastLoadedMemberIndex = 
memberAssignmentBalancer.nextLeastLoadedMember();
 
-        for (Uuid topicId : assignedTopicIds) {
-            if (canTopicParticipateInReassignment(topicId))
-                return true;
+                assignPartition(topicId, partition, leastLoadedMemberIndex);
+            }
         }
-        return false;
     }
 
     /**
-     * Checks if the current assignments of partitions to members is balanced.
-     *
-     * Balance is determined by first checking if the difference in the number 
of partitions assigned
-     * to any two members is one. If this is not true, it verifies that no 
member can
-     * receive additional partitions without disrupting the balance.
+     * If a topic has two or more potential members it is subject to 
reassignment.
      *
-     * @return true if the assignment is balanced, false otherwise.
+     * @return true if the topic can participate in reassignment, false 
otherwise.
      */
-
-    private boolean isBalanced() {
-        int min = 
assignmentManager.targetAssignmentSize(sortedMembersByAssignmentSize.first());
-        int max = 
assignmentManager.targetAssignmentSize(sortedMembersByAssignmentSize.last());
-
-        // If minimum and maximum numbers of partitions assigned to members 
differ by at most one return true.
-        if (min >= max - 1)
-            return true;
-
-        // Ensure that members without a complete set of topic partitions 
cannot receive any additional partitions.
-        // This maintains balance. Start by checking members with the fewest 
assigned partitions to see if they can take more.
-        for (String member : sortedMembersByAssignmentSize) {
-            int memberPartitionCount = 
assignmentManager.targetAssignmentSize(member);
-
-            // Skip if this member already has all the topic partitions it can 
get.
-            int maxAssignmentSize = 
assignmentManager.maxAssignmentSize(member);
-            if (memberPartitionCount == maxAssignmentSize)
-                continue;
-
-            // Otherwise make sure it cannot get any more partitions.
-            for (Uuid topicId : 
groupSpec.memberSubscription(member).subscribedTopicIds()) {
-                Set<Integer> assignedPartitions = 
targetAssignment.get(member).partitions().get(topicId);
-                for (int i = 0; i < 
subscribedTopicDescriber.numPartitions(topicId); i++) {
-                    TopicIdPartition topicIdPartition = new 
TopicIdPartition(topicId, i);
-                    if (assignedPartitions == null || 
!assignedPartitions.contains(i)) {
-                        String otherMember = 
partitionOwnerInTargetAssignment.get(topicIdPartition);
-                        int otherMemberPartitionCount = 
assignmentManager.targetAssignmentSize(otherMember);
-                        if (memberPartitionCount + 1 < 
otherMemberPartitionCount) {
-                            LOG.debug("{} can be moved from member {} to 
member {} for a more balanced assignment.",
-                                topicIdPartition, otherMember, member);
-                            return false;
-                        }
-                    }
-                }
-            }
-        }
-        return true;
+    private boolean canTopicParticipateInReassignment(Uuid topicId) {
+        return topicSubscribers.get(topicId).size() >= 2;
     }
 
     /**
      * Balance the current assignment after the initial round of assignments 
have completed.
      */
     private void balance() {
-        if (!unassignedPartitions.isEmpty())
-            throw new PartitionAssignorException("Some partitions were left 
unassigned");
-        // Refill unassigned partitions with all the topicId partitions.
-        unassignedPartitions.addAll(topicIdPartitions(subscribedTopicIds, 
subscribedTopicDescriber));
+        Set<Uuid> topicIds = new HashSet<>(subscribedTopicIds);
 
-        // Narrow down the reassignment scope to only those partitions that 
can actually be reassigned.
-        Set<TopicIdPartition> fixedPartitions = new HashSet<>();
+        // Narrow down the reassignment scope to only those topics that can 
actually be reassigned.
         for (Uuid topicId : subscribedTopicIds) {
             if (!canTopicParticipateInReassignment(topicId)) {
-                for (int i = 0; i < 
subscribedTopicDescriber.numPartitions(topicId); i++) {
-                    fixedPartitions.add(new TopicIdPartition(topicId, i));
-                }
-            }
-        }
-        unassignedPartitions.removeAll(fixedPartitions);
-
-        // Narrow down the reassignment scope to only those members that are 
subject to reassignment.
-        for (String member : groupSpec.memberIds()) {
-            if (!canMemberParticipateInReassignment(member)) {
-                sortedMembersByAssignmentSize.remove(member);
+                topicIds.remove(topicId);
             }
         }
 
         // If all the partitions are fixed i.e. unassigned partitions is empty 
there is no point of re-balancing.
-        if (!unassignedPartitions.isEmpty()) performReassignments();
+        if (!topicIds.isEmpty()) balanceTopics(topicIds);
     }
 
     /**
      * Performs reassignments of partitions to balance the load across members.
      * This method iteratively reassigns partitions until no further moves can 
improve the balance.
+     * <p/>
+     * The method loops over the topics repeatedly until an entire loop around 
the topics has
+     * completed without any reassignments, or we hit an iteration limit.
+     * <p/>
+     * This method produces perfectly balanced assignments when all 
subscribers of a topic have the
+     * same subscriptions. However, when subscribers have overlapping, 
non-identical subscriptions,
+     * the method produces almost-balanced assignments, assuming the iteration 
limit is not hit.
+     * eg. if there are three members and two topics and members 1 and 2 are 
subscribed to the first
+     *     topic and members 2 and 3 are subscribed to the second topic, we 
can end up with an
+     *     assignment like:
+     * <ul>
+     *   <li>Member 1: 9 partitions</li>
+     *   <li>Member 2: 10 partitions</li>
+     *   <li>Member 3: 11 partitions</li>
+     * </ul>
      *
-     * The method uses a do-while loop to ensure at least one pass over the 
partitions and continues
-     * reassigning as long as there are modifications to the current 
assignments. It checks for balance
-     * after each reassignment and exits if the balance is achieved.
+     * In this assignment, the subscribers of the first topic have a 
difference in partitions of 1,
+     * so the topic is considered balanced. The same applies to the second 
topic. However, balance
+     * can be improved by moving a partition from the second topic from member 
3 to member 2 and a
+     * partition from the first topic from member 2 to member 1.
      *
-     * @throws PartitionAssignorException if there are inconsistencies in 
expected members per partition
-     *         or if a partition is expected to already be assigned but isn't.
+     * @param topicIds          The topics to consider for reassignment. These 
topics must have at
+     *                          least two subscribers.
      */
-    private void performReassignments() {
-        boolean modified;
-        boolean reassignmentOccurred;
-        // Repeat reassignment until no partition can be moved to improve the 
balance.
-        do {
-            modified = false;
-            reassignmentOccurred = false;
-            // Reassign all reassignable partitions sorted in descending order
-            // by totalPartitions/number of subscribed members,
-            // until the full list is processed or a balance is achieved.
-            List<TopicIdPartition> reassignablePartitions = 
sortTopicIdPartitions(unassignedPartitions);
-
-            for (TopicIdPartition reassignablePartition : 
reassignablePartitions) {
-                // Only check if there is any change in balance if any moves 
were made.
-                if (reassignmentOccurred && isBalanced()) {
+    private void balanceTopics(Collection<Uuid> topicIds) {
+        List<Uuid> sortedTopicIds = sortTopicIds(topicIds);
+        // The index of the last topic in sortedTopicIds that was rebalanced.
+        // Used to decide when to exit early.
+        int lastRebalanceTopicIndex = -1;
+
+        MemberAssignmentBalancer memberAssignmentBalancer = new 
MemberAssignmentBalancer();
+
+        // An array of partitions, with partitions owned by the same member 
grouped together.
+        List<Integer> partitions = new ArrayList<>();
+
+        // The ranges in the partitions list assigned to members.
+        // Maintaining these ranges allows for constant time, deterministic 
picking of partitions
+        // owned by a given member.
+        Map<Integer, Integer> startPartitionIndices = new HashMap<>();
+        Map<Integer, Integer> endPartitionIndices = new HashMap<>(); // 
exclusive
+
+        // Repeat reassignment until no partition can be moved to improve the 
balance or we hit an
+        // iteration limit.
+        for (int i = 0; i < 16; i++) {

Review Comment:
   Yes. The choice of 16 was arbitrary. I didn't want something too low (not 
enough time for convergence) or too high (too much cpu cost).
   
   Also added a javadoc for the new constant.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to