jeffkbkim commented on code in PR #15974: URL: https://github.com/apache/kafka/pull/15974#discussion_r1610394065
########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilderTest.java: ########## @@ -79,8 +80,16 @@ public void testTwoMembersNoTopicSubscription() { Collections.emptyMap() )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS); - GroupAssignment groupAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); + GroupSpecImpl groupSpec = new GroupSpecImpl( Review Comment: nit: should all the `GroupSpecImpl groupSpec` initializations in the tests be `GroupSpec groupSpec`? ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/AssignmentTestUtil.java: ########## @@ -82,4 +83,34 @@ public static void assertAssignment( assertEquals(expectedAssignment.get(memberId), computedAssignmentForMember); }); } + + /** + * Generate a reverse look up map of partition to member target assignments from the given member spec. + * + * @param memberSpec A map where the key is the member Id and the value is an + * AssignmentMemberSpec object containing the member's partition assignments. + * @return Map of topic partition to member assignments. + */ + public static Map<Uuid, Map<Integer, String>> invertedTargetAssignment( + Map<String, AssignmentMemberSpec> memberSpec + ) { + Map<Uuid, Map<Integer, String>> invertedTargetAssignment = new HashMap<>(); + for (Map.Entry<String, AssignmentMemberSpec> memberEntry : memberSpec.entrySet()) { + String memberId = memberEntry.getKey(); + Map<Uuid, Set<Integer>> topicsAndPartitions = memberEntry.getValue().assignedPartitions(); + + for (Map.Entry<Uuid, Set<Integer>> topicEntry : topicsAndPartitions.entrySet()) { + Uuid topicId = topicEntry.getKey(); + Set<Integer> partitions = topicEntry.getValue(); + + invertedTargetAssignment.putIfAbsent(topicId, new HashMap<>()); + Map<Integer, String> partitionMap = invertedTargetAssignment.get(topicId); Review Comment: i think computeIfAbsent would work ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignorException.java: ########## @@ -19,7 +19,7 @@ import org.apache.kafka.common.errors.ApiException; /** - * Exception thrown by {@link PartitionAssignor#assign(AssignmentSpec)}. The exception + * Exception thrown by {@link PartitionAssignor#assign(GroupSpecImpl, SubscribedTopicDescriber)}}. The exception Review Comment: i think this needs to be GroupSpec ########## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java: ########## @@ -186,9 +190,33 @@ private void createAssignmentSpec() { Collections.emptyMap() )); } - assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS); + groupSpec = new GroupSpecImpl(members, HOMOGENEOUS, Collections.emptyMap()); } + public Map<Uuid, Map<Integer, String>> invertedTargetAssignment( + GroupAssignment groupAssignment + ) { + Map<Uuid, Map<Integer, String>> invertedTargetAssignment = new HashMap<>(); + for (Map.Entry<String, MemberAssignment> memberEntry : groupAssignment.members().entrySet()) { + String memberId = memberEntry.getKey(); + Map<Uuid, Set<Integer>> topicsAndPartitions = memberEntry.getValue().targetPartitions(); + + for (Map.Entry<Uuid, Set<Integer>> topicEntry : topicsAndPartitions.entrySet()) { + Uuid topicId = topicEntry.getKey(); + Set<Integer> partitions = topicEntry.getValue(); + + invertedTargetAssignment.putIfAbsent(topicId, new HashMap<>()); + Map<Integer, String> partitionMap = invertedTargetAssignment.get(topicId); + + for (Integer partitionId : partitions) { + partitionMap.put(partitionId, memberId); + } + } + } + return invertedTargetAssignment; + } + + Review Comment: nit: newline ########## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java: ########## @@ -186,9 +190,33 @@ private void createAssignmentSpec() { Collections.emptyMap() )); } - assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS); + groupSpec = new GroupSpecImpl(members, HOMOGENEOUS, Collections.emptyMap()); } + public Map<Uuid, Map<Integer, String>> invertedTargetAssignment( Review Comment: is it not possible to unify this with ServerSideAssignorBenchmark#invertedTargetAssignment? seems like some details are different but they have a lot in common -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org