dajac commented on a change in pull request #8311:
URL: https://github.com/apache/kafka/pull/8311#discussion_r413546760



##########
File path: 
clients/src/test/java/org/apache/kafka/common/requests/AlterReplicaLogDirsResponseTest.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import java.util.Map;
+
+import org.apache.kafka.common.message.AlterReplicaLogDirsResponseData;
+import 
org.apache.kafka.common.message.AlterReplicaLogDirsResponseData.AlterReplicaLogDirPartitionResult;
+import 
org.apache.kafka.common.message.AlterReplicaLogDirsResponseData.AlterReplicaLogDirTopicResult;
+import org.apache.kafka.common.protocol.Errors;
+import org.junit.Test;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+
+public class AlterReplicaLogDirsResponseTest {
+
+    @Test
+    public void testErrorResponse() {

Review comment:
       I suppose that this should be named `testErrorCounts`.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -2234,18 +2249,18 @@ public AlterReplicaLogDirsResult 
alterReplicaLogDirs(Map<TopicPartitionReplica,
                 @Override
                 public void handleResponse(AbstractResponse abstractResponse) {
                     AlterReplicaLogDirsResponse response = 
(AlterReplicaLogDirsResponse) abstractResponse;
-                    for (Map.Entry<TopicPartition, Errors> responseEntry: 
response.responses().entrySet()) {
-                        TopicPartition tp = responseEntry.getKey();
-                        Errors error = responseEntry.getValue();
-                        TopicPartitionReplica replica = new 
TopicPartitionReplica(tp.topic(), tp.partition(), brokerId);
-                        KafkaFutureImpl<Void> future = futures.get(replica);
-                        if (future == null) {
-                            handleFailure(new IllegalStateException(
-                                "The partition " + tp + " in the response from 
broker " + brokerId + " is not in the request"));
-                        } else if (error == Errors.NONE) {
-                            future.complete(null);
-                        } else {
-                            future.completeExceptionally(error.exception());
+                    for (AlterReplicaLogDirTopicResult topicResult: 
response.data().results()) {
+                        for (AlterReplicaLogDirPartitionResult 
partitionResult: topicResult.partitions()) {
+                            TopicPartitionReplica replica = new 
TopicPartitionReplica(topicResult.topicName(), 
partitionResult.partitionIndex(), brokerId);
+                            KafkaFutureImpl<Void> future = 
futures.get(replica);
+                            if (future == null) {
+                                handleFailure(new IllegalStateException(
+                                        "The partition " + new 
TopicPartition(topicResult.topicName(), partitionResult.partitionIndex()) + " 
in the response from broker " + brokerId + " is not in the request"));

Review comment:
       @tombentley This one bugs me. I know that it was already like this in 
the previous implementation but I wonder if we could improve this while we are 
here. I am also fine keeping it as it is and improving this latter on.
   
   I find odd that we fail all the non-realised yet futures when we discover an 
unknown replica in the response. From a client perspective, the behaviour is 
unpredictable and we leave it with potentially a partial view of the results 
event though the brokers may have sent back all the requested ones. Wouldn't it 
be better to just ignore unexpected replica and log a warning message? If a 
broker would ever do this, the client would continue to work as expected which 
is probably better. I think that it is a better behavior.
   
   Similarly, we may want to ensure that all futures are done in case a replica 
would be missing in the response from the broker.
   
   What do you think?

##########
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##########
@@ -1707,4 +1707,47 @@ class KafkaApisTest {
       0, 0, partitionStates.asJava, Seq(broker).asJava).build()
     metadataCache.updateMetadata(correlationId = 0, updateMetadataRequest)
   }
+
+  @Test
+  def testAlterReplicaLogDirs(): Unit = {
+    val data = new AlterReplicaLogDirsRequestData()
+    val dir = new AlterReplicaLogDirsRequestData.AlterReplicaLogDir()
+      .setPath("/foo")
+    dir.topics().add(new 
AlterReplicaLogDirsRequestData.AlterReplicaLogDirTopic().setName("t0").setPartitions(asList(0,
 1, 2)))
+    data.dirs().add(dir)
+    val alterReplicaLogDirsRequest = new AlterReplicaLogDirsRequest.Builder(
+      data
+    ).build()
+    val request = buildRequest(alterReplicaLogDirsRequest)
+
+    EasyMock.reset(replicaManager, clientRequestQuotaManager, requestChannel)
+
+    val capturedResponse = expectNoThrottling()
+    val t0p0 = new TopicPartition("t0", 0)
+    val t0p1 = new TopicPartition("t0", 1)
+    val t0p2 = new TopicPartition("t0", 2)
+    val partitionResults = Map(
+      t0p0 -> Errors.NONE,
+      t0p1 -> Errors.LOG_DIR_NOT_FOUND,
+      t0p2 -> Errors.INVALID_TOPIC_EXCEPTION)
+    EasyMock.expect(replicaManager.alterReplicaLogDirs(EasyMock.eq(Map(
+      t0p0 -> "/foo",
+      t0p1 -> "/foo",
+      t0p2 -> "/foo"))))
+    .andReturn(partitionResults)
+    EasyMock.replay(replicaManager, clientQuotaManager, 
clientRequestQuotaManager, requestChannel)
+
+    createKafkaApis().handleAlterReplicaLogDirsRequest(request)
+
+    val response = readResponse(ApiKeys.ALTER_REPLICA_LOG_DIRS, 
alterReplicaLogDirsRequest, capturedResponse)
+      .asInstanceOf[AlterReplicaLogDirsResponse]
+    assertEquals(partitionResults, response.data.results.asScala.flatMap { tr 
=>
+      tr.partitions().asScala.map { pr =>
+        new TopicPartition(tr.topicName(), pr.partitionIndex()) -> 
Errors.forCode(pr.errorCode())

Review comment:
       nit: `()` can be removed for all the accessors here.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsRequest.java
##########
@@ -17,141 +17,79 @@
 
 package org.apache.kafka.common.requests;
 
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
-import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.utils.CollectionUtils;
-
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
-import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
-import static org.apache.kafka.common.protocol.types.Type.INT32;
-import static org.apache.kafka.common.protocol.types.Type.STRING;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.AlterReplicaLogDirsRequestData;
+import org.apache.kafka.common.message.AlterReplicaLogDirsResponseData;
+import 
org.apache.kafka.common.message.AlterReplicaLogDirsResponseData.AlterReplicaLogDirTopicResult;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
 
 public class AlterReplicaLogDirsRequest extends AbstractRequest {
 
-    // request level key names
-    private static final String LOG_DIRS_KEY_NAME = "log_dirs";
-
-    // log dir level key names
-    private static final String LOG_DIR_KEY_NAME = "log_dir";
-    private static final String TOPICS_KEY_NAME = "topics";
-
-    // topic level key names
-    private static final String PARTITIONS_KEY_NAME = "partitions";
-
-    private static final Schema ALTER_REPLICA_LOG_DIRS_REQUEST_V0 = new Schema(
-            new Field("log_dirs", new ArrayOf(new Schema(
-                    new Field("log_dir", STRING, "The absolute log directory 
path."),
-                    new Field("topics", new ArrayOf(new Schema(
-                            TOPIC_NAME,
-                            new Field("partitions", new ArrayOf(INT32), "List 
of partition ids of the topic."))))))));
-
-    /**
-     * The version number is bumped to indicate that on quota violation 
brokers send out responses before throttling.
-     */
-    private static final Schema ALTER_REPLICA_LOG_DIRS_REQUEST_V1 = 
ALTER_REPLICA_LOG_DIRS_REQUEST_V0;
-
-    public static Schema[] schemaVersions() {
-        return new Schema[]{ALTER_REPLICA_LOG_DIRS_REQUEST_V0, 
ALTER_REPLICA_LOG_DIRS_REQUEST_V1};
-    }
-
-    private final Map<TopicPartition, String> partitionDirs;
+    private final AlterReplicaLogDirsRequestData data;
 
     public static class Builder extends 
AbstractRequest.Builder<AlterReplicaLogDirsRequest> {
-        private final Map<TopicPartition, String> partitionDirs;
+        private final AlterReplicaLogDirsRequestData data;
 
-        public Builder(Map<TopicPartition, String> partitionDirs) {
+        public Builder(AlterReplicaLogDirsRequestData data) {
             super(ApiKeys.ALTER_REPLICA_LOG_DIRS);
-            this.partitionDirs = partitionDirs;
+            this.data = data;
         }
 
         @Override
         public AlterReplicaLogDirsRequest build(short version) {
-            return new AlterReplicaLogDirsRequest(partitionDirs, version);
+            return new AlterReplicaLogDirsRequest(data, version);
         }
 
         @Override
         public String toString() {
-            StringBuilder builder = new StringBuilder();
-            builder.append("(type=AlterReplicaLogDirsRequest")
-                .append(", partitionDirs=")
-                .append(partitionDirs)
-                .append(")");
-            return builder.toString();
+            return data.toString();
         }
     }
 
     public AlterReplicaLogDirsRequest(Struct struct, short version) {
         super(ApiKeys.ALTER_REPLICA_LOG_DIRS, version);
-        partitionDirs = new HashMap<>();
-        for (Object logDirStructObj : struct.getArray(LOG_DIRS_KEY_NAME)) {
-            Struct logDirStruct = (Struct) logDirStructObj;
-            String logDir = logDirStruct.getString(LOG_DIR_KEY_NAME);
-            for (Object topicStructObj : 
logDirStruct.getArray(TOPICS_KEY_NAME)) {
-                Struct topicStruct = (Struct) topicStructObj;
-                String topic = topicStruct.get(TOPIC_NAME);
-                for (Object partitionObj : 
topicStruct.getArray(PARTITIONS_KEY_NAME)) {
-                    int partition = (Integer) partitionObj;
-                    partitionDirs.put(new TopicPartition(topic, partition), 
logDir);
-                }
-            }
-        }
+        this.data = new AlterReplicaLogDirsRequestData(struct, version);
     }
 
-    public AlterReplicaLogDirsRequest(Map<TopicPartition, String> 
partitionDirs, short version) {
+    public AlterReplicaLogDirsRequest(AlterReplicaLogDirsRequestData data, 
short version) {
         super(ApiKeys.ALTER_REPLICA_LOG_DIRS, version);
-        this.partitionDirs = partitionDirs;
+        this.data = data;
     }
 
     @Override
     protected Struct toStruct() {
-        Map<String, List<TopicPartition>> dirPartitions = new HashMap<>();
-        for (Map.Entry<TopicPartition, String> entry: 
partitionDirs.entrySet()) {
-            if (!dirPartitions.containsKey(entry.getValue()))
-                dirPartitions.put(entry.getValue(), new ArrayList<>());
-            dirPartitions.get(entry.getValue()).add(entry.getKey());
-        }
-
-        Struct struct = new 
Struct(ApiKeys.ALTER_REPLICA_LOG_DIRS.requestSchema(version()));
-        List<Struct> logDirStructArray = new ArrayList<>();
-        for (Map.Entry<String, List<TopicPartition>> logDirEntry: 
dirPartitions.entrySet()) {
-            Struct logDirStruct = struct.instance(LOG_DIRS_KEY_NAME);
-            logDirStruct.set(LOG_DIR_KEY_NAME, logDirEntry.getKey());
-
-            List<Struct> topicStructArray = new ArrayList<>();
-            for (Map.Entry<String, List<Integer>> topicEntry: 
CollectionUtils.groupPartitionsByTopic(logDirEntry.getValue()).entrySet()) {
-                Struct topicStruct = logDirStruct.instance(TOPICS_KEY_NAME);
-                topicStruct.set(TOPIC_NAME, topicEntry.getKey());
-                topicStruct.set(PARTITIONS_KEY_NAME, 
topicEntry.getValue().toArray());
-                topicStructArray.add(topicStruct);
-            }
-            logDirStruct.set(TOPICS_KEY_NAME, topicStructArray.toArray());
-            logDirStructArray.add(logDirStruct);
-        }
-        struct.set(LOG_DIRS_KEY_NAME, logDirStructArray.toArray());
-        return struct;
+        return data.toStruct(version());
     }
 
     @Override
-    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
-        Map<TopicPartition, Errors> responseMap = new HashMap<>();
-        for (Map.Entry<TopicPartition, String> entry : 
partitionDirs.entrySet()) {
-            responseMap.put(entry.getKey(), Errors.forException(e));
-        }
-        return new AlterReplicaLogDirsResponse(throttleTimeMs, responseMap);
+    public AlterReplicaLogDirsResponse getErrorResponse(int throttleTimeMs, 
Throwable e) {
+        AlterReplicaLogDirsResponseData data = new 
AlterReplicaLogDirsResponseData();
+        data.setResults(this.data.dirs().stream().flatMap(alterDir ->
+            alterDir.topics().stream().map(topic ->
+                new AlterReplicaLogDirTopicResult()
+                    .setTopicName(topic.name())
+                    .setPartitions(topic.partitions().stream().map(partitionId 
->
+                        new 
AlterReplicaLogDirsResponseData.AlterReplicaLogDirPartitionResult()
+                            .setErrorCode(Errors.forException(e).code())
+                            
.setPartitionIndex(partitionId)).collect(Collectors.toList())))).collect(Collectors.toList()));
+        return new 
AlterReplicaLogDirsResponse(data.setThrottleTimeMs(throttleTimeMs));
     }
 
     public Map<TopicPartition, String> partitionDirs() {
-        return partitionDirs;
+        Map<TopicPartition, String> result = new HashMap<>();

Review comment:
       Do you mind adding a unit test for this one in 
`AlterReplicaLogDirsRequestTest` for the sake of completeness?

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -3382,6 +3388,70 @@ public void testAlterClientQuotas() throws Exception {
         }
     }
 
+    @Test
+    public void testAlterReplicaLogDirsSuccess() throws Exception {
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            createAlterLogDirsResponse(env, env.cluster().nodeById(0), 
Errors.NONE, 0);
+            createAlterLogDirsResponse(env, env.cluster().nodeById(1), 
Errors.NONE, 0);
+
+            TopicPartitionReplica tpr0 = new TopicPartitionReplica("topic", 0, 
0);
+            TopicPartitionReplica tpr1 = new TopicPartitionReplica("topic", 0, 
1);
+
+            Map<TopicPartitionReplica, String> logDirs = new HashMap<>();
+            assertNull(logDirs.put(tpr0, "/data0"));
+            assertNull(logDirs.put(tpr1, "/data1"));

Review comment:
       `assertNull`s shouldn't be here but few lines bellow.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to