AndrewJSchofield commented on code in PR #22284:
URL: https://github.com/apache/kafka/pull/22284#discussion_r3242387579


##########
server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java:
##########
@@ -165,53 +236,104 @@ public Optional<Throwable> validateDlqTopic() {
             });
         }
 
-        public ShareGroupDLQRecordParameter recordParam() {
-            return param;
-        }
-
         public boolean dlqTopicExists() {
             Optional<String> shareGroupDlqTopic = 
cacheHelper.shareGroupDlqTopic(param.groupId());
             return 
shareGroupDlqTopic.filter(cacheHelper::containsTopic).isPresent();
         }
-    }
 
-    private class ProduceRequestHandler extends 
ShareGroupDLQStateManagerHandler {
-        private final CompletableFuture<Void> result;
-        private static final Logger LOG = 
LoggerFactory.getLogger(ShareGroupDLQStateManager.ProduceRequestHandler.class);
-
-        public ProduceRequestHandler(ShareGroupDLQRecordParameter param, 
CompletableFuture<Void> result) {
-            super(param);
-            this.result = result;
+        public String dlqTopicName() {

Review Comment:
   Might it be a good idea to return `Optional<String>` here so that you don't 
inadvertently end up auto-creating a topic called `"UNKNOWN"` due to some code 
quirk?



##########
server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java:
##########
@@ -165,53 +236,104 @@ public Optional<Throwable> validateDlqTopic() {
             });
         }
 
-        public ShareGroupDLQRecordParameter recordParam() {
-            return param;
-        }
-
         public boolean dlqTopicExists() {
             Optional<String> shareGroupDlqTopic = 
cacheHelper.shareGroupDlqTopic(param.groupId());
             return 
shareGroupDlqTopic.filter(cacheHelper::containsTopic).isPresent();
         }
-    }
 
