squah-confluent commented on code in PR #17385: URL: https://github.com/apache/kafka/pull/17385#discussion_r1799117546
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilder.java: ########## @@ -157,716 +240,779 @@ public GroupAssignment build() { assignStickyPartitions(); - unassignedPartitionsAssignment(); + computeUnassignedPartitions(unassignedPartitions); + unassignedPartitionsAssignment(unassignedPartitions); balance(); return new GroupAssignment(targetAssignment); } /** - * <li> TopicIdPartitions are sorted in descending order based on the value: - * totalPartitions/number of subscribed members. </li> - * <li> If the above value is the same then topicIdPartitions are sorted in - * ascending order of number of subscribers. </li> - * <li> If both criteria are the same, sort in ascending order of the partition Id. - * This last criteria is for predictability of the assignments. </li> + * Sorts topics based on the following criteria: + * <ol> + * <li>Topics are sorted in descending order of totalPartitions / number of subscribers + * (partitions per subscriber).</li> + * <li>Ties are broken by sorting in ascending order of number of subscribers.</li> + * <li>Any remaining ties are broken by sorting in ascending order of topic id.</li> + * </ol> + * + * The last criteria is for predictability of assignments. * - * @param topicIdPartitions The topic partitions that need to be sorted. - * @return A list of sorted topic partitions. + * @param topicIds The topic ids that need to be sorted. + * @return A list of sorted topic ids. */ - private List<TopicIdPartition> sortTopicIdPartitions(Collection<TopicIdPartition> topicIdPartitions) { - Comparator<TopicIdPartition> comparator = Comparator - .comparingDouble((TopicIdPartition topicIdPartition) -> { - int totalPartitions = subscribedTopicDescriber.numPartitions(topicIdPartition.topicId()); - int totalSubscribers = membersPerTopic.get(topicIdPartition.topicId()).size(); - return (double) totalPartitions / totalSubscribers; - }) - .reversed() - .thenComparingInt(topicIdPartition -> membersPerTopic.get(topicIdPartition.topicId()).size()) - .thenComparingInt(TopicIdPartition::partitionId); + private List<Uuid> sortTopicIds(Collection<Uuid> topicIds) { + Comparator<Uuid> comparator = new Comparator<Uuid>() { + @Override + public int compare(final Uuid topic1Id, final Uuid topic2Id) { + int topic1PartitionCount = subscribedTopicDescriber.numPartitions(topic1Id); + int topic2PartitionCount = subscribedTopicDescriber.numPartitions(topic2Id); + int topic1SubscriberCount = topicSubscribers.get(topic1Id).size(); + int topic2SubscriberCount = topicSubscribers.get(topic2Id).size(); + + // Order by partitions per subscriber, descending. + int order = Double.compare( + (double) topic2PartitionCount / topic2SubscriberCount, + (double) topic1PartitionCount / topic1SubscriberCount + ); + + // Then order by subscriber count, ascending. + if (order == 0) { + order = Integer.compare(topic1SubscriberCount, topic2SubscriberCount); + } - return topicIdPartitions.stream() - .sorted(comparator) - .collect(Collectors.toList()); + // Then order by topic id, ascending. + if (order == 0) { + order = topic1Id.compareTo(topic2Id); + } + + return order; + } + }; + + List<Uuid> sortedTopicIds = new ArrayList<>(topicIds); + sortedTopicIds.sort(comparator); + return sortedTopicIds; } /** - * Gets a set of partitions that are to be retained from the existing assignment. This includes: - * <li> Partitions from topics that are still present in both the new subscriptions and the topic metadata. </li> + * Imports the existing assignment into the target assignment, taking into account topic + * unsubscriptions. */ private void assignStickyPartitions() { - groupSpec.memberIds().forEach(memberId -> - groupSpec.memberAssignment(memberId).partitions().forEach((topicId, currentAssignment) -> { + for (String memberId : groupSpec.memberIds()) { + int memberIndex = memberIndices.get(memberId); + + Map<Uuid, Set<Integer>> oldAssignment = groupSpec.memberAssignment(memberId).partitions(); + Map<Uuid, Set<Integer>> newAssignment = null; + + for (Map.Entry<Uuid, Set<Integer>> entry : oldAssignment.entrySet()) { + Uuid topicId = entry.getKey(); + Set<Integer> partitions = entry.getValue(); + if (groupSpec.memberSubscription(memberId).subscribedTopicIds().contains(topicId)) { - currentAssignment.forEach(partition -> { - TopicIdPartition topicIdPartition = new TopicIdPartition(topicId, partition); - assignmentManager.addPartitionToTargetAssignment(topicIdPartition, memberId); - assignedStickyPartitions.add(topicIdPartition); - }); + memberTargetAssignmentSizes[memberIndex] += partitions.size(); + + int numPartitions = subscribedTopicDescriber.numPartitions(topicId); + for (int partition : partitions) { + if (partition >= numPartitions) { + LOG.warn( + "Previous assignment for {} contains {}:{}, but topic only has {} partitions", + memberId, + topicId, + partition, + numPartitions + ); + + if (newAssignment == null) { + // If the new assignment is null, we create a deep copy of the + // original assignment so that we can alter it. + newAssignment = deepCopy(oldAssignment); + } + Set<Integer> newPartitions = newAssignment.get(topicId); + newPartitions.remove(partition); + if (newPartitions.isEmpty()) { + newAssignment.remove(topicId); + } + memberTargetAssignmentSizes[memberIndex]--; + + continue; + } + + previousAssignmentPartitionOwners.get(topicId)[partition] = memberIndex; + targetAssignmentPartitionOwners.get(topicId)[partition] = memberIndex; + } } else { + // The member is no longer subscribed to the topic. Remove it from the + // assignment. LOG.debug("The topic " + topicId + " is no longer present in the subscribed topics list"); + + if (newAssignment == null) { + // If the new assignment is null, we create a deep copy of the + // original assignment so that we can alter it. + newAssignment = deepCopy(oldAssignment); + } + // Remove the entire topic. + newAssignment.remove(topicId); } - }) - ); + } + + if (newAssignment == null) { + newAssignment = oldAssignment; + } + targetAssignment.put(memberId, new MemberAssignmentImpl(newAssignment)); + } } /** - * Allocates the remaining unassigned partitions to members in a balanced manner. - * <li> Partitions are sorted to maximize the probability of a balanced assignment. </li> - * <li> Sort members in ascending order of their current target assignment sizes - * to ensure the least filled member gets the partition first. </li> + * A class that holds the state for the balancing process. + * <p/> + * Keeps track of two ranges of members: the members with the least number of assigned + * partitions and the members with the most number of assigned partitions. Within each range, + * all members start with the same number of assigned partitions. */ - private void unassignedPartitionsAssignment() { - List<TopicIdPartition> sortedPartitions = sortTopicIdPartitions(unassignedPartitions); + private final class MemberAssignmentBalancer { + private final int[] memberTargetAssignmentSizes; - for (TopicIdPartition partition : sortedPartitions) { - TreeSet<String> sortedMembers = assignmentManager.sortMembersByAssignmentSize( - membersPerTopic.get(partition.topicId()) - ); + /** + * The members to balance, sorted by number of assigned partitions. + * <p/> + * Can be visualized like so: + * <pre> + * ^ + * | # + * partition | ### + * count | ### + * | ########## + * +-----------> + * members + * </pre> + */ + private final List<Integer> sortedMembers; - for (String member : sortedMembers) { - if (assignmentManager.maybeAssignPartitionToMember(partition, member)) { - break; + /** + * [0, leastLoadedRangeEnd) represents the range of members with the smallest partition + * count. + */ + private int leastLoadedRangeEnd = 0; // exclusive + + /** + * The partition count of each member in the least loaded range before we start assigning + * partitions to them. + */ + private int leastLoadedRangePartitionCount = -1; + + /** + * The index of the next element in sortedMembers to which to assign the next partition. + */ + private int nextLeastLoadedMember = 0; + + /** + * [mostLoadedRangeStart, mostLoadedRangeEnd) is the range of members with the largest + * partition count. + */ + private int mostLoadedRangeStart = 0; // inclusive + + /** + * [mostLoadedRangeStart, mostLoadedRangeEnd) is the range of members with the largest + * partition count. + * <p/> + * mostLoadedRangeEnd is not simply the end of the member list, since we need to be able to + * exclude heavily loaded members which do not have any partitions to give up. + */ + private int mostLoadedRangeEnd = 0; // inclusive + + /** + * The partition count of each member in the most loaded range before we start unassigning + * partitions from them. + */ + private int mostLoadedRangePartitionCount = 1; + + /** + * The index of the next element in sortedMembers from which to unassign the next partition. + */ + private int nextMostLoadedMember = -1; + + // The ranges can be visualized like so: + // + // most + // loaded + // ^ least range + // | loaded # + // | range [##<- ]## + // partition | ##[#######]## + // count | [...-> ]##[#######]## + // | [#######]##[#######]## + // +-----------------------> + // members + // ^^ + // These members have no partitions to give up. + // + // The least loaded range expands rightwards and the most loaded range expands leftwards. + // Note that we are not done once the ranges touch, but rather when the partition counts in + // the ranges differ by one or less. + + public MemberAssignmentBalancer() { + this.memberTargetAssignmentSizes = UniformHeterogeneousAssignmentBuilder.this.memberTargetAssignmentSizes; + this.sortedMembers = new ArrayList<>(memberTargetAssignmentSizes.length); + } + + /** + * Resets the state of the balancer for a new set of members. + * + * @param members The members to balance. + * @return The difference in partition counts between the most and least loaded members. + */ + public int initialize(List<Integer> members) { + if (members.size() < 1) { + // nextLeastLoadedMember needs at least one member. + throw new IllegalArgumentException("Cannot balance an empty subscriber list."); + } + + // Build a sorted list of subscribers, where members with the smallest partition count + // appear first. + sortedMembers.clear(); + sortedMembers.addAll(members); + sortedMembers.sort(memberComparator); + + leastLoadedRangeEnd = 0; // exclusive + leastLoadedRangePartitionCount = memberTargetAssignmentSizes[sortedMembers.get(0)] - 1; + nextLeastLoadedMember = 0; + + mostLoadedRangeStart = sortedMembers.size(); // inclusive + mostLoadedRangeEnd = sortedMembers.size(); // exclusive + mostLoadedRangePartitionCount = memberTargetAssignmentSizes[sortedMembers.get(sortedMembers.size() - 1)] + 1; + nextMostLoadedMember = sortedMembers.size() - 1; + + return memberTargetAssignmentSizes[sortedMembers.get(sortedMembers.size() - 1)] - + memberTargetAssignmentSizes[sortedMembers.get(0)]; + } + + /** + * Gets the least loaded member to which to assign a partition. + * <p/> + * Advances the state of the balancer by assuming that an additional partition has been + * assigned to the returned member. + * <p/> + * Assumes that there is at least one member. + * + * @return The least loaded member to which to assign a partition + */ + public int nextLeastLoadedMember() { + // To generate a balanced assignment, we assign partitions to members in the + // [0, leastLoadedRangeEnd) range first. Once each member in the range has received a + // partition, the partition count of the range rises by one and we can try to expand it. + // Range full, Range full, + // bump level by one bump level by one, + // can't expand range. expand range. + // ^ ^ ^ + // | # | # | [..-> ]# + // partition | ### -> | [..-> ]### -> | [.......##]# + // count | [..-> ]### | [.......]### | [.......##]# + // | [#######]### | [#######]### | [#########]# + // +-------------> +-------------> +------------> + // members members members + + if (nextLeastLoadedMember >= leastLoadedRangeEnd) { + // We've hit the end of the range. Bump its level and try to expand it. + leastLoadedRangePartitionCount++; + + // Expand the range. + while (leastLoadedRangeEnd < sortedMembers.size() && + memberTargetAssignmentSizes[sortedMembers.get(leastLoadedRangeEnd)] == leastLoadedRangePartitionCount) { + leastLoadedRangeEnd++; } + + // Reset the pointer to the start of the range of members with identical + // partition counts. + nextLeastLoadedMember = 0; } + + int leastLoadedMemberIndex = sortedMembers.get(nextLeastLoadedMember); + nextLeastLoadedMember++; + + return leastLoadedMemberIndex; } - } - /** - * If a topic has two or more potential members it is subject to reassignment. - * - * @return true if the topic can participate in reassignment, false otherwise. - */ - private boolean canTopicParticipateInReassignment(Uuid topicId) { - return membersPerTopic.get(topicId).size() >= 2; + /** + * Gets the most loaded member from which to reassign a partition. + * <p/> + * Advances the state of the balancer by assuming that a partition has been unassigned from + * the returned member. + * + * @return The most loaded member from which to reassign a partition, or -1 if no such + * member exists. + */ + public int nextMostLoadedMember() { + if (nextMostLoadedMember < mostLoadedRangeStart) { + if (mostLoadedRangeEnd <= mostLoadedRangeStart && + mostLoadedRangeStart > 0) { + // The range is empty due to calls to excludeMostLoadedMember(). We risk not + // expanding the range below and returning a member outside the range. Ensure + // that we always expand the range below by resetting the partition count. + mostLoadedRangePartitionCount = memberTargetAssignmentSizes[sortedMembers.get(mostLoadedRangeStart - 1)]; + } else { + mostLoadedRangePartitionCount--; + } + + // Expand the range. + while (mostLoadedRangeStart > 0 && + memberTargetAssignmentSizes[sortedMembers.get(mostLoadedRangeStart - 1)] == mostLoadedRangePartitionCount) { + mostLoadedRangeStart--; + } + + // Reset the pointer to the end of the range of members with identical partition + // counts. + nextMostLoadedMember = mostLoadedRangeEnd - 1; + } + + if (nextMostLoadedMember < 0) { + // The range is empty due to excludeMostLoadedMember() calls and there are no more + // members that can give up partitions. + return -1; + } + + int mostLoadedMemberIndex = sortedMembers.get(nextMostLoadedMember); + nextMostLoadedMember--; + + return mostLoadedMemberIndex; + } + + /** + * Excludes the last member returned from nextMostLoadedMember from the most loaded range. + * + * Must not be called if nextMostLoadedMember has not been called yet or returned -1. + */ + public void excludeMostLoadedMember() { + // Kick the member out of the most loaded range by swapping with the member at the end + // of the range and contracting the end of the range. + int mostLoadedMemberIndex = sortedMembers.get(nextMostLoadedMember + 1); + sortedMembers.set(nextMostLoadedMember + 1, sortedMembers.get(mostLoadedRangeEnd - 1)); + sortedMembers.set(mostLoadedRangeEnd - 1, mostLoadedMemberIndex); + mostLoadedRangeEnd--; + } + + /** + * Checks whether the members are balanced based on the partition counts of the least and + * most loaded ranges. + */ + public boolean isBalanced() { + return mostLoadedRangePartitionCount - leastLoadedRangePartitionCount <= 1; + } } /** - * If a member is not assigned all its potential partitions it is subject to reassignment. - * If any of the partitions assigned to a member is subject to reassignment, the member itself - * is subject to reassignment. + * Allocates the remaining unassigned partitions to members in a balanced manner. + * <ol> + * <li>Topics are sorted to maximize the probability of a balanced assignment.</li> + * <li>Unassigned partitions within each topic are distributed to the least loaded + * subscribers, repeatedly.</li> + * </ol> * - * @return true if the member can participate in reassignment, false otherwise. + * @param partitions The partitions to be assigned. */ - private boolean canMemberParticipateInReassignment(String memberId) { - Set<Uuid> assignedTopicIds = targetAssignment.get(memberId).partitions().keySet(); + private void unassignedPartitionsAssignment(Map<Uuid, List<Integer>> partitions) { + List<Uuid> sortedTopicIds = sortTopicIds(partitions.keySet()); - int currentAssignmentSize = assignmentManager.targetAssignmentSize(memberId); - int maxAssignmentSize = assignmentManager.maxAssignmentSize(memberId); + MemberAssignmentBalancer memberAssignmentBalancer = new MemberAssignmentBalancer(); - if (currentAssignmentSize > maxAssignmentSize) - LOG.error("The member {} is assigned more partitions than the maximum possible.", memberId); + for (Uuid topicId : sortedTopicIds) { + memberAssignmentBalancer.initialize(topicSubscribers.get(topicId)); - if (currentAssignmentSize < maxAssignmentSize) - return true; + for (int partition : partitions.get(topicId)) { + int leastLoadedMemberIndex = memberAssignmentBalancer.nextLeastLoadedMember(); - for (Uuid topicId : assignedTopicIds) { - if (canTopicParticipateInReassignment(topicId)) - return true; + assignPartition(topicId, partition, leastLoadedMemberIndex); + } } - return false; } /** - * Checks if the current assignments of partitions to members is balanced. - * - * Balance is determined by first checking if the difference in the number of partitions assigned - * to any two members is one. If this is not true, it verifies that no member can - * receive additional partitions without disrupting the balance. + * If a topic has two or more potential members it is subject to reassignment. * - * @return true if the assignment is balanced, false otherwise. + * @return true if the topic can participate in reassignment, false otherwise. */ - - private boolean isBalanced() { - int min = assignmentManager.targetAssignmentSize(sortedMembersByAssignmentSize.first()); - int max = assignmentManager.targetAssignmentSize(sortedMembersByAssignmentSize.last()); - - // If minimum and maximum numbers of partitions assigned to members differ by at most one return true. - if (min >= max - 1) - return true; - - // Ensure that members without a complete set of topic partitions cannot receive any additional partitions. - // This maintains balance. Start by checking members with the fewest assigned partitions to see if they can take more. - for (String member : sortedMembersByAssignmentSize) { - int memberPartitionCount = assignmentManager.targetAssignmentSize(member); - - // Skip if this member already has all the topic partitions it can get. - int maxAssignmentSize = assignmentManager.maxAssignmentSize(member); - if (memberPartitionCount == maxAssignmentSize) - continue; - - // Otherwise make sure it cannot get any more partitions. - for (Uuid topicId : groupSpec.memberSubscription(member).subscribedTopicIds()) { - Set<Integer> assignedPartitions = targetAssignment.get(member).partitions().get(topicId); - for (int i = 0; i < subscribedTopicDescriber.numPartitions(topicId); i++) { - TopicIdPartition topicIdPartition = new TopicIdPartition(topicId, i); - if (assignedPartitions == null || !assignedPartitions.contains(i)) { - String otherMember = partitionOwnerInTargetAssignment.get(topicIdPartition); - int otherMemberPartitionCount = assignmentManager.targetAssignmentSize(otherMember); - if (memberPartitionCount + 1 < otherMemberPartitionCount) { - LOG.debug("{} can be moved from member {} to member {} for a more balanced assignment.", - topicIdPartition, otherMember, member); - return false; - } - } - } - } - } - return true; + private boolean canTopicParticipateInReassignment(Uuid topicId) { + return topicSubscribers.get(topicId).size() >= 2; } /** * Balance the current assignment after the initial round of assignments have completed. */ private void balance() { - if (!unassignedPartitions.isEmpty()) - throw new PartitionAssignorException("Some partitions were left unassigned"); - // Refill unassigned partitions with all the topicId partitions. - unassignedPartitions.addAll(topicIdPartitions(subscribedTopicIds, subscribedTopicDescriber)); + Set<Uuid> topicIds = new HashSet<>(subscribedTopicIds); - // Narrow down the reassignment scope to only those partitions that can actually be reassigned. - Set<TopicIdPartition> fixedPartitions = new HashSet<>(); + // Narrow down the reassignment scope to only those topics that can actually be reassigned. for (Uuid topicId : subscribedTopicIds) { if (!canTopicParticipateInReassignment(topicId)) { - for (int i = 0; i < subscribedTopicDescriber.numPartitions(topicId); i++) { - fixedPartitions.add(new TopicIdPartition(topicId, i)); - } - } - } - unassignedPartitions.removeAll(fixedPartitions); - - // Narrow down the reassignment scope to only those members that are subject to reassignment. - for (String member : groupSpec.memberIds()) { - if (!canMemberParticipateInReassignment(member)) { - sortedMembersByAssignmentSize.remove(member); + topicIds.remove(topicId); } } // If all the partitions are fixed i.e. unassigned partitions is empty there is no point of re-balancing. - if (!unassignedPartitions.isEmpty()) performReassignments(); + if (!topicIds.isEmpty()) balanceTopics(topicIds); } /** * Performs reassignments of partitions to balance the load across members. * This method iteratively reassigns partitions until no further moves can improve the balance. + * <p/> + * The method loops over the topics repeatedly until an entire loop around the topics has + * completed without any reassignments, or we hit an iteration limit. + * <p/> + * This method produces perfectly balanced assignments when all subscribers of a topic have the + * same subscriptions. However, when subscribers have overlapping, non-identical subscriptions, + * the method produces almost-balanced assignments, assuming the iteration limit is not hit. + * eg. if there are three members and two topics and members 1 and 2 are subscribed to the first + * topic and members 2 and 3 are subscribed to the second topic, we can end up with an + * assignment like: + * <ul> + * <li>Member 1: 9 partitions</li> + * <li>Member 2: 10 partitions</li> + * <li>Member 3: 11 partitions</li> + * </ul> * - * The method uses a do-while loop to ensure at least one pass over the partitions and continues - * reassigning as long as there are modifications to the current assignments. It checks for balance - * after each reassignment and exits if the balance is achieved. + * In this assignment, the subscribers of the first topic have a difference in partitions of 1, + * so the topic is considered balanced. The same applies to the second topic. However, balance + * can be improved by moving a partition from the second topic from member 3 to member 2 and a + * partition from the first topic from member 2 to member 1. * - * @throws PartitionAssignorException if there are inconsistencies in expected members per partition - * or if a partition is expected to already be assigned but isn't. + * @param topicIds The topics to consider for reassignment. These topics must have at + * least two subscribers. */ - private void performReassignments() { - boolean modified; - boolean reassignmentOccurred; - // Repeat reassignment until no partition can be moved to improve the balance. - do { - modified = false; - reassignmentOccurred = false; - // Reassign all reassignable partitions sorted in descending order - // by totalPartitions/number of subscribed members, - // until the full list is processed or a balance is achieved. - List<TopicIdPartition> reassignablePartitions = sortTopicIdPartitions(unassignedPartitions); - - for (TopicIdPartition reassignablePartition : reassignablePartitions) { - // Only check if there is any change in balance if any moves were made. - if (reassignmentOccurred && isBalanced()) { + private void balanceTopics(Collection<Uuid> topicIds) { + List<Uuid> sortedTopicIds = sortTopicIds(topicIds); + // The index of the last topic in sortedTopicIds that was rebalanced. + // Used to decide when to exit early. + int lastRebalanceTopicIndex = -1; + + MemberAssignmentBalancer memberAssignmentBalancer = new MemberAssignmentBalancer(); + + // An array of partitions, with partitions owned by the same member grouped together. + List<Integer> partitions = new ArrayList<>(); + + // The ranges in the partitions list assigned to members. + // Maintaining these ranges allows for constant time, deterministic picking of partitions + // owned by a given member. + Map<Integer, Integer> startPartitionIndices = new HashMap<>(); + Map<Integer, Integer> endPartitionIndices = new HashMap<>(); // exclusive + + // Repeat reassignment until no partition can be moved to improve the balance or we hit an + // iteration limit. + for (int i = 0; i < 16; i++) { + for (int topicIndex = 0; topicIndex < sortedTopicIds.size(); topicIndex++) { + if (topicIndex == lastRebalanceTopicIndex) { + // The last rebalanced topic was this one, which means we've gone through all + // the topics and didn't perform any additional rebalancing. Don't bother trying + // to rebalance this topic again and exit. return; } - reassignmentOccurred = false; - // The topicIdPartition must have at least two members. - if (membersPerTopic.get(reassignablePartition.topicId()).size() <= 1) - throw new PartitionAssignorException(String.format("Expected more than one potential member for " + - "topicIdPartition '%s'", reassignablePartition) - ); + Uuid topicId = sortedTopicIds.get(topicIndex); - // The topicIdPartition must have a current target owner. - String currentTargetOwner = partitionOwnerInTargetAssignment.get(reassignablePartition); - if (currentTargetOwner == null) - throw new PartitionAssignorException(String.format("Expected topicIdPartition '%s' to be assigned " + - "to a member", reassignablePartition) - ); + int reassignedPartitionCount = balanceTopic( + topicId, + memberAssignmentBalancer, + partitions, + startPartitionIndices, + endPartitionIndices + ); - for (String otherMember : membersPerTopic.get(reassignablePartition.topicId())) { - if (assignmentManager.targetAssignmentSize(currentTargetOwner) > assignmentManager.targetAssignmentSize(otherMember) + 1) { - reassignPartition(reassignablePartition); - modified = true; - reassignmentOccurred = true; - break; - } + if (reassignedPartitionCount > 0 || + lastRebalanceTopicIndex == -1) { + lastRebalanceTopicIndex = topicIndex; } } - } while (modified); - } - - /** - * Reassigns a partition to an eligible member with the fewest current target assignments. - * <ul> - * <li> Iterates over members sorted by ascending assignment size. </li> - * <li> Selects the first member subscribed to the partition's topic. </li> - * </ul> - * - * @param partition The partition to reassign. - * @throws AssertionError If no subscribed member is found. - */ - private void reassignPartition(TopicIdPartition partition) { - // Find the new member with the least assignment size. - String newOwner = null; - for (String anotherMember : sortedMembersByAssignmentSize) { - if (groupSpec.memberSubscription(anotherMember).subscribedTopicIds().contains(partition.topicId())) { - newOwner = anotherMember; - break; - } - } - - if (newOwner == null) { - throw new PartitionAssignorException("No suitable new owner was found for the partition" + partition); } - - reassignPartition(partition, newOwner); } /** - * Reassigns the given partition to a new member while considering partition movements and stickiness. - * <p> - * This method performs the following actions: - * <ol> - * <li> Determines the current owner of the partition. </li> - * <li> Identifies the correct partition to move, adhering to stickiness constraints. </li> - * <li> Processes the partition movement to the new member. </li> - * </ol> + * Performs reassignments of a topic's partitions to balance the load across its subscribers. * - * @param partition The {@link TopicIdPartition} to be reassigned. - * @param newMember The Id of the member to which the partition should be reassigned. + * @param topicId The topic to consider for reassignment. The topic must have + * at least two subscribers. + * @param memberAssignmentBalancer A MemberAssignmentBalancer to hold the state of the balancing + * algorithm. + * subscribers in ascending order of total assigned partitions. + * @param partitions A list for the balancing algorithm to store the topic's + * partitions, with partitions owned by the same member grouped + * together. + * @param startPartitionIndices A map for the balancing algorithm to store the ranges in the + * partitions list assigned to members. These ranges allows for + * constant time, deterministic picking of partitions owned by a + * given member. + * @param endPartitionIndices A map for the balancing algorithm to store the ranges in the + * partitions list assigned to members. These ranges allows for + * constant time, deterministic picking of partitions owned by a + * given member. + * @return the number of partitions reassigned. */ - private void reassignPartition(TopicIdPartition partition, String newMember) { - String member = partitionOwnerInTargetAssignment.get(partition); - // Find the correct partition movement considering the stickiness requirement. - TopicIdPartition partitionToBeMoved = partitionMovements.computeActualPartitionToBeMoved( - partition, - member, - newMember - ); - processPartitionMovement(partitionToBeMoved, newMember); - } - - private void processPartitionMovement(TopicIdPartition topicIdPartition, String newMember) { - String oldMember = partitionOwnerInTargetAssignment.get(topicIdPartition); - - partitionMovements.movePartition(topicIdPartition, oldMember, newMember); - - assignmentManager.removePartitionFromTargetAssignment(topicIdPartition, oldMember); - assignmentManager.addPartitionToTargetAssignment(topicIdPartition, newMember); - } - - /** - * Adds the topic's partition to the member's target assignment. - */ - private static void addPartitionToAssignment( - Map<String, MemberAssignment> memberAssignments, - String memberId, + private int balanceTopic( Uuid topicId, - int partition + MemberAssignmentBalancer memberAssignmentBalancer, + List<Integer> partitions, + Map<Integer, Integer> startPartitionIndices, + Map<Integer, Integer> endPartitionIndices // exclusive ) { - memberAssignments.get(memberId) - .partitions() - .computeIfAbsent(topicId, __ -> new HashSet<>()) - .add(partition); - } + int reassignedPartitionCount = 0; - /** - * Constructs a set of {@code TopicIdPartition} including all the given topic Ids based on their partition counts. - * - * @param topicIds Collection of topic Ids. - * @param subscribedTopicDescriber Describer to fetch partition counts for topics. - * - * @return Set of {@code TopicIdPartition} including all the provided topic Ids. - */ - private static Set<TopicIdPartition> topicIdPartitions( - Collection<Uuid> topicIds, - SubscribedTopicDescriber subscribedTopicDescriber - ) { - Set<TopicIdPartition> topicIdPartitions = new HashSet<>(); - for (Uuid topicId : topicIds) { - int numPartitions = subscribedTopicDescriber.numPartitions(topicId); - for (int partitionId = 0; partitionId < numPartitions; partitionId++) { - topicIdPartitions.add(new TopicIdPartition(topicId, partitionId)); - } - } - return topicIdPartitions; - } + int numPartitions = subscribedTopicDescriber.numPartitions(topicId); - /** - * This class represents a pair of member Ids involved in a partition reassignment. - * Each pair contains a source and a destination member Id. - * It normally corresponds to a particular partition or topic, and indicates that the particular partition or some - * partition of the particular topic was moved from the source member to the destination member during the rebalance. - */ - private static class MemberPair { - private final String srcMemberId; - private final String dstMemberId; + int[] previousPartitionOwners = previousAssignmentPartitionOwners.get(topicId); + int[] partitionOwners = targetAssignmentPartitionOwners.get(topicId); - MemberPair(String srcMemberId, String dstMemberId) { - this.srcMemberId = srcMemberId; - this.dstMemberId = dstMemberId; - } + // Try to return partitions to their previous owners if it improves balance. + reassignedPartitionCount += balanceTopicRestoringStickiness( + topicId, + numPartitions, + previousPartitionOwners, + partitionOwners + ); - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + ((this.srcMemberId == null) ? 0 : this.srcMemberId.hashCode()); - result = prime * result + ((this.dstMemberId == null) ? 0 : this.dstMemberId.hashCode()); - return result; + // Next, try to reassign partitions from the most loaded subscribers to the least loaded + // subscribers. We use a similar approach to unassignedPartitionsAssignment, except instead + // we are reassigning partitions from the most loaded subscribers to the least loaded + // subscribers. Once the difference in partition counts between the ranges is one or less, + // we are done, since reassigning partitions would not improve balance. + int imbalance = memberAssignmentBalancer.initialize(topicSubscribers.get(topicId)); + if (imbalance <= 1) { + // The topic is already balanced. + return reassignedPartitionCount; } - @Override - public boolean equals(Object obj) { - if (obj == null) - return false; - - if (!getClass().isInstance(obj)) - return false; - - MemberPair otherPair = (MemberPair) obj; - return this.srcMemberId.equals(otherPair.srcMemberId) && this.dstMemberId.equals(otherPair.dstMemberId); + // Initialize the array of partitions, with partitions owned by the same member grouped + // together. + partitions.clear(); + for (int partition = 0; partition < numPartitions; partition++) { + partitions.add(partition); } + partitions.sort( + Comparator + .comparingInt((Integer partition) -> partitionOwners[partition]) + .thenComparingInt(partition -> partition) + ); - @Override - public String toString() { - return "MemberPair(" + - "srcMemberId='" + srcMemberId + '\'' + - ", dstMemberId='" + dstMemberId + '\'' + - ')'; - } - } + // Initialize the ranges in the partitions list owned by members. + startPartitionIndices.clear(); + endPartitionIndices.clear(); - /** - * This class maintains some data structures to simplify lookup of partition movements among members. - * During a partition rebalance, it keeps track of partition movements corresponding to each topic, - * and also possible movement (in form a <code>MemberPair</code> object) for each partition. - */ - private static class PartitionMovements { - private final Map<Uuid, Map<MemberPair, Set<TopicIdPartition>>> partitionMovementsByTopic = new HashMap<>(); - private final Map<TopicIdPartition, MemberPair> partitionMovementsByPartition = new HashMap<>(); - - private MemberPair removeMovementRecordOfPartition(TopicIdPartition partition) { - MemberPair pair = partitionMovementsByPartition.remove(partition); - - Uuid topic = partition.topicId(); - Map<MemberPair, Set<TopicIdPartition>> partitionMovementsForThisTopic = partitionMovementsByTopic.get(topic); - partitionMovementsForThisTopic.get(pair).remove(partition); - if (partitionMovementsForThisTopic.get(pair).isEmpty()) - partitionMovementsForThisTopic.remove(pair); - if (partitionMovementsByTopic.get(topic).isEmpty()) - partitionMovementsByTopic.remove(topic); - - return pair; + for (int i = 0; i < numPartitions; i++) { + int partition = partitions.get(i); + int ownerIndex = partitionOwners[partition]; + if (startPartitionIndices.get(ownerIndex) == null) { + startPartitionIndices.put(ownerIndex, i); + } + endPartitionIndices.put(ownerIndex, i + 1); // endPartitionIndices is exclusive } - private void addPartitionMovementRecord(TopicIdPartition partition, MemberPair pair) { - partitionMovementsByPartition.put(partition, pair); - - Uuid topic = partition.topicId(); - if (!partitionMovementsByTopic.containsKey(topic)) - partitionMovementsByTopic.put(topic, new HashMap<>()); + // Redistribute partitions from the most loaded subscriber to the least loaded subscriber + // until the topic's subscribers are balanced. + while (!memberAssignmentBalancer.isBalanced()) { + // NB: The condition above does not do much. This loop will terminate due to the checks + // inside instead. - Map<MemberPair, Set<TopicIdPartition>> partitionMovementsForThisTopic = partitionMovementsByTopic.get(topic); - if (!partitionMovementsForThisTopic.containsKey(pair)) - partitionMovementsForThisTopic.put(pair, new HashSet<>()); + // First, choose a member from the most loaded range to reassign a partition from. - partitionMovementsForThisTopic.get(pair).add(partition); - } + // Loop until we find a member that has partitions to give up. + int mostLoadedMemberIndex = -1; + while (true) { + mostLoadedMemberIndex = memberAssignmentBalancer.nextMostLoadedMember(); - private void movePartition(TopicIdPartition partition, String oldOwner, String newOwner) { - MemberPair pair = new MemberPair(oldOwner, newOwner); - - if (partitionMovementsByPartition.containsKey(partition)) { - // This partition was previously moved. - MemberPair existingPair = removeMovementRecordOfPartition(partition); - if (existingPair.dstMemberId.equals(oldOwner)) { - throw new PartitionAssignorException("Mismatch in partition movement record with respect to " + - "partition ownership during a rebalance" - ); - } - if (!existingPair.srcMemberId.equals(newOwner)) { - // The partition is not moving back to its previous member. - addPartitionMovementRecord(partition, new MemberPair(existingPair.srcMemberId, newOwner)); + if (mostLoadedMemberIndex == -1) { + // There are no more members with partitions to give up. + // We need to break out of two loops here. + break; } - } else - addPartitionMovementRecord(partition, pair); - } - /** - * Computes the actual partition to be moved based on the current and proposed partition owners. - * This method determines the appropriate partition movement, considering existing partition movements - * and constraints within a topic. - * - * @param partition The {@link TopicIdPartition} object representing the partition to be moved. - * @param oldOwner The memberId of the current owner of the partition. - * @param newOwner The memberId of the proposed new owner of the partition. - * @return The {@link TopicIdPartition} that should be moved, based on existing movement patterns - * and ownership. Returns the original partition if no specific movement pattern applies. - * @throws PartitionAssignorException if the old owner does not match the expected value for the partition. - */ - private TopicIdPartition computeActualPartitionToBeMoved( - TopicIdPartition partition, - String oldOwner, - String newOwner - ) { - Uuid topic = partition.topicId(); - - if (!partitionMovementsByTopic.containsKey(topic)) - return partition; - - if (partitionMovementsByPartition.containsKey(partition)) { - String expectedOldOwner = partitionMovementsByPartition.get(partition).dstMemberId; - if (!oldOwner.equals(expectedOldOwner)) { - throw new PartitionAssignorException("Old owner does not match expected value for partition: " + partition); + if (!endPartitionIndices.containsKey(mostLoadedMemberIndex) || + endPartitionIndices.get(mostLoadedMemberIndex) - startPartitionIndices.get(mostLoadedMemberIndex) <= 0) { + memberAssignmentBalancer.excludeMostLoadedMember(); + continue; } - oldOwner = partitionMovementsByPartition.get(partition).srcMemberId; - } - - Map<MemberPair, Set<TopicIdPartition>> partitionMovementsForThisTopic = partitionMovementsByTopic.get(topic); - MemberPair reversePair = new MemberPair(newOwner, oldOwner); - if (!partitionMovementsForThisTopic.containsKey(reversePair)) - return partition; - - return partitionMovementsForThisTopic.get(reversePair).iterator().next(); - } - } - - /** - * Manages assignments to members based on their current assignment size and maximum allowed assignment size. - */ - private class AssignmentManager { - private final Map<String, MemberAssignmentData> membersWithAssignmentSizes = new HashMap<>(); - /** - * Represents the assignment metadata for a member. - */ - private class MemberAssignmentData { - final String memberId; - int currentAssignmentSize = 0; - int maxAssignmentSize; - - /** - * Constructs a MemberAssignmentData with the given member Id. - * - * @param memberId The Id of the member. - */ - MemberAssignmentData(String memberId) { - this.memberId = memberId; + break; } - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - MemberAssignmentData that = (MemberAssignmentData) o; - return memberId.equals(that.memberId); + if (mostLoadedMemberIndex == -1) { + // There are no more members with partitions to give up. + break; } - @Override - public int hashCode() { - return Objects.hash(memberId); - } + // We've picked a member. Now pick its last partition to reassign. + int partition = partitions.get(endPartitionIndices.get(mostLoadedMemberIndex) - 1); + endPartitionIndices.put(mostLoadedMemberIndex, endPartitionIndices.get(mostLoadedMemberIndex) - 1); - @Override - public String toString() { - return "MemberAssignmentData(" + - "memberId='" + memberId + '\'' + - ", currentAssignmentSize=" + currentAssignmentSize + - ", maxAssignmentSize=" + maxAssignmentSize + - ')'; + // Find the least loaded subscriber. + int leastLoadedMemberIndex = memberAssignmentBalancer.nextLeastLoadedMember(); + + // If the most and least loaded subscribers came from member ranges with partition + // counts that differ by one or less, then the members are balanced and we are done. + if (memberAssignmentBalancer.isBalanced()) { + // The topic is balanced. + break; } - } - /** - * Initializes an AssignmentManager, setting up the necessary data structures. - */ - public AssignmentManager( - SubscribedTopicDescriber subscribedTopicDescriber - ) { - groupSpec.memberIds().forEach(memberId -> { - int maxSize = groupSpec.memberSubscription(memberId).subscribedTopicIds().stream() - .mapToInt(subscribedTopicDescriber::numPartitions) - .sum(); - - MemberAssignmentData memberAssignmentData = membersWithAssignmentSizes - .computeIfAbsent(memberId, MemberAssignmentData::new); - memberAssignmentData.maxAssignmentSize = maxSize; - memberAssignmentData.currentAssignmentSize = 0; - }); + // Reassign the partition. + assignPartition(topicId, partition, leastLoadedMemberIndex); + reassignedPartitionCount++; } - /** - * @param memberId The member Id. - * @return The current assignment size for the given member. - */ - private int targetAssignmentSize(String memberId) { - MemberAssignmentData memberData = this.membersWithAssignmentSizes.get(memberId); - if (memberData == null) { - LOG.warn("Member Id {} not found", memberId); - return 0; - } - return memberData.currentAssignmentSize; - } + return reassignedPartitionCount; + } - /** - * @param memberId The member Id. - * @return The maximum assignment size for the given member. - */ - private int maxAssignmentSize(String memberId) { - MemberAssignmentData memberData = this.membersWithAssignmentSizes.get(memberId); - if (memberData == null) { - LOG.warn("Member Id {} not found", memberId); - return 0; + /** + * Restores topic partitions to their previous owners if it improves balance. + * + * @param topicId The topic to rebalance. + * @param numPartitions The number of partitions in the topic. + * @param previousPartitionOwners The previous owners of the partitions. + * @param partitionOwners The current owners of the partitions. + * @return the number of partitions reassigned. + */ + private int balanceTopicRestoringStickiness( Review Comment: This step is a rough translation of something the previous implementation did. The only time it does anything is if a previous balancing iteration moved partitions away from a member and now wants to move them back. For this to happen, subscriptions must be overlapping and non-equal. eg. Consider a case with 2 topics and 3 consumers, where topic 1 is subscribed to by consumer 1 and 2 and topic 2 is subscribed to by consumer 2 and 3. Let's say that consumer 1 has 10 partitions, 2 has 15 and 3 has 5. We might balance topic 1 first and move partitions from consumer 2 -> 1, so that they both have 12-13 partitions. Then we might balance topic 2 and move partitions from consumer 2 -> 3, so that they both have 9 partitions. The next iteration on topic 1 would move partitions back from consumer 1 -> 2. Since subscription patterns like these are uncommon, I'm actually tempted to remove this step. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilder.java: ########## @@ -157,716 +240,779 @@ public GroupAssignment build() { assignStickyPartitions(); - unassignedPartitionsAssignment(); + computeUnassignedPartitions(unassignedPartitions); + unassignedPartitionsAssignment(unassignedPartitions); balance(); return new GroupAssignment(targetAssignment); } /** - * <li> TopicIdPartitions are sorted in descending order based on the value: - * totalPartitions/number of subscribed members. </li> - * <li> If the above value is the same then topicIdPartitions are sorted in - * ascending order of number of subscribers. </li> - * <li> If both criteria are the same, sort in ascending order of the partition Id. - * This last criteria is for predictability of the assignments. </li> + * Sorts topics based on the following criteria: + * <ol> + * <li>Topics are sorted in descending order of totalPartitions / number of subscribers + * (partitions per subscriber).</li> + * <li>Ties are broken by sorting in ascending order of number of subscribers.</li> + * <li>Any remaining ties are broken by sorting in ascending order of topic id.</li> + * </ol> + * + * The last criteria is for predictability of assignments. * - * @param topicIdPartitions The topic partitions that need to be sorted. - * @return A list of sorted topic partitions. + * @param topicIds The topic ids that need to be sorted. + * @return A list of sorted topic ids. */ - private List<TopicIdPartition> sortTopicIdPartitions(Collection<TopicIdPartition> topicIdPartitions) { - Comparator<TopicIdPartition> comparator = Comparator - .comparingDouble((TopicIdPartition topicIdPartition) -> { - int totalPartitions = subscribedTopicDescriber.numPartitions(topicIdPartition.topicId()); - int totalSubscribers = membersPerTopic.get(topicIdPartition.topicId()).size(); - return (double) totalPartitions / totalSubscribers; - }) - .reversed() - .thenComparingInt(topicIdPartition -> membersPerTopic.get(topicIdPartition.topicId()).size()) - .thenComparingInt(TopicIdPartition::partitionId); + private List<Uuid> sortTopicIds(Collection<Uuid> topicIds) { + Comparator<Uuid> comparator = new Comparator<Uuid>() { + @Override + public int compare(final Uuid topic1Id, final Uuid topic2Id) { + int topic1PartitionCount = subscribedTopicDescriber.numPartitions(topic1Id); + int topic2PartitionCount = subscribedTopicDescriber.numPartitions(topic2Id); + int topic1SubscriberCount = topicSubscribers.get(topic1Id).size(); + int topic2SubscriberCount = topicSubscribers.get(topic2Id).size(); + + // Order by partitions per subscriber, descending. + int order = Double.compare( + (double) topic2PartitionCount / topic2SubscriberCount, + (double) topic1PartitionCount / topic1SubscriberCount + ); + + // Then order by subscriber count, ascending. + if (order == 0) { + order = Integer.compare(topic1SubscriberCount, topic2SubscriberCount); + } - return topicIdPartitions.stream() - .sorted(comparator) - .collect(Collectors.toList()); + // Then order by topic id, ascending. + if (order == 0) { + order = topic1Id.compareTo(topic2Id); + } + + return order; + } + }; + + List<Uuid> sortedTopicIds = new ArrayList<>(topicIds); + sortedTopicIds.sort(comparator); + return sortedTopicIds; } /** - * Gets a set of partitions that are to be retained from the existing assignment. This includes: - * <li> Partitions from topics that are still present in both the new subscriptions and the topic metadata. </li> + * Imports the existing assignment into the target assignment, taking into account topic + * unsubscriptions. */ private void assignStickyPartitions() { - groupSpec.memberIds().forEach(memberId -> - groupSpec.memberAssignment(memberId).partitions().forEach((topicId, currentAssignment) -> { + for (String memberId : groupSpec.memberIds()) { + int memberIndex = memberIndices.get(memberId); + + Map<Uuid, Set<Integer>> oldAssignment = groupSpec.memberAssignment(memberId).partitions(); + Map<Uuid, Set<Integer>> newAssignment = null; + + for (Map.Entry<Uuid, Set<Integer>> entry : oldAssignment.entrySet()) { + Uuid topicId = entry.getKey(); + Set<Integer> partitions = entry.getValue(); + if (groupSpec.memberSubscription(memberId).subscribedTopicIds().contains(topicId)) { - currentAssignment.forEach(partition -> { - TopicIdPartition topicIdPartition = new TopicIdPartition(topicId, partition); - assignmentManager.addPartitionToTargetAssignment(topicIdPartition, memberId); - assignedStickyPartitions.add(topicIdPartition); - }); + memberTargetAssignmentSizes[memberIndex] += partitions.size(); + + int numPartitions = subscribedTopicDescriber.numPartitions(topicId); + for (int partition : partitions) { + if (partition >= numPartitions) { + LOG.warn( + "Previous assignment for {} contains {}:{}, but topic only has {} partitions", + memberId, + topicId, + partition, + numPartitions + ); + + if (newAssignment == null) { + // If the new assignment is null, we create a deep copy of the + // original assignment so that we can alter it. + newAssignment = deepCopy(oldAssignment); + } + Set<Integer> newPartitions = newAssignment.get(topicId); + newPartitions.remove(partition); + if (newPartitions.isEmpty()) { + newAssignment.remove(topicId); + } + memberTargetAssignmentSizes[memberIndex]--; + + continue; + } + + previousAssignmentPartitionOwners.get(topicId)[partition] = memberIndex; + targetAssignmentPartitionOwners.get(topicId)[partition] = memberIndex; + } } else { + // The member is no longer subscribed to the topic. Remove it from the + // assignment. LOG.debug("The topic " + topicId + " is no longer present in the subscribed topics list"); + + if (newAssignment == null) { + // If the new assignment is null, we create a deep copy of the + // original assignment so that we can alter it. + newAssignment = deepCopy(oldAssignment); + } + // Remove the entire topic. + newAssignment.remove(topicId); } - }) - ); + } + + if (newAssignment == null) { + newAssignment = oldAssignment; + } + targetAssignment.put(memberId, new MemberAssignmentImpl(newAssignment)); + } } /** - * Allocates the remaining unassigned partitions to members in a balanced manner. - * <li> Partitions are sorted to maximize the probability of a balanced assignment. </li> - * <li> Sort members in ascending order of their current target assignment sizes - * to ensure the least filled member gets the partition first. </li> + * A class that holds the state for the balancing process. + * <p/> + * Keeps track of two ranges of members: the members with the least number of assigned + * partitions and the members with the most number of assigned partitions. Within each range, + * all members start with the same number of assigned partitions. */ - private void unassignedPartitionsAssignment() { - List<TopicIdPartition> sortedPartitions = sortTopicIdPartitions(unassignedPartitions); + private final class MemberAssignmentBalancer { + private final int[] memberTargetAssignmentSizes; - for (TopicIdPartition partition : sortedPartitions) { - TreeSet<String> sortedMembers = assignmentManager.sortMembersByAssignmentSize( - membersPerTopic.get(partition.topicId()) - ); + /** + * The members to balance, sorted by number of assigned partitions. + * <p/> + * Can be visualized like so: + * <pre> + * ^ + * | # + * partition | ### + * count | ### + * | ########## + * +-----------> + * members + * </pre> + */ + private final List<Integer> sortedMembers; - for (String member : sortedMembers) { - if (assignmentManager.maybeAssignPartitionToMember(partition, member)) { - break; + /** + * [0, leastLoadedRangeEnd) represents the range of members with the smallest partition + * count. + */ + private int leastLoadedRangeEnd = 0; // exclusive + + /** + * The partition count of each member in the least loaded range before we start assigning + * partitions to them. + */ + private int leastLoadedRangePartitionCount = -1; + + /** + * The index of the next element in sortedMembers to which to assign the next partition. + */ + private int nextLeastLoadedMember = 0; + + /** + * [mostLoadedRangeStart, mostLoadedRangeEnd) is the range of members with the largest + * partition count. + */ + private int mostLoadedRangeStart = 0; // inclusive + + /** + * [mostLoadedRangeStart, mostLoadedRangeEnd) is the range of members with the largest + * partition count. + * <p/> + * mostLoadedRangeEnd is not simply the end of the member list, since we need to be able to + * exclude heavily loaded members which do not have any partitions to give up. + */ + private int mostLoadedRangeEnd = 0; // inclusive + + /** + * The partition count of each member in the most loaded range before we start unassigning + * partitions from them. + */ + private int mostLoadedRangePartitionCount = 1; + + /** + * The index of the next element in sortedMembers from which to unassign the next partition. + */ + private int nextMostLoadedMember = -1; + + // The ranges can be visualized like so: + // + // most + // loaded + // ^ least range + // | loaded # + // | range [##<- ]## + // partition | ##[#######]## + // count | [...-> ]##[#######]## + // | [#######]##[#######]## + // +-----------------------> + // members + // ^^ + // These members have no partitions to give up. + // + // The least loaded range expands rightwards and the most loaded range expands leftwards. + // Note that we are not done once the ranges touch, but rather when the partition counts in + // the ranges differ by one or less. + + public MemberAssignmentBalancer() { + this.memberTargetAssignmentSizes = UniformHeterogeneousAssignmentBuilder.this.memberTargetAssignmentSizes; + this.sortedMembers = new ArrayList<>(memberTargetAssignmentSizes.length); + } + + /** + * Resets the state of the balancer for a new set of members. + * + * @param members The members to balance. + * @return The difference in partition counts between the most and least loaded members. + */ + public int initialize(List<Integer> members) { + if (members.size() < 1) { + // nextLeastLoadedMember needs at least one member. + throw new IllegalArgumentException("Cannot balance an empty subscriber list."); + } + + // Build a sorted list of subscribers, where members with the smallest partition count + // appear first. + sortedMembers.clear(); + sortedMembers.addAll(members); + sortedMembers.sort(memberComparator); + + leastLoadedRangeEnd = 0; // exclusive + leastLoadedRangePartitionCount = memberTargetAssignmentSizes[sortedMembers.get(0)] - 1; + nextLeastLoadedMember = 0; + + mostLoadedRangeStart = sortedMembers.size(); // inclusive + mostLoadedRangeEnd = sortedMembers.size(); // exclusive + mostLoadedRangePartitionCount = memberTargetAssignmentSizes[sortedMembers.get(sortedMembers.size() - 1)] + 1; + nextMostLoadedMember = sortedMembers.size() - 1; + + return memberTargetAssignmentSizes[sortedMembers.get(sortedMembers.size() - 1)] - + memberTargetAssignmentSizes[sortedMembers.get(0)]; + } + + /** + * Gets the least loaded member to which to assign a partition. + * <p/> + * Advances the state of the balancer by assuming that an additional partition has been + * assigned to the returned member. + * <p/> + * Assumes that there is at least one member. + * + * @return The least loaded member to which to assign a partition + */ + public int nextLeastLoadedMember() { + // To generate a balanced assignment, we assign partitions to members in the + // [0, leastLoadedRangeEnd) range first. Once each member in the range has received a + // partition, the partition count of the range rises by one and we can try to expand it. + // Range full, Range full, + // bump level by one bump level by one, + // can't expand range. expand range. + // ^ ^ ^ + // | # | # | [..-> ]# + // partition | ### -> | [..-> ]### -> | [.......##]# + // count | [..-> ]### | [.......]### | [.......##]# + // | [#######]### | [#######]### | [#########]# + // +-------------> +-------------> +------------> + // members members members + + if (nextLeastLoadedMember >= leastLoadedRangeEnd) { + // We've hit the end of the range. Bump its level and try to expand it. + leastLoadedRangePartitionCount++; + + // Expand the range. + while (leastLoadedRangeEnd < sortedMembers.size() && + memberTargetAssignmentSizes[sortedMembers.get(leastLoadedRangeEnd)] == leastLoadedRangePartitionCount) { + leastLoadedRangeEnd++; } + + // Reset the pointer to the start of the range of members with identical + // partition counts. + nextLeastLoadedMember = 0; } + + int leastLoadedMemberIndex = sortedMembers.get(nextLeastLoadedMember); + nextLeastLoadedMember++; + + return leastLoadedMemberIndex; } - } - /** - * If a topic has two or more potential members it is subject to reassignment. - * - * @return true if the topic can participate in reassignment, false otherwise. - */ - private boolean canTopicParticipateInReassignment(Uuid topicId) { - return membersPerTopic.get(topicId).size() >= 2; + /** + * Gets the most loaded member from which to reassign a partition. + * <p/> + * Advances the state of the balancer by assuming that a partition has been unassigned from + * the returned member. + * + * @return The most loaded member from which to reassign a partition, or -1 if no such + * member exists. + */ + public int nextMostLoadedMember() { + if (nextMostLoadedMember < mostLoadedRangeStart) { + if (mostLoadedRangeEnd <= mostLoadedRangeStart && + mostLoadedRangeStart > 0) { + // The range is empty due to calls to excludeMostLoadedMember(). We risk not + // expanding the range below and returning a member outside the range. Ensure + // that we always expand the range below by resetting the partition count. + mostLoadedRangePartitionCount = memberTargetAssignmentSizes[sortedMembers.get(mostLoadedRangeStart - 1)]; + } else { + mostLoadedRangePartitionCount--; + } + + // Expand the range. + while (mostLoadedRangeStart > 0 && + memberTargetAssignmentSizes[sortedMembers.get(mostLoadedRangeStart - 1)] == mostLoadedRangePartitionCount) { + mostLoadedRangeStart--; + } + + // Reset the pointer to the end of the range of members with identical partition + // counts. + nextMostLoadedMember = mostLoadedRangeEnd - 1; + } + + if (nextMostLoadedMember < 0) { + // The range is empty due to excludeMostLoadedMember() calls and there are no more + // members that can give up partitions. + return -1; + } + + int mostLoadedMemberIndex = sortedMembers.get(nextMostLoadedMember); + nextMostLoadedMember--; + + return mostLoadedMemberIndex; + } + + /** + * Excludes the last member returned from nextMostLoadedMember from the most loaded range. + * + * Must not be called if nextMostLoadedMember has not been called yet or returned -1. + */ + public void excludeMostLoadedMember() { + // Kick the member out of the most loaded range by swapping with the member at the end + // of the range and contracting the end of the range. + int mostLoadedMemberIndex = sortedMembers.get(nextMostLoadedMember + 1); + sortedMembers.set(nextMostLoadedMember + 1, sortedMembers.get(mostLoadedRangeEnd - 1)); + sortedMembers.set(mostLoadedRangeEnd - 1, mostLoadedMemberIndex); + mostLoadedRangeEnd--; + } + + /** + * Checks whether the members are balanced based on the partition counts of the least and + * most loaded ranges. + */ + public boolean isBalanced() { + return mostLoadedRangePartitionCount - leastLoadedRangePartitionCount <= 1; + } } /** - * If a member is not assigned all its potential partitions it is subject to reassignment. - * If any of the partitions assigned to a member is subject to reassignment, the member itself - * is subject to reassignment. + * Allocates the remaining unassigned partitions to members in a balanced manner. + * <ol> + * <li>Topics are sorted to maximize the probability of a balanced assignment.</li> + * <li>Unassigned partitions within each topic are distributed to the least loaded + * subscribers, repeatedly.</li> + * </ol> * - * @return true if the member can participate in reassignment, false otherwise. + * @param partitions The partitions to be assigned. */ - private boolean canMemberParticipateInReassignment(String memberId) { - Set<Uuid> assignedTopicIds = targetAssignment.get(memberId).partitions().keySet(); + private void unassignedPartitionsAssignment(Map<Uuid, List<Integer>> partitions) { + List<Uuid> sortedTopicIds = sortTopicIds(partitions.keySet()); - int currentAssignmentSize = assignmentManager.targetAssignmentSize(memberId); - int maxAssignmentSize = assignmentManager.maxAssignmentSize(memberId); + MemberAssignmentBalancer memberAssignmentBalancer = new MemberAssignmentBalancer(); - if (currentAssignmentSize > maxAssignmentSize) - LOG.error("The member {} is assigned more partitions than the maximum possible.", memberId); + for (Uuid topicId : sortedTopicIds) { + memberAssignmentBalancer.initialize(topicSubscribers.get(topicId)); - if (currentAssignmentSize < maxAssignmentSize) - return true; + for (int partition : partitions.get(topicId)) { + int leastLoadedMemberIndex = memberAssignmentBalancer.nextLeastLoadedMember(); - for (Uuid topicId : assignedTopicIds) { - if (canTopicParticipateInReassignment(topicId)) - return true; + assignPartition(topicId, partition, leastLoadedMemberIndex); + } } - return false; } /** - * Checks if the current assignments of partitions to members is balanced. - * - * Balance is determined by first checking if the difference in the number of partitions assigned - * to any two members is one. If this is not true, it verifies that no member can - * receive additional partitions without disrupting the balance. + * If a topic has two or more potential members it is subject to reassignment. * - * @return true if the assignment is balanced, false otherwise. + * @return true if the topic can participate in reassignment, false otherwise. */ - - private boolean isBalanced() { - int min = assignmentManager.targetAssignmentSize(sortedMembersByAssignmentSize.first()); - int max = assignmentManager.targetAssignmentSize(sortedMembersByAssignmentSize.last()); - - // If minimum and maximum numbers of partitions assigned to members differ by at most one return true. - if (min >= max - 1) - return true; - - // Ensure that members without a complete set of topic partitions cannot receive any additional partitions. - // This maintains balance. Start by checking members with the fewest assigned partitions to see if they can take more. - for (String member : sortedMembersByAssignmentSize) { - int memberPartitionCount = assignmentManager.targetAssignmentSize(member); - - // Skip if this member already has all the topic partitions it can get. - int maxAssignmentSize = assignmentManager.maxAssignmentSize(member); - if (memberPartitionCount == maxAssignmentSize) - continue; - - // Otherwise make sure it cannot get any more partitions. - for (Uuid topicId : groupSpec.memberSubscription(member).subscribedTopicIds()) { - Set<Integer> assignedPartitions = targetAssignment.get(member).partitions().get(topicId); - for (int i = 0; i < subscribedTopicDescriber.numPartitions(topicId); i++) { - TopicIdPartition topicIdPartition = new TopicIdPartition(topicId, i); - if (assignedPartitions == null || !assignedPartitions.contains(i)) { - String otherMember = partitionOwnerInTargetAssignment.get(topicIdPartition); - int otherMemberPartitionCount = assignmentManager.targetAssignmentSize(otherMember); - if (memberPartitionCount + 1 < otherMemberPartitionCount) { - LOG.debug("{} can be moved from member {} to member {} for a more balanced assignment.", - topicIdPartition, otherMember, member); - return false; - } - } - } - } - } - return true; + private boolean canTopicParticipateInReassignment(Uuid topicId) { + return topicSubscribers.get(topicId).size() >= 2; } /** * Balance the current assignment after the initial round of assignments have completed. */ private void balance() { - if (!unassignedPartitions.isEmpty()) - throw new PartitionAssignorException("Some partitions were left unassigned"); - // Refill unassigned partitions with all the topicId partitions. - unassignedPartitions.addAll(topicIdPartitions(subscribedTopicIds, subscribedTopicDescriber)); + Set<Uuid> topicIds = new HashSet<>(subscribedTopicIds); - // Narrow down the reassignment scope to only those partitions that can actually be reassigned. - Set<TopicIdPartition> fixedPartitions = new HashSet<>(); + // Narrow down the reassignment scope to only those topics that can actually be reassigned. for (Uuid topicId : subscribedTopicIds) { if (!canTopicParticipateInReassignment(topicId)) { - for (int i = 0; i < subscribedTopicDescriber.numPartitions(topicId); i++) { - fixedPartitions.add(new TopicIdPartition(topicId, i)); - } - } - } - unassignedPartitions.removeAll(fixedPartitions); - - // Narrow down the reassignment scope to only those members that are subject to reassignment. - for (String member : groupSpec.memberIds()) { - if (!canMemberParticipateInReassignment(member)) { - sortedMembersByAssignmentSize.remove(member); + topicIds.remove(topicId); } } // If all the partitions are fixed i.e. unassigned partitions is empty there is no point of re-balancing. - if (!unassignedPartitions.isEmpty()) performReassignments(); + if (!topicIds.isEmpty()) balanceTopics(topicIds); } /** * Performs reassignments of partitions to balance the load across members. * This method iteratively reassigns partitions until no further moves can improve the balance. + * <p/> + * The method loops over the topics repeatedly until an entire loop around the topics has + * completed without any reassignments, or we hit an iteration limit. + * <p/> + * This method produces perfectly balanced assignments when all subscribers of a topic have the + * same subscriptions. However, when subscribers have overlapping, non-identical subscriptions, + * the method produces almost-balanced assignments, assuming the iteration limit is not hit. + * eg. if there are three members and two topics and members 1 and 2 are subscribed to the first + * topic and members 2 and 3 are subscribed to the second topic, we can end up with an + * assignment like: + * <ul> + * <li>Member 1: 9 partitions</li> + * <li>Member 2: 10 partitions</li> + * <li>Member 3: 11 partitions</li> + * </ul> * - * The method uses a do-while loop to ensure at least one pass over the partitions and continues - * reassigning as long as there are modifications to the current assignments. It checks for balance - * after each reassignment and exits if the balance is achieved. + * In this assignment, the subscribers of the first topic have a difference in partitions of 1, + * so the topic is considered balanced. The same applies to the second topic. However, balance + * can be improved by moving a partition from the second topic from member 3 to member 2 and a + * partition from the first topic from member 2 to member 1. * - * @throws PartitionAssignorException if there are inconsistencies in expected members per partition - * or if a partition is expected to already be assigned but isn't. + * @param topicIds The topics to consider for reassignment. These topics must have at + * least two subscribers. */ - private void performReassignments() { - boolean modified; - boolean reassignmentOccurred; - // Repeat reassignment until no partition can be moved to improve the balance. - do { - modified = false; - reassignmentOccurred = false; - // Reassign all reassignable partitions sorted in descending order - // by totalPartitions/number of subscribed members, - // until the full list is processed or a balance is achieved. - List<TopicIdPartition> reassignablePartitions = sortTopicIdPartitions(unassignedPartitions); - - for (TopicIdPartition reassignablePartition : reassignablePartitions) { - // Only check if there is any change in balance if any moves were made. - if (reassignmentOccurred && isBalanced()) { + private void balanceTopics(Collection<Uuid> topicIds) { + List<Uuid> sortedTopicIds = sortTopicIds(topicIds); + // The index of the last topic in sortedTopicIds that was rebalanced. + // Used to decide when to exit early. + int lastRebalanceTopicIndex = -1; + + MemberAssignmentBalancer memberAssignmentBalancer = new MemberAssignmentBalancer(); + + // An array of partitions, with partitions owned by the same member grouped together. + List<Integer> partitions = new ArrayList<>(); + + // The ranges in the partitions list assigned to members. + // Maintaining these ranges allows for constant time, deterministic picking of partitions + // owned by a given member. + Map<Integer, Integer> startPartitionIndices = new HashMap<>(); + Map<Integer, Integer> endPartitionIndices = new HashMap<>(); // exclusive + + // Repeat reassignment until no partition can be moved to improve the balance or we hit an + // iteration limit. + for (int i = 0; i < 16; i++) { + for (int topicIndex = 0; topicIndex < sortedTopicIds.size(); topicIndex++) { + if (topicIndex == lastRebalanceTopicIndex) { + // The last rebalanced topic was this one, which means we've gone through all + // the topics and didn't perform any additional rebalancing. Don't bother trying + // to rebalance this topic again and exit. return; } - reassignmentOccurred = false; - // The topicIdPartition must have at least two members. - if (membersPerTopic.get(reassignablePartition.topicId()).size() <= 1) - throw new PartitionAssignorException(String.format("Expected more than one potential member for " + - "topicIdPartition '%s'", reassignablePartition) - ); + Uuid topicId = sortedTopicIds.get(topicIndex); - // The topicIdPartition must have a current target owner. - String currentTargetOwner = partitionOwnerInTargetAssignment.get(reassignablePartition); - if (currentTargetOwner == null) - throw new PartitionAssignorException(String.format("Expected topicIdPartition '%s' to be assigned " + - "to a member", reassignablePartition) - ); + int reassignedPartitionCount = balanceTopic( + topicId, + memberAssignmentBalancer, + partitions, + startPartitionIndices, + endPartitionIndices + ); - for (String otherMember : membersPerTopic.get(reassignablePartition.topicId())) { - if (assignmentManager.targetAssignmentSize(currentTargetOwner) > assignmentManager.targetAssignmentSize(otherMember) + 1) { - reassignPartition(reassignablePartition); - modified = true; - reassignmentOccurred = true; - break; - } + if (reassignedPartitionCount > 0 || + lastRebalanceTopicIndex == -1) { + lastRebalanceTopicIndex = topicIndex; } } - } while (modified); - } - - /** - * Reassigns a partition to an eligible member with the fewest current target assignments. - * <ul> - * <li> Iterates over members sorted by ascending assignment size. </li> - * <li> Selects the first member subscribed to the partition's topic. </li> - * </ul> - * - * @param partition The partition to reassign. - * @throws AssertionError If no subscribed member is found. - */ - private void reassignPartition(TopicIdPartition partition) { - // Find the new member with the least assignment size. - String newOwner = null; - for (String anotherMember : sortedMembersByAssignmentSize) { - if (groupSpec.memberSubscription(anotherMember).subscribedTopicIds().contains(partition.topicId())) { - newOwner = anotherMember; - break; - } - } - - if (newOwner == null) { - throw new PartitionAssignorException("No suitable new owner was found for the partition" + partition); } - - reassignPartition(partition, newOwner); } /** - * Reassigns the given partition to a new member while considering partition movements and stickiness. - * <p> - * This method performs the following actions: - * <ol> - * <li> Determines the current owner of the partition. </li> - * <li> Identifies the correct partition to move, adhering to stickiness constraints. </li> - * <li> Processes the partition movement to the new member. </li> - * </ol> + * Performs reassignments of a topic's partitions to balance the load across its subscribers. * - * @param partition The {@link TopicIdPartition} to be reassigned. - * @param newMember The Id of the member to which the partition should be reassigned. + * @param topicId The topic to consider for reassignment. The topic must have + * at least two subscribers. + * @param memberAssignmentBalancer A MemberAssignmentBalancer to hold the state of the balancing + * algorithm. + * subscribers in ascending order of total assigned partitions. + * @param partitions A list for the balancing algorithm to store the topic's + * partitions, with partitions owned by the same member grouped + * together. + * @param startPartitionIndices A map for the balancing algorithm to store the ranges in the + * partitions list assigned to members. These ranges allows for + * constant time, deterministic picking of partitions owned by a + * given member. + * @param endPartitionIndices A map for the balancing algorithm to store the ranges in the + * partitions list assigned to members. These ranges allows for + * constant time, deterministic picking of partitions owned by a + * given member. + * @return the number of partitions reassigned. */ - private void reassignPartition(TopicIdPartition partition, String newMember) { - String member = partitionOwnerInTargetAssignment.get(partition); - // Find the correct partition movement considering the stickiness requirement. - TopicIdPartition partitionToBeMoved = partitionMovements.computeActualPartitionToBeMoved( - partition, - member, - newMember - ); - processPartitionMovement(partitionToBeMoved, newMember); - } - - private void processPartitionMovement(TopicIdPartition topicIdPartition, String newMember) { - String oldMember = partitionOwnerInTargetAssignment.get(topicIdPartition); - - partitionMovements.movePartition(topicIdPartition, oldMember, newMember); - - assignmentManager.removePartitionFromTargetAssignment(topicIdPartition, oldMember); - assignmentManager.addPartitionToTargetAssignment(topicIdPartition, newMember); - } - - /** - * Adds the topic's partition to the member's target assignment. - */ - private static void addPartitionToAssignment( - Map<String, MemberAssignment> memberAssignments, - String memberId, + private int balanceTopic( Uuid topicId, - int partition + MemberAssignmentBalancer memberAssignmentBalancer, + List<Integer> partitions, + Map<Integer, Integer> startPartitionIndices, + Map<Integer, Integer> endPartitionIndices // exclusive ) { - memberAssignments.get(memberId) - .partitions() - .computeIfAbsent(topicId, __ -> new HashSet<>()) - .add(partition); - } + int reassignedPartitionCount = 0; - /** - * Constructs a set of {@code TopicIdPartition} including all the given topic Ids based on their partition counts. - * - * @param topicIds Collection of topic Ids. - * @param subscribedTopicDescriber Describer to fetch partition counts for topics. - * - * @return Set of {@code TopicIdPartition} including all the provided topic Ids. - */ - private static Set<TopicIdPartition> topicIdPartitions( - Collection<Uuid> topicIds, - SubscribedTopicDescriber subscribedTopicDescriber - ) { - Set<TopicIdPartition> topicIdPartitions = new HashSet<>(); - for (Uuid topicId : topicIds) { - int numPartitions = subscribedTopicDescriber.numPartitions(topicId); - for (int partitionId = 0; partitionId < numPartitions; partitionId++) { - topicIdPartitions.add(new TopicIdPartition(topicId, partitionId)); - } - } - return topicIdPartitions; - } + int numPartitions = subscribedTopicDescriber.numPartitions(topicId); - /** - * This class represents a pair of member Ids involved in a partition reassignment. - * Each pair contains a source and a destination member Id. - * It normally corresponds to a particular partition or topic, and indicates that the particular partition or some - * partition of the particular topic was moved from the source member to the destination member during the rebalance. - */ - private static class MemberPair { - private final String srcMemberId; - private final String dstMemberId; + int[] previousPartitionOwners = previousAssignmentPartitionOwners.get(topicId); + int[] partitionOwners = targetAssignmentPartitionOwners.get(topicId); - MemberPair(String srcMemberId, String dstMemberId) { - this.srcMemberId = srcMemberId; - this.dstMemberId = dstMemberId; - } + // Try to return partitions to their previous owners if it improves balance. + reassignedPartitionCount += balanceTopicRestoringStickiness( + topicId, + numPartitions, + previousPartitionOwners, + partitionOwners + ); - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + ((this.srcMemberId == null) ? 0 : this.srcMemberId.hashCode()); - result = prime * result + ((this.dstMemberId == null) ? 0 : this.dstMemberId.hashCode()); - return result; + // Next, try to reassign partitions from the most loaded subscribers to the least loaded + // subscribers. We use a similar approach to unassignedPartitionsAssignment, except instead + // we are reassigning partitions from the most loaded subscribers to the least loaded + // subscribers. Once the difference in partition counts between the ranges is one or less, + // we are done, since reassigning partitions would not improve balance. + int imbalance = memberAssignmentBalancer.initialize(topicSubscribers.get(topicId)); + if (imbalance <= 1) { + // The topic is already balanced. + return reassignedPartitionCount; } - @Override - public boolean equals(Object obj) { - if (obj == null) - return false; - - if (!getClass().isInstance(obj)) - return false; - - MemberPair otherPair = (MemberPair) obj; - return this.srcMemberId.equals(otherPair.srcMemberId) && this.dstMemberId.equals(otherPair.dstMemberId); + // Initialize the array of partitions, with partitions owned by the same member grouped + // together. + partitions.clear(); + for (int partition = 0; partition < numPartitions; partition++) { + partitions.add(partition); } + partitions.sort( + Comparator + .comparingInt((Integer partition) -> partitionOwners[partition]) + .thenComparingInt(partition -> partition) + ); - @Override - public String toString() { - return "MemberPair(" + - "srcMemberId='" + srcMemberId + '\'' + - ", dstMemberId='" + dstMemberId + '\'' + - ')'; - } - } + // Initialize the ranges in the partitions list owned by members. + startPartitionIndices.clear(); + endPartitionIndices.clear(); - /** - * This class maintains some data structures to simplify lookup of partition movements among members. - * During a partition rebalance, it keeps track of partition movements corresponding to each topic, - * and also possible movement (in form a <code>MemberPair</code> object) for each partition. - */ - private static class PartitionMovements { - private final Map<Uuid, Map<MemberPair, Set<TopicIdPartition>>> partitionMovementsByTopic = new HashMap<>(); - private final Map<TopicIdPartition, MemberPair> partitionMovementsByPartition = new HashMap<>(); - - private MemberPair removeMovementRecordOfPartition(TopicIdPartition partition) { - MemberPair pair = partitionMovementsByPartition.remove(partition); - - Uuid topic = partition.topicId(); - Map<MemberPair, Set<TopicIdPartition>> partitionMovementsForThisTopic = partitionMovementsByTopic.get(topic); - partitionMovementsForThisTopic.get(pair).remove(partition); - if (partitionMovementsForThisTopic.get(pair).isEmpty()) - partitionMovementsForThisTopic.remove(pair); - if (partitionMovementsByTopic.get(topic).isEmpty()) - partitionMovementsByTopic.remove(topic); - - return pair; + for (int i = 0; i < numPartitions; i++) { + int partition = partitions.get(i); + int ownerIndex = partitionOwners[partition]; + if (startPartitionIndices.get(ownerIndex) == null) { + startPartitionIndices.put(ownerIndex, i); + } + endPartitionIndices.put(ownerIndex, i + 1); // endPartitionIndices is exclusive } - private void addPartitionMovementRecord(TopicIdPartition partition, MemberPair pair) { - partitionMovementsByPartition.put(partition, pair); - - Uuid topic = partition.topicId(); - if (!partitionMovementsByTopic.containsKey(topic)) - partitionMovementsByTopic.put(topic, new HashMap<>()); + // Redistribute partitions from the most loaded subscriber to the least loaded subscriber + // until the topic's subscribers are balanced. + while (!memberAssignmentBalancer.isBalanced()) { + // NB: The condition above does not do much. This loop will terminate due to the checks + // inside instead. - Map<MemberPair, Set<TopicIdPartition>> partitionMovementsForThisTopic = partitionMovementsByTopic.get(topic); - if (!partitionMovementsForThisTopic.containsKey(pair)) - partitionMovementsForThisTopic.put(pair, new HashSet<>()); + // First, choose a member from the most loaded range to reassign a partition from. - partitionMovementsForThisTopic.get(pair).add(partition); - } + // Loop until we find a member that has partitions to give up. + int mostLoadedMemberIndex = -1; + while (true) { + mostLoadedMemberIndex = memberAssignmentBalancer.nextMostLoadedMember(); - private void movePartition(TopicIdPartition partition, String oldOwner, String newOwner) { - MemberPair pair = new MemberPair(oldOwner, newOwner); - - if (partitionMovementsByPartition.containsKey(partition)) { - // This partition was previously moved. - MemberPair existingPair = removeMovementRecordOfPartition(partition); - if (existingPair.dstMemberId.equals(oldOwner)) { - throw new PartitionAssignorException("Mismatch in partition movement record with respect to " + - "partition ownership during a rebalance" - ); - } - if (!existingPair.srcMemberId.equals(newOwner)) { - // The partition is not moving back to its previous member. - addPartitionMovementRecord(partition, new MemberPair(existingPair.srcMemberId, newOwner)); + if (mostLoadedMemberIndex == -1) { + // There are no more members with partitions to give up. + // We need to break out of two loops here. + break; } - } else - addPartitionMovementRecord(partition, pair); - } - /** - * Computes the actual partition to be moved based on the current and proposed partition owners. - * This method determines the appropriate partition movement, considering existing partition movements - * and constraints within a topic. - * - * @param partition The {@link TopicIdPartition} object representing the partition to be moved. - * @param oldOwner The memberId of the current owner of the partition. - * @param newOwner The memberId of the proposed new owner of the partition. - * @return The {@link TopicIdPartition} that should be moved, based on existing movement patterns - * and ownership. Returns the original partition if no specific movement pattern applies. - * @throws PartitionAssignorException if the old owner does not match the expected value for the partition. - */ - private TopicIdPartition computeActualPartitionToBeMoved( - TopicIdPartition partition, - String oldOwner, - String newOwner - ) { - Uuid topic = partition.topicId(); - - if (!partitionMovementsByTopic.containsKey(topic)) - return partition; - - if (partitionMovementsByPartition.containsKey(partition)) { - String expectedOldOwner = partitionMovementsByPartition.get(partition).dstMemberId; - if (!oldOwner.equals(expectedOldOwner)) { - throw new PartitionAssignorException("Old owner does not match expected value for partition: " + partition); + if (!endPartitionIndices.containsKey(mostLoadedMemberIndex) || + endPartitionIndices.get(mostLoadedMemberIndex) - startPartitionIndices.get(mostLoadedMemberIndex) <= 0) { + memberAssignmentBalancer.excludeMostLoadedMember(); + continue; } - oldOwner = partitionMovementsByPartition.get(partition).srcMemberId; - } - - Map<MemberPair, Set<TopicIdPartition>> partitionMovementsForThisTopic = partitionMovementsByTopic.get(topic); - MemberPair reversePair = new MemberPair(newOwner, oldOwner); - if (!partitionMovementsForThisTopic.containsKey(reversePair)) - return partition; - - return partitionMovementsForThisTopic.get(reversePair).iterator().next(); - } - } - - /** - * Manages assignments to members based on their current assignment size and maximum allowed assignment size. - */ - private class AssignmentManager { - private final Map<String, MemberAssignmentData> membersWithAssignmentSizes = new HashMap<>(); - /** - * Represents the assignment metadata for a member. - */ - private class MemberAssignmentData { - final String memberId; - int currentAssignmentSize = 0; - int maxAssignmentSize; - - /** - * Constructs a MemberAssignmentData with the given member Id. - * - * @param memberId The Id of the member. - */ - MemberAssignmentData(String memberId) { - this.memberId = memberId; + break; } - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - MemberAssignmentData that = (MemberAssignmentData) o; - return memberId.equals(that.memberId); + if (mostLoadedMemberIndex == -1) { + // There are no more members with partitions to give up. + break; } - @Override - public int hashCode() { - return Objects.hash(memberId); - } + // We've picked a member. Now pick its last partition to reassign. + int partition = partitions.get(endPartitionIndices.get(mostLoadedMemberIndex) - 1); + endPartitionIndices.put(mostLoadedMemberIndex, endPartitionIndices.get(mostLoadedMemberIndex) - 1); - @Override - public String toString() { - return "MemberAssignmentData(" + - "memberId='" + memberId + '\'' + - ", currentAssignmentSize=" + currentAssignmentSize + - ", maxAssignmentSize=" + maxAssignmentSize + - ')'; + // Find the least loaded subscriber. + int leastLoadedMemberIndex = memberAssignmentBalancer.nextLeastLoadedMember(); + + // If the most and least loaded subscribers came from member ranges with partition + // counts that differ by one or less, then the members are balanced and we are done. + if (memberAssignmentBalancer.isBalanced()) { + // The topic is balanced. + break; } - } - /** - * Initializes an AssignmentManager, setting up the necessary data structures. - */ - public AssignmentManager( - SubscribedTopicDescriber subscribedTopicDescriber - ) { - groupSpec.memberIds().forEach(memberId -> { - int maxSize = groupSpec.memberSubscription(memberId).subscribedTopicIds().stream() - .mapToInt(subscribedTopicDescriber::numPartitions) - .sum(); - - MemberAssignmentData memberAssignmentData = membersWithAssignmentSizes - .computeIfAbsent(memberId, MemberAssignmentData::new); - memberAssignmentData.maxAssignmentSize = maxSize; - memberAssignmentData.currentAssignmentSize = 0; - }); + // Reassign the partition. + assignPartition(topicId, partition, leastLoadedMemberIndex); + reassignedPartitionCount++; } - /** - * @param memberId The member Id. - * @return The current assignment size for the given member. - */ - private int targetAssignmentSize(String memberId) { - MemberAssignmentData memberData = this.membersWithAssignmentSizes.get(memberId); - if (memberData == null) { - LOG.warn("Member Id {} not found", memberId); - return 0; - } - return memberData.currentAssignmentSize; - } + return reassignedPartitionCount; + } - /** - * @param memberId The member Id. - * @return The maximum assignment size for the given member. - */ - private int maxAssignmentSize(String memberId) { - MemberAssignmentData memberData = this.membersWithAssignmentSizes.get(memberId); - if (memberData == null) { - LOG.warn("Member Id {} not found", memberId); - return 0; + /** + * Restores topic partitions to their previous owners if it improves balance. + * + * @param topicId The topic to rebalance. + * @param numPartitions The number of partitions in the topic. + * @param previousPartitionOwners The previous owners of the partitions. + * @param partitionOwners The current owners of the partitions. + * @return the number of partitions reassigned. + */ + private int balanceTopicRestoringStickiness( + Uuid topicId, + int numPartitions, + int[] previousPartitionOwners, + int[] partitionOwners + ) { + int reassignedPartitionCount = 0; + + // Try to return partitions to their previous owners if it improves balance. + // Note that this is a best effort approach towards restoring stickiness and will not + // produce optimally sticky assignments. + for (int partition = 0; partition < numPartitions; partition++) { + int ownerIndex = partitionOwners[partition]; + int previousOwnerIndex = previousPartitionOwners[partition]; + if (previousOwnerIndex != -1 && + previousOwnerIndex != ownerIndex && + targetAssignmentSize(ownerIndex) > targetAssignmentSize(previousOwnerIndex)) { + assignPartition(topicId, partition, previousOwnerIndex); + reassignedPartitionCount++; } - return memberData.maxAssignmentSize; } - /** - * @param memberId The member Id. - * @return If the given member is at maximum capacity. - */ - private boolean isMemberAtMaxCapacity(String memberId) { - return targetAssignmentSize(memberId) >= maxAssignmentSize(memberId); - } + return reassignedPartitionCount; + } - /** - * @param memberId The member Id. - * Increment the current target assignment size for the member. - */ - private void incrementTargetAssignmentSize(String memberId) { - MemberAssignmentData memberData = this.membersWithAssignmentSizes.get(memberId); - if (memberData == null) { - LOG.warn("Member Id {} not found", memberId); - return; - } - memberData.currentAssignmentSize++; - } + /** + * Computes the set of unassigned partitions, based on targetAssignmentPartitionOwners. + * + * @param unassignedPartitions The map in which to store the unassigned partitions. + */ + private void computeUnassignedPartitions(Map<Uuid, List<Integer>> unassignedPartitions) { + unassignedPartitions.clear(); - /** - * @param memberId The member Id. - * Decrement the current target assignment size for the member, if it's assignment size is greater than zero. - */ - private void decrementTargetAssignmentSize(String memberId) { - MemberAssignmentData memberData = this.membersWithAssignmentSizes.get(memberId); - if (memberData == null) { - LOG.warn("Member Id {} not found", memberId); - return; + for (Uuid topicId : subscribedTopicIds) { + List<Integer> topicUnassignedPartitions = new ArrayList<>(); + int numPartitions = subscribedTopicDescriber.numPartitions(topicId); + for (int partition = 0; partition < numPartitions; partition++) { + if (targetAssignmentPartitionOwners.get(topicId)[partition] == -1) { + topicUnassignedPartitions.add(partition); + } } - if (memberData.currentAssignmentSize > 0) { - memberData.currentAssignmentSize--; + if (!topicUnassignedPartitions.isEmpty()) { + unassignedPartitions.put(topicId, topicUnassignedPartitions); } } + } - /** - * Assigns partition to member if eligible. - * - * @param topicIdPartition The partition to be assigned. - * @param memberId The Id of the member. - * @return true if the partition was assigned, false otherwise. - */ - private boolean maybeAssignPartitionToMember( - TopicIdPartition topicIdPartition, - String memberId - ) { - // If member is not subscribed to the partition's topic, return false without assigning. - if (!groupSpec.memberSubscription(memberId).subscribedTopicIds().contains(topicIdPartition.topicId())) { - return false; - } - - // If the member's current assignment is already at max, return false without assigning. - if (isMemberAtMaxCapacity(memberId)) { - return false; - } + /** + * Assigns or reassigns the given partition to a member. + * + * @param topicId The topic containing the partition to be assigned or reassigned. + * @param partition The partition to be assigned or reassigned. + * @param memberIndex The index of the member to which the partition should be assigned. + */ + private void assignPartition(Uuid topicId, int partition, int memberIndex) { + int oldMemberIndex = targetAssignmentPartitionOwners.get(topicId)[partition]; - addPartitionToTargetAssignment(topicIdPartition, memberId); - return true; + if (oldMemberIndex != -1) { + removePartitionFromTargetAssignment(topicId, partition, oldMemberIndex); } - /** - * Assigns a partition to a member, updates the current assignment size, - * and updates relevant data structures. - * - * @param topicIdPartition The partition to be assigned. - * @param memberId Member that the partition needs to be added to. - */ - private void addPartitionToTargetAssignment(TopicIdPartition topicIdPartition, String memberId) { - addPartitionToAssignment( - targetAssignment, - memberId, - topicIdPartition.topicId(), - topicIdPartition.partitionId() - ); - - partitionOwnerInTargetAssignment.put(topicIdPartition, memberId); - // Remove the member's assignment data from the queue to update it. - sortedMembersByAssignmentSize.remove(memberId); - assignmentManager.incrementTargetAssignmentSize(memberId); - - // Update current assignment size and re-add to queue if needed. - if (!isMemberAtMaxCapacity(memberId)) { - sortedMembersByAssignmentSize.add(memberId); - } + addPartitionToTargetAssignment(topicId, partition, memberIndex); + } - unassignedPartitions.remove(topicIdPartition); + /** + * @param memberIndex The member index. + * @return The current assignment size for the given member. + */ + private int targetAssignmentSize(int memberIndex) { + return memberTargetAssignmentSizes[memberIndex]; + } + + /** + * Assigns a partition to a member and updates the current assignment size. + * + * @param topicId The topic containing the partition to be assigned. + * @param partition The partition to be assigned. + * @param memberIndex Member that the partition needs to be added to. + */ + private void addPartitionToTargetAssignment( + Uuid topicId, + int partition, + int memberIndex + ) { + String memberId = memberIds.get(memberIndex); + Map<Uuid, Set<Integer>> assignment = targetAssignment.get(memberId).partitions(); + if (isImmutableMap(assignment)) { + assignment = deepCopy(assignment); + targetAssignment.put(memberId, new MemberAssignmentImpl(assignment)); } + assignment + .computeIfAbsent(topicId, __ -> { + int numPartitions = subscribedTopicDescriber.numPartitions(topicId); + int numSubscribers = topicSubscribers.get(topicId).size(); + int estimatedPartitionsPerSubscriber = (numPartitions + numSubscribers - 1) / numSubscribers; + return new HashSet<>((int) ((estimatedPartitionsPerSubscriber / 0.75f) + 1)); + }) + .add(partition); - /** - * Revokes the partition from a member, updates the current target assignment size, - * and other relevant data structures. - * - * @param topicIdPartition The partition to be revoked. - * @param memberId Member that the partition needs to be revoked from. - */ - private void removePartitionFromTargetAssignment(TopicIdPartition topicIdPartition, String memberId) { - Map<Uuid, Set<Integer>> targetPartitionsMap = targetAssignment.get(memberId).partitions(); - Set<Integer> partitionsSet = targetPartitionsMap.get(topicIdPartition.topicId()); - // Remove the partition from the assignment, if there are no more partitions from a particular topic, - // remove the topic from the assignment as well. - if (partitionsSet != null) { - partitionsSet.remove(topicIdPartition.partitionId()); - if (partitionsSet.isEmpty()) { - targetPartitionsMap.remove(topicIdPartition.topicId()); - } - } + targetAssignmentPartitionOwners.get(topicId)[partition] = memberIndex; - partitionOwnerInTargetAssignment.remove(topicIdPartition, memberId); - // Remove the member's assignment data from the set to update it. - sortedMembersByAssignmentSize.remove(memberId); - assignmentManager.decrementTargetAssignmentSize(memberId); + memberTargetAssignmentSizes[memberIndex]++; + } - // Update current assignment size and re-add to set if needed. - if (!isMemberAtMaxCapacity(memberId)) { - sortedMembersByAssignmentSize.add(memberId); + /** + * Revokes the partition from a member and updates the current target assignment size. + * + * @param topicId The topic containing the partition to be revoked. + * @param partition The partition to be revoked. + * @param memberIndex Member that the partition needs to be revoked from. + */ + private void removePartitionFromTargetAssignment( + Uuid topicId, + int partition, + int memberIndex + ) { + String memberId = memberIds.get(memberIndex); + Map<Uuid, Set<Integer>> assignment = targetAssignment.get(memberId).partitions(); + if (isImmutableMap(assignment)) { + assignment = deepCopy(assignment); + targetAssignment.put(memberId, new MemberAssignmentImpl(assignment)); + } + Set<Integer> partitionsSet = assignment.get(topicId); + // Remove the partition from the assignment, if there are no more partitions from a particular topic, + // remove the topic from the assignment as well. + if (partitionsSet != null) { + partitionsSet.remove(partition); + if (partitionsSet.isEmpty()) { + assignment.remove(topicId); } } - /** - * Sorts members in ascending order based on their current target assignment size. - * Members that have reached their max assignment size are removed. - * - * @param memberIds Member Ids that need to be sorted. - * @return A set that maintains the order of members by assignment size. - */ - private TreeSet<String> sortMembersByAssignmentSize(Collection<String> memberIds) { - Comparator<String> comparator = Comparator - .comparingInt((String memberId) -> membersWithAssignmentSizes.get(memberId).currentAssignmentSize) - .thenComparing(memberId -> memberId); - - return memberIds.stream() - .filter(memberId -> { - MemberAssignmentData memberData = membersWithAssignmentSizes.get(memberId); - return memberData.currentAssignmentSize < memberData.maxAssignmentSize; - }) - .collect(Collectors.toCollection(() -> new TreeSet<>(comparator))); + targetAssignmentPartitionOwners.get(topicId)[partition] = -1; + + memberTargetAssignmentSizes[memberIndex]--; + } + + private static Map<Uuid, Set<Integer>> deepCopy(Map<Uuid, Set<Integer>> map) { Review Comment: I've factored these out into an `AssignorHelpers` class. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org