dajac commented on a change in pull request #8311: URL: https://github.com/apache/kafka/pull/8311#discussion_r420615938
########## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ########## @@ -477,6 +484,27 @@ public void testCreateTopics() throws Exception { } } + @Test + public void testCreateTopicsPartialResponse() throws Exception { + try (AdminClientUnitTestEnv env = mockClientEnv()) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponse(body -> body instanceof CreateTopicsRequest, + prepareCreateTopicsResponse("myTopic", Errors.NONE)); + CreateTopicsResult topicsResult = env.adminClient().createTopics( + asList(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2))), + new NewTopic("myTopic2", Collections.singletonMap(0, asList(0, 1, 2)))), Review comment: nit: Can we align it with the `new` above? ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ########## @@ -2234,18 +2258,29 @@ public AlterReplicaLogDirsResult alterReplicaLogDirs(Map<TopicPartitionReplica, @Override public void handleResponse(AbstractResponse abstractResponse) { AlterReplicaLogDirsResponse response = (AlterReplicaLogDirsResponse) abstractResponse; - for (Map.Entry<TopicPartition, Errors> responseEntry: response.responses().entrySet()) { - TopicPartition tp = responseEntry.getKey(); - Errors error = responseEntry.getValue(); - TopicPartitionReplica replica = new TopicPartitionReplica(tp.topic(), tp.partition(), brokerId); - KafkaFutureImpl<Void> future = futures.get(replica); - if (future == null) { - handleFailure(new IllegalStateException( - "The partition " + tp + " in the response from broker " + brokerId + " is not in the request")); - } else if (error == Errors.NONE) { - future.complete(null); - } else { - future.completeExceptionally(error.exception()); + for (AlterReplicaLogDirTopicResult topicResult: response.data().results()) { + for (AlterReplicaLogDirPartitionResult partitionResult: topicResult.partitions()) { + TopicPartitionReplica replica = new TopicPartitionReplica(topicResult.topicName(), partitionResult.partitionIndex(), brokerId); + KafkaFutureImpl<Void> future = futures.get(replica); + if (future == null) { + log.warn("The partition {} in the response from broker {}} is not in the request", + new TopicPartition(topicResult.topicName(), partitionResult.partitionIndex()), + brokerId); + } else if (partitionResult.errorCode() == Errors.NONE.code()) { + future.complete(null); + } else { + future.completeExceptionally(Errors.forCode(partitionResult.errorCode()).exception()); + } + } + } + // The server should send back a response for every replica. But do a sanity check anyway. + for (Map.Entry<TopicPartitionReplica, KafkaFutureImpl<Void>> entry : futures.entrySet()) { + TopicPartitionReplica replica = entry.getKey(); + KafkaFutureImpl<Void> future = entry.getValue(); + if (!future.isDone() + && replica.brokerId() == brokerId) { Review comment: nit: i would bring the second part on the first line. ########## File path: clients/src/test/java/org/apache/kafka/common/requests/AlterReplicaLogDirsRequestTest.java ########## @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.LogDirNotFoundException; +import org.apache.kafka.common.message.AlterReplicaLogDirsRequestData; +import org.apache.kafka.common.message.AlterReplicaLogDirsRequestData.AlterReplicaLogDir; +import org.apache.kafka.common.message.AlterReplicaLogDirsRequestData.AlterReplicaLogDirCollection; +import org.apache.kafka.common.message.AlterReplicaLogDirsRequestData.AlterReplicaLogDirTopic; +import org.apache.kafka.common.message.AlterReplicaLogDirsRequestData.AlterReplicaLogDirTopicCollection; +import org.apache.kafka.common.message.AlterReplicaLogDirsResponseData.AlterReplicaLogDirTopicResult; +import org.apache.kafka.common.protocol.Errors; +import org.junit.Test; + +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; +import static org.junit.Assert.assertEquals; + +public class AlterReplicaLogDirsRequestTest { + + @Test + public void testErrorResponse() { + AlterReplicaLogDirsRequestData data = new AlterReplicaLogDirsRequestData() + .setDirs(new AlterReplicaLogDirCollection( + singletonList(new AlterReplicaLogDir() + .setPath("/data0") + .setTopics(new AlterReplicaLogDirTopicCollection( + singletonList(new AlterReplicaLogDirTopic() + .setName("topic") + .setPartitions(asList(0, 1, 2))).iterator()))).iterator())); + AlterReplicaLogDirsResponse errorResponse = new AlterReplicaLogDirsRequest.Builder(data).build() + .getErrorResponse(123, new LogDirNotFoundException("/data0")); + assertEquals(1, errorResponse.data().results().size()); + AlterReplicaLogDirTopicResult topicResponse = errorResponse.data().results().get(0); + assertEquals("topic", topicResponse.topicName()); + assertEquals(3, topicResponse.partitions().size()); + for (int i = 0; i < 3; i++) { + assertEquals(i, topicResponse.partitions().get(i).partitionIndex()); + assertEquals(Errors.LOG_DIR_NOT_FOUND.code(), topicResponse.partitions().get(i).errorCode()); + } + } + + @Test + public void testPartitionDir() { + AlterReplicaLogDirsRequestData data = new AlterReplicaLogDirsRequestData() + .setDirs(new AlterReplicaLogDirCollection( + asList(new AlterReplicaLogDir() + .setPath("/data0") + .setTopics(new AlterReplicaLogDirTopicCollection( + asList(new AlterReplicaLogDirTopic() + .setName("topic") + .setPartitions(asList(0, 1)), + new AlterReplicaLogDirTopic() + .setName("topic2") + .setPartitions(asList(7))).iterator())), Review comment: nit: indentation is a bit inconsistent here. ########## File path: clients/src/test/java/org/apache/kafka/common/requests/AlterReplicaLogDirsRequestTest.java ########## @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.LogDirNotFoundException; +import org.apache.kafka.common.message.AlterReplicaLogDirsRequestData; +import org.apache.kafka.common.message.AlterReplicaLogDirsRequestData.AlterReplicaLogDir; +import org.apache.kafka.common.message.AlterReplicaLogDirsRequestData.AlterReplicaLogDirCollection; +import org.apache.kafka.common.message.AlterReplicaLogDirsRequestData.AlterReplicaLogDirTopic; +import org.apache.kafka.common.message.AlterReplicaLogDirsRequestData.AlterReplicaLogDirTopicCollection; +import org.apache.kafka.common.message.AlterReplicaLogDirsResponseData.AlterReplicaLogDirTopicResult; +import org.apache.kafka.common.protocol.Errors; +import org.junit.Test; + +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; +import static org.junit.Assert.assertEquals; + +public class AlterReplicaLogDirsRequestTest { + + @Test + public void testErrorResponse() { + AlterReplicaLogDirsRequestData data = new AlterReplicaLogDirsRequestData() + .setDirs(new AlterReplicaLogDirCollection( + singletonList(new AlterReplicaLogDir() + .setPath("/data0") + .setTopics(new AlterReplicaLogDirTopicCollection( + singletonList(new AlterReplicaLogDirTopic() + .setName("topic") + .setPartitions(asList(0, 1, 2))).iterator()))).iterator())); + AlterReplicaLogDirsResponse errorResponse = new AlterReplicaLogDirsRequest.Builder(data).build() + .getErrorResponse(123, new LogDirNotFoundException("/data0")); + assertEquals(1, errorResponse.data().results().size()); + AlterReplicaLogDirTopicResult topicResponse = errorResponse.data().results().get(0); + assertEquals("topic", topicResponse.topicName()); + assertEquals(3, topicResponse.partitions().size()); + for (int i = 0; i < 3; i++) { + assertEquals(i, topicResponse.partitions().get(i).partitionIndex()); + assertEquals(Errors.LOG_DIR_NOT_FOUND.code(), topicResponse.partitions().get(i).errorCode()); + } + } + + @Test + public void testPartitionDir() { + AlterReplicaLogDirsRequestData data = new AlterReplicaLogDirsRequestData() + .setDirs(new AlterReplicaLogDirCollection( + asList(new AlterReplicaLogDir() + .setPath("/data0") + .setTopics(new AlterReplicaLogDirTopicCollection( + asList(new AlterReplicaLogDirTopic() + .setName("topic") + .setPartitions(asList(0, 1)), + new AlterReplicaLogDirTopic() + .setName("topic2") + .setPartitions(asList(7))).iterator())), + new AlterReplicaLogDir() + .setPath("/data1") + .setTopics(new AlterReplicaLogDirTopicCollection( + asList(new AlterReplicaLogDirTopic() + .setName("topic3") + .setPartitions(asList(12))).iterator()))).iterator())); Review comment: nit: Indentation of these two lines look weird. ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ########## @@ -1402,6 +1407,15 @@ int numPendingCalls() { return runnable.pendingCalls.size(); } + /** + * Fail the given future when a response handler expected a result for an entity but no result was present. + * @param future The future to fail. + * @param message The message to fail the future with + */ + private void partialResponse(KafkaFutureImpl<?> future, String message) { Review comment: I am not entirely convinced by the name. At the end, the method complete a future with the given message. There is nothing specific to handling partial responses. I would rename it to something more generic or simply remove it as we don't gain much with it. ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ########## @@ -1402,6 +1407,15 @@ int numPendingCalls() { return runnable.pendingCalls.size(); } + /** + * Fail the given future when a response handler expected a result for an entity but no result was present. + * @param future The future to fail. + * @param message The message to fail the future with Review comment: nit: `.` at the end to stay consistent with previous line. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org