-    private class ProduceRequestHandler extends 
ShareGroupDLQStateManagerHandler {
-        private final CompletableFuture<Void> result;
-        private static final Logger LOG = 
LoggerFactory.getLogger(ShareGroupDLQStateManager.ProduceRequestHandler.class);
-
-        public ProduceRequestHandler(ShareGroupDLQRecordParameter param, 
CompletableFuture<Void> result) {
-            super(param);
-            this.result = result;
+        public String dlqTopicName() {
+            return 
cacheHelper.shareGroupDlqTopic(param.groupId()).orElse("UNKNOWN");
         }
 
-        @Override
-        protected AbstractRequest.Builder<? extends AbstractRequest> 
requestBuilder() {
-            return null;
-        }
-
-        @Override
-        protected CompletableFuture<? extends AbstractResponse> result() {
-            return CompletableFuture.completedFuture(null);
-        }
+        // Visibility for testing
+        Optional<Errors> checkResponseError(ClientResponse response) {
+            if (response.hasResponse()) {
+                return Optional.empty();
+            }
 
-        @Override
-        protected String name() {
-            return "ProduceRequestHandler";
+            LOG.debug("Response for RPC {} with DLQ topic {} is invalid - {}", 
name(), dlqTopicName(), response);
+
+            if (response.authenticationException() != null) {
+                LOG.error("Authentication exception", 
response.authenticationException());
+                Errors error = 
Errors.forException(response.authenticationException());
+                return Optional.of(error);
+            } else if (response.versionMismatch() != null) {
+                LOG.error("Version mismatch exception", 
response.versionMismatch());
+                Errors error = Errors.forException(response.versionMismatch());
+                return Optional.of(error);
+            } else if (response.wasDisconnected()) {    // Retriable
+                return Optional.of(Errors.NETWORK_EXCEPTION);
+            } else if (response.wasTimedOut()) {    // Retriable
+                LOG.debug("Response for RPC {} with DLQ topic {} timed out - 
{}.", name(), dlqTopicName(), response);
+                return Optional.of(Errors.REQUEST_TIMED_OUT);
+            } else {
+                return Optional.of(Errors.UNKNOWN_SERVER_ERROR);
+            }
         }
 
-        @Override
-        protected void createTopicErrorResponse(Exception exception) {
-            this.result.completeExceptionally(exception);
-        }
+        private void handleCreateTopicsResponse(ClientResponse response) {
+            LOG.debug("Received CreateTopicsResponse {}", response);
+            createTopicsBackoff.incrementAttempt();
+            Errors clientResponseError = 
checkResponseError(response).orElse(Errors.NONE);
+            String clientResponseErrorMessage = clientResponseError.message();
+            String dlqTopicName = dlqTopicName();
+
+            switch (clientResponseError) {
+                case NONE:
+                    // Topic has been created
+                    CreateTopicsResponse createTopicsResponse = 
((CreateTopicsResponse) response.responseBody());
+                    Optional<CreateTopicsResponseData.CreatableTopicResult> 
topicResultOpt = createTopicsResponse.data().topics().stream().findFirst();
+                    if (topicResultOpt.isEmpty() || 
!topicResultOpt.get().name().equals(dlqTopicName)) {
+                        LOG.error("DLQ topic not found in create topic 
response {}.", dlqTopicName);
+                        
requestErrorResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION.exception());

Review Comment:
   This would be very strange. It would mean that you said to create one topic 
with a particular name and the response was not empty but the topic name did 
not match. `INTERNAL_SERVER_ERROR`?



##########
server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java:
##########
@@ -165,53 +236,104 @@ public Optional<Throwable> validateDlqTopic() {
             });
         }
 
-        public ShareGroupDLQRecordParameter recordParam() {
-            return param;
-        }
-
         public boolean dlqTopicExists() {
             Optional<String> shareGroupDlqTopic = 
cacheHelper.shareGroupDlqTopic(param.groupId());
             return 
shareGroupDlqTopic.filter(cacheHelper::containsTopic).isPresent();
         }
-    }
 
-    private class ProduceRequestHandler extends 
ShareGroupDLQStateManagerHandler {
-        private final CompletableFuture<Void> result;
-        private static final Logger LOG = 
LoggerFactory.getLogger(ShareGroupDLQStateManager.ProduceRequestHandler.class);
-
-        public ProduceRequestHandler(ShareGroupDLQRecordParameter param, 
CompletableFuture<Void> result) {
-            super(param);
-            this.result = result;
+        public String dlqTopicName() {
+            return 
cacheHelper.shareGroupDlqTopic(param.groupId()).orElse("UNKNOWN");
         }
 
-        @Override
-        protected AbstractRequest.Builder<? extends AbstractRequest> 
requestBuilder() {
-            return null;
-        }
-
-        @Override
-        protected CompletableFuture<? extends AbstractResponse> result() {
-            return CompletableFuture.completedFuture(null);
-        }
+        // Visibility for testing
+        Optional<Errors> checkResponseError(ClientResponse response) {
+            if (response.hasResponse()) {
+                return Optional.empty();
+            }
 
-        @Override
-        protected String name() {
-            return "ProduceRequestHandler";
+            LOG.debug("Response for RPC {} with DLQ topic {} is invalid - {}", 
name(), dlqTopicName(), response);
+
+            if (response.authenticationException() != null) {
+                LOG.error("Authentication exception", 
response.authenticationException());
+                Errors error = 
Errors.forException(response.authenticationException());
+                return Optional.of(error);
+            } else if (response.versionMismatch() != null) {
+                LOG.error("Version mismatch exception", 
response.versionMismatch());
+                Errors error = Errors.forException(response.versionMismatch());
+                return Optional.of(error);
+            } else if (response.wasDisconnected()) {    // Retriable
+                return Optional.of(Errors.NETWORK_EXCEPTION);
+            } else if (response.wasTimedOut()) {    // Retriable
+                LOG.debug("Response for RPC {} with DLQ topic {} timed out - 
{}.", name(), dlqTopicName(), response);
+                return Optional.of(Errors.REQUEST_TIMED_OUT);
+            } else {
+                return Optional.of(Errors.UNKNOWN_SERVER_ERROR);
+            }
         }
 
-        @Override
-        protected void createTopicErrorResponse(Exception exception) {
-            this.result.completeExceptionally(exception);
-        }
+        private void handleCreateTopicsResponse(ClientResponse response) {
+            LOG.debug("Received CreateTopicsResponse {}", response);
+            createTopicsBackoff.incrementAttempt();
+            Errors clientResponseError = 
checkResponseError(response).orElse(Errors.NONE);
+            String clientResponseErrorMessage = clientResponseError.message();
+            String dlqTopicName = dlqTopicName();
+
+            switch (clientResponseError) {
+                case NONE:
+                    // Topic has been created
+                    CreateTopicsResponse createTopicsResponse = 
((CreateTopicsResponse) response.responseBody());
+                    Optional<CreateTopicsResponseData.CreatableTopicResult> 
topicResultOpt = createTopicsResponse.data().topics().stream().findFirst();
+                    if (topicResultOpt.isEmpty() || 
!topicResultOpt.get().name().equals(dlqTopicName)) {
+                        LOG.error("DLQ topic not found in create topic 
response {}.", dlqTopicName);
+                        
requestErrorResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION.exception());
+                        break;
+                    }
 
-        @Override
-        public void onComplete(ClientResponse response) {
+                    CreateTopicsResponseData.CreatableTopicResult topicResult 
= topicResultOpt.get();
+                    Errors error = Errors.forCode(topicResult.errorCode());
+                    String errorMessage = topicResult.errorMessage();
+                    switch (error) {
+                        case NONE:
+                            // Replace with enqueue post PRODUCE implementation
+                            this.result.complete(null);
+                            break;
+
+                        case THROTTLING_QUOTA_EXCEEDED:
+                            LOG.debug("Received retriable error in create DLQ 
topic response for {} using dlq topic {}: {}", name(), dlqTopicName, 
errorMessage);

Review Comment:
   such a nit: "DLQ" and "dlq" in the same line??? 😆 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to