squah-confluent commented on code in PR #17385: URL: https://github.com/apache/kafka/pull/17385#discussion_r1799117413
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilder.java: ########## @@ -157,716 +240,779 @@ public GroupAssignment build() { assignStickyPartitions(); - unassignedPartitionsAssignment(); + computeUnassignedPartitions(unassignedPartitions); + unassignedPartitionsAssignment(unassignedPartitions); balance(); return new GroupAssignment(targetAssignment); } /** - * <li> TopicIdPartitions are sorted in descending order based on the value: - * totalPartitions/number of subscribed members. </li> - * <li> If the above value is the same then topicIdPartitions are sorted in - * ascending order of number of subscribers. </li> - * <li> If both criteria are the same, sort in ascending order of the partition Id. - * This last criteria is for predictability of the assignments. </li> + * Sorts topics based on the following criteria: + * <ol> + * <li>Topics are sorted in descending order of totalPartitions / number of subscribers + * (partitions per subscriber).</li> + * <li>Ties are broken by sorting in ascending order of number of subscribers.</li> + * <li>Any remaining ties are broken by sorting in ascending order of topic id.</li> + * </ol> + * + * The last criteria is for predictability of assignments. * - * @param topicIdPartitions The topic partitions that need to be sorted. - * @return A list of sorted topic partitions. + * @param topicIds The topic ids that need to be sorted. + * @return A list of sorted topic ids. */ - private List<TopicIdPartition> sortTopicIdPartitions(Collection<TopicIdPartition> topicIdPartitions) { - Comparator<TopicIdPartition> comparator = Comparator - .comparingDouble((TopicIdPartition topicIdPartition) -> { - int totalPartitions = subscribedTopicDescriber.numPartitions(topicIdPartition.topicId()); - int totalSubscribers = membersPerTopic.get(topicIdPartition.topicId()).size(); - return (double) totalPartitions / totalSubscribers; - }) - .reversed() - .thenComparingInt(topicIdPartition -> membersPerTopic.get(topicIdPartition.topicId()).size()) - .thenComparingInt(TopicIdPartition::partitionId); + private List<Uuid> sortTopicIds(Collection<Uuid> topicIds) { + Comparator<Uuid> comparator = new Comparator<Uuid>() { + @Override + public int compare(final Uuid topic1Id, final Uuid topic2Id) { + int topic1PartitionCount = subscribedTopicDescriber.numPartitions(topic1Id); + int topic2PartitionCount = subscribedTopicDescriber.numPartitions(topic2Id); + int topic1SubscriberCount = topicSubscribers.get(topic1Id).size(); + int topic2SubscriberCount = topicSubscribers.get(topic2Id).size(); + + // Order by partitions per subscriber, descending. + int order = Double.compare( + (double) topic2PartitionCount / topic2SubscriberCount, + (double) topic1PartitionCount / topic1SubscriberCount + ); + + // Then order by subscriber count, ascending. + if (order == 0) { + order = Integer.compare(topic1SubscriberCount, topic2SubscriberCount); + } - return topicIdPartitions.stream() - .sorted(comparator) - .collect(Collectors.toList()); + // Then order by topic id, ascending. + if (order == 0) { + order = topic1Id.compareTo(topic2Id); + } + + return order; + } + }; + + List<Uuid> sortedTopicIds = new ArrayList<>(topicIds); + sortedTopicIds.sort(comparator); + return sortedTopicIds; } /** - * Gets a set of partitions that are to be retained from the existing assignment. This includes: - * <li> Partitions from topics that are still present in both the new subscriptions and the topic metadata. </li> + * Imports the existing assignment into the target assignment, taking into account topic + * unsubscriptions. */ private void assignStickyPartitions() { - groupSpec.memberIds().forEach(memberId -> - groupSpec.memberAssignment(memberId).partitions().forEach((topicId, currentAssignment) -> { + for (String memberId : groupSpec.memberIds()) { + int memberIndex = memberIndices.get(memberId); + + Map<Uuid, Set<Integer>> oldAssignment = groupSpec.memberAssignment(memberId).partitions(); + Map<Uuid, Set<Integer>> newAssignment = null; + + for (Map.Entry<Uuid, Set<Integer>> entry : oldAssignment.entrySet()) { + Uuid topicId = entry.getKey(); + Set<Integer> partitions = entry.getValue(); + if (groupSpec.memberSubscription(memberId).subscribedTopicIds().contains(topicId)) { - currentAssignment.forEach(partition -> { - TopicIdPartition topicIdPartition = new TopicIdPartition(topicId, partition); - assignmentManager.addPartitionToTargetAssignment(topicIdPartition, memberId); - assignedStickyPartitions.add(topicIdPartition); - }); + memberTargetAssignmentSizes[memberIndex] += partitions.size(); + + int numPartitions = subscribedTopicDescriber.numPartitions(topicId); + for (int partition : partitions) { + if (partition >= numPartitions) { + LOG.warn( + "Previous assignment for {} contains {}:{}, but topic only has {} partitions", + memberId, + topicId, + partition, + numPartitions + ); + + if (newAssignment == null) { + // If the new assignment is null, we create a deep copy of the + // original assignment so that we can alter it. + newAssignment = deepCopy(oldAssignment); + } + Set<Integer> newPartitions = newAssignment.get(topicId); + newPartitions.remove(partition); + if (newPartitions.isEmpty()) { + newAssignment.remove(topicId); + } + memberTargetAssignmentSizes[memberIndex]--; + + continue; + } + + previousAssignmentPartitionOwners.get(topicId)[partition] = memberIndex; + targetAssignmentPartitionOwners.get(topicId)[partition] = memberIndex; + } } else { + // The member is no longer subscribed to the topic. Remove it from the + // assignment. LOG.debug("The topic " + topicId + " is no longer present in the subscribed topics list"); + + if (newAssignment == null) { + // If the new assignment is null, we create a deep copy of the + // original assignment so that we can alter it. + newAssignment = deepCopy(oldAssignment); + } + // Remove the entire topic. + newAssignment.remove(topicId); } - }) - ); + } + + if (newAssignment == null) { + newAssignment = oldAssignment; + } + targetAssignment.put(memberId, new MemberAssignmentImpl(newAssignment)); + } } /** - * Allocates the remaining unassigned partitions to members in a balanced manner. - * <li> Partitions are sorted to maximize the probability of a balanced assignment. </li> - * <li> Sort members in ascending order of their current target assignment sizes - * to ensure the least filled member gets the partition first. </li> + * A class that holds the state for the balancing process. + * <p/> + * Keeps track of two ranges of members: the members with the least number of assigned + * partitions and the members with the most number of assigned partitions. Within each range, + * all members start with the same number of assigned partitions. */ - private void unassignedPartitionsAssignment() { - List<TopicIdPartition> sortedPartitions = sortTopicIdPartitions(unassignedPartitions); + private final class MemberAssignmentBalancer { + private final int[] memberTargetAssignmentSizes; - for (TopicIdPartition partition : sortedPartitions) { - TreeSet<String> sortedMembers = assignmentManager.sortMembersByAssignmentSize( - membersPerTopic.get(partition.topicId()) - ); + /** + * The members to balance, sorted by number of assigned partitions. + * <p/> + * Can be visualized like so: + * <pre> + * ^ + * | # + * partition | ### + * count | ### + * | ########## + * +-----------> + * members + * </pre> + */ + private final List<Integer> sortedMembers; - for (String member : sortedMembers) { - if (assignmentManager.maybeAssignPartitionToMember(partition, member)) { - break; + /** + * [0, leastLoadedRangeEnd) represents the range of members with the smallest partition + * count. + */ + private int leastLoadedRangeEnd = 0; // exclusive + + /** + * The partition count of each member in the least loaded range before we start assigning + * partitions to them. + */ + private int leastLoadedRangePartitionCount = -1; + + /** + * The index of the next element in sortedMembers to which to assign the next partition. + */ + private int nextLeastLoadedMember = 0; + + /** + * [mostLoadedRangeStart, mostLoadedRangeEnd) is the range of members with the largest + * partition count. + */ + private int mostLoadedRangeStart = 0; // inclusive + + /** + * [mostLoadedRangeStart, mostLoadedRangeEnd) is the range of members with the largest + * partition count. + * <p/> + * mostLoadedRangeEnd is not simply the end of the member list, since we need to be able to + * exclude heavily loaded members which do not have any partitions to give up. + */ + private int mostLoadedRangeEnd = 0; // inclusive + + /** + * The partition count of each member in the most loaded range before we start unassigning + * partitions from them. + */ + private int mostLoadedRangePartitionCount = 1; + + /** + * The index of the next element in sortedMembers from which to unassign the next partition. + */ + private int nextMostLoadedMember = -1; + + // The ranges can be visualized like so: + // + // most + // loaded + // ^ least range + // | loaded # + // | range [##<- ]## + // partition | ##[#######]## + // count | [...-> ]##[#######]## + // | [#######]##[#######]## + // +-----------------------> + // members + // ^^ + // These members have no partitions to give up. + // + // The least loaded range expands rightwards and the most loaded range expands leftwards. + // Note that we are not done once the ranges touch, but rather when the partition counts in + // the ranges differ by one or less. + + public MemberAssignmentBalancer() { + this.memberTargetAssignmentSizes = UniformHeterogeneousAssignmentBuilder.this.memberTargetAssignmentSizes; + this.sortedMembers = new ArrayList<>(memberTargetAssignmentSizes.length); + } + + /** + * Resets the state of the balancer for a new set of members. + * + * @param members The members to balance. + * @return The difference in partition counts between the most and least loaded members. + */ + public int initialize(List<Integer> members) { + if (members.size() < 1) { + // nextLeastLoadedMember needs at least one member. + throw new IllegalArgumentException("Cannot balance an empty subscriber list."); + } + + // Build a sorted list of subscribers, where members with the smallest partition count + // appear first. + sortedMembers.clear(); + sortedMembers.addAll(members); + sortedMembers.sort(memberComparator); + + leastLoadedRangeEnd = 0; // exclusive + leastLoadedRangePartitionCount = memberTargetAssignmentSizes[sortedMembers.get(0)] - 1; + nextLeastLoadedMember = 0; + + mostLoadedRangeStart = sortedMembers.size(); // inclusive + mostLoadedRangeEnd = sortedMembers.size(); // exclusive + mostLoadedRangePartitionCount = memberTargetAssignmentSizes[sortedMembers.get(sortedMembers.size() - 1)] + 1; + nextMostLoadedMember = sortedMembers.size() - 1; + + return memberTargetAssignmentSizes[sortedMembers.get(sortedMembers.size() - 1)] - + memberTargetAssignmentSizes[sortedMembers.get(0)]; + } + + /** + * Gets the least loaded member to which to assign a partition. + * <p/> + * Advances the state of the balancer by assuming that an additional partition has been + * assigned to the returned member. + * <p/> + * Assumes that there is at least one member. + * + * @return The least loaded member to which to assign a partition + */ + public int nextLeastLoadedMember() { + // To generate a balanced assignment, we assign partitions to members in the + // [0, leastLoadedRangeEnd) range first. Once each member in the range has received a + // partition, the partition count of the range rises by one and we can try to expand it. + // Range full, Range full, + // bump level by one bump level by one, + // can't expand range. expand range. + // ^ ^ ^ + // | # | # | [..-> ]# + // partition | ### -> | [..-> ]### -> | [.......##]# + // count | [..-> ]### | [.......]### | [.......##]# + // | [#######]### | [#######]### | [#########]# + // +-------------> +-------------> +------------> + // members members members + + if (nextLeastLoadedMember >= leastLoadedRangeEnd) { + // We've hit the end of the range. Bump its level and try to expand it. + leastLoadedRangePartitionCount++; + + // Expand the range. + while (leastLoadedRangeEnd < sortedMembers.size() && + memberTargetAssignmentSizes[sortedMembers.get(leastLoadedRangeEnd)] == leastLoadedRangePartitionCount) { + leastLoadedRangeEnd++; } + + // Reset the pointer to the start of the range of members with identical + // partition counts. + nextLeastLoadedMember = 0; } + + int leastLoadedMemberIndex = sortedMembers.get(nextLeastLoadedMember); + nextLeastLoadedMember++; + + return leastLoadedMemberIndex; } - } - /** - * If a topic has two or more potential members it is subject to reassignment. - * - * @return true if the topic can participate in reassignment, false otherwise. - */ - private boolean canTopicParticipateInReassignment(Uuid topicId) { - return membersPerTopic.get(topicId).size() >= 2; + /** + * Gets the most loaded member from which to reassign a partition. + * <p/> + * Advances the state of the balancer by assuming that a partition has been unassigned from + * the returned member. + * + * @return The most loaded member from which to reassign a partition, or -1 if no such + * member exists. + */ + public int nextMostLoadedMember() { + if (nextMostLoadedMember < mostLoadedRangeStart) { + if (mostLoadedRangeEnd <= mostLoadedRangeStart && + mostLoadedRangeStart > 0) { + // The range is empty due to calls to excludeMostLoadedMember(). We risk not + // expanding the range below and returning a member outside the range. Ensure + // that we always expand the range below by resetting the partition count. + mostLoadedRangePartitionCount = memberTargetAssignmentSizes[sortedMembers.get(mostLoadedRangeStart - 1)]; + } else { + mostLoadedRangePartitionCount--; + } + + // Expand the range. + while (mostLoadedRangeStart > 0 && + memberTargetAssignmentSizes[sortedMembers.get(mostLoadedRangeStart - 1)] == mostLoadedRangePartitionCount) { + mostLoadedRangeStart--; + } + + // Reset the pointer to the end of the range of members with identical partition + // counts. + nextMostLoadedMember = mostLoadedRangeEnd - 1; + } + + if (nextMostLoadedMember < 0) { + // The range is empty due to excludeMostLoadedMember() calls and there are no more + // members that can give up partitions. + return -1; + } + + int mostLoadedMemberIndex = sortedMembers.get(nextMostLoadedMember); + nextMostLoadedMember--; + + return mostLoadedMemberIndex; + } + + /** + * Excludes the last member returned from nextMostLoadedMember from the most loaded range. + * + * Must not be called if nextMostLoadedMember has not been called yet or returned -1. + */ + public void excludeMostLoadedMember() { + // Kick the member out of the most loaded range by swapping with the member at the end + // of the range and contracting the end of the range. + int mostLoadedMemberIndex = sortedMembers.get(nextMostLoadedMember + 1); + sortedMembers.set(nextMostLoadedMember + 1, sortedMembers.get(mostLoadedRangeEnd - 1)); + sortedMembers.set(mostLoadedRangeEnd - 1, mostLoadedMemberIndex); + mostLoadedRangeEnd--; + } + + /** + * Checks whether the members are balanced based on the partition counts of the least and + * most loaded ranges. + */ + public boolean isBalanced() { + return mostLoadedRangePartitionCount - leastLoadedRangePartitionCount <= 1; + } } /** - * If a member is not assigned all its potential partitions it is subject to reassignment. - * If any of the partitions assigned to a member is subject to reassignment, the member itself - * is subject to reassignment. + * Allocates the remaining unassigned partitions to members in a balanced manner. + * <ol> + * <li>Topics are sorted to maximize the probability of a balanced assignment.</li> + * <li>Unassigned partitions within each topic are distributed to the least loaded + * subscribers, repeatedly.</li> + * </ol> * - * @return true if the member can participate in reassignment, false otherwise. + * @param partitions The partitions to be assigned. */ - private boolean canMemberParticipateInReassignment(String memberId) { - Set<Uuid> assignedTopicIds = targetAssignment.get(memberId).partitions().keySet(); + private void unassignedPartitionsAssignment(Map<Uuid, List<Integer>> partitions) { + List<Uuid> sortedTopicIds = sortTopicIds(partitions.keySet()); - int currentAssignmentSize = assignmentManager.targetAssignmentSize(memberId); - int maxAssignmentSize = assignmentManager.maxAssignmentSize(memberId); + MemberAssignmentBalancer memberAssignmentBalancer = new MemberAssignmentBalancer(); - if (currentAssignmentSize > maxAssignmentSize) - LOG.error("The member {} is assigned more partitions than the maximum possible.", memberId); + for (Uuid topicId : sortedTopicIds) { + memberAssignmentBalancer.initialize(topicSubscribers.get(topicId)); - if (currentAssignmentSize < maxAssignmentSize) - return true; + for (int partition : partitions.get(topicId)) { + int leastLoadedMemberIndex = memberAssignmentBalancer.nextLeastLoadedMember(); - for (Uuid topicId : assignedTopicIds) { - if (canTopicParticipateInReassignment(topicId)) - return true; + assignPartition(topicId, partition, leastLoadedMemberIndex); + } } - return false; } /** - * Checks if the current assignments of partitions to members is balanced. - * - * Balance is determined by first checking if the difference in the number of partitions assigned - * to any two members is one. If this is not true, it verifies that no member can - * receive additional partitions without disrupting the balance. + * If a topic has two or more potential members it is subject to reassignment. * - * @return true if the assignment is balanced, false otherwise. + * @return true if the topic can participate in reassignment, false otherwise. */ - - private boolean isBalanced() { - int min = assignmentManager.targetAssignmentSize(sortedMembersByAssignmentSize.first()); - int max = assignmentManager.targetAssignmentSize(sortedMembersByAssignmentSize.last()); - - // If minimum and maximum numbers of partitions assigned to members differ by at most one return true. - if (min >= max - 1) - return true; - - // Ensure that members without a complete set of topic partitions cannot receive any additional partitions. - // This maintains balance. Start by checking members with the fewest assigned partitions to see if they can take more. - for (String member : sortedMembersByAssignmentSize) { - int memberPartitionCount = assignmentManager.targetAssignmentSize(member); - - // Skip if this member already has all the topic partitions it can get. - int maxAssignmentSize = assignmentManager.maxAssignmentSize(member); - if (memberPartitionCount == maxAssignmentSize) - continue; - - // Otherwise make sure it cannot get any more partitions. - for (Uuid topicId : groupSpec.memberSubscription(member).subscribedTopicIds()) { - Set<Integer> assignedPartitions = targetAssignment.get(member).partitions().get(topicId); - for (int i = 0; i < subscribedTopicDescriber.numPartitions(topicId); i++) { - TopicIdPartition topicIdPartition = new TopicIdPartition(topicId, i); - if (assignedPartitions == null || !assignedPartitions.contains(i)) { - String otherMember = partitionOwnerInTargetAssignment.get(topicIdPartition); - int otherMemberPartitionCount = assignmentManager.targetAssignmentSize(otherMember); - if (memberPartitionCount + 1 < otherMemberPartitionCount) { - LOG.debug("{} can be moved from member {} to member {} for a more balanced assignment.", - topicIdPartition, otherMember, member); - return false; - } - } - } - } - } - return true; + private boolean canTopicParticipateInReassignment(Uuid topicId) { + return topicSubscribers.get(topicId).size() >= 2; } /** * Balance the current assignment after the initial round of assignments have completed. */ private void balance() { - if (!unassignedPartitions.isEmpty()) - throw new PartitionAssignorException("Some partitions were left unassigned"); - // Refill unassigned partitions with all the topicId partitions. - unassignedPartitions.addAll(topicIdPartitions(subscribedTopicIds, subscribedTopicDescriber)); + Set<Uuid> topicIds = new HashSet<>(subscribedTopicIds); - // Narrow down the reassignment scope to only those partitions that can actually be reassigned. - Set<TopicIdPartition> fixedPartitions = new HashSet<>(); + // Narrow down the reassignment scope to only those topics that can actually be reassigned. for (Uuid topicId : subscribedTopicIds) { if (!canTopicParticipateInReassignment(topicId)) { - for (int i = 0; i < subscribedTopicDescriber.numPartitions(topicId); i++) { - fixedPartitions.add(new TopicIdPartition(topicId, i)); - } - } - } - unassignedPartitions.removeAll(fixedPartitions); - - // Narrow down the reassignment scope to only those members that are subject to reassignment. - for (String member : groupSpec.memberIds()) { - if (!canMemberParticipateInReassignment(member)) { - sortedMembersByAssignmentSize.remove(member); + topicIds.remove(topicId); } } // If all the partitions are fixed i.e. unassigned partitions is empty there is no point of re-balancing. - if (!unassignedPartitions.isEmpty()) performReassignments(); + if (!topicIds.isEmpty()) balanceTopics(topicIds); } /** * Performs reassignments of partitions to balance the load across members. * This method iteratively reassigns partitions until no further moves can improve the balance. + * <p/> + * The method loops over the topics repeatedly until an entire loop around the topics has + * completed without any reassignments, or we hit an iteration limit. + * <p/> + * This method produces perfectly balanced assignments when all subscribers of a topic have the + * same subscriptions. However, when subscribers have overlapping, non-identical subscriptions, + * the method produces almost-balanced assignments, assuming the iteration limit is not hit. + * eg. if there are three members and two topics and members 1 and 2 are subscribed to the first + * topic and members 2 and 3 are subscribed to the second topic, we can end up with an + * assignment like: + * <ul> + * <li>Member 1: 9 partitions</li> + * <li>Member 2: 10 partitions</li> + * <li>Member 3: 11 partitions</li> + * </ul> * - * The method uses a do-while loop to ensure at least one pass over the partitions and continues - * reassigning as long as there are modifications to the current assignments. It checks for balance - * after each reassignment and exits if the balance is achieved. + * In this assignment, the subscribers of the first topic have a difference in partitions of 1, + * so the topic is considered balanced. The same applies to the second topic. However, balance + * can be improved by moving a partition from the second topic from member 3 to member 2 and a + * partition from the first topic from member 2 to member 1. * - * @throws PartitionAssignorException if there are inconsistencies in expected members per partition - * or if a partition is expected to already be assigned but isn't. + * @param topicIds The topics to consider for reassignment. These topics must have at + * least two subscribers. */ - private void performReassignments() { - boolean modified; - boolean reassignmentOccurred; - // Repeat reassignment until no partition can be moved to improve the balance. - do { - modified = false; - reassignmentOccurred = false; - // Reassign all reassignable partitions sorted in descending order - // by totalPartitions/number of subscribed members, - // until the full list is processed or a balance is achieved. - List<TopicIdPartition> reassignablePartitions = sortTopicIdPartitions(unassignedPartitions); - - for (TopicIdPartition reassignablePartition : reassignablePartitions) { - // Only check if there is any change in balance if any moves were made. - if (reassignmentOccurred && isBalanced()) { + private void balanceTopics(Collection<Uuid> topicIds) { + List<Uuid> sortedTopicIds = sortTopicIds(topicIds); + // The index of the last topic in sortedTopicIds that was rebalanced. + // Used to decide when to exit early. + int lastRebalanceTopicIndex = -1; + + MemberAssignmentBalancer memberAssignmentBalancer = new MemberAssignmentBalancer(); + + // An array of partitions, with partitions owned by the same member grouped together. + List<Integer> partitions = new ArrayList<>(); + + // The ranges in the partitions list assigned to members. + // Maintaining these ranges allows for constant time, deterministic picking of partitions + // owned by a given member. + Map<Integer, Integer> startPartitionIndices = new HashMap<>(); + Map<Integer, Integer> endPartitionIndices = new HashMap<>(); // exclusive + + // Repeat reassignment until no partition can be moved to improve the balance or we hit an + // iteration limit. + for (int i = 0; i < 16; i++) { Review Comment: Yes. The choice of 16 was arbitrary. I didn't want something too low (not enough time for convergence) or too high (too much cpu cost). Also added a javadoc for the new constant. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org