fonsdant commented on code in PR #17038: URL: https://github.com/apache/kafka/pull/17038#discussion_r1746786740
########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java: ########## @@ -262,12 +264,13 @@ private void createInternalTopics() { ); } - Map<TopicPartition, OffsetAndMetadata> listConsumerGroupOffsets(String group) + ListConsumerGroupOffsetsResult listConsumerGroupOffsets(List<String> groups) throws InterruptedException, ExecutionException { + Map<String, ListConsumerGroupOffsetsSpec> groupSpecs = groups.stream() + .collect(Collectors.toMap(group -> group, group -> new ListConsumerGroupOffsetsSpec())); return adminCall( - () -> sourceAdminClient.listConsumerGroupOffsets(group).partitionsToOffsetAndMetadata().get(), - () -> String.format("list offsets for consumer group %s on %s cluster", group, - config.sourceClusterAlias()) + () -> sourceAdminClient.listConsumerGroupOffsets(groupSpecs), + () -> String.format("list offsets for consumer groups %s on %s cluster", groups, config.sourceClusterAlias()) ); Review Comment: While looping, we need to get topic partitions and metadata by group. For this reason, I have chosen `ListConsumerGroupOffsetsResult` as the return of listConsumerGroupOffsets as it allows us to do `result.partitionsToOffsetAndMetadata(group).get()` after calling `listConsumerGroups(filteredGroups)`. In `.all().get()` inside `listConsumerGroups` approach, once we have all topic partitions and metadata for all consumer groups, we will not be able to distinguish each of them by group while looping. An alternative could be to make a copy of `ListConsumerGroupOffsetsResult.all()` that groups future results by group. This way, we could continue to get them batched and still have how to distinguish what group each one belongs to. But I think it would need KIP, right? So it does not seem to be the case. Let me know if I have missed something. Many thanks for your review, @gharris1727! :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org