apoorvmittal10 commented on code in PR #18834:
URL: https://github.com/apache/kafka/pull/18834#discussion_r1954238834


##########
clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java:
##########
@@ -73,61 +75,121 @@ public AdminApiLookupStrategy<CoordinatorKey> 
lookupStrategy() {
 
     @Override
     public DescribeShareGroupOffsetsRequest.Builder buildBatchedRequest(int 
coordinatorId, Set<CoordinatorKey> keys) {
-        List<String> groupIds = keys.stream().map(key -> {
-            if (key.type != FindCoordinatorRequest.CoordinatorType.GROUP) {
-                throw new IllegalArgumentException("Invalid group coordinator 
key " + key +
-                    " when building `DescribeShareGroupOffsets` request");
+        validateKeys(keys);
+
+        List<DescribeShareGroupOffsetsRequestGroup> groups = new 
ArrayList<>(keys.size());
+        keys.forEach(coordinatorKey -> {
+            String groupId = coordinatorKey.idValue;
+            ListShareGroupOffsetsSpec spec = groupSpecs.get(groupId);
+            DescribeShareGroupOffsetsRequestGroup requestGroup = new 
DescribeShareGroupOffsetsRequestGroup()
+                .setGroupId(groupId);
+
+            Map<String, List<Integer>> topicPartitionMap = new HashMap<>();
+            spec.topicPartitions().forEach(tp -> 
topicPartitionMap.computeIfAbsent(tp.topic(), t -> new 
LinkedList<>()).add(tp.partition()));

Review Comment:
   Query: Why do we need to modify input param which is part of request? 



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -10456,13 +10458,15 @@ class KafkaApisTest extends Logging {
     kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
 
     val response = 
verifyNoThrottling[DescribeShareGroupOffsetsResponse](requestChannelRequest)
-    response.data.responses.forEach(topic => 
topic.partitions().forEach(partition => 
assertEquals(Errors.UNSUPPORTED_VERSION.code(), partition.errorCode())))
+    response.data.groups.forEach(group => group.topics.forEach(topic => 
topic.partitions.forEach(partition => 
assertEquals(Errors.UNSUPPORTED_VERSION.code(), partition.errorCode()))))

Review Comment:
   ```suggestion
       response.data.groups.forEach(group => group.topics.forEach(topic => 
topic.partitions.forEach(partition => 
assertEquals(Errors.UNSUPPORTED_VERSION.code, partition.errorCode))))
   ```



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -10478,9 +10482,11 @@ class KafkaApisTest extends Logging {
     kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
 
     val response = 
verifyNoThrottling[DescribeShareGroupOffsetsResponse](requestChannelRequest)
-    response.data.responses.forEach(
-      topic => topic.partitions().forEach(
-        partition => assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code(), 
partition.errorCode())
+    response.data.groups.forEach(
+      group => group.topics.forEach(
+        topic => topic.partitions.forEach(
+          partition => assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code(), 
partition.errorCode())

Review Comment:
   ```suggestion
             partition => assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, 
partition.errorCode)
   ```



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -10491,31 +10497,49 @@ class KafkaApisTest extends Logging {
     val topicId1 = Uuid.randomUuid()
     val topicName2 = "topic-2"
     val topicId2 = Uuid.randomUuid()
+    val topicName3 = "topic-3"
+    val topicId3 = Uuid.randomUuid()

Review Comment:
   ```suggestion
       val topicId3 = Uuid.randomUuid
   ```



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -10558,7 +10582,81 @@ class KafkaApisTest extends Logging {
           ))
       ))
 
-    future.complete(describeShareGroupOffsetsResponse)
+    val describeShareGroupOffsetsResponseGroup2 = new 
DescribeShareGroupOffsetsResponseGroup()
+      .setGroupId("group2")
+      .setTopics(util.List.of(
+        new DescribeShareGroupOffsetsResponseTopic()
+          .setTopicName(topicName3)
+          .setTopicId(topicId3)
+          .setPartitions(util.List.of(
+            new DescribeShareGroupOffsetsResponsePartition()
+              .setPartitionIndex(0)
+              .setStartOffset(0)
+              .setLeaderEpoch(1)
+              .setErrorMessage(null)
+              .setErrorCode(0)
+          ))
+      ))
+
+    val describeShareGroupOffsetsResponse = new 
DescribeShareGroupOffsetsResponseData()
+      .setGroups(util.List.of(describeShareGroupOffsetsResponseGroup1, 
describeShareGroupOffsetsResponseGroup2))
+
+    futureGroup1.complete(describeShareGroupOffsetsResponseGroup1)
+    futureGroup2.complete(describeShareGroupOffsetsResponseGroup2)
+    val response = 
verifyNoThrottling[DescribeShareGroupOffsetsResponse](requestChannelRequest)
+    assertEquals(describeShareGroupOffsetsResponse, response.data)
+  }
+
+  @Test
+  def testDescribeShareGroupOffsetsRequestEmptyGroupsSuccess(): Unit = {
+    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
+
+    val describeShareGroupOffsetsRequest = new 
DescribeShareGroupOffsetsRequestData()
+
+    val requestChannelRequest = buildRequest(new 
DescribeShareGroupOffsetsRequest.Builder(describeShareGroupOffsetsRequest, 
true).build())

Review Comment:
   ```suggestion
       val requestChannelRequest = buildRequest(new 
DescribeShareGroupOffsetsRequest.Builder(describeShareGroupOffsetsRequest, 
true).build)
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -956,47 +957,62 @@ public 
CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> fetch
     }
 
     /**
-     * See {@link GroupCoordinator#describeShareGroupOffsets(RequestContext, 
DescribeShareGroupOffsetsRequestData)}.
+     * See {@link GroupCoordinator#describeShareGroupOffsets(RequestContext, 
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup)}.
      */
     @Override
-    public CompletableFuture<DescribeShareGroupOffsetsResponseData> 
describeShareGroupOffsets(
+    public 
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup>
 describeShareGroupOffsets(
         RequestContext context,
-        DescribeShareGroupOffsetsRequestData requestData
+        
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup 
requestData
     ) {
+        final HashMap<Uuid, String> requestTopicIdToNameMapping = new 
HashMap<>();

Review Comment:
   Why not just Map?
   ```suggestion
           final Map<Uuid, String> requestTopicIdToNameMapping = new 
HashMap<>();
   ```



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -10558,7 +10582,81 @@ class KafkaApisTest extends Logging {
           ))
       ))
 
-    future.complete(describeShareGroupOffsetsResponse)
+    val describeShareGroupOffsetsResponseGroup2 = new 
DescribeShareGroupOffsetsResponseGroup()
+      .setGroupId("group2")
+      .setTopics(util.List.of(
+        new DescribeShareGroupOffsetsResponseTopic()
+          .setTopicName(topicName3)
+          .setTopicId(topicId3)
+          .setPartitions(util.List.of(
+            new DescribeShareGroupOffsetsResponsePartition()
+              .setPartitionIndex(0)
+              .setStartOffset(0)
+              .setLeaderEpoch(1)
+              .setErrorMessage(null)
+              .setErrorCode(0)
+          ))
+      ))
+
+    val describeShareGroupOffsetsResponse = new 
DescribeShareGroupOffsetsResponseData()
+      .setGroups(util.List.of(describeShareGroupOffsetsResponseGroup1, 
describeShareGroupOffsetsResponseGroup2))
+
+    futureGroup1.complete(describeShareGroupOffsetsResponseGroup1)
+    futureGroup2.complete(describeShareGroupOffsetsResponseGroup2)
+    val response = 
verifyNoThrottling[DescribeShareGroupOffsetsResponse](requestChannelRequest)
+    assertEquals(describeShareGroupOffsetsResponse, response.data)
+  }
+
+  @Test
+  def testDescribeShareGroupOffsetsRequestEmptyGroupsSuccess(): Unit = {
+    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
+
+    val describeShareGroupOffsetsRequest = new 
DescribeShareGroupOffsetsRequestData()
+
+    val requestChannelRequest = buildRequest(new 
DescribeShareGroupOffsetsRequest.Builder(describeShareGroupOffsetsRequest, 
true).build())
+
+    val future = new 
CompletableFuture[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup]()

Review Comment:
   nit: Same else where:
   ```suggestion
       val future = new 
CompletableFuture[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup]
   ```



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -10558,7 +10582,81 @@ class KafkaApisTest extends Logging {
           ))
       ))
 
-    future.complete(describeShareGroupOffsetsResponse)
+    val describeShareGroupOffsetsResponseGroup2 = new 
DescribeShareGroupOffsetsResponseGroup()
+      .setGroupId("group2")
+      .setTopics(util.List.of(
+        new DescribeShareGroupOffsetsResponseTopic()
+          .setTopicName(topicName3)
+          .setTopicId(topicId3)
+          .setPartitions(util.List.of(
+            new DescribeShareGroupOffsetsResponsePartition()
+              .setPartitionIndex(0)
+              .setStartOffset(0)
+              .setLeaderEpoch(1)
+              .setErrorMessage(null)
+              .setErrorCode(0)
+          ))
+      ))
+
+    val describeShareGroupOffsetsResponse = new 
DescribeShareGroupOffsetsResponseData()
+      .setGroups(util.List.of(describeShareGroupOffsetsResponseGroup1, 
describeShareGroupOffsetsResponseGroup2))
+
+    futureGroup1.complete(describeShareGroupOffsetsResponseGroup1)
+    futureGroup2.complete(describeShareGroupOffsetsResponseGroup2)
+    val response = 
verifyNoThrottling[DescribeShareGroupOffsetsResponse](requestChannelRequest)
+    assertEquals(describeShareGroupOffsetsResponse, response.data)
+  }
+
+  @Test
+  def testDescribeShareGroupOffsetsRequestEmptyGroupsSuccess(): Unit = {
+    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
+
+    val describeShareGroupOffsetsRequest = new 
DescribeShareGroupOffsetsRequestData()
+
+    val requestChannelRequest = buildRequest(new 
DescribeShareGroupOffsetsRequest.Builder(describeShareGroupOffsetsRequest, 
true).build())
+
+    val future = new 
CompletableFuture[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup]()
+    kafkaApis = createKafkaApis(
+      overrideProperties = Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> 
"true"),
+    )
+    kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+
+    val describeShareGroupOffsetsResponseGroup = new 
DescribeShareGroupOffsetsResponseGroup()
+
+    val describeShareGroupOffsetsResponse = new 
DescribeShareGroupOffsetsResponseData()

Review Comment:
   ```suggestion
       val describeShareGroupOffsetsResponseGroup = new 
DescribeShareGroupOffsetsResponseGroup
       val describeShareGroupOffsetsResponse = new 
DescribeShareGroupOffsetsResponseData
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -956,47 +957,62 @@ public 
CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> fetch
     }
 
     /**
-     * See {@link GroupCoordinator#describeShareGroupOffsets(RequestContext, 
DescribeShareGroupOffsetsRequestData)}.
+     * See {@link GroupCoordinator#describeShareGroupOffsets(RequestContext, 
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup)}.
      */
     @Override
-    public CompletableFuture<DescribeShareGroupOffsetsResponseData> 
describeShareGroupOffsets(
+    public 
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup>
 describeShareGroupOffsets(
         RequestContext context,
-        DescribeShareGroupOffsetsRequestData requestData
+        
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup 
requestData
     ) {
+        final HashMap<Uuid, String> requestTopicIdToNameMapping = new 
HashMap<>();

Review Comment:
   Also can be moved after first 2 if blocks as it's being used later.



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -3190,26 +3190,51 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   def handleDescribeShareGroupOffsetsRequest(request: RequestChannel.Request): 
Unit = {
     val describeShareGroupOffsetsRequest = 
request.body[DescribeShareGroupOffsetsRequest]
+    val groups = describeShareGroupOffsetsRequest.groups()
+
+    val futures = new 
mutable.ArrayBuffer[CompletableFuture[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup]](groups.size)
+    groups.forEach { groupDescribeOffsets =>
+      if (!isShareGroupProtocolEnabled) {
+        futures += CompletableFuture.completedFuture(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
+          .setGroupId(groupDescribeOffsets.groupId)
+          .setErrorCode(Errors.UNSUPPORTED_VERSION.code))
+      } else if (!authHelper.authorize(request.context, READ, GROUP, 
groupDescribeOffsets.groupId)) {
+        futures += CompletableFuture.completedFuture(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
+          .setGroupId(groupDescribeOffsets.groupId)
+          .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code))
+      } else if (groupDescribeOffsets.topics.isEmpty) {
+        futures += CompletableFuture.completedFuture(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
+          .setGroupId(groupDescribeOffsets.groupId)
+          .setErrorCode(Errors.NONE.code))

Review Comment:
   Do we need to set this error code specifically? It should be none by default.



##########
clients/src/main/java/org/apache/kafka/common/requests/DescribeShareGroupOffsetsResponse.java:
##########
@@ -18,20 +18,56 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData;
+import 
org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.ByteBufferAccessor;
 import org.apache.kafka.common.protocol.Errors;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 public class DescribeShareGroupOffsetsResponse extends AbstractResponse {
     private final DescribeShareGroupOffsetsResponseData data;
+    private final Map<String, Throwable> groupLevelErrors = new HashMap<>();
 
     public 
DescribeShareGroupOffsetsResponse(DescribeShareGroupOffsetsResponseData data) {
         super(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS);
         this.data = data;
+        for (DescribeShareGroupOffsetsResponseGroup group : data.groups()) {
+            if (group.errorCode() != Errors.NONE.code()) {
+                this.groupLevelErrors.put(group.groupId(), 
Errors.forCode(group.errorCode()).exception(group.errorMessage()));
+            }
+        }
+    }
+
+    // Builds a response with the same group-level error for all groups and 
empty topics lists for all groups
+    public DescribeShareGroupOffsetsResponse(int throttleTimeMs,
+                                             List<String> groupIds,
+                                             Throwable allGroupsException) {
+        super(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS);
+        short errorCode = Errors.forException(allGroupsException).code();
+        List<DescribeShareGroupOffsetsResponseGroup> groupList = new 
ArrayList<>();
+        groupIds.forEach(groupId -> {
+            groupList.add(new DescribeShareGroupOffsetsResponseGroup()
+                .setGroupId(groupId)
+                .setErrorCode(errorCode)
+                .setErrorMessage(errorCode == 
Errors.UNKNOWN_SERVER_ERROR.code() ? Errors.forCode(errorCode).message() : 
allGroupsException.getMessage()));

Review Comment:
   Query: Why do we want special handling for `UNKNOWN_SERVER_ERROR`, rather 
shouldn't we have:
   
   ```
   .setErrorMessage(allGroupsException.getMessage() != null ? 
allGroupsException.getMessage() :  Errors.forCode(errorCode).message()));



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -1009,20 +1025,23 @@ public 
CompletableFuture<DescribeShareGroupOffsetsResponseData> describeShareGro
                     future.completeExceptionally(new 
IllegalStateException("Result is null for the read state summary"));
                     return;
                 }
-                
List<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic>
 describeShareGroupOffsetsResponseTopicList =
-                    result.topicsData().stream().map(
-                        topicData -> new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
-                            .setTopicId(topicData.topicId())
-                            
.setTopicName(metadataImage.topics().topicIdToNameView().get(topicData.topicId()))
-                            .setPartitions(topicData.partitions().stream().map(
-                                partitionData -> new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
-                                    
.setPartitionIndex(partitionData.partition())
-                                    
.setStartOffset(partitionData.startOffset())
-                                    
.setErrorMessage(partitionData.errorMessage())
-                                    .setErrorCode(partitionData.errorCode())
-                            ).toList())
-                    ).toList();
-                future.complete(new 
DescribeShareGroupOffsetsResponseData().setResponses(describeShareGroupOffsetsResponseTopicList));
+                result.topicsData().forEach(topicData ->
+                    describeShareGroupOffsetsResponseTopicList.add(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
+                        .setTopicId(topicData.topicId())
+                        
.setTopicName(requestTopicIdToNameMapping.get(topicData.topicId()))
+                        .setPartitions(topicData.partitions().stream().map(
+                            partitionData -> new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+                                .setPartitionIndex(partitionData.partition())
+                                .setStartOffset(partitionData.startOffset())
+                                
.setErrorMessage(Errors.forCode(partitionData.errorCode()).message())
+                                .setErrorCode(partitionData.errorCode())
+                        ).toList())
+                    ));
+
+                future.complete(
+                    new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
+                        .setGroupId(requestData.groupId())
+                        
.setTopics(describeShareGroupOffsetsResponseTopicList));
             });

Review Comment:
   For my understanding: else where in the GroupCoorrdinatorService we schedule 
read operation over runtime but we handle this current code differently, why?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to