AndrewJSchofield commented on code in PR #18834:
URL: https://github.com/apache/kafka/pull/18834#discussion_r1950965331


##########
clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java:
##########
@@ -71,63 +73,122 @@ public AdminApiLookupStrategy<CoordinatorKey> 
lookupStrategy() {
         return lookupStrategy;
     }
 
+    private void validateKeys(Set<CoordinatorKey> groupIds) {
+        Set<CoordinatorKey> keys = coordinatorKeys(groupSpecs.keySet());
+        if (!keys.containsAll(groupIds)) {
+            throw new IllegalArgumentException("Received unexpected group ids 
" + groupIds +
+                " (expected one of " + keys + ")");
+        }
+    }
+
     @Override
     public DescribeShareGroupOffsetsRequest.Builder buildBatchedRequest(int 
coordinatorId, Set<CoordinatorKey> keys) {
-        List<String> groupIds = keys.stream().map(key -> {
-            if (key.type != FindCoordinatorRequest.CoordinatorType.GROUP) {
-                throw new IllegalArgumentException("Invalid group coordinator 
key " + key +
-                    " when building `DescribeShareGroupOffsets` request");
+        validateKeys(keys);
+
+        List<DescribeShareGroupOffsetsRequestGroup> groups = new 
ArrayList<>(keys.size());
+        keys.forEach(coordinatorKey -> {
+            String groupId = coordinatorKey.idValue;
+            ListShareGroupOffsetsSpec spec = groupSpecs.get(groupId);
+            DescribeShareGroupOffsetsRequestGroup requestGroup = new 
DescribeShareGroupOffsetsRequestGroup()
+                .setGroupId(groupId);
+
+            Map<String, List<Integer>> topicPartitionMap = new HashMap<>();
+            spec.topicPartitions().forEach(tp -> 
topicPartitionMap.computeIfAbsent(tp.topic(), t -> new 
LinkedList<>()).add(tp.partition()));
+
+            Map<String, DescribeShareGroupOffsetsRequestTopic> requestTopics = 
new HashMap<>();
+            for (TopicPartition tp : spec.topicPartitions()) {
+                requestTopics.computeIfAbsent(tp.topic(), t ->
+                        new DescribeShareGroupOffsetsRequestTopic()
+                            .setTopicName(tp.topic())
+                            .setPartitions(new LinkedList<>()))
+                    .partitions()
+                    .add(tp.partition());
             }
-            return key.idValue;
-        }).collect(Collectors.toList());
-        // The DescribeShareGroupOffsetsRequest only includes a single group 
ID at this point, which is likely a mistake to be fixing a follow-on PR.
-        String groupId = groupIds.isEmpty() ? null : groupIds.get(0);
-        if (groupId == null) {
-            throw new IllegalArgumentException("Missing group id in request");
-        }
-        ListShareGroupOffsetsSpec spec = groupSpecs.get(groupId);
-        
List<DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic>
 topics =
-            spec.topicPartitions().stream().map(
-                topicPartition -> new 
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
-                    .setTopicName(topicPartition.topic())
-                    .setPartitions(List.of(topicPartition.partition()))
-            ).collect(Collectors.toList());
+            requestGroup.setTopics(new ArrayList<>(requestTopics.values()));
+            groups.add(requestGroup);
+        });
         DescribeShareGroupOffsetsRequestData data = new 
DescribeShareGroupOffsetsRequestData()
-            .setGroupId(groupId)
-            .setTopics(topics);
+            .setGroups(groups);
         return new DescribeShareGroupOffsetsRequest.Builder(data, true);
     }
 
     @Override
     public ApiResult<CoordinatorKey, Map<TopicPartition, Long>> 
handleResponse(Node coordinator,
                                                                                
Set<CoordinatorKey> groupIds,
                                                                                
AbstractResponse abstractResponse) {
+        validateKeys(groupIds);
+
         final DescribeShareGroupOffsetsResponse response = 
(DescribeShareGroupOffsetsResponse) abstractResponse;
         final Map<CoordinatorKey, Map<TopicPartition, Long>> completed = new 
HashMap<>();
         final Map<CoordinatorKey, Throwable> failed = new HashMap<>();
+        final List<CoordinatorKey> unmapped = new ArrayList<>();
 
-        for (CoordinatorKey groupId : groupIds) {
-            Map<TopicPartition, Long> data = new HashMap<>();
-            response.data().responses().stream().map(
-                describedTopic ->
-                    describedTopic.partitions().stream().map(
-                        partition -> {
-                            if (partition.errorCode() == Errors.NONE.code())
-                                data.put(new 
TopicPartition(describedTopic.topicName(), partition.partitionIndex()), 
partition.startOffset());
-                            else
-                                log.error("Skipping return offset for topic {} 
partition {} due to error {}.", describedTopic.topicName(), 
partition.partitionIndex(), Errors.forCode(partition.errorCode()));
-                            return data;
+        for (CoordinatorKey coordinatorKey : groupIds) {
+            String groupId = coordinatorKey.idValue;
+            if (response.hasGroupError(groupId)) {
+                handleGroupError(coordinatorKey, response.groupError(groupId), 
failed, unmapped);
+            } else {
+                Map<TopicPartition, Long> groupOffsetsListing = new 
HashMap<>();
+                response.data().groups().stream().filter(g -> 
g.groupId().equals(groupId)).forEach(groupResponse -> {
+                    for 
(DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic 
topicResponse : groupResponse.topics()) {
+                        for 
(DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition
 partitionResponse : topicResponse.partitions()) {
+                            TopicPartition tp = new 
TopicPartition(topicResponse.topicName(), partitionResponse.partitionIndex());
+                            if (partitionResponse.errorCode() == 
Errors.NONE.code()) {
+                                // Negative offset indicates there is no start 
offset for this partition
+                                if (partitionResponse.startOffset() < 0) {
+                                    groupOffsetsListing.put(tp, null);
+                                } else {
+                                    groupOffsetsListing.put(tp, 
partitionResponse.startOffset());
+                                }
+                            } else {
+                                log.warn("Skipping return offset for {} due to 
error {}: {}.", tp, partitionResponse.errorCode(), 
partitionResponse.errorMessage());
+                            }
                         }
-                    ).collect(Collectors.toList())
-            ).collect(Collectors.toList());
-            completed.put(groupId, data);
+                    }
+                });
+
+                completed.put(coordinatorKey, groupOffsetsListing);
+            }
         }
-        return new ApiResult<>(completed, failed, Collections.emptyList());
+        return new ApiResult<>(completed, failed, unmapped);
     }
 
     private static Set<CoordinatorKey> coordinatorKeys(Collection<String> 
groupIds) {
         return groupIds.stream()
             .map(CoordinatorKey::byGroupId)
             .collect(Collectors.toSet());
     }
+
+    private void handleGroupError(CoordinatorKey groupId,
+                                  Errors error,
+                                  Map<CoordinatorKey, Throwable> failed,
+                                  List<CoordinatorKey> groupsToUnmap) {
+        switch (error) {
+            case GROUP_AUTHORIZATION_FAILED:
+            case UNKNOWN_MEMBER_ID:
+            case STALE_MEMBER_EPOCH:
+                log.debug("`DescribeShareGroupOffsets` request for group id {} 
failed due to error {}", groupId.idValue, error);

Review Comment:
   This reflects the name of the RPC, and that's `DescribeShareGroupOffsets`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to