AndrewJSchofield commented on code in PR #19142:
URL: https://github.com/apache/kafka/pull/19142#discussion_r1985365027


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java:
##########
@@ -135,56 +148,71 @@ private GroupAssignment newAssignmentHomogeneous(
         List<TopicIdPartition> targetPartitions,
         Map<TopicIdPartition, List<String>> currentAssignment
     ) {
-        Map<TopicIdPartition, List<String>> newAssignment = new HashMap<>();
-
-        // Step 1: Hash member IDs to topic partitions.
-        memberHashAssignment(targetPartitions, groupSpec.memberIds(), 
newAssignment);
-
-        // Step 2: Round-robin assignment for unassigned partitions which do 
not have members already assigned in the current assignment.
-        List<TopicIdPartition> unassignedPartitions = targetPartitions.stream()
-            .filter(targetPartition -> 
!newAssignment.containsKey(targetPartition))
-            .filter(targetPartition -> 
!currentAssignment.containsKey(targetPartition))
-            .toList();
-
-        roundRobinAssignment(groupSpec.memberIds(), unassignedPartitions, 
newAssignment);
-
-        // Step 3: We combine current assignment and new assignment.
-        Map<String, Set<TopicIdPartition>> finalAssignment = new HashMap<>();
-
-        // As per the KIP, we should revoke the assignments from current 
assignment for partitions that were assigned by step 1
-        // in the new assignment and have members in current assignment by 
step 2. But we haven't implemented it to avoid the
-        // complexity in both the implementation and the run time complexity. 
This step was mentioned in the KIP to reduce
-        // the burden of certain members of the share groups. This can be 
achieved with the help of limiting the max
-        // no. of partitions assignment for every member(KAFKA-18788). Hence, 
the potential problem of burdening
-        // the share consumers will be addressed in a future PR.
+        // For entirely balanced assignment, we would expect 
(numTargetPartitions / numGroupMembers) partitions per member, rounded upwards.
+        // That can be expressed as         Math.ceil(numTargetPartitions / 
(double) numGroupMembers)
+        // Using integer arithmetic, as     (numTargetPartitions + 
numGroupMembers - 1) / numGroupMembers
+        int numGroupMembers = groupSpec.memberIds().size();
+        int numTargetPartitions = targetPartitions.size();
+        int desiredAssignmentCount = (numTargetPartitions + numGroupMembers - 
1) / numGroupMembers;
+
+        Map<TopicIdPartition, List<String>> newAssignment = new 
HashMap<>((int) (((numTargetPartitions + 1) / 0.75f) + 1));
+
+        // Hash member IDs to topic partitions. Each member will be assigned 
one partition, but some partitions
+        // might have been assigned to more than one member.
+        memberHashAssignment(groupSpec.memberIds(), targetPartitions, 
newAssignment);
+
+        // Combine current and new hashed assignments, sized to accommodate 
the expected number of mappings.
+        Map<String, Set<TopicIdPartition>> finalAssignment = new 
HashMap<>((int) ((numGroupMembers / 0.75f) + 1));
+        Map<TopicIdPartition, Set<String>> finalAssignmentByPartition = new 
HashMap<>((int) (((numTargetPartitions + 1) / 0.75f) + 1));
+
+        // First, take the members assigned by hashing.
+        newAssignment.forEach((targetPartition, members) -> 
members.forEach(member -> {
+            finalAssignment.computeIfAbsent(member, k -> new 
HashSet<>()).add(targetPartition);
+            finalAssignmentByPartition.computeIfAbsent(targetPartition, k -> 
new HashSet<>()).add(member);
+        }));
 
-        newAssignment.forEach((targetPartition, members) -> 
members.forEach(member ->
-            finalAssignment.computeIfAbsent(member, k -> new 
HashSet<>()).add(targetPartition)));
+        // Then, take the members from the current assignment, making sure 
that no member has too many assigned partitions.
         // When combining current assignment, we need to only consider the 
topics in current assignment that are also being
         // subscribed in the new assignment as well.
         currentAssignment.forEach((targetPartition, members) -> {
-            if (subscribedTopicIds.contains(targetPartition.topicId()))
+            if (subscribedTopicIds.contains(targetPartition.topicId())) {
                 members.forEach(member -> {
-                    if (groupSpec.memberIds().contains(member) && 
!newAssignment.containsKey(targetPartition))
-                        finalAssignment.computeIfAbsent(member, k -> new 
HashSet<>()).add(targetPartition);
+                    if (groupSpec.memberIds().contains(member)) {
+                        Set<TopicIdPartition> memberPartitions = 
finalAssignment.computeIfAbsent(member, k -> new HashSet<>());
+                        if ((memberPartitions.size() < desiredAssignmentCount) 
&& !newAssignment.containsKey(targetPartition)) {
+                            memberPartitions.add(targetPartition);
+                            
finalAssignmentByPartition.computeIfAbsent(targetPartition, k -> new 
HashSet<>()).add(member);
+                        }
+                    }
                 });
+            }
         });
 
+        // Finally, round-robin assignment for unassignment partitions which 
do not already have members assigned.

Review Comment:
   Thanks. Saw it too. Will fix in next commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to