gharris1727 commented on code in PR #17038: URL: https://github.com/apache/kafka/pull/17038#discussion_r1742462093
########## clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsResult.java: ########## @@ -40,7 +40,7 @@ public class ListConsumerGroupOffsetsResult { final Map<String, KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> futures; - ListConsumerGroupOffsetsResult(final Map<CoordinatorKey, KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> futures) { + public ListConsumerGroupOffsetsResult(final Map<CoordinatorKey, KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> futures) { Review Comment: This is a change to the public interface, which would require a KIP. Please revert this change and find an alternative. Thanks! ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java: ########## @@ -262,12 +264,13 @@ private void createInternalTopics() { ); } - Map<TopicPartition, OffsetAndMetadata> listConsumerGroupOffsets(String group) + ListConsumerGroupOffsetsResult listConsumerGroupOffsets(List<String> groups) throws InterruptedException, ExecutionException { + Map<String, ListConsumerGroupOffsetsSpec> groupSpecs = groups.stream() + .collect(Collectors.toMap(group -> group, group -> new ListConsumerGroupOffsetsSpec())); Review Comment: nit: create one of these Spec objects and reuse it for every entry in the map. ########## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java: ########## @@ -151,7 +155,14 @@ public void testFindConsumerGroups() throws Exception { doReturn(groups).when(connector).listConsumerGroups(); doReturn(true).when(connector).shouldReplicateByTopicFilter(anyString()); doReturn(true).when(connector).shouldReplicateByGroupFilter(anyString()); - doReturn(offsets).when(connector).listConsumerGroupOffsets(anyString()); Review Comment: I think this leaves the `offsets` variable unused. ########## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java: ########## @@ -151,7 +155,14 @@ public void testFindConsumerGroups() throws Exception { doReturn(groups).when(connector).listConsumerGroups(); doReturn(true).when(connector).shouldReplicateByTopicFilter(anyString()); doReturn(true).when(connector).shouldReplicateByGroupFilter(anyString()); - doReturn(offsets).when(connector).listConsumerGroupOffsets(anyString()); + + Map<CoordinatorKey, KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> futures = new HashMap<>(); + futures.put(CoordinatorKey.byGroupId("g1"), KafkaFuture.completedFuture(offsets)); + futures.put(CoordinatorKey.byGroupId("g2"), KafkaFuture.completedFuture(offsets)); + ListConsumerGroupOffsetsResult offsetsResult = new ListConsumerGroupOffsetsResult(futures); + offsetsResult = spy(offsetsResult); Review Comment: I think you can avoid needing to change the constructor visibility if you use mock(ListConsumerGroupOffsetsResult.class) instead. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org