chia7712 commented on code in PR #20998:
URL: https://github.com/apache/kafka/pull/20998#discussion_r2576300692


##########
server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java:
##########
@@ -411,22 +402,17 @@ Optional<Errors> checkResponseError(ClientResponse 
response, BiConsumer<Errors,
             if (response.authenticationException() != null) {
                 log.error("Authentication exception", 
response.authenticationException());
                 Errors error = 
Errors.forException(response.authenticationException());
-                errorConsumer.accept(error, new 
AuthenticationException(String.format("Server response for %s indicates 
authentication exception.", this.partitionKey)));
                 return Optional.of(error);
             } else if (response.versionMismatch() != null) {
                 log.error("Version mismatch exception", 
response.versionMismatch());
                 Errors error = Errors.forException(response.versionMismatch());
-                errorConsumer.accept(error, new 
UnsupportedVersionException(String.format("Server response for %s indicates 
version mismatch.", this.partitionKey)));
                 return Optional.of(error);
-            } else if (response.wasDisconnected()) {
-                errorConsumer.accept(Errors.NETWORK_EXCEPTION, new 
NetworkException(String.format("Server response for %s indicates disconnect.", 
this.partitionKey)));
+            } else if (response.wasDisconnected()) {    // Retriable
                 return Optional.of(Errors.NETWORK_EXCEPTION);
-            } else if (response.wasTimedOut()) {
-                log.error("Response for RPC {} with key {} timed out - {}.", 
name(), this.partitionKey, response);
-                errorConsumer.accept(Errors.REQUEST_TIMED_OUT, new 
NetworkException(String.format("Server response for %s indicates timeout.", 
this.partitionKey)));
+            } else if (response.wasTimedOut()) {    // Retriable
+                log.debug("Response for RPC {} with key {} timed out - {}.", 
name(), this.partitionKey, response);

Review Comment:
   This debug message appears to be redundant, as the information is logged 
later in the downstream handler



##########
server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java:
##########
@@ -1184,65 +1279,87 @@ protected boolean isResponseForRequest(ClientResponse 
response) {
         protected void handleRequestResponse(ClientResponse response) {
             log.debug("Delete state response received - {}", response);
             deleteStateBackoff.incrementAttempt();
+            Errors clientResponseError = 
checkResponseError(response).orElse(Errors.NONE);
+            String clientResponseErrorMessage = clientResponseError.message();
 
-            // response can be a combined one for large number of requests
-            // we need to deconstruct it
-            DeleteShareGroupStateResponse combinedResponse = 
(DeleteShareGroupStateResponse) response.responseBody();
-
-            for (DeleteShareGroupStateResponseData.DeleteStateResult 
deleteStateResult : combinedResponse.data().results()) {
-                if 
(deleteStateResult.topicId().equals(partitionKey().topicId())) {
-                    
Optional<DeleteShareGroupStateResponseData.PartitionResult> partitionStateData =
-                        deleteStateResult.partitions().stream()
-                            .filter(partitionResult -> 
partitionResult.partition() == partitionKey().partition())
-                            .findFirst();
-
-                    if (partitionStateData.isPresent()) {
-                        Errors error = 
Errors.forCode(partitionStateData.get().errorCode());
-                        String errorMessage = 
partitionStateData.get().errorMessage();
-                        if (errorMessage == null || errorMessage.isEmpty()) {
-                            errorMessage = error.message();
-                        }
+            switch (clientResponseError) {
+                case NONE:
+                    // response can be a combined one for large number of 
requests
+                    // we need to deconstruct it
+                    DeleteShareGroupStateResponse combinedResponse = 
(DeleteShareGroupStateResponse) response.responseBody();
+
+                    for (DeleteShareGroupStateResponseData.DeleteStateResult 
deleteStateResult : combinedResponse.data().results()) {
+                        if 
(deleteStateResult.topicId().equals(partitionKey().topicId())) {
+                            
Optional<DeleteShareGroupStateResponseData.PartitionResult> partitionStateData =
+                                deleteStateResult.partitions().stream()
+                                    .filter(partitionResult -> 
partitionResult.partition() == partitionKey().partition())
+                                    .findFirst();
+
+                            if (partitionStateData.isPresent()) {
+                                Errors error = 
Errors.forCode(partitionStateData.get().errorCode());
+                                String errorMessage = 
partitionStateData.get().errorMessage();
+                                if (errorMessage == null || 
errorMessage.isEmpty()) {
+                                    errorMessage = error.message();
+                                }
 
-                        switch (error) {
-                            case NONE:
-                                deleteStateBackoff.resetAttempts();
-                                
DeleteShareGroupStateResponseData.DeleteStateResult result = 
DeleteShareGroupStateResponse.toResponseDeleteStateResult(
-                                    partitionKey().topicId(),
-                                    List.of(partitionStateData.get())
-                                );
-                                this.result.complete(new 
DeleteShareGroupStateResponse(
-                                    new 
DeleteShareGroupStateResponseData().setResults(List.of(result))));
-                                return;
-
-                            // check retriable errors
-                            case COORDINATOR_NOT_AVAILABLE:
-                            case COORDINATOR_LOAD_IN_PROGRESS:
-                            case NOT_COORDINATOR:
-                            case UNKNOWN_TOPIC_OR_PARTITION:
-                                log.debug("Received retriable error in delete 
state RPC for key {}: {}", partitionKey(), errorMessage);
-                                if (!deleteStateBackoff.canAttempt()) {
-                                    log.error("Exhausted max retries for 
delete state RPC for key {} without success.", partitionKey());
-                                    requestErrorResponse(error, new 
Exception("Exhausted max retries to complete delete state RPC without 
success."));
-                                    return;
+                                switch (error) {
+                                    case NONE:
+                                        deleteStateBackoff.resetAttempts();
+                                        
DeleteShareGroupStateResponseData.DeleteStateResult result = 
DeleteShareGroupStateResponse.toResponseDeleteStateResult(
+                                            partitionKey().topicId(),
+                                            List.of(partitionStateData.get())
+                                        );
+                                        this.result.complete(new 
DeleteShareGroupStateResponse(
+                                            new 
DeleteShareGroupStateResponseData().setResults(List.of(result))));
+                                        return;
+
+                                    // check retriable errors
+                                    case COORDINATOR_NOT_AVAILABLE:
+                                    case COORDINATOR_LOAD_IN_PROGRESS:
+                                    case NOT_COORDINATOR:
+                                    case UNKNOWN_TOPIC_OR_PARTITION:
+                                        log.debug("Received retriable error in 
delete state RPC for key {}: {}", partitionKey(), errorMessage);
+                                        if (!deleteStateBackoff.canAttempt()) {
+                                            log.error("Exhausted max retries 
for delete state RPC for key {} without success.", partitionKey());
+                                            requestErrorResponse(error, new 
Exception("Exhausted max retries to complete delete state RPC without 
success."));
+                                            return;
+                                        }
+                                        super.resetCoordinatorNode();
+                                        timer.add(new 
PersisterTimerTask(deleteStateBackoff.backOff(), this));
+                                        return;
+
+                                    default:
+                                        log.error("Unable to perform delete 
state RPC for key {}: {}", partitionKey(), errorMessage);
+                                        requestErrorResponse(error, new 
Exception(errorMessage));
+                                        return;
                                 }
-                                super.resetCoordinatorNode();
-                                timer.add(new 
PersisterTimerTask(deleteStateBackoff.backOff(), this));
-                                return;
-
-                            default:
-                                log.error("Unable to perform delete state RPC 
for key {}: {}", partitionKey(), errorMessage);
-                                requestErrorResponse(error, new 
Exception(errorMessage));
-                                return;
+                            }
                         }
                     }
-                }
-            }
 
-            // no response found specific topic partition
-            IllegalStateException exception = new IllegalStateException(
-                "Failed to delete state for share partition: " + partitionKey()
-            );
-            requestErrorResponse(Errors.forException(exception), exception);
+                    // no response found specific topic partition
+                    IllegalStateException exception = new 
IllegalStateException(
+                        "Failed to delete state for share partition: " + 
partitionKey()
+                    );
+                    requestErrorResponse(Errors.forException(exception), 
exception);
+                    return;
+
+                case NETWORK_EXCEPTION: // Retriable client response error 
codes.
+                case REQUEST_TIMED_OUT:
+                    log.debug("Received retriable error in delete state RPC 
client response for key {}: {}", partitionKey(), clientResponseErrorMessage);

Review Comment:
   Have you considered using the 
`NetworkPartitionMetadataClient#maybeHandleErrorResponse` pattern to avoid 
genrating the 'Received retriable error ... Exhausted max retries' log
   
   
https://github.com/apache/kafka/blob/7cea0595f7ce8cdd7458ede053557cedd1f7ec95/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/NetworkPartitionMetadataClient.java#L286



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to