apoorvmittal10 commented on code in PR #18834:
URL: https://github.com/apache/kafka/pull/18834#discussion_r1950810086


##########
clients/src/main/java/org/apache/kafka/common/requests/DescribeShareGroupOffsetsRequest.java:
##########
@@ -87,23 +101,10 @@ public static DescribeShareGroupOffsetsRequest 
parse(ByteBuffer buffer, short ve
         );
     }
 
-    public static 
List<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic>
 getErrorDescribeShareGroupOffsets(
-        
List<DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic>
 topics,
-        Errors error
-    ) {
-        return topics.stream()
-            .map(
-                requestTopic -> new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
-                    .setTopicName(requestTopic.topicName())
-                    .setPartitions(
-                        requestTopic.partitions().stream().map(
-                            partition -> new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
-                                .setPartitionIndex(partition)
-                                .setErrorCode(error.code())
-                                .setErrorMessage(error.message())
-                                .setStartOffset(0)
-                        ).collect(Collectors.toList())
-                    )
-            ).collect(Collectors.toList());
+    public static DescribeShareGroupOffsetsResponseGroup 
getErrorDescribedGroup(String groupId, Errors error) {
+        return new DescribeShareGroupOffsetsResponseGroup()
+            .setGroupId(groupId)
+            .setErrorCode(error.code())
+            .setErrorMessage(error.message());

Review Comment:
   This will always send back the default error message, do we want to do that 
or associate acutal error message which would be present in `Throwable`? If we 
want actual error message then we should not accept `Errors` in the method 
alone, should have some presence of `Throwable` or actual error message.



##########
clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java:
##########
@@ -71,63 +73,122 @@ public AdminApiLookupStrategy<CoordinatorKey> 
lookupStrategy() {
         return lookupStrategy;
     }
 
+    private void validateKeys(Set<CoordinatorKey> groupIds) {
+        Set<CoordinatorKey> keys = coordinatorKeys(groupSpecs.keySet());
+        if (!keys.containsAll(groupIds)) {
+            throw new IllegalArgumentException("Received unexpected group ids 
" + groupIds +
+                " (expected one of " + keys + ")");
+        }
+    }
+
     @Override
     public DescribeShareGroupOffsetsRequest.Builder buildBatchedRequest(int 
coordinatorId, Set<CoordinatorKey> keys) {
-        List<String> groupIds = keys.stream().map(key -> {
-            if (key.type != FindCoordinatorRequest.CoordinatorType.GROUP) {
-                throw new IllegalArgumentException("Invalid group coordinator 
key " + key +
-                    " when building `DescribeShareGroupOffsets` request");
+        validateKeys(keys);
+
+        List<DescribeShareGroupOffsetsRequestGroup> groups = new 
ArrayList<>(keys.size());
+        keys.forEach(coordinatorKey -> {
+            String groupId = coordinatorKey.idValue;
+            ListShareGroupOffsetsSpec spec = groupSpecs.get(groupId);
+            DescribeShareGroupOffsetsRequestGroup requestGroup = new 
DescribeShareGroupOffsetsRequestGroup()
+                .setGroupId(groupId);
+
+            Map<String, List<Integer>> topicPartitionMap = new HashMap<>();
+            spec.topicPartitions().forEach(tp -> 
topicPartitionMap.computeIfAbsent(tp.topic(), t -> new 
LinkedList<>()).add(tp.partition()));
+
+            Map<String, DescribeShareGroupOffsetsRequestTopic> requestTopics = 
new HashMap<>();
+            for (TopicPartition tp : spec.topicPartitions()) {
+                requestTopics.computeIfAbsent(tp.topic(), t ->
+                        new DescribeShareGroupOffsetsRequestTopic()
+                            .setTopicName(tp.topic())
+                            .setPartitions(new LinkedList<>()))
+                    .partitions()
+                    .add(tp.partition());
             }
-            return key.idValue;
-        }).collect(Collectors.toList());
-        // The DescribeShareGroupOffsetsRequest only includes a single group 
ID at this point, which is likely a mistake to be fixing a follow-on PR.
-        String groupId = groupIds.isEmpty() ? null : groupIds.get(0);
-        if (groupId == null) {
-            throw new IllegalArgumentException("Missing group id in request");
-        }
-        ListShareGroupOffsetsSpec spec = groupSpecs.get(groupId);
-        
List<DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic>
 topics =
-            spec.topicPartitions().stream().map(
-                topicPartition -> new 
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
-                    .setTopicName(topicPartition.topic())
-                    .setPartitions(List.of(topicPartition.partition()))
-            ).collect(Collectors.toList());
+            requestGroup.setTopics(new ArrayList<>(requestTopics.values()));
+            groups.add(requestGroup);
+        });
         DescribeShareGroupOffsetsRequestData data = new 
DescribeShareGroupOffsetsRequestData()
-            .setGroupId(groupId)
-            .setTopics(topics);
+            .setGroups(groups);
         return new DescribeShareGroupOffsetsRequest.Builder(data, true);
     }
 
     @Override
     public ApiResult<CoordinatorKey, Map<TopicPartition, Long>> 
handleResponse(Node coordinator,
                                                                                
Set<CoordinatorKey> groupIds,
                                                                                
AbstractResponse abstractResponse) {
+        validateKeys(groupIds);
+
         final DescribeShareGroupOffsetsResponse response = 
(DescribeShareGroupOffsetsResponse) abstractResponse;
         final Map<CoordinatorKey, Map<TopicPartition, Long>> completed = new 
HashMap<>();
         final Map<CoordinatorKey, Throwable> failed = new HashMap<>();
+        final List<CoordinatorKey> unmapped = new ArrayList<>();
 
-        for (CoordinatorKey groupId : groupIds) {
-            Map<TopicPartition, Long> data = new HashMap<>();
-            response.data().responses().stream().map(
-                describedTopic ->
-                    describedTopic.partitions().stream().map(
-                        partition -> {
-                            if (partition.errorCode() == Errors.NONE.code())
-                                data.put(new 
TopicPartition(describedTopic.topicName(), partition.partitionIndex()), 
partition.startOffset());
-                            else
-                                log.error("Skipping return offset for topic {} 
partition {} due to error {}.", describedTopic.topicName(), 
partition.partitionIndex(), Errors.forCode(partition.errorCode()));
-                            return data;
+        for (CoordinatorKey coordinatorKey : groupIds) {
+            String groupId = coordinatorKey.idValue;
+            if (response.hasGroupError(groupId)) {
+                handleGroupError(coordinatorKey, response.groupError(groupId), 
failed, unmapped);
+            } else {
+                Map<TopicPartition, Long> groupOffsetsListing = new 
HashMap<>();
+                response.data().groups().stream().filter(g -> 
g.groupId().equals(groupId)).forEach(groupResponse -> {
+                    for 
(DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic 
topicResponse : groupResponse.topics()) {
+                        for 
(DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition
 partitionResponse : topicResponse.partitions()) {
+                            TopicPartition tp = new 
TopicPartition(topicResponse.topicName(), partitionResponse.partitionIndex());
+                            if (partitionResponse.errorCode() == 
Errors.NONE.code()) {
+                                // Negative offset indicates there is no start 
offset for this partition
+                                if (partitionResponse.startOffset() < 0) {
+                                    groupOffsetsListing.put(tp, null);
+                                } else {
+                                    groupOffsetsListing.put(tp, 
partitionResponse.startOffset());
+                                }
+                            } else {
+                                log.warn("Skipping return offset for {} due to 
error {}: {}.", tp, partitionResponse.errorCode(), 
partitionResponse.errorMessage());
+                            }
                         }
-                    ).collect(Collectors.toList())
-            ).collect(Collectors.toList());
-            completed.put(groupId, data);
+                    }
+                });
+
+                completed.put(coordinatorKey, groupOffsetsListing);
+            }
         }
-        return new ApiResult<>(completed, failed, Collections.emptyList());
+        return new ApiResult<>(completed, failed, unmapped);
     }
 
     private static Set<CoordinatorKey> coordinatorKeys(Collection<String> 
groupIds) {
         return groupIds.stream()
             .map(CoordinatorKey::byGroupId)
             .collect(Collectors.toSet());
     }
+
+    private void handleGroupError(CoordinatorKey groupId,
+                                  Errors error,
+                                  Map<CoordinatorKey, Throwable> failed,
+                                  List<CoordinatorKey> groupsToUnmap) {
+        switch (error) {
+            case GROUP_AUTHORIZATION_FAILED:
+            case UNKNOWN_MEMBER_ID:
+            case STALE_MEMBER_EPOCH:
+                log.debug("`DescribeShareGroupOffsets` request for group id {} 
failed due to error {}", groupId.idValue, error);

Review Comment:
   Should it be `ListShareGroupOffsets` here and other logs?
   ```suggestion
                   log.debug("`ListShareGroupOffsets` request for group id {} 
failed due to error {}", groupId.idValue, error);
   ```



##########
clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java:
##########
@@ -71,63 +73,122 @@ public AdminApiLookupStrategy<CoordinatorKey> 
lookupStrategy() {
         return lookupStrategy;
     }
 
+    private void validateKeys(Set<CoordinatorKey> groupIds) {
+        Set<CoordinatorKey> keys = coordinatorKeys(groupSpecs.keySet());
+        if (!keys.containsAll(groupIds)) {
+            throw new IllegalArgumentException("Received unexpected group ids 
" + groupIds +
+                " (expected one of " + keys + ")");
+        }
+    }

Review Comment:
   nit: may be move the private method after public then it's easier to read 
class.



##########
clients/src/main/java/org/apache/kafka/common/requests/DescribeShareGroupOffsetsResponse.java:
##########
@@ -17,21 +17,84 @@
 
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData;
+import 
org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup;
+import 
org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition;
+import 
org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.ByteBufferAccessor;
 import org.apache.kafka.common.protocol.Errors;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 public class DescribeShareGroupOffsetsResponse extends AbstractResponse {
     private final DescribeShareGroupOffsetsResponseData data;
+    private final Map<String, Errors> groupLevelErrors = new HashMap<>();
 
     public 
DescribeShareGroupOffsetsResponse(DescribeShareGroupOffsetsResponseData data) {
         super(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS);
         this.data = data;
+        for (DescribeShareGroupOffsetsResponseGroup group : data.groups()) {
+            this.groupLevelErrors.put(group.groupId(), 
Errors.forCode(group.errorCode()));
+        }
+    }
+
+    public DescribeShareGroupOffsetsResponse(int throttleTimeMs,
+                                             Map<String, Throwable> errorsMap,
+                                             Map<String, Map<TopicIdPartition, 
DescribeShareGroupOffsetsResponsePartition>> responseData) {
+        super(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS);
+        List<DescribeShareGroupOffsetsResponseGroup> groupList = new 
ArrayList<>();
+        for (Map.Entry<String, Map<TopicIdPartition, 
DescribeShareGroupOffsetsResponsePartition>> groupEntry : 
responseData.entrySet()) {
+            String groupId = groupEntry.getKey();
+            Map<TopicIdPartition, DescribeShareGroupOffsetsResponsePartition> 
partitionDataMap = groupEntry.getValue();
+            Map<String, DescribeShareGroupOffsetsResponseTopic> topicDataMap = 
new HashMap<>();

Review Comment:
   Hmmm, this map is never updated. Not sure if it's adding any purpose here.



##########
clients/src/main/java/org/apache/kafka/common/requests/DescribeShareGroupOffsetsResponse.java:
##########
@@ -17,21 +17,84 @@
 
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData;
+import 
org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup;
+import 
org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition;
+import 
org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.ByteBufferAccessor;
 import org.apache.kafka.common.protocol.Errors;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 public class DescribeShareGroupOffsetsResponse extends AbstractResponse {
     private final DescribeShareGroupOffsetsResponseData data;
+    private final Map<String, Errors> groupLevelErrors = new HashMap<>();
 
     public 
DescribeShareGroupOffsetsResponse(DescribeShareGroupOffsetsResponseData data) {
         super(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS);
         this.data = data;
+        for (DescribeShareGroupOffsetsResponseGroup group : data.groups()) {
+            this.groupLevelErrors.put(group.groupId(), 
Errors.forCode(group.errorCode()));

Review Comment:
   `errorMessage` in `DescribeShareGroupOffsetsResponseGroup` if set to 
something custom will be lost as now it will default to `Errors` error code 
message.



##########
clients/src/main/java/org/apache/kafka/common/requests/DescribeShareGroupOffsetsResponse.java:
##########
@@ -42,11 +105,14 @@ public DescribeShareGroupOffsetsResponseData data() {
     @Override
     public Map<Errors, Integer> errorCounts() {
         Map<Errors, Integer> counts = new HashMap<>();
-        data.responses().forEach(
-                result -> result.partitions().forEach(
-                        partitionResult -> updateErrorCounts(counts, 
Errors.forCode(partitionResult.errorCode()))
-                )
-        );
+        for (Map.Entry<String, Errors> entry: groupLevelErrors.entrySet()) {
+            updateErrorCounts(counts, entry.getValue());
+        }

Review Comment:
   nit:
   ```suggestion
           groupLevelErrors.values().forEach(error -> updateErrorCounts(counts, 
error));
   ```



##########
clients/src/main/java/org/apache/kafka/common/requests/DescribeShareGroupOffsetsResponse.java:
##########
@@ -17,21 +17,84 @@
 
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData;
+import 
org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup;
+import 
org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition;
+import 
org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.ByteBufferAccessor;
 import org.apache.kafka.common.protocol.Errors;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 public class DescribeShareGroupOffsetsResponse extends AbstractResponse {
     private final DescribeShareGroupOffsetsResponseData data;
+    private final Map<String, Errors> groupLevelErrors = new HashMap<>();
 
     public 
DescribeShareGroupOffsetsResponse(DescribeShareGroupOffsetsResponseData data) {
         super(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS);
         this.data = data;
+        for (DescribeShareGroupOffsetsResponseGroup group : data.groups()) {
+            this.groupLevelErrors.put(group.groupId(), 
Errors.forCode(group.errorCode()));
+        }
+    }
+
+    public DescribeShareGroupOffsetsResponse(int throttleTimeMs,
+                                             Map<String, Throwable> errorsMap,
+                                             Map<String, Map<TopicIdPartition, 
DescribeShareGroupOffsetsResponsePartition>> responseData) {
+        super(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS);
+        List<DescribeShareGroupOffsetsResponseGroup> groupList = new 
ArrayList<>();
+        for (Map.Entry<String, Map<TopicIdPartition, 
DescribeShareGroupOffsetsResponsePartition>> groupEntry : 
responseData.entrySet()) {
+            String groupId = groupEntry.getKey();
+            Map<TopicIdPartition, DescribeShareGroupOffsetsResponsePartition> 
partitionDataMap = groupEntry.getValue();

Review Comment:
   Can be replaced by a single line:
   ```suggestion
           responseData.forEach((groupId, partitionDataMap) -> {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to