AndrewJSchofield commented on code in PR #17775:
URL: https://github.com/apache/kafka/pull/17775#discussion_r1844139747


##########
clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsResult.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * The result of the {@link Admin#listShareGroupOffsets(Map, 
ListShareGroupOffsetsOptions)} call.
+ * <p>
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
[email protected]
+public class ListShareGroupOffsetsResult {
+
+    private final Map<String, KafkaFuture<Map<TopicPartition, Long>>> futures;
+
+    public ListShareGroupOffsetsResult(Map<String, 
KafkaFuture<Map<TopicPartition, Long>>> futures) {
+        this.futures = futures;
+    }
+
+    /**
+     * Return a future which yields all Map<String, Map<TopicPartition, Long> 
objects, if requests for all the groups succeed.
+     */
+    public KafkaFuture<Map<String, Map<TopicPartition, Long>>> all() {
+        return KafkaFuture.allOf(futures.values().toArray(new 
KafkaFuture[0])).thenApply(
+                nil -> {
+                    Map<String, Map<TopicPartition, Long>> offsets = new 
HashMap<>(futures.size());
+                    futures.forEach((key, future) -> {
+                        try {
+                            offsets.put(key, future.get());
+                        } catch (InterruptedException | ExecutionException e) {
+                            // This should be unreachable, since the 
KafkaFuture#allOf already ensured
+                            // that all the futures completed successfully.
+                            throw new RuntimeException(e);
+                        }
+                    });
+                    return offsets;
+                });
+    }
+
+    /**
+     * Return a future which yields a map of topic partitions to offsets for 
the specified group.
+     */
+    public KafkaFuture<Map<TopicPartition, Long>> partitionsToOffset(String 
groupId) {
+        KafkaFutureImpl<Map<TopicPartition, Long>> future = new 
KafkaFutureImpl<>();
+        if (futures.containsKey(groupId))
+            return futures.get(groupId);
+        else
+            future.completeExceptionally(new IllegalArgumentException("Group 
ID not found: " + groupId));

Review Comment:
   I would make this throw `IllegalArgumentException` rather than complete the 
future exceptionally. See 
`ListConsumerGroupOffsetsResult`.partitionsToOffsetAndMetadata`.



##########
clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsResult.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * The result of the {@link Admin#listShareGroupOffsets(Map, 
ListShareGroupOffsetsOptions)} call.
+ * <p>
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
[email protected]
+public class ListShareGroupOffsetsResult {
+
+    private final Map<String, KafkaFuture<Map<TopicPartition, Long>>> futures;
+
+    public ListShareGroupOffsetsResult(Map<String, 
KafkaFuture<Map<TopicPartition, Long>>> futures) {

Review Comment:
   This method should be package-private, not public. Otherwise it becomes part 
of the public interface and will also end up in the javadoc.



##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java:
##########
@@ -478,6 +481,113 @@ public CompletableFuture<ReadShareGroupStateResponseData> 
readState(RequestConte
         });
     }
 
+    @Override
+    public CompletableFuture<ReadShareGroupStateSummaryResponseData> 
readStateSummary(RequestContext context, ReadShareGroupStateSummaryRequestData 
request) {
+        String groupId = request.groupId();
+        // A map to store the futures for each topicId and partition.
+        Map<Uuid, Map<Integer, 
CompletableFuture<ReadShareGroupStateSummaryResponseData>>> futureMap = new 
HashMap<>();
+
+        // Send an empty response if topic data is empty
+        if (isEmpty(request.topics())) {
+            log.error("Topic Data is empty: {}", request);
+            return CompletableFuture.completedFuture(
+                    new ReadShareGroupStateSummaryResponseData()
+            );
+        }
+
+        // Send an empty response if partition data is empty for any topic
+        for (ReadShareGroupStateSummaryRequestData.ReadStateSummaryData 
topicData : request.topics()) {
+            if (isEmpty(topicData.partitions())) {
+                log.error("Partition Data for topic {} is empty: {}", 
topicData.topicId(), request);
+                return CompletableFuture.completedFuture(
+                        new ReadShareGroupStateSummaryResponseData()
+                );
+            }
+        }
+
+        // Send an empty response if groupId is invalid
+        if (isGroupIdEmpty(groupId)) {
+            log.error("Group id must be specified and non-empty: {}", request);
+            return CompletableFuture.completedFuture(
+                    new ReadShareGroupStateSummaryResponseData()
+            );
+        }
+
+        // Send an empty response if the coordinator is not active
+        if (!isActive.get()) {
+            return CompletableFuture.completedFuture(
+                    generateErrorReadStateSummaryResponse(
+                            request,
+                            Errors.COORDINATOR_NOT_AVAILABLE,
+                            "Share coordinator is not available."
+                    )
+            );
+        }
+
+        // The request received here could have multiple keys of structure 
group:topic:partition. However,
+        // the readState method in ShareCoordinatorShard expects a single key 
in the request. Hence, we will
+        // be looping over the keys below and constructing new 
ReadShareGroupStateRequestData objects to pass
+        // onto the shard method.
+
+        request.topics().forEach(topicData -> {
+            Uuid topicId = topicData.topicId();
+            topicData.partitions().forEach(partitionData -> {
+                // Request object containing information of a single topic 
partition
+                ReadShareGroupStateSummaryRequestData 
requestForCurrentPartition = new ReadShareGroupStateSummaryRequestData()
+                        .setGroupId(groupId)
+                        .setTopics(Collections.singletonList(new 
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()

Review Comment:
   List.of



##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java:
##########
@@ -478,6 +481,113 @@ public CompletableFuture<ReadShareGroupStateResponseData> 
readState(RequestConte
         });
     }
 
+    @Override
+    public CompletableFuture<ReadShareGroupStateSummaryResponseData> 
readStateSummary(RequestContext context, ReadShareGroupStateSummaryRequestData 
request) {
+        String groupId = request.groupId();
+        // A map to store the futures for each topicId and partition.
+        Map<Uuid, Map<Integer, 
CompletableFuture<ReadShareGroupStateSummaryResponseData>>> futureMap = new 
HashMap<>();
+
+        // Send an empty response if topic data is empty
+        if (isEmpty(request.topics())) {
+            log.error("Topic Data is empty: {}", request);
+            return CompletableFuture.completedFuture(
+                    new ReadShareGroupStateSummaryResponseData()
+            );
+        }
+
+        // Send an empty response if partition data is empty for any topic
+        for (ReadShareGroupStateSummaryRequestData.ReadStateSummaryData 
topicData : request.topics()) {
+            if (isEmpty(topicData.partitions())) {
+                log.error("Partition Data for topic {} is empty: {}", 
topicData.topicId(), request);
+                return CompletableFuture.completedFuture(
+                        new ReadShareGroupStateSummaryResponseData()
+                );
+            }
+        }
+
+        // Send an empty response if groupId is invalid
+        if (isGroupIdEmpty(groupId)) {
+            log.error("Group id must be specified and non-empty: {}", request);
+            return CompletableFuture.completedFuture(
+                    new ReadShareGroupStateSummaryResponseData()
+            );
+        }
+
+        // Send an empty response if the coordinator is not active
+        if (!isActive.get()) {
+            return CompletableFuture.completedFuture(
+                    generateErrorReadStateSummaryResponse(
+                            request,
+                            Errors.COORDINATOR_NOT_AVAILABLE,
+                            "Share coordinator is not available."
+                    )
+            );
+        }
+
+        // The request received here could have multiple keys of structure 
group:topic:partition. However,
+        // the readState method in ShareCoordinatorShard expects a single key 
in the request. Hence, we will

Review Comment:
   "summary"



##########
share/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java:
##########
@@ -730,6 +733,147 @@ protected RPCType rpcType() {
         }
     }
 
+    public class ReadStateSummaryHandler extends PersisterStateManagerHandler {
+        private final int leaderEpoch;
+        private final CompletableFuture<ReadShareGroupStateSummaryResponse> 
result;
+        private final BackoffManager readStateBackoff;
+
+        public ReadStateSummaryHandler(
+                String groupId,
+                Uuid topicId,
+                int partition,
+                int leaderEpoch,
+                CompletableFuture<ReadShareGroupStateSummaryResponse> result,
+                long backoffMs,
+                long backoffMaxMs,
+                int maxRPCRetryAttempts,
+                Consumer<ClientResponse> onCompleteCallback
+        ) {
+            super(groupId, topicId, partition, backoffMs, backoffMaxMs, 
maxRPCRetryAttempts);
+            this.leaderEpoch = leaderEpoch;
+            this.result = result;
+            this.readStateBackoff = new BackoffManager(maxRPCRetryAttempts, 
backoffMs, backoffMaxMs);
+        }
+
+        public ReadStateSummaryHandler(
+                String groupId,
+                Uuid topicId,
+                int partition,
+                int leaderEpoch,
+                CompletableFuture<ReadShareGroupStateSummaryResponse> result,
+                Consumer<ClientResponse> onCompleteCallback
+        ) {
+            this(
+                    groupId,
+                    topicId,
+                    partition,
+                    leaderEpoch,
+                    result,
+                    REQUEST_BACKOFF_MS,
+                    REQUEST_BACKOFF_MAX_MS,
+                    MAX_FIND_COORD_ATTEMPTS,
+                    onCompleteCallback
+            );
+        }
+
+        @Override
+        protected String name() {
+            return "ReadStateSummaryHandler";
+        }
+
+        @Override
+        protected AbstractRequest.Builder<ReadShareGroupStateSummaryRequest> 
requestBuilder() {
+            throw new RuntimeException("Read Summary requests are batchable, 
hence individual requests not needed.");
+        }
+
+        @Override
+        protected boolean isResponseForRequest(ClientResponse response) {
+            return response.requestHeader().apiKey() == 
ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY;
+        }
+
+        @Override
+        protected void handleRequestResponse(ClientResponse response) {
+            log.debug("Read state summary response received - {}", response);
+            readStateBackoff.incrementAttempt();
+
+            ReadShareGroupStateSummaryResponse combinedResponse = 
(ReadShareGroupStateSummaryResponse) response.responseBody();
+            for (ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult 
readStateResult : combinedResponse.data().results()) {
+                if 
(readStateResult.topicId().equals(partitionKey().topicId())) {
+                    
Optional<ReadShareGroupStateSummaryResponseData.PartitionResult> 
partitionStateData =
+                            
readStateResult.partitions().stream().filter(partitionResult -> 
partitionResult.partition() == partitionKey().partition())
+                                    .findFirst();
+
+                    if (partitionStateData.isPresent()) {
+                        Errors error = 
Errors.forCode(partitionStateData.get().errorCode());
+                        switch (error) {
+                            case NONE:
+                                readStateBackoff.resetAttempts();
+                                
ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult result = 
ReadShareGroupStateSummaryResponse.toResponseReadStateSummaryResult(
+                                        partitionKey().topicId(),
+                                        
Collections.singletonList(partitionStateData.get())
+                                );
+                                this.result.complete(new 
ReadShareGroupStateSummaryResponse(new ReadShareGroupStateSummaryResponseData()
+                                        
.setResults(Collections.singletonList(result))));
+                                return;
+
+                            // check retriable errors
+                            case COORDINATOR_NOT_AVAILABLE:
+                            case COORDINATOR_LOAD_IN_PROGRESS:
+                            case NOT_COORDINATOR:
+                                log.warn("Received retriable error in read 
state RPC for key {}: {}", partitionKey(), error.message());
+                                if (!readStateBackoff.canAttempt()) {
+                                    log.error("Exhausted max retries for read 
state RPC for key {} without success.", partitionKey());
+                                    readStateSummaryErrorReponse(error, new 
Exception("Exhausted max retries to complete read state RPC without success."));
+                                    return;
+                                }
+                                super.resetCoordinatorNode();
+                                timer.add(new 
PersisterTimerTask(readStateBackoff.backOff(), this));
+                                return;
+
+                            default:
+                                log.error("Unable to perform read state RPC 
for key {}: {}", partitionKey(), error.message());
+                                readStateSummaryErrorReponse(error, null);
+                                return;
+                        }
+                    }
+                }
+            }
+
+            // no response found specific topic partition
+            IllegalStateException exception = new IllegalStateException(
+                    "Failed to read state for share partition " + 
partitionKey()
+            );
+            readStateSummaryErrorReponse(Errors.forException(exception), 
exception);
+        }
+
+        protected void readStateSummaryErrorReponse(Errors error, Exception 
exception) {
+            this.result.complete(new ReadShareGroupStateSummaryResponse(
+                    
ReadShareGroupStateSummaryResponse.toErrorResponseData(partitionKey().topicId(),
 partitionKey().partition(), error, "Error in find coordinator. " +
+                            (exception == null ? error.message() : 
exception.getMessage()))));
+        }
+
+        @Override
+        protected void findCoordinatorErrorResponse(Errors error, Exception 
exception) {
+            this.result.complete(new ReadShareGroupStateSummaryResponse(
+                    
ReadShareGroupStateSummaryResponse.toErrorResponseData(partitionKey().topicId(),
 partitionKey().partition(), error, "Error in read state RPC. " +

Review Comment:
   "summary"



##########
clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsResult.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * The result of the {@link Admin#listShareGroupOffsets(Map, 
ListShareGroupOffsetsOptions)} call.
+ * <p>
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
[email protected]
+public class ListShareGroupOffsetsResult {
+
+    private final Map<String, KafkaFuture<Map<TopicPartition, Long>>> futures;
+
+    public ListShareGroupOffsetsResult(Map<String, 
KafkaFuture<Map<TopicPartition, Long>>> futures) {
+        this.futures = futures;
+    }
+
+    /**
+     * Return a future which yields all Map<String, Map<TopicPartition, Long> 
objects, if requests for all the groups succeed.
+     */
+    public KafkaFuture<Map<String, Map<TopicPartition, Long>>> all() {
+        return KafkaFuture.allOf(futures.values().toArray(new 
KafkaFuture[0])).thenApply(
+                nil -> {
+                    Map<String, Map<TopicPartition, Long>> offsets = new 
HashMap<>(futures.size());
+                    futures.forEach((key, future) -> {

Review Comment:
   It would be a bit easier to see what's going on if `key` was renamed to 
`groupId` since that's what the key is here.



##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinator.java:
##########
@@ -81,6 +83,14 @@ public interface ShareCoordinator {
      */
     CompletableFuture<ReadShareGroupStateResponseData> 
readState(RequestContext context, ReadShareGroupStateRequestData request);
 
+    /**
+     * Handle read share state call

Review Comment:
   "summary"



##########
clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsSpec.java:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * Specification of share group offsets to list using {@link 
Admin#listShareGroupOffsets(Map, ListShareGroupOffsetsOptions)}.
+ * <p>
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
[email protected]
+public class ListShareGroupOffsetsSpec {
+
+    private Collection<TopicPartition> topicPartitions;
+
+    public ListShareGroupOffsetsSpec() {
+        topicPartitions = new ArrayList<>();
+    }
+
+    /**
+     * Set the topic partitions whose offsets are to be listed for a share 
group.
+     */
+    ListShareGroupOffsetsSpec topicPartitions(Collection<TopicPartition> 
topicPartitions) {
+        this.topicPartitions = topicPartitions;
+        return this;
+    }
+
+    /**
+     * Returns the topic partitions whose offsets are to be listed for a share 
group.
+     */
+    Collection<TopicPartition> topicPartitions() {
+        return topicPartitions;
+    }
+}

Review Comment:
   Please implement equals, hashCode and toString.



##########
clients/src/main/java/org/apache/kafka/common/requests/ReadShareGroupStateSummaryResponse.java:
##########
@@ -64,4 +67,46 @@ public static ReadShareGroupStateSummaryResponse 
parse(ByteBuffer buffer, short
                 new ReadShareGroupStateSummaryResponseData(new 
ByteBufferAccessor(buffer), version)
         );
     }
+
+    public static ReadShareGroupStateSummaryResponseData 
toErrorResponseData(Uuid topicId, int partitionId, Errors error, String 
errorMessage) {
+        return new ReadShareGroupStateSummaryResponseData().setResults(
+                Collections.singletonList(new 
ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult()
+                        .setTopicId(topicId)
+                        .setPartitions(Collections.singletonList(new 
ReadShareGroupStateSummaryResponseData.PartitionResult()
+                                .setPartition(partitionId)
+                                .setErrorCode(error.code())
+                                .setErrorMessage(errorMessage)))));
+    }
+
+    public static ReadShareGroupStateSummaryResponseData.PartitionResult 
toErrorResponsePartitionResult(int partitionId, Errors error, String 
errorMessage) {
+        return new ReadShareGroupStateSummaryResponseData.PartitionResult()
+                .setPartition(partitionId)
+                .setErrorCode(error.code())
+                .setErrorMessage(errorMessage);
+    }
+
+    public static ReadShareGroupStateSummaryResponseData toResponseData(
+            Uuid topicId,
+            int partition,
+            long startOffset,
+            int stateEpoch
+    ) {
+        return new ReadShareGroupStateSummaryResponseData()
+                .setResults(Collections.singletonList(
+                        new 
ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult()
+                                .setTopicId(topicId)
+                                .setPartitions(Collections.singletonList(

Review Comment:
   Because we've recently dropped Java 8, you can now use `List.of` instead of 
`Collections.singletonList`.



##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java:
##########
@@ -478,6 +481,113 @@ public CompletableFuture<ReadShareGroupStateResponseData> 
readState(RequestConte
         });
     }
 
+    @Override
+    public CompletableFuture<ReadShareGroupStateSummaryResponseData> 
readStateSummary(RequestContext context, ReadShareGroupStateSummaryRequestData 
request) {
+        String groupId = request.groupId();
+        // A map to store the futures for each topicId and partition.
+        Map<Uuid, Map<Integer, 
CompletableFuture<ReadShareGroupStateSummaryResponseData>>> futureMap = new 
HashMap<>();
+
+        // Send an empty response if topic data is empty
+        if (isEmpty(request.topics())) {
+            log.error("Topic Data is empty: {}", request);
+            return CompletableFuture.completedFuture(
+                    new ReadShareGroupStateSummaryResponseData()
+            );
+        }
+
+        // Send an empty response if partition data is empty for any topic
+        for (ReadShareGroupStateSummaryRequestData.ReadStateSummaryData 
topicData : request.topics()) {
+            if (isEmpty(topicData.partitions())) {
+                log.error("Partition Data for topic {} is empty: {}", 
topicData.topicId(), request);
+                return CompletableFuture.completedFuture(
+                        new ReadShareGroupStateSummaryResponseData()
+                );
+            }
+        }
+
+        // Send an empty response if groupId is invalid
+        if (isGroupIdEmpty(groupId)) {
+            log.error("Group id must be specified and non-empty: {}", request);
+            return CompletableFuture.completedFuture(
+                    new ReadShareGroupStateSummaryResponseData()
+            );
+        }
+
+        // Send an empty response if the coordinator is not active
+        if (!isActive.get()) {
+            return CompletableFuture.completedFuture(
+                    generateErrorReadStateSummaryResponse(
+                            request,
+                            Errors.COORDINATOR_NOT_AVAILABLE,
+                            "Share coordinator is not available."
+                    )
+            );
+        }
+
+        // The request received here could have multiple keys of structure 
group:topic:partition. However,
+        // the readState method in ShareCoordinatorShard expects a single key 
in the request. Hence, we will
+        // be looping over the keys below and constructing new 
ReadShareGroupStateRequestData objects to pass

Review Comment:
   "Summary"



##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java:
##########
@@ -435,6 +438,60 @@ public ReadShareGroupStateResponseData 
readState(ReadShareGroupStateRequestData
         return ReadShareGroupStateResponse.toResponseData(topicId, partition, 
offsetValue.startOffset(), offsetValue.stateEpoch(), stateBatches);
     }
 
+    /**
+     * This method finds the ShareSnapshotValue record corresponding to the 
requested topic partition from the
+     * in-memory state of coordinator shard, the shareStateMap.
+     * <p>
+     * This method as called by the ShareCoordinatorService will be provided 
with
+     * the request data which covers only key i.e. group1:topic1:partition1. 
The implementation
+     * below was done keeping this in mind.
+     *
+     * @param request - ReadShareGroupStateSummaryRequestData for a single key
+     * @param offset  - offset to read from the __share_group_state topic 
partition
+     * @return CoordinatorResult(records, response)
+     */
+    public ReadShareGroupStateSummaryResponseData 
readStateSummary(ReadShareGroupStateSummaryRequestData request, Long offset) {
+        // records to read (with the key of snapshot type), response to caller
+        // only one key will be there in the request by design
+        Optional<ReadShareGroupStateSummaryResponseData> error = 
maybeGetReadStateSummaryError(request, offset);
+        if (error.isPresent()) {
+            return error.get();
+        }
+
+        Uuid topicId = request.topics().get(0).topicId();
+        int partition = 
request.topics().get(0).partitions().get(0).partition();
+        int leaderEpoch = 
request.topics().get(0).partitions().get(0).leaderEpoch();
+
+        SharePartitionKey coordinatorKey = 
SharePartitionKey.getInstance(request.groupId(), topicId, partition);
+
+        if (!shareStateMap.containsKey(coordinatorKey)) {
+            return ReadShareGroupStateSummaryResponse.toResponseData(
+                    topicId,
+                    partition,
+                    PartitionFactory.UNINITIALIZED_START_OFFSET,
+                    PartitionFactory.DEFAULT_STATE_EPOCH
+            );
+        }
+
+        ShareGroupOffset offsetValue = shareStateMap.get(coordinatorKey, 
offset);
+
+        if (offsetValue == null) {
+            // Returning an error response as the snapshot value was not found
+            return ReadShareGroupStateSummaryResponse.toErrorResponseData(
+                    topicId,
+                    partition,
+                    Errors.UNKNOWN_SERVER_ERROR,
+                    "Data not found for topic {}, partition {} for group {}, 
in the in-memory state of share coordinator"
+            );
+        }
+
+        // Updating the leader map with the new leader epoch
+        leaderEpochMap.put(coordinatorKey, leaderEpoch);

Review Comment:
   I would expect that the leader epoch is not known in this case. Since the 
call comes from the group coordinator and it is the not the partition leader, I 
think it's going to specify -1 as the epoch and the update of the leader epoch 
map should not occur.



##########
share/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java:
##########
@@ -730,6 +733,147 @@ protected RPCType rpcType() {
         }
     }
 
+    public class ReadStateSummaryHandler extends PersisterStateManagerHandler {
+        private final int leaderEpoch;
+        private final CompletableFuture<ReadShareGroupStateSummaryResponse> 
result;
+        private final BackoffManager readStateBackoff;
+
+        public ReadStateSummaryHandler(
+                String groupId,
+                Uuid topicId,
+                int partition,
+                int leaderEpoch,
+                CompletableFuture<ReadShareGroupStateSummaryResponse> result,
+                long backoffMs,
+                long backoffMaxMs,
+                int maxRPCRetryAttempts,
+                Consumer<ClientResponse> onCompleteCallback
+        ) {
+            super(groupId, topicId, partition, backoffMs, backoffMaxMs, 
maxRPCRetryAttempts);
+            this.leaderEpoch = leaderEpoch;
+            this.result = result;
+            this.readStateBackoff = new BackoffManager(maxRPCRetryAttempts, 
backoffMs, backoffMaxMs);
+        }
+
+        public ReadStateSummaryHandler(
+                String groupId,
+                Uuid topicId,
+                int partition,
+                int leaderEpoch,
+                CompletableFuture<ReadShareGroupStateSummaryResponse> result,
+                Consumer<ClientResponse> onCompleteCallback
+        ) {
+            this(
+                    groupId,
+                    topicId,
+                    partition,
+                    leaderEpoch,
+                    result,
+                    REQUEST_BACKOFF_MS,
+                    REQUEST_BACKOFF_MAX_MS,
+                    MAX_FIND_COORD_ATTEMPTS,
+                    onCompleteCallback
+            );
+        }
+
+        @Override
+        protected String name() {
+            return "ReadStateSummaryHandler";
+        }
+
+        @Override
+        protected AbstractRequest.Builder<ReadShareGroupStateSummaryRequest> 
requestBuilder() {
+            throw new RuntimeException("Read Summary requests are batchable, 
hence individual requests not needed.");
+        }
+
+        @Override
+        protected boolean isResponseForRequest(ClientResponse response) {
+            return response.requestHeader().apiKey() == 
ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY;
+        }
+
+        @Override
+        protected void handleRequestResponse(ClientResponse response) {
+            log.debug("Read state summary response received - {}", response);
+            readStateBackoff.incrementAttempt();
+
+            ReadShareGroupStateSummaryResponse combinedResponse = 
(ReadShareGroupStateSummaryResponse) response.responseBody();
+            for (ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult 
readStateResult : combinedResponse.data().results()) {
+                if 
(readStateResult.topicId().equals(partitionKey().topicId())) {
+                    
Optional<ReadShareGroupStateSummaryResponseData.PartitionResult> 
partitionStateData =
+                            
readStateResult.partitions().stream().filter(partitionResult -> 
partitionResult.partition() == partitionKey().partition())
+                                    .findFirst();
+
+                    if (partitionStateData.isPresent()) {
+                        Errors error = 
Errors.forCode(partitionStateData.get().errorCode());
+                        switch (error) {
+                            case NONE:
+                                readStateBackoff.resetAttempts();
+                                
ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult result = 
ReadShareGroupStateSummaryResponse.toResponseReadStateSummaryResult(
+                                        partitionKey().topicId(),
+                                        
Collections.singletonList(partitionStateData.get())

Review Comment:
   List.of



##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java:
##########
@@ -478,6 +481,113 @@ public CompletableFuture<ReadShareGroupStateResponseData> 
readState(RequestConte
         });
     }
 
+    @Override
+    public CompletableFuture<ReadShareGroupStateSummaryResponseData> 
readStateSummary(RequestContext context, ReadShareGroupStateSummaryRequestData 
request) {
+        String groupId = request.groupId();
+        // A map to store the futures for each topicId and partition.
+        Map<Uuid, Map<Integer, 
CompletableFuture<ReadShareGroupStateSummaryResponseData>>> futureMap = new 
HashMap<>();
+
+        // Send an empty response if topic data is empty
+        if (isEmpty(request.topics())) {
+            log.error("Topic Data is empty: {}", request);
+            return CompletableFuture.completedFuture(
+                    new ReadShareGroupStateSummaryResponseData()
+            );
+        }
+
+        // Send an empty response if partition data is empty for any topic
+        for (ReadShareGroupStateSummaryRequestData.ReadStateSummaryData 
topicData : request.topics()) {
+            if (isEmpty(topicData.partitions())) {
+                log.error("Partition Data for topic {} is empty: {}", 
topicData.topicId(), request);
+                return CompletableFuture.completedFuture(
+                        new ReadShareGroupStateSummaryResponseData()
+                );
+            }
+        }
+
+        // Send an empty response if groupId is invalid
+        if (isGroupIdEmpty(groupId)) {
+            log.error("Group id must be specified and non-empty: {}", request);
+            return CompletableFuture.completedFuture(
+                    new ReadShareGroupStateSummaryResponseData()
+            );
+        }
+
+        // Send an empty response if the coordinator is not active
+        if (!isActive.get()) {
+            return CompletableFuture.completedFuture(
+                    generateErrorReadStateSummaryResponse(
+                            request,
+                            Errors.COORDINATOR_NOT_AVAILABLE,
+                            "Share coordinator is not available."
+                    )
+            );
+        }
+
+        // The request received here could have multiple keys of structure 
group:topic:partition. However,
+        // the readState method in ShareCoordinatorShard expects a single key 
in the request. Hence, we will
+        // be looping over the keys below and constructing new 
ReadShareGroupStateRequestData objects to pass
+        // onto the shard method.
+
+        request.topics().forEach(topicData -> {
+            Uuid topicId = topicData.topicId();
+            topicData.partitions().forEach(partitionData -> {
+                // Request object containing information of a single topic 
partition
+                ReadShareGroupStateSummaryRequestData 
requestForCurrentPartition = new ReadShareGroupStateSummaryRequestData()
+                        .setGroupId(groupId)
+                        .setTopics(Collections.singletonList(new 
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
+                                .setTopicId(topicId)
+                                
.setPartitions(Collections.singletonList(partitionData))));
+                SharePartitionKey coordinatorKey = 
SharePartitionKey.getInstance(request.groupId(), topicId, 
partitionData.partition());
+                // Scheduling a runtime read operation to read share partition 
state from the coordinator in memory state
+                CompletableFuture<ReadShareGroupStateSummaryResponseData> 
future = runtime.scheduleReadOperation(
+                        "read-share-group-state-summary",
+                        topicPartitionFor(coordinatorKey),
+                        (coordinator, offset) -> 
coordinator.readStateSummary(requestForCurrentPartition, offset)
+                ).exceptionally(exception -> handleOperationException(
+                        "read-share-group-state",

Review Comment:
   "summary"



##########
share/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java:
##########
@@ -730,6 +733,147 @@ protected RPCType rpcType() {
         }
     }
 
+    public class ReadStateSummaryHandler extends PersisterStateManagerHandler {
+        private final int leaderEpoch;
+        private final CompletableFuture<ReadShareGroupStateSummaryResponse> 
result;
+        private final BackoffManager readStateBackoff;
+
+        public ReadStateSummaryHandler(
+                String groupId,
+                Uuid topicId,
+                int partition,
+                int leaderEpoch,
+                CompletableFuture<ReadShareGroupStateSummaryResponse> result,
+                long backoffMs,
+                long backoffMaxMs,
+                int maxRPCRetryAttempts,
+                Consumer<ClientResponse> onCompleteCallback
+        ) {
+            super(groupId, topicId, partition, backoffMs, backoffMaxMs, 
maxRPCRetryAttempts);
+            this.leaderEpoch = leaderEpoch;
+            this.result = result;
+            this.readStateBackoff = new BackoffManager(maxRPCRetryAttempts, 
backoffMs, backoffMaxMs);
+        }
+
+        public ReadStateSummaryHandler(
+                String groupId,
+                Uuid topicId,
+                int partition,
+                int leaderEpoch,
+                CompletableFuture<ReadShareGroupStateSummaryResponse> result,
+                Consumer<ClientResponse> onCompleteCallback
+        ) {
+            this(
+                    groupId,
+                    topicId,
+                    partition,
+                    leaderEpoch,
+                    result,
+                    REQUEST_BACKOFF_MS,
+                    REQUEST_BACKOFF_MAX_MS,
+                    MAX_FIND_COORD_ATTEMPTS,
+                    onCompleteCallback
+            );
+        }
+
+        @Override
+        protected String name() {
+            return "ReadStateSummaryHandler";
+        }
+
+        @Override
+        protected AbstractRequest.Builder<ReadShareGroupStateSummaryRequest> 
requestBuilder() {
+            throw new RuntimeException("Read Summary requests are batchable, 
hence individual requests not needed.");
+        }
+
+        @Override
+        protected boolean isResponseForRequest(ClientResponse response) {
+            return response.requestHeader().apiKey() == 
ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY;
+        }
+
+        @Override
+        protected void handleRequestResponse(ClientResponse response) {
+            log.debug("Read state summary response received - {}", response);
+            readStateBackoff.incrementAttempt();
+
+            ReadShareGroupStateSummaryResponse combinedResponse = 
(ReadShareGroupStateSummaryResponse) response.responseBody();
+            for (ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult 
readStateResult : combinedResponse.data().results()) {
+                if 
(readStateResult.topicId().equals(partitionKey().topicId())) {
+                    
Optional<ReadShareGroupStateSummaryResponseData.PartitionResult> 
partitionStateData =
+                            
readStateResult.partitions().stream().filter(partitionResult -> 
partitionResult.partition() == partitionKey().partition())
+                                    .findFirst();
+
+                    if (partitionStateData.isPresent()) {
+                        Errors error = 
Errors.forCode(partitionStateData.get().errorCode());
+                        switch (error) {
+                            case NONE:
+                                readStateBackoff.resetAttempts();
+                                
ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult result = 
ReadShareGroupStateSummaryResponse.toResponseReadStateSummaryResult(
+                                        partitionKey().topicId(),
+                                        
Collections.singletonList(partitionStateData.get())
+                                );
+                                this.result.complete(new 
ReadShareGroupStateSummaryResponse(new ReadShareGroupStateSummaryResponseData()
+                                        
.setResults(Collections.singletonList(result))));
+                                return;
+
+                            // check retriable errors
+                            case COORDINATOR_NOT_AVAILABLE:
+                            case COORDINATOR_LOAD_IN_PROGRESS:
+                            case NOT_COORDINATOR:
+                                log.warn("Received retriable error in read 
state RPC for key {}: {}", partitionKey(), error.message());

Review Comment:
   "summary"



##########
share/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java:
##########
@@ -278,9 +279,92 @@ public CompletableFuture<DeleteShareGroupStateResult> 
deleteState(DeleteShareGro
      * @return A completable future of  ReadShareGroupStateSummaryResult
      */
     public CompletableFuture<ReadShareGroupStateSummaryResult> 
readSummary(ReadShareGroupStateSummaryParameters request) throws 
IllegalArgumentException {
-        throw new RuntimeException("not implemented");
+        validate(request);
+        GroupTopicPartitionData<PartitionIdLeaderEpochData> gtp = 
request.groupTopicPartitionData();
+        String groupId = gtp.groupId();
+        Map<Uuid, Map<Integer, 
CompletableFuture<ReadShareGroupStateSummaryResponse>>> futureMap = new 
HashMap<>();
+        List<PersisterStateManager.ReadStateSummaryHandler> handlers = new 
ArrayList<>();
+
+        gtp.topicsData().forEach(topicData -> {
+            topicData.partitions().forEach(partitionData -> {
+                CompletableFuture<ReadShareGroupStateSummaryResponse> future = 
futureMap
+                        .computeIfAbsent(topicData.topicId(), k -> new 
HashMap<>())
+                        .computeIfAbsent(partitionData.partition(), k -> new 
CompletableFuture<>());
+
+                handlers.add(
+                        stateManager.new ReadStateSummaryHandler(
+                                groupId,
+                                topicData.topicId(),
+                                partitionData.partition(),
+                                partitionData.leaderEpoch(),
+                                future,
+                                null)
+                );
+            });
+        });
+
+        for (PersisterStateManager.PersisterStateManagerHandler handler : 
handlers) {
+            stateManager.enqueue(handler);
+        }
+
+        // Combine all futures into a single CompletableFuture<Void>
+        CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
+                handlers.stream()
+                        
.map(PersisterStateManager.ReadStateSummaryHandler::result)
+                        .toArray(CompletableFuture[]::new));
+
+        // Transform the combined CompletableFuture<Void> into 
CompletableFuture<ReadShareGroupStateResult>
+        return combinedFuture.thenApply(v -> 
readSummaryResponsesToResult(futureMap));
     }
 
+    /**
+     * Takes in a list of COMPLETED futures and combines the results,
+     * taking care of errors if any, into a single 
ReadShareGroupStateSummaryResult
+     * @param futureMap - HashMap of {topic -> {part -> future}}
+     * @return Object representing combined result of type 
ReadShareGroupStateSummaryResult
+     */
+    // visible for testing
+    ReadShareGroupStateSummaryResult readSummaryResponsesToResult(
+            Map<Uuid, Map<Integer, 
CompletableFuture<ReadShareGroupStateSummaryResponse>>> futureMap
+    ) {
+        List<TopicData<PartitionStateErrorData>> topicsData = 
futureMap.keySet().stream()
+                .map(topicId -> {

Review Comment:
   nit: I think your line continuation indentation is 8 here, but the source 
file uses 4.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to