dajac commented on code in PR #17385: URL: https://github.com/apache/kafka/pull/17385#discussion_r1799472940
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilder.java: ########## @@ -75,79 +84,153 @@ public class UniformHeterogeneousAssignmentBuilder { private final Set<Uuid> subscribedTopicIds; /** - * List of subscribed members for each topic. + * The list of members in the consumer group, sorted by member id. */ - private final Map<Uuid, List<String>> membersPerTopic; + private final List<String> memberIds; /** - * The new assignment that will be returned. + * Maps member ids to their indices in the memberIds list. */ - private final Map<String, MemberAssignment> targetAssignment; + private final Map<String, Integer> memberIndices; /** - * The partitions that still need to be assigned. + * List of subscribed members for each topic, in ascending order. + * <p/> + * Members are stored as integer indices into the memberIds array. */ - private final Set<TopicIdPartition> unassignedPartitions; + private final Map<Uuid, List<Integer>> topicSubscribers; /** - * All the partitions that have been retained from the existing assignment. + * The new assignment that will be returned. + */ + private final Map<String, MemberAssignment> targetAssignment; + + /** + * The number of partitions each member is assigned in the new assignment. + * <p/> + * The indices are the same as the memberIds array. memberTargetAssignmentSizes[i] contains the + * number of partitions assigned to memberIds[i]. + * <p/> + * We use a plain int[] array here for performance reasons. Looking up the number of partitions + * assigned to a given member is a very common operation. By using a plain int[] array, we can + * avoid: + * <ul> + * <li>the overhead of boxing and unboxing integers.</li> + * <li>the cost of HashMap lookups.</li> + * <li>the cost of checking member ids for equality when there are HashMap collisions.</li> + * </ul> */ - private final Set<TopicIdPartition> assignedStickyPartitions; + private final int[] memberTargetAssignmentSizes; /** - * Manages assignments to members based on their current assignment size and maximum allowed assignment size. + * The partitions that still need to be assigned. */ - private final AssignmentManager assignmentManager; + private final Map<Uuid, List<Integer>> unassignedPartitions; /** - * List of all the members sorted by their respective assignment sizes. + * Orders members by their number of assigned partitions, ascending. + * <p/> + * Ties are broken by member index, ascending. */ - private final TreeSet<String> sortedMembersByAssignmentSize; + private final Comparator<Integer> memberComparator; /** - * Tracks the owner of each partition in the target assignment. + * Tracks the owner of each partition in the previous assignment. + * <p/> + * Only assignments which would continue to be valid for the current set of members and + * subscriptions are present. + * <p/> + * Owners are represented as indices into the memberIds array. + * -1 indicates an unassigned partition, a previous owner that is no longer subscribed to the + * topic or a previous owner that is no longer part of the consumer group. + * <p/> + * Used for maximizing stickiness. */ - private final Map<TopicIdPartition, String> partitionOwnerInTargetAssignment; + private final Map<Uuid, int[]> previousAssignmentPartitionOwners; /** - * Handles all operations related to partition movements during a reassignment for balancing the target assignment. + * Tracks the owner of each partition in the target assignment. + * <p/> + * Owners are represented as indices into the memberIds array. + * -1 indicates an unassigned partition. */ - private final PartitionMovements partitionMovements; + private final Map<Uuid, int[]> targetAssignmentPartitionOwners; public UniformHeterogeneousAssignmentBuilder(GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber) { this.groupSpec = groupSpec; this.subscribedTopicDescriber = subscribedTopicDescriber; this.subscribedTopicIds = new HashSet<>(); - this.membersPerTopic = new HashMap<>(); + + // Number the members 0 to M - 1. + this.memberIds = new ArrayList<>(groupSpec.memberIds()); + this.memberIds.sort(null); Review Comment: Is it required by the algorithm or is it only required to make the tests deterministic? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org