apoorvmittal10 commented on code in PR #17775:
URL: https://github.com/apache/kafka/pull/17775#discussion_r1915646369


##########
clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsResult.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.clients.admin.internals.CoordinatorKey;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * The result of the {@link Admin#listShareGroupOffsets(Map, 
ListShareGroupOffsetsOptions)} call.
+ * <p>
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
[email protected]
+public class ListShareGroupOffsetsResult {
+
+    private final Map<String, KafkaFuture<Map<TopicPartition, Long>>> futures;
+
+    public ListShareGroupOffsetsResult(final Map<CoordinatorKey, 
KafkaFuture<Map<TopicPartition, Long>>> futures) {
+        this.futures = futures.entrySet().stream()
+            .collect(Collectors.toMap(e -> e.getKey().idValue, 
Map.Entry::getValue));
+    }
+
+    /**
+     * Return a future which yields all Map<String, Map<TopicPartition, Long> 
objects, if requests for all the groups succeed.
+     */
+    public KafkaFuture<Map<String, Map<TopicPartition, Long>>> all() {
+        return KafkaFuture.allOf(futures.values().toArray(new 
KafkaFuture[0])).thenApply(
+                nil -> {
+                    Map<String, Map<TopicPartition, Long>> offsets = new 
HashMap<>(futures.size());
+                    futures.forEach((groupId, future) -> {
+                        try {
+                            offsets.put(groupId, future.get());
+                        } catch (InterruptedException | ExecutionException e) {
+                            // This should be unreachable, since the 
KafkaFuture#allOf already ensured
+                            // that all the futures completed successfully.
+                            throw new RuntimeException(e);
+                        }
+                    });
+                    return offsets;
+                });
+    }
+
+    /**
+     * Return a future which yields a map of topic partitions to offsets for 
the specified group.
+     */
+    public KafkaFuture<Map<TopicPartition, Long>> partitionsToOffset(String 
groupId) {
+        if (!futures.containsKey(groupId)) {
+            throw new IllegalArgumentException("Group ID not found: " + 
groupId);
+        }
+        return futures.get(groupId);
+    }
+}

Review Comment:
   new line in the end.



