jeffkbkim commented on code in PR #15974:
URL: https://github.com/apache/kafka/pull/15974#discussion_r1610394065


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilderTest.java:
##########
@@ -79,8 +80,16 @@ public void testTwoMembersNoTopicSubscription() {
             Collections.emptyMap()
         ));
 
-        AssignmentSpec assignmentSpec = new AssignmentSpec(members, 
HETEROGENEOUS);
-        GroupAssignment groupAssignment = assignor.assign(assignmentSpec, 
subscribedTopicMetadata);
+        GroupSpecImpl groupSpec = new GroupSpecImpl(

Review Comment:
   nit: should all the `GroupSpecImpl groupSpec` initializations in the tests 
be `GroupSpec groupSpec`?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/AssignmentTestUtil.java:
##########
@@ -82,4 +83,34 @@ public static void assertAssignment(
             assertEquals(expectedAssignment.get(memberId), 
computedAssignmentForMember);
         });
     }
+
+    /**
+     * Generate a reverse look up map of partition to member target 
assignments from the given member spec.
+     *
+     * @param memberSpec        A map where the key is the member Id and the 
value is an
+     *                          AssignmentMemberSpec object containing the 
member's partition assignments.
+     * @return Map of topic partition to member assignments.
+     */
+    public static Map<Uuid, Map<Integer, String>> invertedTargetAssignment(
+        Map<String, AssignmentMemberSpec> memberSpec
+    ) {
+        Map<Uuid, Map<Integer, String>> invertedTargetAssignment = new 
HashMap<>();
+        for (Map.Entry<String, AssignmentMemberSpec> memberEntry : 
memberSpec.entrySet()) {
+            String memberId = memberEntry.getKey();
+            Map<Uuid, Set<Integer>> topicsAndPartitions = 
memberEntry.getValue().assignedPartitions();
+
+            for (Map.Entry<Uuid, Set<Integer>> topicEntry : 
topicsAndPartitions.entrySet()) {
+                Uuid topicId = topicEntry.getKey();
+                Set<Integer> partitions = topicEntry.getValue();
+
+                invertedTargetAssignment.putIfAbsent(topicId, new HashMap<>());
+                Map<Integer, String> partitionMap = 
invertedTargetAssignment.get(topicId);

Review Comment:
   i think computeIfAbsent would work



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignorException.java:
##########
@@ -19,7 +19,7 @@
 import org.apache.kafka.common.errors.ApiException;
 
 /**
- * Exception thrown by {@link PartitionAssignor#assign(AssignmentSpec)}. The 
exception
+ * Exception thrown by {@link PartitionAssignor#assign(GroupSpecImpl, 
SubscribedTopicDescriber)}}. The exception

Review Comment:
   i think this needs to be GroupSpec



##########
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java:
##########
@@ -186,9 +190,33 @@ private void createAssignmentSpec() {
                 Collections.emptyMap()
             ));
         }
-        assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
+        groupSpec = new GroupSpecImpl(members, HOMOGENEOUS, 
Collections.emptyMap());
     }
 
+    public Map<Uuid, Map<Integer, String>> invertedTargetAssignment(
+        GroupAssignment groupAssignment
+    ) {
+        Map<Uuid, Map<Integer, String>> invertedTargetAssignment = new 
HashMap<>();
+        for (Map.Entry<String, MemberAssignment> memberEntry : 
groupAssignment.members().entrySet()) {
+            String memberId = memberEntry.getKey();
+            Map<Uuid, Set<Integer>> topicsAndPartitions = 
memberEntry.getValue().targetPartitions();
+
+            for (Map.Entry<Uuid, Set<Integer>> topicEntry : 
topicsAndPartitions.entrySet()) {
+                Uuid topicId = topicEntry.getKey();
+                Set<Integer> partitions = topicEntry.getValue();
+
+                invertedTargetAssignment.putIfAbsent(topicId, new HashMap<>());
+                Map<Integer, String> partitionMap = 
invertedTargetAssignment.get(topicId);
+
+                for (Integer partitionId : partitions) {
+                    partitionMap.put(partitionId, memberId);
+                }
+            }
+        }
+        return invertedTargetAssignment;
+    }
+
+

Review Comment:
   nit: newline



##########
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java:
##########
@@ -186,9 +190,33 @@ private void createAssignmentSpec() {
                 Collections.emptyMap()
             ));
         }
-        assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
+        groupSpec = new GroupSpecImpl(members, HOMOGENEOUS, 
Collections.emptyMap());
     }
 
+    public Map<Uuid, Map<Integer, String>> invertedTargetAssignment(

Review Comment:
   is it not possible to unify this with 
ServerSideAssignorBenchmark#invertedTargetAssignment?
   
   seems like some details are different but they have a lot in common



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to