AndrewJSchofield commented on code in PR #22368:
URL: https://github.com/apache/kafka/pull/22368#discussion_r3303648734


##########
core/src/main/java/kafka/server/share/ShareCoordinatorMetadataCacheHelperImpl.java:
##########
@@ -93,8 +95,8 @@ public boolean isDlqEnabledOnTopic(String topic) {
             return false;
         }
         Object isEnabled = 
props.get(TopicConfig.ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_CONFIG);
-        if (isEnabled instanceof Boolean) {
-            return (boolean) isEnabled;
+        if (isEnabled instanceof String) {

Review Comment:
   I'm surprised that this is necessary. This config has a boolean type 
already, so I wonder whether that's being lost somehow on its way to the 
metadata cache. For other similar configs, such as 
`TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG`, the code just does 
`Config.getBoolean`.



##########
core/src/test/java/kafka/server/share/ShareCoordinatorMetadataCacheHelperImplTest.java:
##########
@@ -680,14 +682,14 @@ public void 
testIsDlqEnabledOnTopicReturnsFalseWhenConfigValueNotBoolean() {
             mock(GroupConfigManager.class)
         );
 
-        assertFalse(cache.isDlqEnabledOnTopic("test-topic"));
+        assertTrue(cache.isDlqEnabledOnTopic("test-topic"));
     }
 
     @Test
-    public void testIsDlqEnabledOnTopicReturnsTrue() {
+    public void testIsDlqEnabledOnTopicReturnsFalseValue() {

Review Comment:
   nit: why `FalseValue` instead of `False`?



##########
server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java:
##########
@@ -126,15 +149,38 @@ public CompletableFuture<Void> 
dlq(ShareGroupDLQRecordParameter param) {
         return future;
     }
 
+    // Visibility for tests
+    CompletableFuture<Void> dlq(ShareGroupDLQRecordParameter param, long 
requestBackoffMs, long requestBackoffMaxMs, int maxRequestAttempts) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        ProduceRequestHandler requestHandler = new 
ProduceRequestHandler(param, future, requestBackoffMs, requestBackoffMaxMs, 
maxRequestAttempts);
+        enqueue(requestHandler);
+        return future;
+    }
+
     private void enqueue(ProduceRequestHandler requestHandler) {
         sender.enqueue(requestHandler);
     }
 
+    private void addRequestToNodeMap(Node node, ProduceRequestHandler handler) 
{
+        if (!handler.isBatchable()) {

Review Comment:
   This could do with some explanation I feel.



##########
server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java:
##########
@@ -204,6 +261,66 @@ public AbstractRequest.Builder<CreateTopicsRequest> 
createTopicBuilder() throws
                 .setTopics(topicCollection));
         }
 
+        public AbstractRequest.Builder<? extends AbstractRequest> 
requestBuilder() {
+            throw new RuntimeException("Produce requests are batchable, hence 
individual requests not needed.");
+        }
+
+        public void populateDLQTopicData() throws ConfigException {
+            Optional<String> dlqTopic = 
cacheHelper.shareGroupDlqTopic(param.groupId());
+            if (dlqTopic.isEmpty()) {
+                throw new ConfigException(String.format("DLQ topic is not 
configured for share group %s.", param.groupId()));
+            }
+
+            ShareGroupDLQMetadataCacheHelper.TopicPartitionData tpData = 
cacheHelper.topicPartitionData(dlqTopic.get());
+
+            if (tpData.topicId().isEmpty()) {
+                throw new ConfigException(String.format("DLQ topic id could 
not be found for share group %s with DLQ topic %s.", param.groupId(), 
dlqTopic.get()));
+            }
+
+            if (tpData.numPartitions().isEmpty()) {
+                throw new ConfigException(String.format("DLQ topic partition 
count not be found for share group %s with DLQ topic %s.", param.groupId(), 
dlqTopic.get()));

Review Comment:
   nit: Missing "could".



##########
server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java:
##########
@@ -332,6 +508,93 @@ private void handleCreateTopicsResponse(ClientResponse 
response) {
                     requestErrorResponse(clientResponseError.exception());
             }
         }
+
+        private void handleProduceResponse(ClientResponse response) {
+            LOG.debug("Received ProduceRequestResponse {}", response);
+            produceRequestBackoff.incrementAttempt();
+            Errors clientResponseError = 
checkResponseError(response).orElse(Errors.NONE);
+            String clientResponseErrorMessage = clientResponseError.message();
+
+            switch (clientResponseError) {
+                case NONE:
+                    // Produce response received
+                    ProduceResponse produceResponse = ((ProduceResponse) 
response.responseBody());
+                    ProduceResponseData.TopicProduceResponseCollection 
produceResponseCollection = produceResponse.data().responses();
+                    if (produceResponseCollection.isEmpty()) {
+                        LOG.error("Received empty produce response for {} to 
dlq topic node {}.", this, dlqPartitionLeaderNode());

Review Comment:
   nit: Let's be consistent with use of `.` in the log messages. I would tend 
not to include in log messages, but I would definitely go for consistency in 
this source file.



##########
server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java:
##########


Review Comment:
   nit: This method could call the overload beneath will fully specified 
arguments.



##########
server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java:
##########
@@ -238,7 +355,51 @@ public Optional<Throwable> validateDlqTopic() {
 
         public boolean dlqTopicExists() {
             Optional<String> shareGroupDlqTopic = 
cacheHelper.shareGroupDlqTopic(param.groupId());
-            return 
shareGroupDlqTopic.filter(cacheHelper::containsTopic).isPresent();
+            boolean isDlqTopicPresent = 
shareGroupDlqTopic.filter(cacheHelper::containsTopic).isPresent();
+            if (isDlqTopicPresent) {
+                try {
+                    populateDLQTopicData();
+                } catch (ConfigException e) {
+                    return false;
+                }
+                addRequestToNodeMap(dlqPartitionLeaderNode, this);
+            }
+            return isDlqTopicPresent;
+        }
+
+        @Override
+        public String toString() {
+            return "ProduceRequestHandler(" +
+                "param: " + param + "\n" +
+                "dlqTopicData: " + dlqTopicPartitionData + "\n" +
+                ")";
+        }
+
+        private Header[] headers(long offset) {
+            List<Header> headers = new ArrayList<>();
+            headers.add(new RecordHeader("__dlq.errors.topic", 
recordTopic().getBytes(StandardCharsets.UTF_8)));

Review Comment:
   I would define some static strings for the header keys.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to