apoorvmittal10 commented on code in PR #18571:
URL: https://github.com/apache/kafka/pull/18571#discussion_r1918186242


##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -3796,6 +3796,13 @@ public DescribeShareGroupsResult 
describeShareGroups(final Collection<String> gr
                 .collect(Collectors.toMap(entry -> entry.getKey().idValue, 
Map.Entry::getValue)));
     }
 
+    // To do in a follow-up PR
+    @Override
+    public ListShareGroupOffsetsResult listShareGroupOffsets(final Map<String, 
ListShareGroupOffsetsSpec> groupSpecs,
+                                                             final 
ListShareGroupOffsetsOptions options) {
+        // To-do
+        return null;

Review Comment:
   nit: generally better to throw Exception that not yet implemented but it's 
fine for now as follow up PRs will be soon after.



##########
clients/src/main/java/org/apache/kafka/common/requests/ReadShareGroupStateSummaryRequest.java:
##########
@@ -34,7 +35,7 @@ public static class Builder extends 
AbstractRequest.Builder<ReadShareGroupStateS
         private final ReadShareGroupStateSummaryRequestData data;
 
         public Builder(ReadShareGroupStateSummaryRequestData data) {
-            this(data, false);
+            this(data, true);

Review Comment:
   Should this be `true`? I see ShareFetchRequest or ShareAcknowledgeRequest 
still has `false` in enable unstable version and get's tackled at caller level 
with enable versions in server.properties.



##########
clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsResult.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.clients.admin.internals.CoordinatorKey;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * The result of the {@link Admin#listShareGroupOffsets(Map, 
ListShareGroupOffsetsOptions)} call.
+ * <p>
+ * The API of this class is evolving, see {@link Admin} for details.

Review Comment:
   What should one look for in Admin.java to get details about `evolving`?



##########
clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsOptions.java:
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+
+/**
+ * Options for {@link Admin#listShareGroupOffsets(Map, 
ListShareGroupOffsetsOptions)}.
+ * <p>
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class ListShareGroupOffsetsOptions extends 
AbstractOptions<ListShareGroupOffsetsOptions> {
+}

Review Comment:
   Empty line at end of new files.



##########
clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsSpec.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Specification of share group offsets to list using {@link 
Admin#listShareGroupOffsets(Map, ListShareGroupOffsetsOptions)}.
+ * <p>
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class ListShareGroupOffsetsSpec {
+
+    private Collection<TopicPartition> topicPartitions;
+
+    public ListShareGroupOffsetsSpec() {
+        topicPartitions = new ArrayList<>();
+    }

Review Comment:
   nit: What's the benefit of initializing by default? I don't see any `add` 
method to put topicPartition in collection rather a method to completely 
replace.



##########
server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java:
##########
@@ -288,7 +291,96 @@ public CompletableFuture<DeleteShareGroupStateResult> 
deleteState(DeleteShareGro
      * @return A completable future of  ReadShareGroupStateSummaryResult
      */
     public CompletableFuture<ReadShareGroupStateSummaryResult> 
readSummary(ReadShareGroupStateSummaryParameters request) {
-        throw new RuntimeException("not implemented");
+        try {
+            validate(request);
+        } catch (Exception e) {
+            log.error("Unable to validate read state summary request", e);
+            return CompletableFuture.failedFuture(e);
+        }
+        GroupTopicPartitionData<PartitionIdLeaderEpochData> gtp = 
request.groupTopicPartitionData();
+        String groupId = gtp.groupId();
+        Map<Uuid, Map<Integer, 
CompletableFuture<ReadShareGroupStateSummaryResponse>>> futureMap = new 
HashMap<>();
+        List<PersisterStateManager.ReadStateSummaryHandler> handlers = new 
ArrayList<>();
+
+        gtp.topicsData().forEach(topicData -> {
+            topicData.partitions().forEach(partitionData -> {
+                CompletableFuture<ReadShareGroupStateSummaryResponse> future = 
futureMap
+                    .computeIfAbsent(topicData.topicId(), k -> new HashMap<>())
+                    .computeIfAbsent(partitionData.partition(), k -> new 
CompletableFuture<>());
+
+                handlers.add(
+                    stateManager.new ReadStateSummaryHandler(
+                        groupId,
+                        topicData.topicId(),
+                        partitionData.partition(),
+                        partitionData.leaderEpoch(),
+                        future,
+                        null
+                    )
+                );
+            });
+        });
+
+        for (PersisterStateManager.PersisterStateManagerHandler handler : 
handlers) {
+            stateManager.enqueue(handler);
+        }
+
+        // Combine all futures into a single CompletableFuture<Void>
+        CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
+            handlers.stream()
+                .map(PersisterStateManager.ReadStateSummaryHandler::result)
+                .toArray(CompletableFuture[]::new));
+
+        // Transform the combined CompletableFuture<Void> into 
CompletableFuture<ReadShareGroupStateResult>
+        return combinedFuture.thenApply(v -> 
readSummaryResponsesToResult(futureMap));
+    }
+
+    /**
+     * Takes in a list of COMPLETED futures and combines the results,
+     * taking care of errors if any, into a single 
ReadShareGroupStateSummaryResult
+     *
+     * @param futureMap - HashMap of {topic -> {part -> future}}

Review Comment:
   Please do not use such abbreviations in javadoc.
   ```suggestion
        * @param futureMap - HashMap of {topic -> {partition -> future}}
   ```



##########
server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java:
##########
@@ -730,6 +734,147 @@ protected RPCType rpcType() {
         }
     }
 
+    public class ReadStateSummaryHandler extends PersisterStateManagerHandler {
+        private final int leaderEpoch;
+        private final CompletableFuture<ReadShareGroupStateSummaryResponse> 
result;
+        private final BackoffManager readStateSummaryBackoff;
+
+        public ReadStateSummaryHandler(
+            String groupId,
+            Uuid topicId,
+            int partition,
+            int leaderEpoch,
+            CompletableFuture<ReadShareGroupStateSummaryResponse> result,
+            long backoffMs,
+            long backoffMaxMs,
+            int maxRPCRetryAttempts,
+            Consumer<ClientResponse> onCompleteCallback
+        ) {
+            super(groupId, topicId, partition, backoffMs, backoffMaxMs, 
maxRPCRetryAttempts);
+            this.leaderEpoch = leaderEpoch;
+            this.result = result;
+            this.readStateSummaryBackoff = new 
BackoffManager(maxRPCRetryAttempts, backoffMs, backoffMaxMs);
+        }
+
+        public ReadStateSummaryHandler(
+            String groupId,
+            Uuid topicId,
+            int partition,
+            int leaderEpoch,
+            CompletableFuture<ReadShareGroupStateSummaryResponse> result,
+            Consumer<ClientResponse> onCompleteCallback
+        ) {
+            this(
+                groupId,
+                topicId,
+                partition,
+                leaderEpoch,
+                result,
+                REQUEST_BACKOFF_MS,
+                REQUEST_BACKOFF_MAX_MS,
+                MAX_FIND_COORD_ATTEMPTS,
+                onCompleteCallback
+            );
+        }
+
+        @Override
+        protected String name() {
+            return "ReadStateSummaryHandler";
+        }
+
+        @Override
+        protected AbstractRequest.Builder<ReadShareGroupStateSummaryRequest> 
requestBuilder() {
+            throw new RuntimeException("Read Summary requests are batchable, 
hence individual requests not needed.");
+        }
+
+        @Override
+        protected boolean isResponseForRequest(ClientResponse response) {
+            return response.requestHeader().apiKey() == 
ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY;
+        }
+
+        @Override
+        protected void handleRequestResponse(ClientResponse response) {

Review Comment:
   Though not for this PR but just `handleResponse` method name seems better.



##########
server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java:
##########
@@ -730,6 +734,147 @@ protected RPCType rpcType() {
         }
     }
 
+    public class ReadStateSummaryHandler extends PersisterStateManagerHandler {
+        private final int leaderEpoch;
+        private final CompletableFuture<ReadShareGroupStateSummaryResponse> 
result;
+        private final BackoffManager readStateSummaryBackoff;
+
+        public ReadStateSummaryHandler(
+            String groupId,
+            Uuid topicId,
+            int partition,
+            int leaderEpoch,
+            CompletableFuture<ReadShareGroupStateSummaryResponse> result,
+            long backoffMs,
+            long backoffMaxMs,
+            int maxRPCRetryAttempts,
+            Consumer<ClientResponse> onCompleteCallback
+        ) {
+            super(groupId, topicId, partition, backoffMs, backoffMaxMs, 
maxRPCRetryAttempts);
+            this.leaderEpoch = leaderEpoch;
+            this.result = result;
+            this.readStateSummaryBackoff = new 
BackoffManager(maxRPCRetryAttempts, backoffMs, backoffMaxMs);
+        }
+
+        public ReadStateSummaryHandler(
+            String groupId,
+            Uuid topicId,
+            int partition,
+            int leaderEpoch,
+            CompletableFuture<ReadShareGroupStateSummaryResponse> result,
+            Consumer<ClientResponse> onCompleteCallback
+        ) {
+            this(
+                groupId,
+                topicId,
+                partition,
+                leaderEpoch,
+                result,
+                REQUEST_BACKOFF_MS,
+                REQUEST_BACKOFF_MAX_MS,
+                MAX_FIND_COORD_ATTEMPTS,
+                onCompleteCallback
+            );
+        }
+
+        @Override
+        protected String name() {
+            return "ReadStateSummaryHandler";
+        }
+
+        @Override
+        protected AbstractRequest.Builder<ReadShareGroupStateSummaryRequest> 
requestBuilder() {
+            throw new RuntimeException("Read Summary requests are batchable, 
hence individual requests not needed.");

Review Comment:
   Should it be RuntimeException or something more specific?



##########
server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java:
##########
@@ -288,7 +291,96 @@ public CompletableFuture<DeleteShareGroupStateResult> 
deleteState(DeleteShareGro
      * @return A completable future of  ReadShareGroupStateSummaryResult
      */
     public CompletableFuture<ReadShareGroupStateSummaryResult> 
