AndrewJSchofield commented on code in PR #19142: URL: https://github.com/apache/kafka/pull/19142#discussion_r1987157915
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java: ########## @@ -234,67 +257,93 @@ private GroupAssignment newAssignmentHeterogeneous( return groupAssignment(finalAssignment, groupSpec.memberIds()); } - private GroupAssignment groupAssignment( - Map<String, Set<TopicIdPartition>> assignmentByMember, - Collection<String> allGroupMembers - ) { - Map<String, MemberAssignment> members = new HashMap<>(); - for (Map.Entry<String, Set<TopicIdPartition>> entry : assignmentByMember.entrySet()) { - Map<Uuid, Set<Integer>> targetPartitions = new HashMap<>(); - entry.getValue().forEach(targetPartition -> targetPartitions.computeIfAbsent(targetPartition.topicId(), k -> new HashSet<>()).add(targetPartition.partitionId())); - members.put(entry.getKey(), new MemberAssignmentImpl(targetPartitions)); - } - allGroupMembers.forEach(member -> { - if (!members.containsKey(member)) - members.put(member, new MemberAssignmentImpl(new HashMap<>())); - }); - - return new GroupAssignment(members); - } - /** * This function updates assignment by hashing the member IDs of the members and maps the partitions assigned to the - * members based on the hash. This gives approximately even balance. - * @param unassignedPartitions - the subscribed topic partitions which needs assignment. - * @param memberIds - the member ids to which the topic partitions need to be assigned. - * @param assignment - the existing assignment by topic partition. We need to pass it as a parameter because this - * function would be called multiple times for heterogeneous assignment. + * members based on the hash, one partition per member. This gives approximately even balance. + * @param memberIds The member ids to which the topic partitions need to be assigned. + * @param partitionsToAssign The subscribed topic partitions which needs assignment. + * @param assignment The existing assignment by topic partition. We need to pass it as a parameter because this + * method can be called multiple times for heterogeneous assignment. */ // Visible for testing void memberHashAssignment( - List<TopicIdPartition> unassignedPartitions, Collection<String> memberIds, + List<TopicIdPartition> partitionsToAssign, Map<TopicIdPartition, List<String>> assignment ) { - if (!unassignedPartitions.isEmpty()) + if (!partitionsToAssign.isEmpty()) { for (String memberId : memberIds) { - int topicPartitionIndex = Math.abs(memberId.hashCode() % unassignedPartitions.size()); - TopicIdPartition topicPartition = unassignedPartitions.get(topicPartitionIndex); + int topicPartitionIndex = Math.abs(memberId.hashCode() % partitionsToAssign.size()); + TopicIdPartition topicPartition = partitionsToAssign.get(topicPartitionIndex); assignment.computeIfAbsent(topicPartition, k -> new ArrayList<>()).add(memberId); } + } } /** - * This functions assigns topic partitions to members by round-robin approach and updates the existing assignment. - * @param memberIds - the member ids to which the topic partitions need to be assigned, should be non-empty. - * @param unassignedPartitions - the subscribed topic partitions which needs assignment. - * @param assignment - the existing assignment by topic partition. + * This functions assigns topic partitions to members by a round-robin approach and updates the existing assignment. + * @param memberIds The member ids to which the topic partitions need to be assigned, should be non-empty. + * @param partitionsToAssign The subscribed topic partitions which needs assignment. + * @param assignment The existing assignment by topic partition. We need to pass it as a parameter because this + * method can be called multiple times for heterogeneous assignment. */ // Visible for testing void roundRobinAssignment( Collection<String> memberIds, - List<TopicIdPartition> unassignedPartitions, + List<TopicIdPartition> partitionsToAssign, Map<TopicIdPartition, List<String>> assignment ) { // We iterate through the target partitions and assign a memberId to them. In case we run out of members (members < targetPartitions), // we again start from the starting index of memberIds. Iterator<String> memberIdIterator = memberIds.iterator(); - for (TopicIdPartition targetPartition : unassignedPartitions) { + for (TopicIdPartition topicPartition : partitionsToAssign) { if (!memberIdIterator.hasNext()) { memberIdIterator = memberIds.iterator(); } String memberId = memberIdIterator.next(); - assignment.computeIfAbsent(targetPartition, k -> new ArrayList<>()).add(memberId); + assignment.computeIfAbsent(topicPartition, k -> new ArrayList<>()).add(memberId); + } + } + + /** + * This functions assigns topic partitions to members by a round-robin approach and updates the existing assignment. + * @param memberIds The member ids to which the topic partitions need to be assigned, should be non-empty. + * @param partitionsToAssign The subscribed topic partitions which needs assignment. + * @param assignment The existing assignment by topic partition. We need to pass it as a parameter because this + * method can be called multiple times for heterogeneous assignment. + * @param desiredAssignmentCount The number of partitions which can be assigned to each member to give even balance. + * Note that this number is slightly higher than strictly required to allow for situations + * in which we have hashing collisions. + */ + void roundRobinAssignmentWithCount( + Collection<String> memberIds, + List<TopicIdPartition> partitionsToAssign, + Map<String, Set<TopicIdPartition>> assignment, + int desiredAssignmentCount + ) { + Collection<String> memberIdsCopy = new LinkedHashSet<>(memberIds); + + // We iterate through the target partitions which are not in the assignment and assign a memberId to them. + // In case we run out of members (memberIds < partitionsToAssign), we again start from the starting index of memberIds. + Iterator<String> memberIdIterator = memberIdsCopy.iterator(); + ListIterator<TopicIdPartition> partitionListIterator = partitionsToAssign.listIterator(); + while (partitionListIterator.hasNext()) { + TopicIdPartition partition = partitionListIterator.next(); + if (!memberIdIterator.hasNext()) { + memberIdIterator = memberIdsCopy.iterator(); + if (memberIdsCopy.isEmpty()) { + // This should never happen, but guarding against an infinite loop + throw new PartitionAssignorException("Inconsistent number of member IDs"); + } + } + String memberId = memberIdIterator.next(); + Set<TopicIdPartition> memberPartitions = assignment.computeIfAbsent(memberId, k -> new HashSet<>()); + if (memberPartitions.size() <= desiredAssignmentCount) { + memberPartitions.add(partition); + } else { + memberIdIterator.remove(); + partitionListIterator.previous(); Review Comment: It should not. Imagine that we have 55 partitions and 5 members. `desiredAssignmentCount` will be `(55+5-1)/5` which is 11. We expect 11 partitions per member. However, that's the optimistic case. If we happen to have the same partition assigned to all members, we will have to distribute the other 54 partitions across the 5 members, and that means we will potentially need 12 partitions for most members. `roundRobinAssignmentWithCount` uses `memberPartitions.size() <= desiredAssignmentCount`, so it is still prepared to add another partition when a member already has 11. As a result, the number that can be accommodated is 60. Similarly, if we have 56 partitions and 5 members, `desiredAssignmentCount` will be `(56+5-1)/5` which is now 12. We will tolerate up to 13 partitions per member. I have improved the comments around this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org