squah-confluent commented on code in PR #17385: URL: https://github.com/apache/kafka/pull/17385#discussion_r1799116772
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilder.java: ########## @@ -75,79 +84,153 @@ public class UniformHeterogeneousAssignmentBuilder { private final Set<Uuid> subscribedTopicIds; /** - * List of subscribed members for each topic. + * The list of members in the consumer group, sorted by member id. */ - private final Map<Uuid, List<String>> membersPerTopic; + private final List<String> memberIds; /** - * The new assignment that will be returned. + * Maps member ids to their indices in the memberIds list. */ - private final Map<String, MemberAssignment> targetAssignment; + private final Map<String, Integer> memberIndices; /** - * The partitions that still need to be assigned. + * List of subscribed members for each topic, in ascending order. + * <p/> + * Members are stored as integer indices into the memberIds array. */ - private final Set<TopicIdPartition> unassignedPartitions; + private final Map<Uuid, List<Integer>> topicSubscribers; /** - * All the partitions that have been retained from the existing assignment. + * The new assignment that will be returned. + */ + private final Map<String, MemberAssignment> targetAssignment; + + /** + * The number of partitions each member is assigned in the new assignment. + * <p/> + * The indices are the same as the memberIds array. memberTargetAssignmentSizes[i] contains the + * number of partitions assigned to memberIds[i]. + * <p/> + * We use a plain int[] array here for performance reasons. Looking up the number of partitions + * assigned to a given member is a very common operation. By using a plain int[] array, we can + * avoid: + * <ul> + * <li>the overhead of boxing and unboxing integers.</li> + * <li>the cost of HashMap lookups.</li> + * <li>the cost of checking member ids for equality when there are HashMap collisions.</li> + * </ul> */ - private final Set<TopicIdPartition> assignedStickyPartitions; + private final int[] memberTargetAssignmentSizes; /** - * Manages assignments to members based on their current assignment size and maximum allowed assignment size. + * The partitions that still need to be assigned. */ - private final AssignmentManager assignmentManager; + private final Map<Uuid, List<Integer>> unassignedPartitions; /** - * List of all the members sorted by their respective assignment sizes. + * Orders members by their number of assigned partitions, ascending. + * <p/> + * Ties are broken by member index, ascending. */ - private final TreeSet<String> sortedMembersByAssignmentSize; + private final Comparator<Integer> memberComparator; /** - * Tracks the owner of each partition in the target assignment. + * Tracks the owner of each partition in the previous assignment. + * <p/> + * Only assignments which would continue to be valid for the current set of members and + * subscriptions are present. + * <p/> + * Owners are represented as indices into the memberIds array. + * -1 indicates an unassigned partition, a previous owner that is no longer subscribed to the + * topic or a previous owner that is no longer part of the consumer group. + * <p/> + * Used for maximizing stickiness. */ - private final Map<TopicIdPartition, String> partitionOwnerInTargetAssignment; + private final Map<Uuid, int[]> previousAssignmentPartitionOwners; /** - * Handles all operations related to partition movements during a reassignment for balancing the target assignment. + * Tracks the owner of each partition in the target assignment. + * <p/> + * Owners are represented as indices into the memberIds array. + * -1 indicates an unassigned partition. */ - private final PartitionMovements partitionMovements; + private final Map<Uuid, int[]> targetAssignmentPartitionOwners; public UniformHeterogeneousAssignmentBuilder(GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber) { this.groupSpec = groupSpec; this.subscribedTopicDescriber = subscribedTopicDescriber; this.subscribedTopicIds = new HashSet<>(); - this.membersPerTopic = new HashMap<>(); + + // Number the members 0 to M - 1. + this.memberIds = new ArrayList<>(groupSpec.memberIds()); + this.memberIds.sort(null); Review Comment: It's entirely for determinism. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilder.java: ########## @@ -75,79 +84,153 @@ public class UniformHeterogeneousAssignmentBuilder { private final Set<Uuid> subscribedTopicIds; /** - * List of subscribed members for each topic. + * The list of members in the consumer group, sorted by member id. */ - private final Map<Uuid, List<String>> membersPerTopic; + private final List<String> memberIds; /** - * The new assignment that will be returned. + * Maps member ids to their indices in the memberIds list. */ - private final Map<String, MemberAssignment> targetAssignment; + private final Map<String, Integer> memberIndices; /** - * The partitions that still need to be assigned. + * List of subscribed members for each topic, in ascending order. + * <p/> + * Members are stored as integer indices into the memberIds array. */ - private final Set<TopicIdPartition> unassignedPartitions; + private final Map<Uuid, List<Integer>> topicSubscribers; /** - * All the partitions that have been retained from the existing assignment. + * The new assignment that will be returned. + */ + private final Map<String, MemberAssignment> targetAssignment; + + /** + * The number of partitions each member is assigned in the new assignment. + * <p/> + * The indices are the same as the memberIds array. memberTargetAssignmentSizes[i] contains the + * number of partitions assigned to memberIds[i]. + * <p/> + * We use a plain int[] array here for performance reasons. Looking up the number of partitions + * assigned to a given member is a very common operation. By using a plain int[] array, we can + * avoid: + * <ul> + * <li>the overhead of boxing and unboxing integers.</li> + * <li>the cost of HashMap lookups.</li> + * <li>the cost of checking member ids for equality when there are HashMap collisions.</li> + * </ul> */ - private final Set<TopicIdPartition> assignedStickyPartitions; + private final int[] memberTargetAssignmentSizes; /** - * Manages assignments to members based on their current assignment size and maximum allowed assignment size. + * The partitions that still need to be assigned. */ - private final AssignmentManager assignmentManager; + private final Map<Uuid, List<Integer>> unassignedPartitions; /** - * List of all the members sorted by their respective assignment sizes. + * Orders members by their number of assigned partitions, ascending. + * <p/> + * Ties are broken by member index, ascending. */ - private final TreeSet<String> sortedMembersByAssignmentSize; + private final Comparator<Integer> memberComparator; /** - * Tracks the owner of each partition in the target assignment. + * Tracks the owner of each partition in the previous assignment. + * <p/> + * Only assignments which would continue to be valid for the current set of members and + * subscriptions are present. + * <p/> + * Owners are represented as indices into the memberIds array. + * -1 indicates an unassigned partition, a previous owner that is no longer subscribed to the + * topic or a previous owner that is no longer part of the consumer group. + * <p/> + * Used for maximizing stickiness. */ - private final Map<TopicIdPartition, String> partitionOwnerInTargetAssignment; + private final Map<Uuid, int[]> previousAssignmentPartitionOwners; /** - * Handles all operations related to partition movements during a reassignment for balancing the target assignment. + * Tracks the owner of each partition in the target assignment. + * <p/> + * Owners are represented as indices into the memberIds array. + * -1 indicates an unassigned partition. */ - private final PartitionMovements partitionMovements; + private final Map<Uuid, int[]> targetAssignmentPartitionOwners; public UniformHeterogeneousAssignmentBuilder(GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber) { this.groupSpec = groupSpec; this.subscribedTopicDescriber = subscribedTopicDescriber; this.subscribedTopicIds = new HashSet<>(); - this.membersPerTopic = new HashMap<>(); + + // Number the members 0 to M - 1. + this.memberIds = new ArrayList<>(groupSpec.memberIds()); + this.memberIds.sort(null); + this.memberIndices = new HashMap<>(); + for (int memberIndex = 0; memberIndex < this.memberIds.size(); memberIndex++) { + memberIndices.put(memberIds.get(memberIndex), memberIndex); + } + + this.topicSubscribers = new HashMap<>(); + this.targetAssignment = new HashMap<>(); - groupSpec.memberIds().forEach(memberId -> { - groupSpec.memberSubscription(memberId).subscribedTopicIds().forEach(topicId -> { + this.memberTargetAssignmentSizes = new int[this.memberIds.size()]; + + // Build set of all subscribed topics and sets of subscribers per topic. + for (int memberIndex = 0; memberIndex < this.memberIds.size(); memberIndex++) { + String memberId = this.memberIds.get(memberIndex); + for (Uuid topicId : groupSpec.memberSubscription(memberId).subscribedTopicIds()) { // Check if the subscribed topic exists. - int partitionCount = subscribedTopicDescriber.numPartitions(topicId); - if (partitionCount == -1) { + int numPartitions = subscribedTopicDescriber.numPartitions(topicId); + if (numPartitions == -1) { throw new PartitionAssignorException( "Members are subscribed to topic " + topicId + " which doesn't exist in the topic metadata." ); } subscribedTopicIds.add(topicId); - membersPerTopic.computeIfAbsent(topicId, k -> new ArrayList<>()).add(memberId); - }); - targetAssignment.put(memberId, new MemberAssignmentImpl(new HashMap<>())); - }); - this.unassignedPartitions = topicIdPartitions(subscribedTopicIds, subscribedTopicDescriber); - this.assignedStickyPartitions = new HashSet<>(); - this.assignmentManager = new AssignmentManager(this.subscribedTopicDescriber); - this.sortedMembersByAssignmentSize = assignmentManager.sortMembersByAssignmentSize(groupSpec.memberIds()); - this.partitionOwnerInTargetAssignment = new HashMap<>(); - this.partitionMovements = new PartitionMovements(); + topicSubscribers.computeIfAbsent(topicId, k -> new ArrayList<>()).add(memberIndex); + } + } + this.unassignedPartitions = new HashMap<>(); + + this.memberComparator = new Comparator<Integer>() { + @Override + public int compare(final Integer memberIndex1, final Integer memberIndex2) { + // Order by number of assigned partitions, ascending. + int order = Integer.compare( + memberTargetAssignmentSizes[memberIndex1], + memberTargetAssignmentSizes[memberIndex2] + ); + + // Then order by member index, ascending. This is equivalent to ordering by + // member id, ascending, since memberIds is sorted. + if (order == 0) { + order = memberIndex1.compareTo(memberIndex2); + } + + return order; + } + }; + + // Initialize partition owners for the previous and target assignments. + this.previousAssignmentPartitionOwners = new HashMap<>((int) ((this.subscribedTopicIds.size() / 0.75f) + 1)); Review Comment: `previousAssignmentPartitionOwners` is only used during the iterative rebalance phase to restore partitions to their previous owners if it helps balance. To use `GroupSpec`, we'd need to update the method or add a new method to return the assigned memberId. Then compared to the current approach, we'd end up with an extra Uuid map lookup and an extra memberId map lookup per query. I think there would be a performance cost to this, but I have not tested it. I'm actually tempted to remove the usage of this entirely. See comments on `balanceTopicRestoringStickiness`. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilder.java: ########## @@ -157,716 +240,779 @@ public GroupAssignment build() { assignStickyPartitions(); - unassignedPartitionsAssignment(); + computeUnassignedPartitions(unassignedPartitions); + unassignedPartitionsAssignment(unassignedPartitions); balance(); return new GroupAssignment(targetAssignment); } /** - * <li> TopicIdPartitions are sorted in descending order based on the value: - * totalPartitions/number of subscribed members. </li> - * <li> If the above value is the same then topicIdPartitions are sorted in - * ascending order of number of subscribers. </li> - * <li> If both criteria are the same, sort in ascending order of the partition Id. - * This last criteria is for predictability of the assignments. </li> + * Sorts topics based on the following criteria: + * <ol> + * <li>Topics are sorted in descending order of totalPartitions / number of subscribers + * (partitions per subscriber).</li> + * <li>Ties are broken by sorting in ascending order of number of subscribers.</li> + * <li>Any remaining ties are broken by sorting in ascending order of topic id.</li> + * </ol> + * + * The last criteria is for predictability of assignments. * - * @param topicIdPartitions The topic partitions that need to be sorted. - * @return A list of sorted topic partitions. + * @param topicIds The topic ids that need to be sorted. + * @return A list of sorted topic ids. */ - private List<TopicIdPartition> sortTopicIdPartitions(Collection<TopicIdPartition> topicIdPartitions) { - Comparator<TopicIdPartition> comparator = Comparator - .comparingDouble((TopicIdPartition topicIdPartition) -> { - int totalPartitions = subscribedTopicDescriber.numPartitions(topicIdPartition.topicId()); - int totalSubscribers = membersPerTopic.get(topicIdPartition.topicId()).size(); - return (double) totalPartitions / totalSubscribers; - }) - .reversed() - .thenComparingInt(topicIdPartition -> membersPerTopic.get(topicIdPartition.topicId()).size()) - .thenComparingInt(TopicIdPartition::partitionId); + private List<Uuid> sortTopicIds(Collection<Uuid> topicIds) { + Comparator<Uuid> comparator = new Comparator<Uuid>() { + @Override + public int compare(final Uuid topic1Id, final Uuid topic2Id) { + int topic1PartitionCount = subscribedTopicDescriber.numPartitions(topic1Id); + int topic2PartitionCount = subscribedTopicDescriber.numPartitions(topic2Id); + int topic1SubscriberCount = topicSubscribers.get(topic1Id).size(); + int topic2SubscriberCount = topicSubscribers.get(topic2Id).size(); + + // Order by partitions per subscriber, descending. + int order = Double.compare( + (double) topic2PartitionCount / topic2SubscriberCount, + (double) topic1PartitionCount / topic1SubscriberCount + ); + + // Then order by subscriber count, ascending. + if (order == 0) { + order = Integer.compare(topic1SubscriberCount, topic2SubscriberCount); + } - return topicIdPartitions.stream() - .sorted(comparator) - .collect(Collectors.toList()); + // Then order by topic id, ascending. + if (order == 0) { + order = topic1Id.compareTo(topic2Id); + } + + return order; + } + }; + + List<Uuid> sortedTopicIds = new ArrayList<>(topicIds); + sortedTopicIds.sort(comparator); + return sortedTopicIds; } /** - * Gets a set of partitions that are to be retained from the existing assignment. This includes: - * <li> Partitions from topics that are still present in both the new subscriptions and the topic metadata. </li> + * Imports the existing assignment into the target assignment, taking into account topic + * unsubscriptions. */ private void assignStickyPartitions() { - groupSpec.memberIds().forEach(memberId -> - groupSpec.memberAssignment(memberId).partitions().forEach((topicId, currentAssignment) -> { + for (String memberId : groupSpec.memberIds()) { + int memberIndex = memberIndices.get(memberId); + + Map<Uuid, Set<Integer>> oldAssignment = groupSpec.memberAssignment(memberId).partitions(); + Map<Uuid, Set<Integer>> newAssignment = null; + + for (Map.Entry<Uuid, Set<Integer>> entry : oldAssignment.entrySet()) { + Uuid topicId = entry.getKey(); + Set<Integer> partitions = entry.getValue(); + if (groupSpec.memberSubscription(memberId).subscribedTopicIds().contains(topicId)) { - currentAssignment.forEach(partition -> { - TopicIdPartition topicIdPartition = new TopicIdPartition(topicId, partition); - assignmentManager.addPartitionToTargetAssignment(topicIdPartition, memberId); - assignedStickyPartitions.add(topicIdPartition); - }); + memberTargetAssignmentSizes[memberIndex] += partitions.size(); + + int numPartitions = subscribedTopicDescriber.numPartitions(topicId); + for (int partition : partitions) { + if (partition >= numPartitions) { Review Comment: I think we should remove the check for consistency with the homogeneous case. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilder.java: ########## @@ -157,716 +240,779 @@ public GroupAssignment build() { assignStickyPartitions(); - unassignedPartitionsAssignment(); + computeUnassignedPartitions(unassignedPartitions); + unassignedPartitionsAssignment(unassignedPartitions); balance(); return new GroupAssignment(targetAssignment); } /** - * <li> TopicIdPartitions are sorted in descending order based on the value: - * totalPartitions/number of subscribed members. </li> - * <li> If the above value is the same then topicIdPartitions are sorted in - * ascending order of number of subscribers. </li> - * <li> If both criteria are the same, sort in ascending order of the partition Id. - * This last criteria is for predictability of the assignments. </li> + * Sorts topics based on the following criteria: + * <ol> + * <li>Topics are sorted in descending order of totalPartitions / number of subscribers + * (partitions per subscriber).</li> + * <li>Ties are broken by sorting in ascending order of number of subscribers.</li> + * <li>Any remaining ties are broken by sorting in ascending order of topic id.</li> + * </ol> + * + * The last criteria is for predictability of assignments. * - * @param topicIdPartitions The topic partitions that need to be sorted. - * @return A list of sorted topic partitions. + * @param topicIds The topic ids that need to be sorted. + * @return A list of sorted topic ids. */ - private List<TopicIdPartition> sortTopicIdPartitions(Collection<TopicIdPartition> topicIdPartitions) { - Comparator<TopicIdPartition> comparator = Comparator - .comparingDouble((TopicIdPartition topicIdPartition) -> { - int totalPartitions = subscribedTopicDescriber.numPartitions(topicIdPartition.topicId()); - int totalSubscribers = membersPerTopic.get(topicIdPartition.topicId()).size(); - return (double) totalPartitions / totalSubscribers; - }) - .reversed() - .thenComparingInt(topicIdPartition -> membersPerTopic.get(topicIdPartition.topicId()).size()) - .thenComparingInt(TopicIdPartition::partitionId); + private List<Uuid> sortTopicIds(Collection<Uuid> topicIds) { + Comparator<Uuid> comparator = new Comparator<Uuid>() { + @Override + public int compare(final Uuid topic1Id, final Uuid topic2Id) { + int topic1PartitionCount = subscribedTopicDescriber.numPartitions(topic1Id); + int topic2PartitionCount = subscribedTopicDescriber.numPartitions(topic2Id); + int topic1SubscriberCount = topicSubscribers.get(topic1Id).size(); + int topic2SubscriberCount = topicSubscribers.get(topic2Id).size(); + + // Order by partitions per subscriber, descending. + int order = Double.compare( + (double) topic2PartitionCount / topic2SubscriberCount, + (double) topic1PartitionCount / topic1SubscriberCount + ); + + // Then order by subscriber count, ascending. + if (order == 0) { + order = Integer.compare(topic1SubscriberCount, topic2SubscriberCount); + } - return topicIdPartitions.stream() - .sorted(comparator) - .collect(Collectors.toList()); + // Then order by topic id, ascending. + if (order == 0) { + order = topic1Id.compareTo(topic2Id); + } + + return order; + } + }; + + List<Uuid> sortedTopicIds = new ArrayList<>(topicIds); + sortedTopicIds.sort(comparator); + return sortedTopicIds; } /** - * Gets a set of partitions that are to be retained from the existing assignment. This includes: - * <li> Partitions from topics that are still present in both the new subscriptions and the topic metadata. </li> + * Imports the existing assignment into the target assignment, taking into account topic + * unsubscriptions. */ private void assignStickyPartitions() { - groupSpec.memberIds().forEach(memberId -> - groupSpec.memberAssignment(memberId).partitions().forEach((topicId, currentAssignment) -> { + for (String memberId : groupSpec.memberIds()) { + int memberIndex = memberIndices.get(memberId); + + Map<Uuid, Set<Integer>> oldAssignment = groupSpec.memberAssignment(memberId).partitions(); + Map<Uuid, Set<Integer>> newAssignment = null; + + for (Map.Entry<Uuid, Set<Integer>> entry : oldAssignment.entrySet()) { + Uuid topicId = entry.getKey(); + Set<Integer> partitions = entry.getValue(); + if (groupSpec.memberSubscription(memberId).subscribedTopicIds().contains(topicId)) { - currentAssignment.forEach(partition -> { - TopicIdPartition topicIdPartition = new TopicIdPartition(topicId, partition); - assignmentManager.addPartitionToTargetAssignment(topicIdPartition, memberId); - assignedStickyPartitions.add(topicIdPartition); - }); + memberTargetAssignmentSizes[memberIndex] += partitions.size(); + + int numPartitions = subscribedTopicDescriber.numPartitions(topicId); + for (int partition : partitions) { + if (partition >= numPartitions) { + LOG.warn( + "Previous assignment for {} contains {}:{}, but topic only has {} partitions", + memberId, + topicId, + partition, + numPartitions + ); + + if (newAssignment == null) { + // If the new assignment is null, we create a deep copy of the + // original assignment so that we can alter it. + newAssignment = deepCopy(oldAssignment); + } + Set<Integer> newPartitions = newAssignment.get(topicId); + newPartitions.remove(partition); + if (newPartitions.isEmpty()) { + newAssignment.remove(topicId); + } + memberTargetAssignmentSizes[memberIndex]--; + + continue; Review Comment: Now that the check for reduced partition counts is gone, this code no longer exists. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org