apoorvmittal10 commented on code in PR #18571:
URL: https://github.com/apache/kafka/pull/18571#discussion_r1918991457


##########
clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsResult.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.clients.admin.internals.CoordinatorKey;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * The result of the {@link Admin#listShareGroupOffsets(Map, 
ListShareGroupOffsetsOptions)} call.
+ * <p>
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class ListShareGroupOffsetsResult {
+
+    private final Map<String, KafkaFuture<Map<TopicPartition, Long>>> futures;
+
+    public ListShareGroupOffsetsResult(final Map<CoordinatorKey, 
KafkaFuture<Map<TopicPartition, Long>>> futures) {
+        this.futures = futures.entrySet().stream()
+            .collect(Collectors.toMap(e -> e.getKey().idValue, 
Map.Entry::getValue));
+    }
+
+    /**
+     * Return a future which yields all Map<String, Map<TopicPartition, Long> 
objects, if requests for all the groups succeed.
+     */

Review Comment:
   nit: Would be better to write the comments as like javadoc i.e. @return, 
@param? Same for other method and classes.



##########
clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsSpec.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Specification of share group offsets to list using {@link 
Admin#listShareGroupOffsets(Map, ListShareGroupOffsetsOptions)}.
+ * <p>
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class ListShareGroupOffsetsSpec {
+
+    private Collection<TopicPartition> topicPartitions;
+
+    public ListShareGroupOffsetsSpec() {
+    }
+
+    /**
+     * Set the topic partitions whose offsets are to be listed for a share 
group.
+     */
+    public ListShareGroupOffsetsSpec 
topicPartitions(Collection<TopicPartition> topicPartitions) {
+        this.topicPartitions = topicPartitions;
+        return this;
+    }
+
+    /**
+     * Returns the topic partitions whose offsets are to be listed for a share 
group.
+     */
+    public Collection<TopicPartition> topicPartitions() {
+        return topicPartitions == null ? Collections.emptyList() : 
topicPartitions;

Review Comment:
   Just check if there are no xlint warnings as `Collections.emptyList()` has 
unchecked suppression so using List.of() might be better. In case there are no 
warnings then you can have either.
   ```suggestion
           return topicPartitions == null ? List.of() : topicPartitions;
   ```



##########
server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java:
##########
@@ -730,6 +734,147 @@ protected RPCType rpcType() {
         }
     }
 
+    public class ReadStateSummaryHandler extends PersisterStateManagerHandler {
+        private final int leaderEpoch;
+        private final CompletableFuture<ReadShareGroupStateSummaryResponse> 
result;
+        private final BackoffManager readStateSummaryBackoff;
+
+        public ReadStateSummaryHandler(
+            String groupId,
+            Uuid topicId,
+            int partition,
+            int leaderEpoch,
+            CompletableFuture<ReadShareGroupStateSummaryResponse> result,
+            long backoffMs,
+            long backoffMaxMs,
+            int maxRPCRetryAttempts,
+            Consumer<ClientResponse> onCompleteCallback
+        ) {
+            super(groupId, topicId, partition, backoffMs, backoffMaxMs, 
maxRPCRetryAttempts);
+            this.leaderEpoch = leaderEpoch;
+            this.result = result;
+            this.readStateSummaryBackoff = new 
BackoffManager(maxRPCRetryAttempts, backoffMs, backoffMaxMs);
+        }
+
+        public ReadStateSummaryHandler(
+            String groupId,
+            Uuid topicId,
+            int partition,
+            int leaderEpoch,
+            CompletableFuture<ReadShareGroupStateSummaryResponse> result,
+            Consumer<ClientResponse> onCompleteCallback
+        ) {
+            this(
+                groupId,
+                topicId,
+                partition,
+                leaderEpoch,
+                result,
+                REQUEST_BACKOFF_MS,
+                REQUEST_BACKOFF_MAX_MS,
+                MAX_FIND_COORD_ATTEMPTS,
+                onCompleteCallback
+            );
+        }
+
+        @Override
+        protected String name() {
+            return "ReadStateSummaryHandler";
+        }
+
+        @Override
+        protected AbstractRequest.Builder<ReadShareGroupStateSummaryRequest> 
requestBuilder() {
+            throw new RuntimeException("Read Summary requests are batchable, 
hence individual requests not needed.");
+        }
+
+        @Override
+        protected boolean isResponseForRequest(ClientResponse response) {
+            return response.requestHeader().apiKey() == 
ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY;
+        }
+
+        @Override
+        protected void handleRequestResponse(ClientResponse response) {
+            log.debug("Read state summary response received - {}", response);
+            readStateSummaryBackoff.incrementAttempt();
+
+            ReadShareGroupStateSummaryResponse combinedResponse = 
(ReadShareGroupStateSummaryResponse) response.responseBody();
+            for (ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult 
readStateSummaryResult : combinedResponse.data().results()) {
+                if 
(readStateSummaryResult.topicId().equals(partitionKey().topicId())) {
+                    
Optional<ReadShareGroupStateSummaryResponseData.PartitionResult> 
partitionStateData =
+                        
readStateSummaryResult.partitions().stream().filter(partitionResult -> 
partitionResult.partition() == partitionKey().partition())
+                            .findFirst();
+
+                    if (partitionStateData.isPresent()) {
+                        Errors error = 
Errors.forCode(partitionStateData.get().errorCode());
+                        switch (error) {
+                            case NONE:
+                                readStateSummaryBackoff.resetAttempts();
+                                
ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult result = 
ReadShareGroupStateSummaryResponse.toResponseReadStateSummaryResult(
+                                    partitionKey().topicId(),
+                                    
Collections.singletonList(partitionStateData.get())
+                                );
+                                this.result.complete(new 
ReadShareGroupStateSummaryResponse(new ReadShareGroupStateSummaryResponseData()
+                                    
.setResults(Collections.singletonList(result))));
+                                return;
+
+                            // check retriable errors
+                            case COORDINATOR_NOT_AVAILABLE:
+                            case COORDINATOR_LOAD_IN_PROGRESS:
+                            case NOT_COORDINATOR:
+                                log.warn("Received retriable error in read 
state summary RPC for key {}: {}", partitionKey(), error.message());
+                                if (!readStateSummaryBackoff.canAttempt()) {
+                                    log.error("Exhausted max retries for read 
state summary RPC for key {} without success.", partitionKey());
+                                    readStateSummaryErrorReponse(error, new 
Exception("Exhausted max retries to complete read state summary RPC without 
success."));
+                                    return;
+                                }
+                                super.resetCoordinatorNode();
+                                timer.add(new 
PersisterTimerTask(readStateSummaryBackoff.backOff(), this));
+                                return;
+
+                            default:
+                                log.error("Unable to perform read state 
summary RPC for key {}: {}", partitionKey(), error.message());
+                                readStateSummaryErrorReponse(error, null);
+                                return;
+                        }
+                    }
+                }
+            }
+
+            // no response found specific topic partition
+            IllegalStateException exception = new IllegalStateException(
+                "Failed to read state summary for share partition " + 
partitionKey()
+            );
+            readStateSummaryErrorReponse(Errors.forException(exception), 
exception);
+        }
+
+        protected void readStateSummaryErrorReponse(Errors error, Exception 
exception) {
+            this.result.complete(new ReadShareGroupStateSummaryResponse(
+                
ReadShareGroupStateSummaryResponse.toErrorResponseData(partitionKey().topicId(),
 partitionKey().partition(), error, "Error in read state summary RPC. " +
+                    (exception == null ? error.message() : 
exception.getMessage()))));
+        }
+
+        @Override
+        protected void findCoordinatorErrorResponse(Errors error, Exception 
exception) {
+            this.result.complete(new ReadShareGroupStateSummaryResponse(
+                
ReadShareGroupStateSummaryResponse.toErrorResponseData(partitionKey().topicId(),
 partitionKey().partition(), error, "Error in find coordinator. " +
+                    (exception == null ? error.message() : 
exception.getMessage()))));
+        }

Review Comment:
   Why do we need method `findCoordinatorErrorResponse` in 
ReadStateSummaryHandler?



##########
clients/src/main/java/org/apache/kafka/common/requests/ReadShareGroupStateSummaryRequest.java:
##########
@@ -34,7 +35,7 @@ public static class Builder extends 
AbstractRequest.Builder<ReadShareGroupStateS
         private final ReadShareGroupStateSummaryRequestData data;
 
         public Builder(ReadShareGroupStateSummaryRequestData data) {
-            this(data, false);
+            this(data, true);

Review Comment:
   Ok, if it's happening at other places but you can use other constructor as 
well where you can explicilty pass `true` for `enableUnstableLastVersion` while 
creating the request builder. Anyways the `enableUnstableLastVersion` for 
Persister won't be for long hence I leave it on you. But if we default to true 
then is there even any point of making these RPCs unstable in json?



##########
clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsSpec.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Specification of share group offsets to list using {@link 
Admin#listShareGroupOffsets(Map, ListShareGroupOffsetsOptions)}.
+ * <p>
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class ListShareGroupOffsetsSpec {
+
+    private Collection<TopicPartition> topicPartitions;
+
+    public ListShareGroupOffsetsSpec() {
+    }

Review Comment:
   Should we just remove this default constructor, it's adding no value?



##########
server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java:
##########
@@ -730,6 +734,147 @@ protected RPCType rpcType() {
         }
     }
 
+    public class ReadStateSummaryHandler extends PersisterStateManagerHandler {
+        private final int leaderEpoch;
+        private final CompletableFuture<ReadShareGroupStateSummaryResponse> 
result;
+        private final BackoffManager readStateSummaryBackoff;
+
+        public ReadStateSummaryHandler(
+            String groupId,
+            Uuid topicId,
+            int partition,
+            int leaderEpoch,
+            CompletableFuture<ReadShareGroupStateSummaryResponse> result,
+            long backoffMs,
+            long backoffMaxMs,
+            int maxRPCRetryAttempts,
+            Consumer<ClientResponse> onCompleteCallback
+        ) {
+            super(groupId, topicId, partition, backoffMs, backoffMaxMs, 
maxRPCRetryAttempts);
+            this.leaderEpoch = leaderEpoch;
+            this.result = result;
+            this.readStateSummaryBackoff = new 
BackoffManager(maxRPCRetryAttempts, backoffMs, backoffMaxMs);
+        }
+
+        public ReadStateSummaryHandler(
+            String groupId,
+            Uuid topicId,
+            int partition,
+            int leaderEpoch,
+            CompletableFuture<ReadShareGroupStateSummaryResponse> result,
+            Consumer<ClientResponse> onCompleteCallback
+        ) {
+            this(
+                groupId,
+                topicId,
+                partition,
+                leaderEpoch,
+                result,
+                REQUEST_BACKOFF_MS,
+                REQUEST_BACKOFF_MAX_MS,
+                MAX_FIND_COORD_ATTEMPTS,
+                onCompleteCallback
+            );
+        }
+
+        @Override
+        protected String name() {
+            return "ReadStateSummaryHandler";
+        }
+
+        @Override
+        protected AbstractRequest.Builder<ReadShareGroupStateSummaryRequest> 
requestBuilder() {
+            throw new RuntimeException("Read Summary requests are batchable, 
hence individual requests not needed.");
+        }
+
+        @Override
+        protected boolean isResponseForRequest(ClientResponse response) {
+            return response.requestHeader().apiKey() == 
ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY;
+        }
+
+        @Override
+        protected void handleRequestResponse(ClientResponse response) {
+            log.debug("Read state summary response received - {}", response);
+            readStateSummaryBackoff.incrementAttempt();
+
+            ReadShareGroupStateSummaryResponse combinedResponse = 
(ReadShareGroupStateSummaryResponse) response.responseBody();
+            for (ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult 
readStateSummaryResult : combinedResponse.data().results()) {
+                if 
(readStateSummaryResult.topicId().equals(partitionKey().topicId())) {
+                    
Optional<ReadShareGroupStateSummaryResponseData.PartitionResult> 
partitionStateData =
+                        
readStateSummaryResult.partitions().stream().filter(partitionResult -> 
partitionResult.partition() == partitionKey().partition())
+                            .findFirst();
+
+                    if (partitionStateData.isPresent()) {
+                        Errors error = 
Errors.forCode(partitionStateData.get().errorCode());
+                        switch (error) {
+                            case NONE:
+                                readStateSummaryBackoff.resetAttempts();
+                                
ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult result = 
ReadShareGroupStateSummaryResponse.toResponseReadStateSummaryResult(
+                                    partitionKey().topicId(),
+                                    
Collections.singletonList(partitionStateData.get())
+                                );
+                                this.result.complete(new 
ReadShareGroupStateSummaryResponse(new ReadShareGroupStateSummaryResponseData()
+                                    
.setResults(Collections.singletonList(result))));
+                                return;
+
+                            // check retriable errors
+                            case COORDINATOR_NOT_AVAILABLE:
+                            case COORDINATOR_LOAD_IN_PROGRESS:
+                            case NOT_COORDINATOR:
+                                log.warn("Received retriable error in read 
state summary RPC for key {}: {}", partitionKey(), error.message());
+                                if (!readStateSummaryBackoff.canAttempt()) {
+                                    log.error("Exhausted max retries for read 
state summary RPC for key {} without success.", partitionKey());
+                                    readStateSummaryErrorReponse(error, new 
Exception("Exhausted max retries to complete read state summary RPC without 
success."));
+                                    return;
+                                }
+                                super.resetCoordinatorNode();
+                                timer.add(new 
PersisterTimerTask(readStateSummaryBackoff.backOff(), this));
+                                return;
+
+                            default:
+                                log.error("Unable to perform read state 
summary RPC for key {}: {}", partitionKey(), error.message());
+                                readStateSummaryErrorReponse(error, null);
+                                return;
+                        }
+                    }
+                }
+            }
+
+            // no response found specific topic partition
+            IllegalStateException exception = new IllegalStateException(
+                "Failed to read state summary for share partition " + 
partitionKey()
+            );
+            readStateSummaryErrorReponse(Errors.forException(exception), 
exception);

Review Comment:
   You can just pass `exception` and can convert to Error in the method itself. 
Don't see any value to pass both explicitly.



##########
server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java:
##########
@@ -969,5 +1116,27 @@ private static AbstractRequest.Builder<? extends 
AbstractRequest> coalesceReads(
                         .setPartitions(entry.getValue()))
                     .collect(Collectors.toList())));
         }
+
+        private static AbstractRequest.Builder<? extends AbstractRequest> 
coalesceReadSummarys(String groupId, List<? extends 
PersisterStateManagerHandler> handlers) {
+            Map<Uuid, 
List<ReadShareGroupStateSummaryRequestData.PartitionData>> partitionData = new 
HashMap<>();
+            handlers.forEach(persHandler -> {

Review Comment:
   What do we mean by `pers` here?



##########
clients/src/main/java/org/apache/kafka/common/requests/ReadShareGroupStateSummaryRequest.java:
##########
@@ -83,8 +84,24 @@ public ReadShareGroupStateSummaryRequestData data() {
 
     public static ReadShareGroupStateSummaryRequest parse(ByteBuffer buffer, 
short version) {
         return new ReadShareGroupStateSummaryRequest(
-                new ReadShareGroupStateSummaryRequestData(new 
ByteBufferAccessor(buffer), version),
-                version
+            new ReadShareGroupStateSummaryRequestData(new 
ByteBufferAccessor(buffer), version),
+            version
         );
     }
+
+    public static 
List<ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult> 
getErrorList(
+        List<Uuid> topicIds,
+        Errors error
+    ) {
+        return topicIds.stream()
+            .map(topicId ->
+                new 
ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult()
+                    .setTopicId(topicId)
+                    .setPartitions(List.of(
+                            new 
ReadShareGroupStateSummaryResponseData.PartitionResult()
+                                .setErrorCode(error.code())
+                                .setErrorMessage(error.message())

Review Comment:
   This will give default Error message and original exception message will be 
lost, do you want that behaviour? If yes, then please write a comment here else 
please fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to