AndrewJSchofield commented on code in PR #19142:
URL: https://github.com/apache/kafka/pull/19142#discussion_r1987145943


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java:
##########
@@ -135,56 +148,71 @@ private GroupAssignment newAssignmentHomogeneous(
         List<TopicIdPartition> targetPartitions,
         Map<TopicIdPartition, List<String>> currentAssignment
     ) {
-        Map<TopicIdPartition, List<String>> newAssignment = new HashMap<>();
-
-        // Step 1: Hash member IDs to topic partitions.
-        memberHashAssignment(targetPartitions, groupSpec.memberIds(), 
newAssignment);
-
-        // Step 2: Round-robin assignment for unassigned partitions which do 
not have members already assigned in the current assignment.
-        List<TopicIdPartition> unassignedPartitions = targetPartitions.stream()
-            .filter(targetPartition -> 
!newAssignment.containsKey(targetPartition))
-            .filter(targetPartition -> 
!currentAssignment.containsKey(targetPartition))
-            .toList();
-
-        roundRobinAssignment(groupSpec.memberIds(), unassignedPartitions, 
newAssignment);
-
-        // Step 3: We combine current assignment and new assignment.
-        Map<String, Set<TopicIdPartition>> finalAssignment = new HashMap<>();
-
-        // As per the KIP, we should revoke the assignments from current 
assignment for partitions that were assigned by step 1
-        // in the new assignment and have members in current assignment by 
step 2. But we haven't implemented it to avoid the
-        // complexity in both the implementation and the run time complexity. 
This step was mentioned in the KIP to reduce
-        // the burden of certain members of the share groups. This can be 
achieved with the help of limiting the max
-        // no. of partitions assignment for every member(KAFKA-18788). Hence, 
the potential problem of burdening
-        // the share consumers will be addressed in a future PR.
+        // For entirely balanced assignment, we would expect 
(numTargetPartitions / numGroupMembers) partitions per member, rounded upwards.
+        // That can be expressed as         Math.ceil(numTargetPartitions / 
(double) numGroupMembers)
+        // Using integer arithmetic, as     (numTargetPartitions + 
numGroupMembers - 1) / numGroupMembers
+        int numGroupMembers = groupSpec.memberIds().size();
+        int numTargetPartitions = targetPartitions.size();
+        int desiredAssignmentCount = (numTargetPartitions + numGroupMembers - 
1) / numGroupMembers;
+
+        Map<TopicIdPartition, List<String>> newAssignment = new 
HashMap<>((int) (((numTargetPartitions + 1) / 0.75f) + 1));

Review Comment:
   Makes perfect sense. Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to