readSummary(ReadShareGroupStateSummaryParameters request) {
-        throw new RuntimeException("not implemented");
+        try {
+            validate(request);
+        } catch (Exception e) {
+            log.error("Unable to validate read state summary request", e);

Review Comment:
   Should we have error log for this failure, is this error concerning?



##########
clients/src/main/java/org/apache/kafka/common/requests/ReadShareGroupStateSummaryRequest.java:
##########
@@ -83,8 +84,24 @@ public ReadShareGroupStateSummaryRequestData data() {
 
     public static ReadShareGroupStateSummaryRequest parse(ByteBuffer buffer, 
short version) {
         return new ReadShareGroupStateSummaryRequest(
-                new ReadShareGroupStateSummaryRequestData(new 
ByteBufferAccessor(buffer), version),
-                version
+            new ReadShareGroupStateSummaryRequestData(new 
ByteBufferAccessor(buffer), version),
+            version
         );
     }
+
+    public static 
List<ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult> 
getErrorList(
+        List<Uuid> topicIds,
+        Errors error
+    ) {
+        return topicIds.stream()
+            .map(topicId ->
+                new 
ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult()
+                    .setTopicId(topicId)
+                    .setPartitions(List.of(
+                            new 
ReadShareGroupStateSummaryResponseData.PartitionResult()
+                                .setErrorCode(error.code())
+                                .setErrorMessage(error.exceptionName())

Review Comment:
   Why do we want to return the class name of the exception? Also shouldn't the 
actual exception be passed here to get the correct error message?



##########
clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsSpec.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Specification of share group offsets to list using {@link 
Admin#listShareGroupOffsets(Map, ListShareGroupOffsetsOptions)}.
+ * <p>
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class ListShareGroupOffsetsSpec {
+
+    private Collection<TopicPartition> topicPartitions;
+
+    public ListShareGroupOffsetsSpec() {
+        topicPartitions = new ArrayList<>();
+    }
+
+    /**
+     * Set the topic partitions whose offsets are to be listed for a share 
group.
+     */
+    public ListShareGroupOffsetsSpec 
topicPartitions(Collection<TopicPartition> topicPartitions) {
+        this.topicPartitions = topicPartitions;
+        return this;
+    }
+
+    /**
+     * Returns the topic partitions whose offsets are to be listed for a share 
group.
+     */
+    public Collection<TopicPartition> topicPartitions() {
+        return topicPartitions;

Review Comment:
   In case you remove the default initialization then `return topicPartitions 
== null ? Collections.emptyList()/List.of() : topicPartitions;`



##########
server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java:
##########
@@ -288,7 +291,96 @@ public CompletableFuture<DeleteShareGroupStateResult> 
deleteState(DeleteShareGro
      * @return A completable future of  ReadShareGroupStateSummaryResult
      */
     public CompletableFuture<ReadShareGroupStateSummaryResult> 
readSummary(ReadShareGroupStateSummaryParameters request) {
-        throw new RuntimeException("not implemented");
+        try {
+            validate(request);
+        } catch (Exception e) {
+            log.error("Unable to validate read state summary request", e);
+            return CompletableFuture.failedFuture(e);
+        }
+        GroupTopicPartitionData<PartitionIdLeaderEpochData> gtp = 
request.groupTopicPartitionData();

Review Comment:
   nit: line break above would be good.



##########
clients/src/main/java/org/apache/kafka/common/requests/ReadShareGroupStateSummaryResponse.java:
##########
@@ -59,9 +61,52 @@ public int throttleTimeMs() {
     public void maybeSetThrottleTimeMs(int throttleTimeMs) {
         // No op
     }
+
     public static ReadShareGroupStateSummaryResponse parse(ByteBuffer buffer, 
short version) {
         return new ReadShareGroupStateSummaryResponse(
-                new ReadShareGroupStateSummaryResponseData(new 
ByteBufferAccessor(buffer), version)
+            new ReadShareGroupStateSummaryResponseData(new 
ByteBufferAccessor(buffer), version)
         );
     }
+
+    public static ReadShareGroupStateSummaryResponseData 
toErrorResponseData(Uuid topicId, int partitionId, Errors error, String 
errorMessage) {
+        return new ReadShareGroupStateSummaryResponseData().setResults(
+            List.of(new 
ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult()
+                .setTopicId(topicId)
+                .setPartitions(List.of(new 
ReadShareGroupStateSummaryResponseData.PartitionResult()
+                    .setPartition(partitionId)
+                    .setErrorCode(error.code())
+                    .setErrorMessage(errorMessage)))));
+    }
+
+    public static ReadShareGroupStateSummaryResponseData.PartitionResult 
toErrorResponsePartitionResult(int partitionId, Errors error, String 
errorMessage) {
+        return new ReadShareGroupStateSummaryResponseData.PartitionResult()
+            .setPartition(partitionId)
+            .setErrorCode(error.code())
+            .setErrorMessage(errorMessage);
+    }
+
+    public static ReadShareGroupStateSummaryResponseData toResponseData(
+        Uuid topicId,
+        int partition,
+        long startOffset,
+        int stateEpoch
+    ) {
+        return new ReadShareGroupStateSummaryResponseData()
+            .setResults(List.of(
+                new 
ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult()
+                    .setTopicId(topicId)
+                    .setPartitions(List.of(
+                        new 
ReadShareGroupStateSummaryResponseData.PartitionResult()
+                            .setPartition(partition)
+                            .setStartOffset(startOffset)
+                            .setStateEpoch(stateEpoch)
+                    ))
+            ));
+    }
+
+    public static 
ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult 
toResponseReadStateSummaryResult(Uuid topicId, 
List<ReadShareGroupStateSummaryResponseData.PartitionResult> partitionResults) {

Review Comment:
   Might want to break in 2 lines, seems too big for single line.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to