##########
clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.clients.admin.ListShareGroupOffsetsSpec;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.OffsetFetchRequest;
+import org.apache.kafka.common.requests.OffsetFetchResponse;
+import org.apache.kafka.common.utils.LogContext;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class ListShareGroupOffsetsHandler extends 
AdminApiHandler.Batched<CoordinatorKey, Map<TopicPartition, Long>> {
+
+    private final Map<String, ListShareGroupOffsetsSpec> groupSpecs;
+    private final Logger log;
+    private final CoordinatorStrategy lookupStrategy;
+
+    public ListShareGroupOffsetsHandler(
+        Map<String, ListShareGroupOffsetsSpec> groupSpecs,
+        LogContext logContext
+    ) {
+        this.log = logContext.logger(ListShareGroupOffsetsHandler.class);
+        this.lookupStrategy = new 
CoordinatorStrategy(FindCoordinatorRequest.CoordinatorType.GROUP, logContext);
+        this.groupSpecs = groupSpecs;
+    }
+
+    public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, 
Map<TopicPartition, Long>> newFuture(Collection<String> coordinatorKeys) {
+        return AdminApiFuture.forKeys(coordinatorKeys(coordinatorKeys));
+    }
+
+    @Override
+    public String apiName() {
+        return "listShareGroupOffsets";
+    }
+
+    @Override
+    public AdminApiLookupStrategy<CoordinatorKey> lookupStrategy() {
+        return lookupStrategy;
+    }
+
+    private void validateKeys(Set<CoordinatorKey> coordinatorKeys) {
+        Set<CoordinatorKey> keys = coordinatorKeys(groupSpecs.keySet());
+        if (!keys.containsAll(coordinatorKeys)) {
+            throw new IllegalArgumentException("Received unexpected group ids 
" + coordinatorKeys +
+                " (expected among " + keys + ")");
+        }
+    }
+
+    @Override
+    public OffsetFetchRequest.Builder buildBatchedRequest(int brokerId, 
Set<CoordinatorKey> coordinatorKeys) {
+        // Create a map that only contains the consumer groups owned by the 
coordinator.
+        Map<String, List<TopicPartition>> coordinatorGroupIdToTopicPartitions 
= new HashMap<>(coordinatorKeys.size());
+        coordinatorKeys.forEach(g -> {
+            ListShareGroupOffsetsSpec spec = groupSpecs.get(g.idValue);
+            List<TopicPartition> partitions = spec.topicPartitions() != null ? 
List.copyOf(spec.topicPartitions()) : null;
+            coordinatorGroupIdToTopicPartitions.put(g.idValue, partitions);
+        });
+
+        return new 
OffsetFetchRequest.Builder(coordinatorGroupIdToTopicPartitions, false, false);
+    }
+
+    @Override
+    public ApiResult<CoordinatorKey, Map<TopicPartition, Long>> handleResponse(
+        Node coordinator,
+        Set<CoordinatorKey> coordinatorKeys,
+        AbstractResponse abstractResponse
+    ) {
+        validateKeys(coordinatorKeys);
+
+        final OffsetFetchResponse response = (OffsetFetchResponse) 
abstractResponse;
+
+        Map<CoordinatorKey, Map<TopicPartition, Long>> completed = new 
HashMap<>();
+        Map<CoordinatorKey, Throwable> failed = new HashMap<>();
+        List<CoordinatorKey> unmapped = new ArrayList<>();
+        for (CoordinatorKey coordinatorKey : coordinatorKeys) {
+            String group = coordinatorKey.idValue;
+            if (response.groupHasError(group)) {
+                handleGroupError(CoordinatorKey.byGroupId(group), 
response.groupLevelError(group), failed, unmapped);
+            } else {
+                final Map<TopicPartition, Long> groupOffsetsListing = new 
HashMap<>();
+                Map<TopicPartition, OffsetFetchResponse.PartitionData> 
responseData = response.partitionDataMap(group);
+                for (Map.Entry<TopicPartition, 
OffsetFetchResponse.PartitionData> partitionEntry : responseData.entrySet()) {
+                    final TopicPartition topicPartition = 
partitionEntry.getKey();
+                    OffsetFetchResponse.PartitionData partitionData = 
partitionEntry.getValue();
+                    final Errors error = partitionData.error;
+
+                    if (error == Errors.NONE) {
+                        final long offset = partitionData.offset;
+                        groupOffsetsListing.put(topicPartition, offset);
+                    } else {
+                        log.warn("Skipping return offset for {} due to error 
{}.", topicPartition, error);
+                    }
+                }
+                completed.put(CoordinatorKey.byGroupId(group), 
groupOffsetsListing);
+            }
+        }
+        return new ApiResult<>(completed, failed, unmapped);
+    }
+
+    private static Set<CoordinatorKey> coordinatorKeys(Collection<String> 
coordinatorKeys) {
+        return coordinatorKeys.stream()
+            .map(CoordinatorKey::byGroupId)
+            .collect(Collectors.toSet());
+    }
+
+
+    private void handleGroupError(
+        CoordinatorKey groupId,
+        Errors error,
+        Map<CoordinatorKey, Throwable> failed,
+        List<CoordinatorKey> groupsToUnmap
+    ) {
+        switch (error) {
+            case GROUP_AUTHORIZATION_FAILED:
+            case GROUP_ID_NOT_FOUND:
+                log.debug("`OffsetFetch` request for group id {} failed due to 
error {}", groupId.idValue, error);
+                failed.put(groupId, error.exception());
+                break;
+            case COORDINATOR_LOAD_IN_PROGRESS:
+                // If the coordinator is in the middle of loading, then we 
just need to retry
+                log.debug("`OffsetFetch` request for group id {} failed 
because the coordinator " +
+                    "is still in the process of loading state. Will retry", 
groupId.idValue);
+                break;
+
+            case COORDINATOR_NOT_AVAILABLE:
+            case NOT_COORDINATOR:
+                // If the coordinator is unavailable or there was a 
coordinator change, then we unmap
+                // the key so that we retry the `FindCoordinator` request
+                log.debug("`OffsetFetch` request for group id {} returned 
error {}. " +
+                    "Will attempt to find the coordinator again and retry", 
groupId.idValue, error);
+                groupsToUnmap.add(groupId);
+                break;
+
+            default:
+                log.error("`OffsetFetch` request for group id {} failed due to 
unexpected error {}", groupId.idValue, error);
+                failed.put(groupId, error.exception());
+        }
+    }
+}

Review Comment:
   Line break for new file.



##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java:
##########
@@ -578,6 +581,113 @@ public CompletableFuture<ReadShareGroupStateResponseData> 
readState(RequestConte
         });
     }
 
+    @Override
+    public CompletableFuture<ReadShareGroupStateSummaryResponseData> 
readStateSummary(RequestContext context, ReadShareGroupStateSummaryRequestData 
request) {
+        String groupId = request.groupId();
+        // A map to store the futures for each topicId and partition.
+        Map<Uuid, Map<Integer, 
CompletableFuture<ReadShareGroupStateSummaryResponseData>>> futureMap = new 
HashMap<>();
+
+        // Send an empty response if topic data is empty
+        if (isEmpty(request.topics())) {
+            log.error("Topic Data is empty: {}", request);
+            return CompletableFuture.completedFuture(
+                    new ReadShareGroupStateSummaryResponseData()
+            );
+        }
+
+        // Send an empty response if partition data is empty for any topic
+        for (ReadShareGroupStateSummaryRequestData.ReadStateSummaryData 
topicData : request.topics()) {
+            if (isEmpty(topicData.partitions())) {
+                log.error("Partition Data for topic {} is empty: {}", 
topicData.topicId(), request);
+                return CompletableFuture.completedFuture(
+                        new ReadShareGroupStateSummaryResponseData()
+                );
+            }
+        }
+
+        // Send an empty response if groupId is invalid
+        if (isGroupIdEmpty(groupId)) {
+            log.error("Group id must be specified and non-empty: {}", request);
+            return CompletableFuture.completedFuture(
+                    new ReadShareGroupStateSummaryResponseData()
+            );
+        }
+
+        // Send an empty response if the coordinator is not active
+        if (!isActive.get()) {
+            return CompletableFuture.completedFuture(
+                    generateErrorReadStateSummaryResponse(
+                            request,
+                            Errors.COORDINATOR_NOT_AVAILABLE,
+                            "Share coordinator is not available."
+                    )
+            );
+        }
+
+        // The request received here could have multiple keys of structure 
group:topic:partition. However,
+        // the readStateSummary method in ShareCoordinatorShard expects a 
single key in the request. Hence, we will
+        // be looping over the keys below and constructing new 
ReadShareGroupStateSummaryRequestData objects to pass
+        // onto the shard method.
+
+        request.topics().forEach(topicData -> {
+            Uuid topicId = topicData.topicId();
+            topicData.partitions().forEach(partitionData -> {
+                // Request object containing information of a single topic 
partition
+                ReadShareGroupStateSummaryRequestData 
requestForCurrentPartition = new ReadShareGroupStateSummaryRequestData()
+                        .setGroupId(groupId)
+                        .setTopics(List.of(new 
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
+                                .setTopicId(topicId)
+                                
.setPartitions(Collections.singletonList(partitionData))));
+                SharePartitionKey coordinatorKey = 
SharePartitionKey.getInstance(request.groupId(), topicId, 
partitionData.partition());
+                // Scheduling a runtime read operation to read share partition 
state from the coordinator in memory state
+                CompletableFuture<ReadShareGroupStateSummaryResponseData> 
future = runtime.scheduleReadOperation(
+                        "read-share-group-state-summary",
+                        topicPartitionFor(coordinatorKey),
+                        (coordinator, offset) -> 
coordinator.readStateSummary(requestForCurrentPartition, offset)
+                ).exceptionally(exception -> handleOperationException(
+                        "read-share-group-state-summary",
+                        request,
+                        exception,
+                        (error, message) -> 
ReadShareGroupStateSummaryResponse.toErrorResponseData(
+                                topicData.topicId(),
+                                partitionData.partition(),
+                                error,
+                                "Unable to read share group state: " + 
exception.getMessage()
+                        ),
+                        log
+                ));
+                futureMap.computeIfAbsent(topicId, k -> new HashMap<>())
+                        .put(partitionData.partition(), future);
+            });
+        });
+
+        // Combine all futures into a single CompletableFuture<Void>
+        CompletableFuture<Void> combinedFuture = 
CompletableFuture.allOf(futureMap.values().stream()
+                .flatMap(map -> 
map.values().stream()).toArray(CompletableFuture[]::new));
+
+        // Transform the combined CompletableFuture<Void> into 
CompletableFuture<ReadShareGroupStateResponseData>
+        return combinedFuture.thenApply(v -> {
+            
List<ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult> 
readStateSummaryResult = new ArrayList<>(futureMap.size());
+            futureMap.forEach(
+                    (topicId, topicEntry) -> {
+                        
List<ReadShareGroupStateSummaryResponseData.PartitionResult> partitionResults = 
new ArrayList<>(topicEntry.size());
+                        topicEntry.forEach(
+                                (partitionId, responseFut) -> {

Review Comment:
   nit: `responseFut` => `responseFuture`, I am not a fan of irregular variable 
names for saving 3 characters. Though if `fut` is well known for `future` then 
it's otherwise.



##########
clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsOptions.java:
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+
+/**
+ * Options for {@link Admin#listShareGroupOffsets(Map, 
ListShareGroupOffsetsOptions)}.
+ * <p>
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
[email protected]
+public class ListShareGroupOffsetsOptions extends 
AbstractOptions<ListShareGroupOffsetsOptions> {
+}

Review Comment:
   New files generally have a blank last line. Can we please add.



##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java:
##########
@@ -493,6 +496,56 @@ private List<PersisterStateBatch> mergeBatches(
             .combineStateBatches();
     }
 
+    /**
+     * This method finds the ShareSnapshotValue record corresponding to the 
requested topic partition from the
+     * in-memory state of coordinator shard, the shareStateMap.
+     * <p>
+     * This method as called by the ShareCoordinatorService will be provided 
with
+     * the request data which covers only key i.e. group1:topic1:partition1. 
The implementation
+     * below was done keeping this in mind.
+     *
+     * @param request - ReadShareGroupStateSummaryRequestData for a single key
+     * @param offset  - offset to read from the __share_group_state topic 
partition
+     * @return CoordinatorResult(records, response)
+     */
+    public ReadShareGroupStateSummaryResponseData 
readStateSummary(ReadShareGroupStateSummaryRequestData request, Long offset) {

Review Comment:
   nit: Can we please move this `public` method prior `private`?



##########
share/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java:
##########
@@ -730,6 +734,147 @@ protected RPCType rpcType() {
         }
     }
 
+    public class ReadStateSummaryHandler extends PersisterStateManagerHandler {
+        private final int leaderEpoch;
+        private final CompletableFuture<ReadShareGroupStateSummaryResponse> 
result;
+        private final BackoffManager readStateBackoff;
+
+        public ReadStateSummaryHandler(
+                String groupId,
+                Uuid topicId,
+                int partition,
+                int leaderEpoch,
+                CompletableFuture<ReadShareGroupStateSummaryResponse> result,
+                long backoffMs,
+                long backoffMaxMs,
+                int maxRPCRetryAttempts,
+                Consumer<ClientResponse> onCompleteCallback
+        ) {
+            super(groupId, topicId, partition, backoffMs, backoffMaxMs, 
maxRPCRetryAttempts);
+            this.leaderEpoch = leaderEpoch;
+            this.result = result;
+            this.readStateBackoff = new BackoffManager(maxRPCRetryAttempts, 
backoffMs, backoffMaxMs);
+        }
+
+        public ReadStateSummaryHandler(
+                String groupId,
+                Uuid topicId,
+                int partition,
+                int leaderEpoch,
+                CompletableFuture<ReadShareGroupStateSummaryResponse> result,
+                Consumer<ClientResponse> onCompleteCallback
+        ) {
+            this(
+                    groupId,
+                    topicId,
+                    partition,
+                    leaderEpoch,
+                    result,
+                    REQUEST_BACKOFF_MS,
+                    REQUEST_BACKOFF_MAX_MS,
+                    MAX_FIND_COORD_ATTEMPTS,
+                    onCompleteCallback
+            );
+        }
+
+        @Override
+        protected String name() {
+            return "ReadStateSummaryHandler";

Review Comment:
   Shouldn't it be `readStateSummaryHandler` as earlier it was camel case for 
`list...`
   ```suggestion
               return "readStateSummaryHandler";
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java:
##########
@@ -210,6 +212,18 @@ 
CompletableFuture<List<ShareGroupDescribeResponseData.DescribedGroup>> shareGrou
         List<String> groupIds
     );
 
+    /**
+     * List share group Offsets.
+     *
+     * @param context           The coordinator request context.
+     * @param request           The ReadShareGroupStateSummaryRequest data.
+     *
+     * @return A future yielding the results or an exception.
+     */
+    
CompletableFuture<List<ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult>>
 listShareGroupOffsets(

Review Comment:
   I meant the method take `ReadShareGroupStateSummaryRequestData` as param 
hence I was expecting method name something like `readShareGroupStateSummary` 
but the current method name is `listShareGroupOffsets`, an reason?



##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java:
##########
@@ -493,6 +496,56 @@ private List<PersisterStateBatch> mergeBatches(
             .combineStateBatches();
     }
 
+    /**
+     * This method finds the ShareSnapshotValue record corresponding to the 
requested topic partition from the
+     * in-memory state of coordinator shard, the shareStateMap.
+     * <p>
+     * This method as called by the ShareCoordinatorService will be provided 
with
+     * the request data which covers only key i.e. group1:topic1:partition1. 
The implementation
+     * below was done keeping this in mind.
+     *
+     * @param request - ReadShareGroupStateSummaryRequestData for a single key
+     * @param offset  - offset to read from the __share_group_state topic 
partition
+     * @return CoordinatorResult(records, response)
+     */
+    public ReadShareGroupStateSummaryResponseData 
readStateSummary(ReadShareGroupStateSummaryRequestData request, Long offset) {
+        // records to read (with the key of snapshot type), response to caller
+        // only one key will be there in the request by design
+        Optional<ReadShareGroupStateSummaryResponseData> error = 
maybeGetReadStateSummaryError(request, offset);
+        if (error.isPresent()) {
+            return error.get();
+        }
+
+        Uuid topicId = request.topics().get(0).topicId();
+        int partition = 
request.topics().get(0).partitions().get(0).partition();
+
+        SharePartitionKey coordinatorKey = 
SharePartitionKey.getInstance(request.groupId(), topicId, partition);
+
+        if (!shareStateMap.containsKey(coordinatorKey)) {
+            return ReadShareGroupStateSummaryResponse.toResponseData(
+                    topicId,
+                    partition,
+                    PartitionFactory.UNINITIALIZED_START_OFFSET,
+                    PartitionFactory.DEFAULT_STATE_EPOCH
+            );
+        }
+
+        ShareGroupOffset offsetValue = shareStateMap.get(coordinatorKey, 
offset);
+

Review Comment:
   nit: remove line break.



##########
clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.clients.admin.ListShareGroupOffsetsSpec;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.OffsetFetchRequest;
+import org.apache.kafka.common.requests.OffsetFetchResponse;
+import org.apache.kafka.common.utils.LogContext;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class ListShareGroupOffsetsHandler extends 
AdminApiHandler.Batched<CoordinatorKey, Map<TopicPartition, Long>> {
+
+    private final Map<String, ListShareGroupOffsetsSpec> groupSpecs;
+    private final Logger log;
+    private final CoordinatorStrategy lookupStrategy;
+
+    public ListShareGroupOffsetsHandler(
+        Map<String, ListShareGroupOffsetsSpec> groupSpecs,
+        LogContext logContext
+    ) {
+        this.log = logContext.logger(ListShareGroupOffsetsHandler.class);
+        this.lookupStrategy = new 
CoordinatorStrategy(FindCoordinatorRequest.CoordinatorType.GROUP, logContext);
+        this.groupSpecs = groupSpecs;
+    }
+
+    public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, 
Map<TopicPartition, Long>> newFuture(Collection<String> coordinatorKeys) {
+        return AdminApiFuture.forKeys(coordinatorKeys(coordinatorKeys));
+    }
+
+    @Override
+    public String apiName() {
+        return "listShareGroupOffsets";
+    }
+
+    @Override
+    public AdminApiLookupStrategy<CoordinatorKey> lookupStrategy() {
+        return lookupStrategy;
+    }
+
+    private void validateKeys(Set<CoordinatorKey> coordinatorKeys) {
+        Set<CoordinatorKey> keys = coordinatorKeys(groupSpecs.keySet());
+        if (!keys.containsAll(coordinatorKeys)) {
+            throw new IllegalArgumentException("Received unexpected group ids 
" + coordinatorKeys +
+                " (expected among " + keys + ")");
+        }
+    }
+
+    @Override
+    public OffsetFetchRequest.Builder buildBatchedRequest(int brokerId, 
Set<CoordinatorKey> coordinatorKeys) {
+        // Create a map that only contains the consumer groups owned by the 
coordinator.
+        Map<String, List<TopicPartition>> coordinatorGroupIdToTopicPartitions 
= new HashMap<>(coordinatorKeys.size());
+        coordinatorKeys.forEach(g -> {
+            ListShareGroupOffsetsSpec spec = groupSpecs.get(g.idValue);
+            List<TopicPartition> partitions = spec.topicPartitions() != null ? 
List.copyOf(spec.topicPartitions()) : null;
+            coordinatorGroupIdToTopicPartitions.put(g.idValue, partitions);
+        });
+
+        return new 
OffsetFetchRequest.Builder(coordinatorGroupIdToTopicPartitions, false, false);
+    }
+
+    @Override
+    public ApiResult<CoordinatorKey, Map<TopicPartition, Long>> handleResponse(
+        Node coordinator,
+        Set<CoordinatorKey> coordinatorKeys,
+        AbstractResponse abstractResponse
+    ) {
+        validateKeys(coordinatorKeys);
+
+        final OffsetFetchResponse response = (OffsetFetchResponse) 
abstractResponse;
+
+        Map<CoordinatorKey, Map<TopicPartition, Long>> completed = new 
HashMap<>();
+        Map<CoordinatorKey, Throwable> failed = new HashMap<>();
+        List<CoordinatorKey> unmapped = new ArrayList<>();
+        for (CoordinatorKey coordinatorKey : coordinatorKeys) {
+            String group = coordinatorKey.idValue;
+            if (response.groupHasError(group)) {
+                handleGroupError(CoordinatorKey.byGroupId(group), 
response.groupLevelError(group), failed, unmapped);
+            } else {
+                final Map<TopicPartition, Long> groupOffsetsListing = new 
HashMap<>();
+                Map<TopicPartition, OffsetFetchResponse.PartitionData> 
responseData = response.partitionDataMap(group);
+                for (Map.Entry<TopicPartition, 
OffsetFetchResponse.PartitionData> partitionEntry : responseData.entrySet()) {
+                    final TopicPartition topicPartition = 
partitionEntry.getKey();
+                    OffsetFetchResponse.PartitionData partitionData = 
partitionEntry.getValue();
+                    final Errors error = partitionData.error;
+
+                    if (error == Errors.NONE) {
+                        final long offset = partitionData.offset;
+                        groupOffsetsListing.put(topicPartition, offset);
+                    } else {
+                        log.warn("Skipping return offset for {} due to error 
{}.", topicPartition, error);
+                    }
+                }
+                completed.put(CoordinatorKey.byGroupId(group), 
groupOffsetsListing);
+            }
+        }
+        return new ApiResult<>(completed, failed, unmapped);
+    }
+
+    private static Set<CoordinatorKey> coordinatorKeys(Collection<String> 
coordinatorKeys) {
+        return coordinatorKeys.stream()
+            .map(CoordinatorKey::byGroupId)
+            .collect(Collectors.toSet());
+    }
+
+
+    private void handleGroupError(
+        CoordinatorKey groupId,
+        Errors error,
+        Map<CoordinatorKey, Throwable> failed,
+        List<CoordinatorKey> groupsToUnmap
+    ) {
+        switch (error) {
+            case GROUP_AUTHORIZATION_FAILED:
+            case GROUP_ID_NOT_FOUND:
+                log.debug("`OffsetFetch` request for group id {} failed due to 
error {}", groupId.idValue, error);
+                failed.put(groupId, error.exception());
+                break;
+            case COORDINATOR_LOAD_IN_PROGRESS:
+                // If the coordinator is in the middle of loading, then we 
just need to retry
+                log.debug("`OffsetFetch` request for group id {} failed 
because the coordinator " +
+                    "is still in the process of loading state. Will retry", 
groupId.idValue);
+                break;
+

Review Comment:
   Above case do not have an empty line after `break`, can we please have 
consistency.



##########
clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.clients.admin.ListShareGroupOffsetsSpec;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.OffsetFetchRequest;
+import org.apache.kafka.common.requests.OffsetFetchResponse;
+import org.apache.kafka.common.utils.LogContext;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class ListShareGroupOffsetsHandler extends 
AdminApiHandler.Batched<CoordinatorKey, Map<TopicPartition, Long>> {
+
+    private final Map<String, ListShareGroupOffsetsSpec> groupSpecs;
+    private final Logger log;
+    private final CoordinatorStrategy lookupStrategy;
+
+    public ListShareGroupOffsetsHandler(
+        Map<String, ListShareGroupOffsetsSpec> groupSpecs,
+        LogContext logContext
+    ) {
+        this.log = logContext.logger(ListShareGroupOffsetsHandler.class);
+        this.lookupStrategy = new 
CoordinatorStrategy(FindCoordinatorRequest.CoordinatorType.GROUP, logContext);
+        this.groupSpecs = groupSpecs;
+    }
+
+    public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, 
Map<TopicPartition, Long>> newFuture(Collection<String> coordinatorKeys) {
+        return AdminApiFuture.forKeys(coordinatorKeys(coordinatorKeys));
+    }
+
+    @Override
+    public String apiName() {
+        return "listShareGroupOffsets";
+    }
+
+    @Override
+    public AdminApiLookupStrategy<CoordinatorKey> lookupStrategy() {
+        return lookupStrategy;
+    }
+
+    private void validateKeys(Set<CoordinatorKey> coordinatorKeys) {
+        Set<CoordinatorKey> keys = coordinatorKeys(groupSpecs.keySet());
+        if (!keys.containsAll(coordinatorKeys)) {
+            throw new IllegalArgumentException("Received unexpected group ids 
" + coordinatorKeys +
+                " (expected among " + keys + ")");
+        }
+    }
+
+    @Override
+    public OffsetFetchRequest.Builder buildBatchedRequest(int brokerId, 
Set<CoordinatorKey> coordinatorKeys) {
+        // Create a map that only contains the consumer groups owned by the 
coordinator.
+        Map<String, List<TopicPartition>> coordinatorGroupIdToTopicPartitions 
= new HashMap<>(coordinatorKeys.size());
+        coordinatorKeys.forEach(g -> {

Review Comment:
   instead of `g` can't we have `coordinatorKey` or `key` as the name? It's 
hard to read such variable names.



##########
clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.clients.admin.ListShareGroupOffsetsSpec;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.OffsetFetchRequest;
+import org.apache.kafka.common.requests.OffsetFetchResponse;
+import org.apache.kafka.common.utils.LogContext;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class ListShareGroupOffsetsHandler extends 
AdminApiHandler.Batched<CoordinatorKey, Map<TopicPartition, Long>> {
+
+    private final Map<String, ListShareGroupOffsetsSpec> groupSpecs;
+    private final Logger log;
+    private final CoordinatorStrategy lookupStrategy;
+
+    public ListShareGroupOffsetsHandler(
+        Map<String, ListShareGroupOffsetsSpec> groupSpecs,
+        LogContext logContext
+    ) {
+        this.log = logContext.logger(ListShareGroupOffsetsHandler.class);
+        this.lookupStrategy = new 
CoordinatorStrategy(FindCoordinatorRequest.CoordinatorType.GROUP, logContext);
+        this.groupSpecs = groupSpecs;
+    }
+
+    public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, 
Map<TopicPartition, Long>> newFuture(Collection<String> coordinatorKeys) {
+        return AdminApiFuture.forKeys(coordinatorKeys(coordinatorKeys));
+    }
+
+    @Override
+    public String apiName() {
+        return "listShareGroupOffsets";
+    }
+
+    @Override
+    public AdminApiLookupStrategy<CoordinatorKey> lookupStrategy() {
+        return lookupStrategy;
+    }
+
+    private void validateKeys(Set<CoordinatorKey> coordinatorKeys) {
+        Set<CoordinatorKey> keys = coordinatorKeys(groupSpecs.keySet());
+        if (!keys.containsAll(coordinatorKeys)) {
+            throw new IllegalArgumentException("Received unexpected group ids 
" + coordinatorKeys +
+                " (expected among " + keys + ")");
+        }
+    }
+
+    @Override
+    public OffsetFetchRequest.Builder buildBatchedRequest(int brokerId, 
Set<CoordinatorKey> coordinatorKeys) {
+        // Create a map that only contains the consumer groups owned by the 
coordinator.
+        Map<String, List<TopicPartition>> coordinatorGroupIdToTopicPartitions 
= new HashMap<>(coordinatorKeys.size());
+        coordinatorKeys.forEach(g -> {
+            ListShareGroupOffsetsSpec spec = groupSpecs.get(g.idValue);
+            List<TopicPartition> partitions = spec.topicPartitions() != null ? 
List.copyOf(spec.topicPartitions()) : null;
+            coordinatorGroupIdToTopicPartitions.put(g.idValue, partitions);
+        });
+
+        return new 
OffsetFetchRequest.Builder(coordinatorGroupIdToTopicPartitions, false, false);
+    }
+
+    @Override
+    public ApiResult<CoordinatorKey, Map<TopicPartition, Long>> handleResponse(
+        Node coordinator,
+        Set<CoordinatorKey> coordinatorKeys,
+        AbstractResponse abstractResponse
+    ) {
+        validateKeys(coordinatorKeys);
+
+        final OffsetFetchResponse response = (OffsetFetchResponse) 
abstractResponse;
+
+        Map<CoordinatorKey, Map<TopicPartition, Long>> completed = new 
HashMap<>();
+        Map<CoordinatorKey, Throwable> failed = new HashMap<>();
+        List<CoordinatorKey> unmapped = new ArrayList<>();
+        for (CoordinatorKey coordinatorKey : coordinatorKeys) {
+            String group = coordinatorKey.idValue;
+            if (response.groupHasError(group)) {
+                handleGroupError(CoordinatorKey.byGroupId(group), 
response.groupLevelError(group), failed, unmapped);
+            } else {
+                final Map<TopicPartition, Long> groupOffsetsListing = new 
HashMap<>();
+                Map<TopicPartition, OffsetFetchResponse.PartitionData> 
responseData = response.partitionDataMap(group);
+                for (Map.Entry<TopicPartition, 
OffsetFetchResponse.PartitionData> partitionEntry : responseData.entrySet()) {
+                    final TopicPartition topicPartition = 
partitionEntry.getKey();
+                    OffsetFetchResponse.PartitionData partitionData = 
partitionEntry.getValue();
+                    final Errors error = partitionData.error;
+
+                    if (error == Errors.NONE) {
+                        final long offset = partitionData.offset;
+                        groupOffsetsListing.put(topicPartition, offset);
+                    } else {
+                        log.warn("Skipping return offset for {} due to error 
{}.", topicPartition, error);
+                    }
+                }
+                completed.put(CoordinatorKey.byGroupId(group), 
groupOffsetsListing);
+            }
+        }
+        return new ApiResult<>(completed, failed, unmapped);
+    }
+
+    private static Set<CoordinatorKey> coordinatorKeys(Collection<String> 
coordinatorKeys) {
+        return coordinatorKeys.stream()
+            .map(CoordinatorKey::byGroupId)
+            .collect(Collectors.toSet());
+    }
+

Review Comment:
   Remove additional line break



##########
clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.clients.admin.ListShareGroupOffsetsSpec;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.OffsetFetchRequest;
+import org.apache.kafka.common.requests.OffsetFetchResponse;
+import org.apache.kafka.common.utils.LogContext;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class ListShareGroupOffsetsHandler extends 
AdminApiHandler.Batched<CoordinatorKey, Map<TopicPartition, Long>> {

Review Comment:
   Why don't we have unit tests for the class? Am I missing something?



##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinator.java:
##########
@@ -76,6 +78,14 @@ public interface ShareCoordinator {
      */
     CompletableFuture<ReadShareGroupStateResponseData> 
readState(RequestContext context, ReadShareGroupStateRequestData request);
 
+    /**
+     * Handle read share state summary call
+     * @param context - represents the incoming read request context
+     * @param request - actual RPC request object
+     * @return completable future comprising read RPC response data

Review Comment:
   nit: may be it's too geeneric and you want to correct it.



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -4318,9 +4318,22 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   def handleReadShareGroupStateSummaryRequest(request: 
RequestChannel.Request): Unit = {
     val readShareGroupStateSummaryRequest = 
request.body[ReadShareGroupStateSummaryRequest]
-    // TODO: Implement the ReadShareGroupStateSummaryRequest handling
-    requestHelper.sendMaybeThrottle(request, 
readShareGroupStateSummaryRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
-    CompletableFuture.completedFuture[Unit](())
+    authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
+
+    shareCoordinator match {
+      case None => requestHelper.sendResponseMaybeThrottle(request, 
requestThrottleMs =>
+        readShareGroupStateSummaryRequest.getErrorResponse(requestThrottleMs,
+          new ApiException("Share coordinator is not enabled.")))
+        CompletableFuture.completedFuture[Unit](())
+      case Some(coordinator) => coordinator.readStateSummary(request.context, 
readShareGroupStateSummaryRequest.data)
+        .handle[Unit] { (response, exception) =>
+          if (exception != null) {
+            requestHelper.sendMaybeThrottle(request, 
readShareGroupStateSummaryRequest.getErrorResponse(exception))
+          } else {
+            requestHelper.sendMaybeThrottle(request, new 
ReadShareGroupStateSummaryResponse(response))
+          }
+        }
+    }

Review Comment:
   I can't see the KafkaApisTest for this change, am I missing something?



##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java:
##########
@@ -578,6 +581,113 @@ public CompletableFuture<ReadShareGroupStateResponseData> 
readState(RequestConte
         });
     }
 
+    @Override
+    public CompletableFuture<ReadShareGroupStateSummaryResponseData> 
readStateSummary(RequestContext context, ReadShareGroupStateSummaryRequestData 
request) {
+        String groupId = request.groupId();
+        // A map to store the futures for each topicId and partition.
+        Map<Uuid, Map<Integer, 
CompletableFuture<ReadShareGroupStateSummaryResponseData>>> futureMap = new 
HashMap<>();
+
+        // Send an empty response if topic data is empty
+        if (isEmpty(request.topics())) {
+            log.error("Topic Data is empty: {}", request);
+            return CompletableFuture.completedFuture(
+                    new ReadShareGroupStateSummaryResponseData()
+            );
+        }
+
+        // Send an empty response if partition data is empty for any topic
+        for (ReadShareGroupStateSummaryRequestData.ReadStateSummaryData 
topicData : request.topics()) {
+            if (isEmpty(topicData.partitions())) {
+                log.error("Partition Data for topic {} is empty: {}", 
topicData.topicId(), request);
+                return CompletableFuture.completedFuture(
+                        new ReadShareGroupStateSummaryResponseData()
+                );
+            }
+        }
+
+        // Send an empty response if groupId is invalid
+        if (isGroupIdEmpty(groupId)) {
+            log.error("Group id must be specified and non-empty: {}", request);
+            return CompletableFuture.completedFuture(
+                    new ReadShareGroupStateSummaryResponseData()
+            );
+        }

Review Comment:
   Why this check is not prior to topic list iteration? If this is true then 
why to iterate for partitions.



##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java:
##########
@@ -578,6 +581,113 @@ public CompletableFuture<ReadShareGroupStateResponseData> 
readState(RequestConte
         });
     }
 
+    @Override
+    public CompletableFuture<ReadShareGroupStateSummaryResponseData> 
readStateSummary(RequestContext context, ReadShareGroupStateSummaryRequestData 
request) {
+        String groupId = request.groupId();
+        // A map to store the futures for each topicId and partition.
+        Map<Uuid, Map<Integer, 
CompletableFuture<ReadShareGroupStateSummaryResponseData>>> futureMap = new 
HashMap<>();
+
+        // Send an empty response if topic data is empty
+        if (isEmpty(request.topics())) {
+            log.error("Topic Data is empty: {}", request);
+            return CompletableFuture.completedFuture(
+                    new ReadShareGroupStateSummaryResponseData()
+            );
+        }
+
+        // Send an empty response if partition data is empty for any topic
+        for (ReadShareGroupStateSummaryRequestData.ReadStateSummaryData 
topicData : request.topics()) {
+            if (isEmpty(topicData.partitions())) {
+                log.error("Partition Data for topic {} is empty: {}", 
topicData.topicId(), request);
+                return CompletableFuture.completedFuture(
+                        new ReadShareGroupStateSummaryResponseData()
+                );

Review Comment:
   Same as above.



##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java:
##########
@@ -578,6 +581,113 @@ public CompletableFuture<ReadShareGroupStateResponseData> 
readState(RequestConte
         });
     }
 
+    @Override
+    public CompletableFuture<ReadShareGroupStateSummaryResponseData> 
readStateSummary(RequestContext context, ReadShareGroupStateSummaryRequestData 
request) {
+        String groupId = request.groupId();
+        // A map to store the futures for each topicId and partition.
+        Map<Uuid, Map<Integer, 
CompletableFuture<ReadShareGroupStateSummaryResponseData>>> futureMap = new 
HashMap<>();
+
+        // Send an empty response if topic data is empty
+        if (isEmpty(request.topics())) {
+            log.error("Topic Data is empty: {}", request);
+            return CompletableFuture.completedFuture(
+                    new ReadShareGroupStateSummaryResponseData()
+            );

Review Comment:
   The log is an error and response is empty (without exception), is it 
intended?



##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java:
##########
@@ -578,6 +581,113 @@ public CompletableFuture<ReadShareGroupStateResponseData> 
readState(RequestConte
         });
     }
 
+    @Override
+    public CompletableFuture<ReadShareGroupStateSummaryResponseData> 
readStateSummary(RequestContext context, ReadShareGroupStateSummaryRequestData 
request) {
+        String groupId = request.groupId();
+        // A map to store the futures for each topicId and partition.
+        Map<Uuid, Map<Integer, 
CompletableFuture<ReadShareGroupStateSummaryResponseData>>> futureMap = new 
HashMap<>();
+
+        // Send an empty response if topic data is empty
+        if (isEmpty(request.topics())) {
+            log.error("Topic Data is empty: {}", request);
+            return CompletableFuture.completedFuture(
+                    new ReadShareGroupStateSummaryResponseData()
+            );
+        }
+
+        // Send an empty response if partition data is empty for any topic
+        for (ReadShareGroupStateSummaryRequestData.ReadStateSummaryData 
topicData : request.topics()) {
+            if (isEmpty(topicData.partitions())) {
+                log.error("Partition Data for topic {} is empty: {}", 
topicData.topicId(), request);
+                return CompletableFuture.completedFuture(
+                        new ReadShareGroupStateSummaryResponseData()
+                );
+            }
+        }
+
+        // Send an empty response if groupId is invalid
+        if (isGroupIdEmpty(groupId)) {
+            log.error("Group id must be specified and non-empty: {}", request);
+            return CompletableFuture.completedFuture(
+                    new ReadShareGroupStateSummaryResponseData()
+            );
+        }
+
+        // Send an empty response if the coordinator is not active
+        if (!isActive.get()) {
+            return CompletableFuture.completedFuture(
+                    generateErrorReadStateSummaryResponse(
+                            request,
+                            Errors.COORDINATOR_NOT_AVAILABLE,
+                            "Share coordinator is not available."
+                    )
+            );
+        }
+
+        // The request received here could have multiple keys of structure 
group:topic:partition. However,
+        // the readStateSummary method in ShareCoordinatorShard expects a 
single key in the request. Hence, we will
+        // be looping over the keys below and constructing new 
ReadShareGroupStateSummaryRequestData objects to pass
+        // onto the shard method.
+
+        request.topics().forEach(topicData -> {
+            Uuid topicId = topicData.topicId();
+            topicData.partitions().forEach(partitionData -> {
+                // Request object containing information of a single topic 
partition
+                ReadShareGroupStateSummaryRequestData 
requestForCurrentPartition = new ReadShareGroupStateSummaryRequestData()
+                        .setGroupId(groupId)
+                        .setTopics(List.of(new 
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
+                                .setTopicId(topicId)
+                                
.setPartitions(Collections.singletonList(partitionData))));
+                SharePartitionKey coordinatorKey = 
SharePartitionKey.getInstance(request.groupId(), topicId, 
partitionData.partition());
+                // Scheduling a runtime read operation to read share partition 
state from the coordinator in memory state
+                CompletableFuture<ReadShareGroupStateSummaryResponseData> 
future = runtime.scheduleReadOperation(
+                        "read-share-group-state-summary",
+                        topicPartitionFor(coordinatorKey),
+                        (coordinator, offset) -> 
coordinator.readStateSummary(requestForCurrentPartition, offset)
+                ).exceptionally(exception -> handleOperationException(
+                        "read-share-group-state-summary",
+                        request,
+                        exception,
+                        (error, message) -> 
ReadShareGroupStateSummaryResponse.toErrorResponseData(
+                                topicData.topicId(),
+                                partitionData.partition(),
+                                error,
+                                "Unable to read share group state: " + 
exception.getMessage()
+                        ),
+                        log
+                ));
+                futureMap.computeIfAbsent(topicId, k -> new HashMap<>())
+                        .put(partitionData.partition(), future);
+            });
+        });
+
+        // Combine all futures into a single CompletableFuture<Void>
+        CompletableFuture<Void> combinedFuture = 
CompletableFuture.allOf(futureMap.values().stream()
+                .flatMap(map -> 
map.values().stream()).toArray(CompletableFuture[]::new));
+
+        // Transform the combined CompletableFuture<Void> into 
CompletableFuture<ReadShareGroupStateResponseData>
+        return combinedFuture.thenApply(v -> {
+            
List<ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult> 
readStateSummaryResult = new ArrayList<>(futureMap.size());
+            futureMap.forEach(
+                    (topicId, topicEntry) -> {
+                        
List<ReadShareGroupStateSummaryResponseData.PartitionResult> partitionResults = 
new ArrayList<>(topicEntry.size());
+                        topicEntry.forEach(
+                                (partitionId, responseFut) -> {
+                                    // responseFut would already be completed 
by now since we have used
+                                    // CompletableFuture::allOf to create a 
combined future from the future map.
+                                    partitionResults.add(
+                                            
responseFut.getNow(null).results().get(0).partitions().get(0)

Review Comment:
   nit: you might want to correct the indentation.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -712,6 +737,54 @@ public CompletableFuture<List<DescribedGroup>> 
shareGroupDescribe(
         return FutureUtils.combineFutures(futures, ArrayList::new, 
List::addAll);
     }
 
+    /**
+     * See {@link GroupCoordinator#listShareGroupOffsets(RequestContext, 
ReadShareGroupStateSummaryRequestData)}.
+     */
+    @Override
+    public 
CompletableFuture<List<ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult>>
 listShareGroupOffsets(
+        RequestContext context,
+        ReadShareGroupStateSummaryRequestData requestData
+    ) {

Review Comment:
   Do we have unit tests for the method? I can't find any though.



##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java:
##########
@@ -578,6 +581,113 @@ public CompletableFuture<ReadShareGroupStateResponseData> 
readState(RequestConte
         });
     }
 
+    @Override
+    public CompletableFuture<ReadShareGroupStateSummaryResponseData> 
readStateSummary(RequestContext context, ReadShareGroupStateSummaryRequestData 
request) {
+        String groupId = request.groupId();
+        // A map to store the futures for each topicId and partition.
+        Map<Uuid, Map<Integer, 
CompletableFuture<ReadShareGroupStateSummaryResponseData>>> futureMap = new 
HashMap<>();

Review Comment:
   Shouldn't this be created when used i.e. atleast after the `isEmpty` check. 
If code returns from `if (isEmpty(request.topics())) {` then why to initialize 
the map.



##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java:
##########
@@ -578,6 +581,113 @@ public CompletableFuture<ReadShareGroupStateResponseData> 
readState(RequestConte
         });
     }
 
+    @Override
+    public CompletableFuture<ReadShareGroupStateSummaryResponseData> 
readStateSummary(RequestContext context, ReadShareGroupStateSummaryRequestData 
request) {
+        String groupId = request.groupId();
+        // A map to store the futures for each topicId and partition.
+        Map<Uuid, Map<Integer, 
CompletableFuture<ReadShareGroupStateSummaryResponseData>>> futureMap = new 
HashMap<>();
+
+        // Send an empty response if topic data is empty
+        if (isEmpty(request.topics())) {
+            log.error("Topic Data is empty: {}", request);
+            return CompletableFuture.completedFuture(
+                    new ReadShareGroupStateSummaryResponseData()
+            );
+        }
+
+        // Send an empty response if partition data is empty for any topic
+        for (ReadShareGroupStateSummaryRequestData.ReadStateSummaryData 
topicData : request.topics()) {
+            if (isEmpty(topicData.partitions())) {
+                log.error("Partition Data for topic {} is empty: {}", 
topicData.topicId(), request);
+                return CompletableFuture.completedFuture(
+                        new ReadShareGroupStateSummaryResponseData()
+                );
+            }
+        }
+
+        // Send an empty response if groupId is invalid
+        if (isGroupIdEmpty(groupId)) {
+            log.error("Group id must be specified and non-empty: {}", request);
+            return CompletableFuture.completedFuture(
+                    new ReadShareGroupStateSummaryResponseData()
+            );
+        }
+
+        // Send an empty response if the coordinator is not active
+        if (!isActive.get()) {

Review Comment:
   This should be the first check in the method, even prior looking at request.



##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java:
##########
@@ -493,6 +496,56 @@ private List<PersisterStateBatch> mergeBatches(
             .combineStateBatches();
     }
 
+    /**
+     * This method finds the ShareSnapshotValue record corresponding to the 
requested topic partition from the
+     * in-memory state of coordinator shard, the shareStateMap.
+     * <p>
+     * This method as called by the ShareCoordinatorService will be provided 
with
+     * the request data which covers only key i.e. group1:topic1:partition1. 
The implementation
+     * below was done keeping this in mind.
+     *
+     * @param request - ReadShareGroupStateSummaryRequestData for a single key
+     * @param offset  - offset to read from the __share_group_state topic 
partition
+     * @return CoordinatorResult(records, response)

Review Comment:
   Is it correct?



##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java:
##########
@@ -493,6 +496,56 @@ private List<PersisterStateBatch> mergeBatches(
             .combineStateBatches();
     }
 
+    /**
+     * This method finds the ShareSnapshotValue record corresponding to the 
requested topic partition from the
+     * in-memory state of coordinator shard, the shareStateMap.
+     * <p>
+     * This method as called by the ShareCoordinatorService will be provided 
with
+     * the request data which covers only key i.e. group1:topic1:partition1. 
The implementation
+     * below was done keeping this in mind.
+     *
+     * @param request - ReadShareGroupStateSummaryRequestData for a single key
+     * @param offset  - offset to read from the __share_group_state topic 
partition
+     * @return CoordinatorResult(records, response)
+     */
+    public ReadShareGroupStateSummaryResponseData 
readStateSummary(ReadShareGroupStateSummaryRequestData request, Long offset) {
+        // records to read (with the key of snapshot type), response to caller
+        // only one key will be there in the request by design
+        Optional<ReadShareGroupStateSummaryResponseData> error = 
maybeGetReadStateSummaryError(request, offset);
+        if (error.isPresent()) {
+            return error.get();
+        }
+
+        Uuid topicId = request.topics().get(0).topicId();
+        int partition = 
request.topics().get(0).partitions().get(0).partition();

Review Comment:
   Similar as above.



##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java:
##########
@@ -493,6 +496,56 @@ private List<PersisterStateBatch> mergeBatches(
             .combineStateBatches();
     }
 
+    /**
+     * This method finds the ShareSnapshotValue record corresponding to the 
requested topic partition from the
+     * in-memory state of coordinator shard, the shareStateMap.
+     * <p>
+     * This method as called by the ShareCoordinatorService will be provided 
with
+     * the request data which covers only key i.e. group1:topic1:partition1. 
The implementation
+     * below was done keeping this in mind.
+     *
+     * @param request - ReadShareGroupStateSummaryRequestData for a single key
+     * @param offset  - offset to read from the __share_group_state topic 
partition
+     * @return CoordinatorResult(records, response)
+     */
+    public ReadShareGroupStateSummaryResponseData 
readStateSummary(ReadShareGroupStateSummaryRequestData request, Long offset) {
+        // records to read (with the key of snapshot type), response to caller
+        // only one key will be there in the request by design
+        Optional<ReadShareGroupStateSummaryResponseData> error = 
maybeGetReadStateSummaryError(request, offset);
+        if (error.isPresent()) {
+            return error.get();
+        }
+
+        Uuid topicId = request.topics().get(0).topicId();

Review Comment:
   Is it guaranteed that the request will have the topic and that too only 1? 
Or do we need to validate?



##########
share/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java:
##########
@@ -730,6 +734,147 @@ protected RPCType rpcType() {
         }
     }
 
+    public class ReadStateSummaryHandler extends PersisterStateManagerHandler {

Review Comment:
   Can we please have javadoc for class.



##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java:
##########
@@ -573,6 +626,46 @@ private Optional<ReadShareGroupStateResponseData> 
maybeGetReadStateError(ReadSha
         return Optional.empty();
     }
 
+    private Optional<ReadShareGroupStateSummaryResponseData> 
maybeGetReadStateSummaryError(ReadShareGroupStateSummaryRequestData request, 
Long offset) {
+        String groupId = request.groupId();
+        ReadShareGroupStateSummaryRequestData.ReadStateSummaryData topicData = 
request.topics().get(0);
+        ReadShareGroupStateSummaryRequestData.PartitionData partitionData = 
topicData.partitions().get(0);
+
+        Uuid topicId = topicData.topicId();
+        int partitionId = partitionData.partition();
+
+        if (topicId == null) {
+            log.error("Request topic id is null.");

Review Comment:
   Are these logs helpful and should be logged as `error`?



##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java:
##########
@@ -573,6 +626,46 @@ private Optional<ReadShareGroupStateResponseData> 
maybeGetReadStateError(ReadSha
         return Optional.empty();
     }
 
+    private Optional<ReadShareGroupStateSummaryResponseData> 
maybeGetReadStateSummaryError(ReadShareGroupStateSummaryRequestData request, 
Long offset) {
+        String groupId = request.groupId();
+        ReadShareGroupStateSummaryRequestData.ReadStateSummaryData topicData = 
request.topics().get(0);
+        ReadShareGroupStateSummaryRequestData.PartitionData partitionData = 
topicData.partitions().get(0);

Review Comment:
   Same as above here.



##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java:
##########
@@ -493,6 +496,56 @@ private List<PersisterStateBatch> mergeBatches(
             .combineStateBatches();
     }
 
+    /**
+     * This method finds the ShareSnapshotValue record corresponding to the 
requested topic partition from the
+     * in-memory state of coordinator shard, the shareStateMap.
+     * <p>
+     * This method as called by the ShareCoordinatorService will be provided 
with
+     * the request data which covers only key i.e. group1:topic1:partition1. 
The implementation
+     * below was done keeping this in mind.
+     *
+     * @param request - ReadShareGroupStateSummaryRequestData for a single key
+     * @param offset  - offset to read from the __share_group_state topic 
partition
+     * @return CoordinatorResult(records, response)
+     */
+    public ReadShareGroupStateSummaryResponseData 
readStateSummary(ReadShareGroupStateSummaryRequestData request, Long offset) {
+        // records to read (with the key of snapshot type), response to caller
+        // only one key will be there in the request by design
+        Optional<ReadShareGroupStateSummaryResponseData> error = 
maybeGetReadStateSummaryError(request, offset);
+        if (error.isPresent()) {
+            return error.get();
+        }
+
+        Uuid topicId = request.topics().get(0).topicId();
+        int partition = 
request.topics().get(0).partitions().get(0).partition();
+
+        SharePartitionKey coordinatorKey = 
SharePartitionKey.getInstance(request.groupId(), topicId, partition);
+
+        if (!shareStateMap.containsKey(coordinatorKey)) {
+            return ReadShareGroupStateSummaryResponse.toResponseData(
+                    topicId,
+                    partition,
+                    PartitionFactory.UNINITIALIZED_START_OFFSET,
+                    PartitionFactory.DEFAULT_STATE_EPOCH
+            );
+        }
+
+        ShareGroupOffset offsetValue = shareStateMap.get(coordinatorKey, 
offset);
+
+        if (offsetValue == null) {
+            // Returning an error response as the snapshot value was not found
+            return ReadShareGroupStateSummaryResponse.toErrorResponseData(
+                    topicId,
+                    partition,
+                    Errors.UNKNOWN_SERVER_ERROR,

Review Comment:
   Is it the right error or we need to be specific about missing key? Also 
shouldn't we log in such cases as `unknown server error` is returned which 
means that this scenario should not occur in broker.



##########
share/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java:
##########
@@ -730,6 +734,147 @@ protected RPCType rpcType() {
         }
     }
 
+    public class ReadStateSummaryHandler extends PersisterStateManagerHandler {
+        private final int leaderEpoch;
+        private final CompletableFuture<ReadShareGroupStateSummaryResponse> 
result;
+        private final BackoffManager readStateBackoff;
+
+        public ReadStateSummaryHandler(
+                String groupId,
+                Uuid topicId,
+                int partition,
+                int leaderEpoch,
+                CompletableFuture<ReadShareGroupStateSummaryResponse> result,
+                long backoffMs,
+                long backoffMaxMs,
+                int maxRPCRetryAttempts,
+                Consumer<ClientResponse> onCompleteCallback
+        ) {
+            super(groupId, topicId, partition, backoffMs, backoffMaxMs, 
maxRPCRetryAttempts);
+            this.leaderEpoch = leaderEpoch;
+            this.result = result;
+            this.readStateBackoff = new BackoffManager(maxRPCRetryAttempts, 
backoffMs, backoffMaxMs);
+        }
+
+        public ReadStateSummaryHandler(
+                String groupId,
+                Uuid topicId,
+                int partition,
+                int leaderEpoch,
+                CompletableFuture<ReadShareGroupStateSummaryResponse> result,
+                Consumer<ClientResponse> onCompleteCallback
+        ) {
+            this(
+                    groupId,
+                    topicId,

Review Comment:
   Is the indentation correct?



##########
share/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java:
##########
@@ -730,6 +734,147 @@ protected RPCType rpcType() {
         }
     }
 
+    public class ReadStateSummaryHandler extends PersisterStateManagerHandler {
+        private final int leaderEpoch;
+        private final CompletableFuture<ReadShareGroupStateSummaryResponse> 
result;
+        private final BackoffManager readStateBackoff;
+
+        public ReadStateSummaryHandler(
+                String groupId,
+                Uuid topicId,
+                int partition,
+                int leaderEpoch,
+                CompletableFuture<ReadShareGroupStateSummaryResponse> result,
+                long backoffMs,
+                long backoffMaxMs,
+                int maxRPCRetryAttempts,
+                Consumer<ClientResponse> onCompleteCallback
+        ) {
+            super(groupId, topicId, partition, backoffMs, backoffMaxMs, 
maxRPCRetryAttempts);
+            this.leaderEpoch = leaderEpoch;
+            this.result = result;
+            this.readStateBackoff = new BackoffManager(maxRPCRetryAttempts, 
backoffMs, backoffMaxMs);
+        }
+
+        public ReadStateSummaryHandler(
+                String groupId,
+                Uuid topicId,
+                int partition,
+                int leaderEpoch,
+                CompletableFuture<ReadShareGroupStateSummaryResponse> result,
+                Consumer<ClientResponse> onCompleteCallback
+        ) {
+            this(
+                    groupId,
+                    topicId,
+                    partition,
+                    leaderEpoch,
+                    result,
+                    REQUEST_BACKOFF_MS,
+                    REQUEST_BACKOFF_MAX_MS,
+                    MAX_FIND_COORD_ATTEMPTS,
+                    onCompleteCallback
+            );
+        }
+
+        @Override
+        protected String name() {
+            return "ReadStateSummaryHandler";
+        }
+
+        @Override
+        protected AbstractRequest.Builder<ReadShareGroupStateSummaryRequest> 
requestBuilder() {
+            throw new RuntimeException("Read Summary requests are batchable, 
hence individual requests not needed.");

Review Comment:
   Should it be RuntimeException or IllegalArgumentException, though depends 
what the final response from caller would be?



##########
share/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java:
##########
@@ -730,6 +734,147 @@ protected RPCType rpcType() {
         }
     }
 
+    public class ReadStateSummaryHandler extends PersisterStateManagerHandler {
+        private final int leaderEpoch;
+        private final CompletableFuture<ReadShareGroupStateSummaryResponse> 
result;
+        private final BackoffManager readStateBackoff;
+
+        public ReadStateSummaryHandler(
+                String groupId,
+                Uuid topicId,
+                int partition,
+                int leaderEpoch,
+                CompletableFuture<ReadShareGroupStateSummaryResponse> result,
+                long backoffMs,
+                long backoffMaxMs,
+                int maxRPCRetryAttempts,
+                Consumer<ClientResponse> onCompleteCallback
+        ) {
+            super(groupId, topicId, partition, backoffMs, backoffMaxMs, 
maxRPCRetryAttempts);
+            this.leaderEpoch = leaderEpoch;
+            this.result = result;
+            this.readStateBackoff = new BackoffManager(maxRPCRetryAttempts, 
backoffMs, backoffMaxMs);
+        }
+
+        public ReadStateSummaryHandler(
+                String groupId,
+                Uuid topicId,
+                int partition,
+                int leaderEpoch,
+                CompletableFuture<ReadShareGroupStateSummaryResponse> result,
+                Consumer<ClientResponse> onCompleteCallback
+        ) {
+            this(
+                    groupId,
+                    topicId,
+                    partition,
+                    leaderEpoch,
+                    result,
+                    REQUEST_BACKOFF_MS,
+                    REQUEST_BACKOFF_MAX_MS,
+                    MAX_FIND_COORD_ATTEMPTS,
+                    onCompleteCallback
+            );
+        }
+
+        @Override
+        protected String name() {
+            return "ReadStateSummaryHandler";
+        }
+
+        @Override
+        protected AbstractRequest.Builder<ReadShareGroupStateSummaryRequest> 
requestBuilder() {
+            throw new RuntimeException("Read Summary requests are batchable, 
hence individual requests not needed.");
+        }
+
+        @Override
+        protected boolean isResponseForRequest(ClientResponse response) {
+            return response.requestHeader().apiKey() == 
ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY;
+        }
+
+        @Override
+        protected void handleRequestResponse(ClientResponse response) {

Review Comment:
   Do we have tests for these changes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to