mumrah commented on code in PR #19142: URL: https://github.com/apache/kafka/pull/19142#discussion_r1987411712
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java: ########## @@ -234,67 +257,93 @@ private GroupAssignment newAssignmentHeterogeneous( return groupAssignment(finalAssignment, groupSpec.memberIds()); } - private GroupAssignment groupAssignment( - Map<String, Set<TopicIdPartition>> assignmentByMember, - Collection<String> allGroupMembers - ) { - Map<String, MemberAssignment> members = new HashMap<>(); - for (Map.Entry<String, Set<TopicIdPartition>> entry : assignmentByMember.entrySet()) { - Map<Uuid, Set<Integer>> targetPartitions = new HashMap<>(); - entry.getValue().forEach(targetPartition -> targetPartitions.computeIfAbsent(targetPartition.topicId(), k -> new HashSet<>()).add(targetPartition.partitionId())); - members.put(entry.getKey(), new MemberAssignmentImpl(targetPartitions)); - } - allGroupMembers.forEach(member -> { - if (!members.containsKey(member)) - members.put(member, new MemberAssignmentImpl(new HashMap<>())); - }); - - return new GroupAssignment(members); - } - /** * This function updates assignment by hashing the member IDs of the members and maps the partitions assigned to the - * members based on the hash. This gives approximately even balance. - * @param unassignedPartitions - the subscribed topic partitions which needs assignment. - * @param memberIds - the member ids to which the topic partitions need to be assigned. - * @param assignment - the existing assignment by topic partition. We need to pass it as a parameter because this - * function would be called multiple times for heterogeneous assignment. + * members based on the hash, one partition per member. This gives approximately even balance. + * @param memberIds The member ids to which the topic partitions need to be assigned. + * @param partitionsToAssign The subscribed topic partitions which needs assignment. + * @param assignment The existing assignment by topic partition. We need to pass it as a parameter because this + * method can be called multiple times for heterogeneous assignment. */ // Visible for testing void memberHashAssignment( - List<TopicIdPartition> unassignedPartitions, Collection<String> memberIds, + List<TopicIdPartition> partitionsToAssign, Map<TopicIdPartition, List<String>> assignment ) { - if (!unassignedPartitions.isEmpty()) + if (!partitionsToAssign.isEmpty()) { for (String memberId : memberIds) { - int topicPartitionIndex = Math.abs(memberId.hashCode() % unassignedPartitions.size()); - TopicIdPartition topicPartition = unassignedPartitions.get(topicPartitionIndex); + int topicPartitionIndex = Math.abs(memberId.hashCode() % partitionsToAssign.size()); + TopicIdPartition topicPartition = partitionsToAssign.get(topicPartitionIndex); assignment.computeIfAbsent(topicPartition, k -> new ArrayList<>()).add(memberId); } + } } /** - * This functions assigns topic partitions to members by round-robin approach and updates the existing assignment. - * @param memberIds - the member ids to which the topic partitions need to be assigned, should be non-empty. - * @param unassignedPartitions - the subscribed topic partitions which needs assignment. - * @param assignment - the existing assignment by topic partition. + * This functions assigns topic partitions to members by a round-robin approach and updates the existing assignment. + * @param memberIds The member ids to which the topic partitions need to be assigned, should be non-empty. + * @param partitionsToAssign The subscribed topic partitions which needs assignment. + * @param assignment The existing assignment by topic partition. We need to pass it as a parameter because this + * method can be called multiple times for heterogeneous assignment. */ // Visible for testing void roundRobinAssignment( Collection<String> memberIds, - List<TopicIdPartition> unassignedPartitions, + List<TopicIdPartition> partitionsToAssign, Map<TopicIdPartition, List<String>> assignment ) { // We iterate through the target partitions and assign a memberId to them. In case we run out of members (members < targetPartitions), // we again start from the starting index of memberIds. Iterator<String> memberIdIterator = memberIds.iterator(); - for (TopicIdPartition targetPartition : unassignedPartitions) { + for (TopicIdPartition topicPartition : partitionsToAssign) { if (!memberIdIterator.hasNext()) { memberIdIterator = memberIds.iterator(); } String memberId = memberIdIterator.next(); - assignment.computeIfAbsent(targetPartition, k -> new ArrayList<>()).add(memberId); + assignment.computeIfAbsent(topicPartition, k -> new ArrayList<>()).add(memberId); + } + } + + /** + * This functions assigns topic partitions to members by a round-robin approach and updates the existing assignment. + * @param memberIds The member ids to which the topic partitions need to be assigned, should be non-empty. + * @param partitionsToAssign The subscribed topic partitions which needs assignment. + * @param assignment The existing assignment by topic partition. We need to pass it as a parameter because this + * method can be called multiple times for heterogeneous assignment. + * @param desiredAssignmentCount The number of partitions which can be assigned to each member to give even balance. + * Note that this number is slightly higher than strictly required to allow for situations + * in which we have hashing collisions. + */ + void roundRobinAssignmentWithCount( + Collection<String> memberIds, + List<TopicIdPartition> partitionsToAssign, + Map<String, Set<TopicIdPartition>> assignment, + int desiredAssignmentCount + ) { + Collection<String> memberIdsCopy = new LinkedHashSet<>(memberIds); + + // We iterate through the target partitions which are not in the assignment and assign a memberId to them. + // In case we run out of members (memberIds < partitionsToAssign), we again start from the starting index of memberIds. + Iterator<String> memberIdIterator = memberIdsCopy.iterator(); + ListIterator<TopicIdPartition> partitionListIterator = partitionsToAssign.listIterator(); + while (partitionListIterator.hasNext()) { + TopicIdPartition partition = partitionListIterator.next(); + if (!memberIdIterator.hasNext()) { + memberIdIterator = memberIdsCopy.iterator(); + if (memberIdsCopy.isEmpty()) { + // This should never happen, but guarding against an infinite loop + throw new PartitionAssignorException("Inconsistent number of member IDs"); + } + } + String memberId = memberIdIterator.next(); + Set<TopicIdPartition> memberPartitions = assignment.computeIfAbsent(memberId, k -> new HashSet<>()); + if (memberPartitions.size() <= desiredAssignmentCount) { + memberPartitions.add(partition); + } else { + memberIdIterator.remove(); + partitionListIterator.previous(); Review Comment: I see, thanks for the clarification. My concern was also if we could pass in some invalid input to this method and end up getting stuck (e.g., 55 partitions, 5 members, but desiredAssignmentCount of 2). The check on L336 catches it. Can we add a unit tests for this exceptional case as well? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org