sjhajharia commented on code in PR #18571:
URL: https://github.com/apache/kafka/pull/18571#discussion_r1919613483


##########
server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java:
##########
@@ -730,6 +734,147 @@ protected RPCType rpcType() {
         }
     }
 
+    public class ReadStateSummaryHandler extends PersisterStateManagerHandler {
+        private final int leaderEpoch;
+        private final CompletableFuture<ReadShareGroupStateSummaryResponse> 
result;
+        private final BackoffManager readStateSummaryBackoff;
+
+        public ReadStateSummaryHandler(
+            String groupId,
+            Uuid topicId,
+            int partition,
+            int leaderEpoch,
+            CompletableFuture<ReadShareGroupStateSummaryResponse> result,
+            long backoffMs,
+            long backoffMaxMs,
+            int maxRPCRetryAttempts,
+            Consumer<ClientResponse> onCompleteCallback
+        ) {
+            super(groupId, topicId, partition, backoffMs, backoffMaxMs, 
maxRPCRetryAttempts);
+            this.leaderEpoch = leaderEpoch;
+            this.result = result;
+            this.readStateSummaryBackoff = new 
BackoffManager(maxRPCRetryAttempts, backoffMs, backoffMaxMs);
+        }
+
+        public ReadStateSummaryHandler(
+            String groupId,
+            Uuid topicId,
+            int partition,
+            int leaderEpoch,
+            CompletableFuture<ReadShareGroupStateSummaryResponse> result,
+            Consumer<ClientResponse> onCompleteCallback
+        ) {
+            this(
+                groupId,
+                topicId,
+                partition,
+                leaderEpoch,
+                result,
+                REQUEST_BACKOFF_MS,
+                REQUEST_BACKOFF_MAX_MS,
+                MAX_FIND_COORD_ATTEMPTS,
+                onCompleteCallback
+            );
+        }
+
+        @Override
+        protected String name() {
+            return "ReadStateSummaryHandler";
+        }
+
+        @Override
+        protected AbstractRequest.Builder<ReadShareGroupStateSummaryRequest> 
requestBuilder() {
+            throw new RuntimeException("Read Summary requests are batchable, 
hence individual requests not needed.");
+        }
+
+        @Override
+        protected boolean isResponseForRequest(ClientResponse response) {
+            return response.requestHeader().apiKey() == 
ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY;
+        }
+
+        @Override
+        protected void handleRequestResponse(ClientResponse response) {
+            log.debug("Read state summary response received - {}", response);
+            readStateSummaryBackoff.incrementAttempt();
+
+            ReadShareGroupStateSummaryResponse combinedResponse = 
(ReadShareGroupStateSummaryResponse) response.responseBody();
+            for (ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult 
readStateSummaryResult : combinedResponse.data().results()) {
+                if 
(readStateSummaryResult.topicId().equals(partitionKey().topicId())) {
+                    
Optional<ReadShareGroupStateSummaryResponseData.PartitionResult> 
partitionStateData =
+                        
readStateSummaryResult.partitions().stream().filter(partitionResult -> 
partitionResult.partition() == partitionKey().partition())
+                            .findFirst();
+
+                    if (partitionStateData.isPresent()) {
+                        Errors error = 
Errors.forCode(partitionStateData.get().errorCode());
+                        switch (error) {
+                            case NONE:
+                                readStateSummaryBackoff.resetAttempts();
+                                
ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult result = 
ReadShareGroupStateSummaryResponse.toResponseReadStateSummaryResult(
+                                    partitionKey().topicId(),
+                                    
Collections.singletonList(partitionStateData.get())
+                                );
+                                this.result.complete(new 
ReadShareGroupStateSummaryResponse(new ReadShareGroupStateSummaryResponseData()
+                                    
.setResults(Collections.singletonList(result))));
+                                return;
+
+                            // check retriable errors
+                            case COORDINATOR_NOT_AVAILABLE:
+                            case COORDINATOR_LOAD_IN_PROGRESS:
+                            case NOT_COORDINATOR:
+                                log.warn("Received retriable error in read 
state summary RPC for key {}: {}", partitionKey(), error.message());
+                                if (!readStateSummaryBackoff.canAttempt()) {
+                                    log.error("Exhausted max retries for read 
state summary RPC for key {} without success.", partitionKey());
+                                    readStateSummaryErrorReponse(error, new 
Exception("Exhausted max retries to complete read state summary RPC without 
success."));
+                                    return;
+                                }
+                                super.resetCoordinatorNode();
+                                timer.add(new 
PersisterTimerTask(readStateSummaryBackoff.backOff(), this));
+                                return;
+
+                            default:
+                                log.error("Unable to perform read state 
summary RPC for key {}: {}", partitionKey(), error.message());
+                                readStateSummaryErrorReponse(error, null);
+                                return;
+                        }
+                    }
+                }
+            }
+
+            // no response found specific topic partition
+            IllegalStateException exception = new IllegalStateException(
+                "Failed to read state summary for share partition " + 
partitionKey()
+            );
+            readStateSummaryErrorReponse(Errors.forException(exception), 
exception);

Review Comment:
   We already have a `readStateSummaryErrorReponse()` method which accepts two 
parameters. I kept it like this to prevent the addition of another overloaded 
fucntion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to