dajac commented on a change in pull request #8311:
URL: https://github.com/apache/kafka/pull/8311#discussion_r420615938



##########
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -477,6 +484,27 @@ public void testCreateTopics() throws Exception {
         }
     }
 
+    @Test
+    public void testCreateTopicsPartialResponse() throws Exception {
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(body -> body instanceof 
CreateTopicsRequest,
+                    prepareCreateTopicsResponse("myTopic", Errors.NONE));
+            CreateTopicsResult topicsResult = env.adminClient().createTopics(
+                    asList(new NewTopic("myTopic", Collections.singletonMap(0, 
asList(0, 1, 2))),
+                            new NewTopic("myTopic2", 
Collections.singletonMap(0, asList(0, 1, 2)))),

Review comment:
       nit: Can we align it with the `new` above?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -2234,18 +2258,29 @@ public AlterReplicaLogDirsResult 
alterReplicaLogDirs(Map<TopicPartitionReplica,
                 @Override
                 public void handleResponse(AbstractResponse abstractResponse) {
                     AlterReplicaLogDirsResponse response = 
(AlterReplicaLogDirsResponse) abstractResponse;
-                    for (Map.Entry<TopicPartition, Errors> responseEntry: 
response.responses().entrySet()) {
-                        TopicPartition tp = responseEntry.getKey();
-                        Errors error = responseEntry.getValue();
-                        TopicPartitionReplica replica = new 
TopicPartitionReplica(tp.topic(), tp.partition(), brokerId);
-                        KafkaFutureImpl<Void> future = futures.get(replica);
-                        if (future == null) {
-                            handleFailure(new IllegalStateException(
-                                "The partition " + tp + " in the response from 
broker " + brokerId + " is not in the request"));
-                        } else if (error == Errors.NONE) {
-                            future.complete(null);
-                        } else {
-                            future.completeExceptionally(error.exception());
+                    for (AlterReplicaLogDirTopicResult topicResult: 
response.data().results()) {
+                        for (AlterReplicaLogDirPartitionResult 
partitionResult: topicResult.partitions()) {
+                            TopicPartitionReplica replica = new 
TopicPartitionReplica(topicResult.topicName(), 
partitionResult.partitionIndex(), brokerId);
+                            KafkaFutureImpl<Void> future = 
futures.get(replica);
+                            if (future == null) {
+                                log.warn("The partition {} in the response 
from broker {}} is not in the request",
+                                        new 
TopicPartition(topicResult.topicName(), partitionResult.partitionIndex()),
+                                        brokerId);
+                            } else if (partitionResult.errorCode() == 
Errors.NONE.code()) {
+                                future.complete(null);
+                            } else {
+                                
future.completeExceptionally(Errors.forCode(partitionResult.errorCode()).exception());
+                            }
+                        }
+                    }
+                    // The server should send back a response for every 
replica. But do a sanity check anyway.
+                    for (Map.Entry<TopicPartitionReplica, 
KafkaFutureImpl<Void>> entry : futures.entrySet()) {
+                        TopicPartitionReplica replica = entry.getKey();
+                        KafkaFutureImpl<Void> future = entry.getValue();
+                        if (!future.isDone()
+                                && replica.brokerId() == brokerId) {

Review comment:
       nit: i would bring the second part on the first line.

##########
File path: 
clients/src/test/java/org/apache/kafka/common/requests/AlterReplicaLogDirsRequestTest.java
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.LogDirNotFoundException;
+import org.apache.kafka.common.message.AlterReplicaLogDirsRequestData;
+import 
org.apache.kafka.common.message.AlterReplicaLogDirsRequestData.AlterReplicaLogDir;
+import 
org.apache.kafka.common.message.AlterReplicaLogDirsRequestData.AlterReplicaLogDirCollection;
+import 
org.apache.kafka.common.message.AlterReplicaLogDirsRequestData.AlterReplicaLogDirTopic;
+import 
org.apache.kafka.common.message.AlterReplicaLogDirsRequestData.AlterReplicaLogDirTopicCollection;
+import 
org.apache.kafka.common.message.AlterReplicaLogDirsResponseData.AlterReplicaLogDirTopicResult;
+import org.apache.kafka.common.protocol.Errors;
+import org.junit.Test;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static org.junit.Assert.assertEquals;
+
+public class AlterReplicaLogDirsRequestTest {
+
+    @Test
+    public void testErrorResponse() {
+        AlterReplicaLogDirsRequestData data = new 
AlterReplicaLogDirsRequestData()
+                .setDirs(new AlterReplicaLogDirCollection(
+                        singletonList(new AlterReplicaLogDir()
+                                .setPath("/data0")
+                                .setTopics(new 
AlterReplicaLogDirTopicCollection(
+                                        singletonList(new 
AlterReplicaLogDirTopic()
+                                                .setName("topic")
+                                                .setPartitions(asList(0, 1, 
2))).iterator()))).iterator()));
+        AlterReplicaLogDirsResponse errorResponse = new 
AlterReplicaLogDirsRequest.Builder(data).build()
+                .getErrorResponse(123, new LogDirNotFoundException("/data0"));
+        assertEquals(1, errorResponse.data().results().size());
+        AlterReplicaLogDirTopicResult topicResponse = 
errorResponse.data().results().get(0);
+        assertEquals("topic", topicResponse.topicName());
+        assertEquals(3, topicResponse.partitions().size());
+        for (int i = 0; i < 3; i++) {
+            assertEquals(i, 
topicResponse.partitions().get(i).partitionIndex());
+            assertEquals(Errors.LOG_DIR_NOT_FOUND.code(), 
topicResponse.partitions().get(i).errorCode());
+        }
+    }
+
+    @Test
+    public void testPartitionDir() {
+        AlterReplicaLogDirsRequestData data = new 
AlterReplicaLogDirsRequestData()
+                .setDirs(new AlterReplicaLogDirCollection(
+                        asList(new AlterReplicaLogDir()
+                                .setPath("/data0")
+                                .setTopics(new 
AlterReplicaLogDirTopicCollection(
+                                        asList(new AlterReplicaLogDirTopic()
+                                                .setName("topic")
+                                                .setPartitions(asList(0, 1)),
+                                                new AlterReplicaLogDirTopic()
+                                                        .setName("topic2")
+                                                        
.setPartitions(asList(7))).iterator())),

Review comment:
       nit: indentation is a bit inconsistent here.

##########
File path: 
clients/src/test/java/org/apache/kafka/common/requests/AlterReplicaLogDirsRequestTest.java
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.LogDirNotFoundException;
+import org.apache.kafka.common.message.AlterReplicaLogDirsRequestData;
+import 
org.apache.kafka.common.message.AlterReplicaLogDirsRequestData.AlterReplicaLogDir;
+import 
org.apache.kafka.common.message.AlterReplicaLogDirsRequestData.AlterReplicaLogDirCollection;
+import 
org.apache.kafka.common.message.AlterReplicaLogDirsRequestData.AlterReplicaLogDirTopic;
+import 
org.apache.kafka.common.message.AlterReplicaLogDirsRequestData.AlterReplicaLogDirTopicCollection;
+import 
org.apache.kafka.common.message.AlterReplicaLogDirsResponseData.AlterReplicaLogDirTopicResult;
+import org.apache.kafka.common.protocol.Errors;
+import org.junit.Test;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static org.junit.Assert.assertEquals;
+
+public class AlterReplicaLogDirsRequestTest {
+
+    @Test
+    public void testErrorResponse() {
+        AlterReplicaLogDirsRequestData data = new 
AlterReplicaLogDirsRequestData()
+                .setDirs(new AlterReplicaLogDirCollection(
+                        singletonList(new AlterReplicaLogDir()
+                                .setPath("/data0")
+                                .setTopics(new 
AlterReplicaLogDirTopicCollection(
+                                        singletonList(new 
AlterReplicaLogDirTopic()
+                                                .setName("topic")
+                                                .setPartitions(asList(0, 1, 
2))).iterator()))).iterator()));
+        AlterReplicaLogDirsResponse errorResponse = new 
AlterReplicaLogDirsRequest.Builder(data).build()
+                .getErrorResponse(123, new LogDirNotFoundException("/data0"));
+        assertEquals(1, errorResponse.data().results().size());
+        AlterReplicaLogDirTopicResult topicResponse = 
errorResponse.data().results().get(0);
+        assertEquals("topic", topicResponse.topicName());
+        assertEquals(3, topicResponse.partitions().size());
+        for (int i = 0; i < 3; i++) {
+            assertEquals(i, 
topicResponse.partitions().get(i).partitionIndex());
+            assertEquals(Errors.LOG_DIR_NOT_FOUND.code(), 
topicResponse.partitions().get(i).errorCode());
+        }
+    }
+
+    @Test
+    public void testPartitionDir() {
+        AlterReplicaLogDirsRequestData data = new 
AlterReplicaLogDirsRequestData()
+                .setDirs(new AlterReplicaLogDirCollection(
+                        asList(new AlterReplicaLogDir()
+                                .setPath("/data0")
+                                .setTopics(new 
AlterReplicaLogDirTopicCollection(
+                                        asList(new AlterReplicaLogDirTopic()
+                                                .setName("topic")
+                                                .setPartitions(asList(0, 1)),
+                                                new AlterReplicaLogDirTopic()
+                                                        .setName("topic2")
+                                                        
.setPartitions(asList(7))).iterator())),
+                                new AlterReplicaLogDir()
+                                        .setPath("/data1")
+                                        .setTopics(new 
AlterReplicaLogDirTopicCollection(
+                                                asList(new 
AlterReplicaLogDirTopic()
+                                                                
.setName("topic3")
+                                                                
.setPartitions(asList(12))).iterator()))).iterator()));

Review comment:
       nit: Indentation of these two lines look weird.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -1402,6 +1407,15 @@ int numPendingCalls() {
         return runnable.pendingCalls.size();
     }
 
+    /**
+     * Fail the given future when a response handler expected a result for an 
entity but no result was present.
+     * @param future The future to fail.
+     * @param message The message to fail the future with
+     */
+    private void partialResponse(KafkaFutureImpl<?> future, String message) {

Review comment:
       I am not entirely convinced by the name. At the end, the method complete 
a future with the given message. There is nothing specific to handling partial 
responses. I would rename it to something more generic or simply remove it as 
we don't gain much with it.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -1402,6 +1407,15 @@ int numPendingCalls() {
         return runnable.pendingCalls.size();
     }
 
+    /**
+     * Fail the given future when a response handler expected a result for an 
entity but no result was present.
+     * @param future The future to fail.
+     * @param message The message to fail the future with

Review comment:
       nit: `.` at the end to stay consistent with previous line.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to