AndrewJSchofield commented on code in PR #22241:
URL: https://github.com/apache/kafka/pull/22241#discussion_r3217925882


##########
server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java:
##########
@@ -105,21 +108,73 @@ public void stop() throws Exception {
      * @return A future completing normally on successful DLQ, exceptionally 
otherwise.
      */
     public CompletableFuture<Void> dlq(ShareGroupDLQRecordParameter param) {
-        ProduceRequestHandler requestHandler = new 
ProduceRequestHandler(param);
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        ProduceRequestHandler requestHandler = new 
ProduceRequestHandler(param, future);
         sender.enqueue(requestHandler);
-        return requestHandler.result().thenAccept(response -> {
-        });
+        return future;
     }
 
     private abstract class ShareGroupDLQStateManagerHandler implements 
RequestCompletionHandler {
+        private final ShareGroupDLQRecordParameter param;
+
+        ShareGroupDLQStateManagerHandler(ShareGroupDLQRecordParameter param) {
+            this.param = param;
+        }
+
         protected abstract AbstractRequest.Builder<? extends AbstractRequest> 
requestBuilder();
 
         protected abstract CompletableFuture<? extends AbstractResponse> 
result();
+
+        protected abstract String name();
+
+        protected abstract void createTopicErrorResponse(Exception exception);
+
+        protected AbstractRequest.Builder<CreateTopicsRequest> 
createTopicBuilder() {
+            return new CreateTopicsRequest.Builder(new 
CreateTopicsRequestData());
+        }
+
+        public Optional<Throwable> validateDlqTopic() {
+            Optional<String> topicNameOpt = 
cacheHelper.shareGroupDlqTopic(param.groupId());
+            Optional<String> topicPrefix = 
cacheHelper.shareGroupDlqTopicPrefix();
+
+            if (topicNameOpt.isEmpty()) {
+                return Optional.of(new ConfigException("Configured DLQ topic 
name in share group " + param.groupId() + " is empty."));
+            } else if (topicNameOpt.get().indexOf("__") == 0) {
+                return Optional.of(new ConfigException("Configured DLQ topic 
name in share group " + param.groupId() + " is invalid: " + 
topicNameOpt.get()));
+            }
+
+            String topicName = topicNameOpt.get();
+
+            if (cacheHelper.containsTopic(topicName) && 
!cacheHelper.isDlqEnabledOnTopic(topicName)) {
+                return Optional.of(new ConfigException("DLQ is not enabled on 
configured DLQ topic for share group " + param.groupId() + " topic: ." + 
topicName));
+            }
+
+            return topicPrefix.map(prefix -> {
+                if (!prefix.isEmpty() && topicName.indexOf(prefix) != 0) {

Review Comment:
   Ditto.



##########
server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java:
##########
@@ -105,21 +108,73 @@ public void stop() throws Exception {
      * @return A future completing normally on successful DLQ, exceptionally 
otherwise.
      */
     public CompletableFuture<Void> dlq(ShareGroupDLQRecordParameter param) {
-        ProduceRequestHandler requestHandler = new 
ProduceRequestHandler(param);
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        ProduceRequestHandler requestHandler = new 
ProduceRequestHandler(param, future);
         sender.enqueue(requestHandler);
-        return requestHandler.result().thenAccept(response -> {
-        });
+        return future;
     }
 
     private abstract class ShareGroupDLQStateManagerHandler implements 
RequestCompletionHandler {
+        private final ShareGroupDLQRecordParameter param;
+
+        ShareGroupDLQStateManagerHandler(ShareGroupDLQRecordParameter param) {
+            this.param = param;
+        }
+
         protected abstract AbstractRequest.Builder<? extends AbstractRequest> 
requestBuilder();
 
         protected abstract CompletableFuture<? extends AbstractResponse> 
result();
+
+        protected abstract String name();
+
+        protected abstract void createTopicErrorResponse(Exception exception);
+
+        protected AbstractRequest.Builder<CreateTopicsRequest> 
createTopicBuilder() {
+            return new CreateTopicsRequest.Builder(new 
CreateTopicsRequestData());
+        }
+
+        public Optional<Throwable> validateDlqTopic() {
+            Optional<String> topicNameOpt = 
cacheHelper.shareGroupDlqTopic(param.groupId());
+            Optional<String> topicPrefix = 
cacheHelper.shareGroupDlqTopicPrefix();
+
+            if (topicNameOpt.isEmpty()) {
+                return Optional.of(new ConfigException("Configured DLQ topic 
name in share group " + param.groupId() + " is empty."));
+            } else if (topicNameOpt.get().indexOf("__") == 0) {

Review Comment:
   I suggest `String.startsWith(String)` instead of `String.indexOf(String)`.



##########
server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java:
##########
@@ -149,12 +214,44 @@ private static class SendThread extends 
InterBrokerSendThread {
 
         @Override
         public Collection<RequestAndCompletionHandler> generateRequests() {
+            if (!queue.isEmpty()) {
+                ShareGroupDLQStateManager.ShareGroupDLQStateManagerHandler 
handler = queue.poll();
+                if (!handler.dlqTopicExists()) {
+                    Node randomNode = randomNode();
+                    if (randomNode == Node.noNode()) {
+                        log.error("Unable to find node to use for coordinator 
lookup.");

Review Comment:
   This isn't really looking up a coordinator as I understand it. It's trying 
to find a random node to auto-create the DLQ topic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to