mumrah commented on code in PR #19142:
URL: https://github.com/apache/kafka/pull/19142#discussion_r1987411712


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java:
##########
@@ -234,67 +257,93 @@ private GroupAssignment newAssignmentHeterogeneous(
         return groupAssignment(finalAssignment, groupSpec.memberIds());
     }
 
-    private GroupAssignment groupAssignment(
-        Map<String, Set<TopicIdPartition>> assignmentByMember,
-        Collection<String> allGroupMembers
-    ) {
-        Map<String, MemberAssignment> members = new HashMap<>();
-        for (Map.Entry<String, Set<TopicIdPartition>> entry : 
assignmentByMember.entrySet()) {
-            Map<Uuid, Set<Integer>> targetPartitions = new HashMap<>();
-            entry.getValue().forEach(targetPartition -> 
targetPartitions.computeIfAbsent(targetPartition.topicId(), k -> new 
HashSet<>()).add(targetPartition.partitionId()));
-            members.put(entry.getKey(), new 
MemberAssignmentImpl(targetPartitions));
-        }
-        allGroupMembers.forEach(member -> {
-            if (!members.containsKey(member))
-                members.put(member, new MemberAssignmentImpl(new HashMap<>()));
-        });
-
-        return new GroupAssignment(members);
-    }
-
     /**
      * This function updates assignment by hashing the member IDs of the 
members and maps the partitions assigned to the
-     * members based on the hash. This gives approximately even balance.
-     * @param unassignedPartitions - the subscribed topic partitions which 
needs assignment.
-     * @param memberIds - the member ids to which the topic partitions need to 
be assigned.
-     * @param assignment - the existing assignment by topic partition. We need 
to pass it as a parameter because this
-     *                   function would be called multiple times for 
heterogeneous assignment.
+     * members based on the hash, one partition per member. This gives 
approximately even balance.
+     * @param memberIds           The member ids to which the topic partitions 
need to be assigned.
+     * @param partitionsToAssign  The subscribed topic partitions which needs 
assignment.
+     * @param assignment          The existing assignment by topic partition. 
We need to pass it as a parameter because this
+     *                            method can be called multiple times for 
heterogeneous assignment.
      */
     // Visible for testing
     void memberHashAssignment(
-        List<TopicIdPartition> unassignedPartitions,
         Collection<String> memberIds,
+        List<TopicIdPartition> partitionsToAssign,
         Map<TopicIdPartition, List<String>> assignment
     ) {
-        if (!unassignedPartitions.isEmpty())
+        if (!partitionsToAssign.isEmpty()) {
             for (String memberId : memberIds) {
-                int topicPartitionIndex = Math.abs(memberId.hashCode() % 
unassignedPartitions.size());
-                TopicIdPartition topicPartition = 
unassignedPartitions.get(topicPartitionIndex);
+                int topicPartitionIndex = Math.abs(memberId.hashCode() % 
partitionsToAssign.size());
+                TopicIdPartition topicPartition = 
partitionsToAssign.get(topicPartitionIndex);
                 assignment.computeIfAbsent(topicPartition, k -> new 
ArrayList<>()).add(memberId);
             }
+        }
     }
 
     /**
-     * This functions assigns topic partitions to members by round-robin 
approach and updates the existing assignment.
-     * @param memberIds - the member ids to which the topic partitions need to 
be assigned, should be non-empty.
-     * @param unassignedPartitions - the subscribed topic partitions which 
needs assignment.
-     * @param assignment - the existing assignment by topic partition.
+     * This functions assigns topic partitions to members by a round-robin 
approach and updates the existing assignment.
+     * @param memberIds              The member ids to which the topic 
partitions need to be assigned, should be non-empty.
+     * @param partitionsToAssign     The subscribed topic partitions which 
needs assignment.
+     * @param assignment             The existing assignment by topic 
partition. We need to pass it as a parameter because this
+     *                               method can be called multiple times for 
heterogeneous assignment.
      */
     // Visible for testing
     void roundRobinAssignment(
         Collection<String> memberIds,
-        List<TopicIdPartition> unassignedPartitions,
+        List<TopicIdPartition> partitionsToAssign,
         Map<TopicIdPartition, List<String>> assignment
     ) {
         // We iterate through the target partitions and assign a memberId to 
them. In case we run out of members (members < targetPartitions),
         // we again start from the starting index of memberIds.
         Iterator<String> memberIdIterator = memberIds.iterator();
-        for (TopicIdPartition targetPartition : unassignedPartitions) {
+        for (TopicIdPartition topicPartition : partitionsToAssign) {
             if (!memberIdIterator.hasNext()) {
                 memberIdIterator = memberIds.iterator();
             }
             String memberId = memberIdIterator.next();
-            assignment.computeIfAbsent(targetPartition, k -> new 
ArrayList<>()).add(memberId);
+            assignment.computeIfAbsent(topicPartition, k -> new 
ArrayList<>()).add(memberId);
+        }
+    }
+
+    /**
+     * This functions assigns topic partitions to members by a round-robin 
approach and updates the existing assignment.
+     * @param memberIds              The member ids to which the topic 
partitions need to be assigned, should be non-empty.
+     * @param partitionsToAssign     The subscribed topic partitions which 
needs assignment.
+     * @param assignment             The existing assignment by topic 
partition. We need to pass it as a parameter because this
+     *                               method can be called multiple times for 
heterogeneous assignment.
+     * @param desiredAssignmentCount The number of partitions which can be 
assigned to each member to give even balance.
+     *                               Note that this number is slightly higher 
than strictly required to allow for situations
+     *                               in which we have hashing collisions.
+     */
+    void roundRobinAssignmentWithCount(
+        Collection<String> memberIds,
+        List<TopicIdPartition> partitionsToAssign,
+        Map<String, Set<TopicIdPartition>> assignment,
+        int desiredAssignmentCount
+    ) {
+        Collection<String> memberIdsCopy = new LinkedHashSet<>(memberIds);
+
+        // We iterate through the target partitions which are not in the 
assignment and assign a memberId to them.
+        // In case we run out of members (memberIds < partitionsToAssign), we 
again start from the starting index of memberIds.
+        Iterator<String> memberIdIterator = memberIdsCopy.iterator();
+        ListIterator<TopicIdPartition> partitionListIterator = 
partitionsToAssign.listIterator();
+        while (partitionListIterator.hasNext()) {
+            TopicIdPartition partition = partitionListIterator.next();
+            if (!memberIdIterator.hasNext()) {
+                memberIdIterator = memberIdsCopy.iterator();
+                if (memberIdsCopy.isEmpty()) {
+                    // This should never happen, but guarding against an 
infinite loop
+                    throw new PartitionAssignorException("Inconsistent number 
of member IDs");
+                }
+            }
+            String memberId = memberIdIterator.next();
+            Set<TopicIdPartition> memberPartitions = 
assignment.computeIfAbsent(memberId, k -> new HashSet<>());
+            if (memberPartitions.size() <= desiredAssignmentCount) {
+                memberPartitions.add(partition);
+            } else {
+                memberIdIterator.remove();
+                partitionListIterator.previous();

Review Comment:
   I see, thanks for the clarification. My concern was also if we could pass in 
some invalid input to this method and end up getting stuck (e.g., 55 
partitions, 5 members, but desiredAssignmentCount of 2). The check on L336 
catches it. Can we add a unit tests for this exceptional case as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to