dajac commented on a change in pull request #8311: URL: https://github.com/apache/kafka/pull/8311#discussion_r413546760
########## File path: clients/src/test/java/org/apache/kafka/common/requests/AlterReplicaLogDirsResponseTest.java ########## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import java.util.Map; + +import org.apache.kafka.common.message.AlterReplicaLogDirsResponseData; +import org.apache.kafka.common.message.AlterReplicaLogDirsResponseData.AlterReplicaLogDirPartitionResult; +import org.apache.kafka.common.message.AlterReplicaLogDirsResponseData.AlterReplicaLogDirTopicResult; +import org.apache.kafka.common.protocol.Errors; +import org.junit.Test; + +import static java.util.Arrays.asList; +import static org.junit.Assert.assertEquals; + +public class AlterReplicaLogDirsResponseTest { + + @Test + public void testErrorResponse() { Review comment: I suppose that this should be named `testErrorCounts`. ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ########## @@ -2234,18 +2249,18 @@ public AlterReplicaLogDirsResult alterReplicaLogDirs(Map<TopicPartitionReplica, @Override public void handleResponse(AbstractResponse abstractResponse) { AlterReplicaLogDirsResponse response = (AlterReplicaLogDirsResponse) abstractResponse; - for (Map.Entry<TopicPartition, Errors> responseEntry: response.responses().entrySet()) { - TopicPartition tp = responseEntry.getKey(); - Errors error = responseEntry.getValue(); - TopicPartitionReplica replica = new TopicPartitionReplica(tp.topic(), tp.partition(), brokerId); - KafkaFutureImpl<Void> future = futures.get(replica); - if (future == null) { - handleFailure(new IllegalStateException( - "The partition " + tp + " in the response from broker " + brokerId + " is not in the request")); - } else if (error == Errors.NONE) { - future.complete(null); - } else { - future.completeExceptionally(error.exception()); + for (AlterReplicaLogDirTopicResult topicResult: response.data().results()) { + for (AlterReplicaLogDirPartitionResult partitionResult: topicResult.partitions()) { + TopicPartitionReplica replica = new TopicPartitionReplica(topicResult.topicName(), partitionResult.partitionIndex(), brokerId); + KafkaFutureImpl<Void> future = futures.get(replica); + if (future == null) { + handleFailure(new IllegalStateException( + "The partition " + new TopicPartition(topicResult.topicName(), partitionResult.partitionIndex()) + " in the response from broker " + brokerId + " is not in the request")); Review comment: @tombentley This one bugs me. I know that it was already like this in the previous implementation but I wonder if we could improve this while we are here. I am also fine keeping it as it is and improving this latter on. I find odd that we fail all the non-realised yet futures when we discover an unknown replica in the response. From a client perspective, the behaviour is unpredictable and we leave it with potentially a partial view of the results event though the brokers may have sent back all the requested ones. Wouldn't it be better to just ignore unexpected replica and log a warning message? If a broker would ever do this, the client would continue to work as expected which is probably better. I think that it is a better behavior. Similarly, we may want to ensure that all futures are done in case a replica would be missing in the response from the broker. What do you think? ########## File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala ########## @@ -1707,4 +1707,47 @@ class KafkaApisTest { 0, 0, partitionStates.asJava, Seq(broker).asJava).build() metadataCache.updateMetadata(correlationId = 0, updateMetadataRequest) } + + @Test + def testAlterReplicaLogDirs(): Unit = { + val data = new AlterReplicaLogDirsRequestData() + val dir = new AlterReplicaLogDirsRequestData.AlterReplicaLogDir() + .setPath("/foo") + dir.topics().add(new AlterReplicaLogDirsRequestData.AlterReplicaLogDirTopic().setName("t0").setPartitions(asList(0, 1, 2))) + data.dirs().add(dir) + val alterReplicaLogDirsRequest = new AlterReplicaLogDirsRequest.Builder( + data + ).build() + val request = buildRequest(alterReplicaLogDirsRequest) + + EasyMock.reset(replicaManager, clientRequestQuotaManager, requestChannel) + + val capturedResponse = expectNoThrottling() + val t0p0 = new TopicPartition("t0", 0) + val t0p1 = new TopicPartition("t0", 1) + val t0p2 = new TopicPartition("t0", 2) + val partitionResults = Map( + t0p0 -> Errors.NONE, + t0p1 -> Errors.LOG_DIR_NOT_FOUND, + t0p2 -> Errors.INVALID_TOPIC_EXCEPTION) + EasyMock.expect(replicaManager.alterReplicaLogDirs(EasyMock.eq(Map( + t0p0 -> "/foo", + t0p1 -> "/foo", + t0p2 -> "/foo")))) + .andReturn(partitionResults) + EasyMock.replay(replicaManager, clientQuotaManager, clientRequestQuotaManager, requestChannel) + + createKafkaApis().handleAlterReplicaLogDirsRequest(request) + + val response = readResponse(ApiKeys.ALTER_REPLICA_LOG_DIRS, alterReplicaLogDirsRequest, capturedResponse) + .asInstanceOf[AlterReplicaLogDirsResponse] + assertEquals(partitionResults, response.data.results.asScala.flatMap { tr => + tr.partitions().asScala.map { pr => + new TopicPartition(tr.topicName(), pr.partitionIndex()) -> Errors.forCode(pr.errorCode()) Review comment: nit: `()` can be removed for all the accessors here. ########## File path: clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsRequest.java ########## @@ -17,141 +17,79 @@ package org.apache.kafka.common.requests; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.protocol.ApiKeys; -import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.types.ArrayOf; -import org.apache.kafka.common.protocol.types.Field; -import org.apache.kafka.common.protocol.types.Schema; -import org.apache.kafka.common.protocol.types.Struct; -import org.apache.kafka.common.utils.CollectionUtils; - import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; +import java.util.stream.Collectors; -import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME; -import static org.apache.kafka.common.protocol.types.Type.INT32; -import static org.apache.kafka.common.protocol.types.Type.STRING; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.AlterReplicaLogDirsRequestData; +import org.apache.kafka.common.message.AlterReplicaLogDirsResponseData; +import org.apache.kafka.common.message.AlterReplicaLogDirsResponseData.AlterReplicaLogDirTopicResult; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.Struct; public class AlterReplicaLogDirsRequest extends AbstractRequest { - // request level key names - private static final String LOG_DIRS_KEY_NAME = "log_dirs"; - - // log dir level key names - private static final String LOG_DIR_KEY_NAME = "log_dir"; - private static final String TOPICS_KEY_NAME = "topics"; - - // topic level key names - private static final String PARTITIONS_KEY_NAME = "partitions"; - - private static final Schema ALTER_REPLICA_LOG_DIRS_REQUEST_V0 = new Schema( - new Field("log_dirs", new ArrayOf(new Schema( - new Field("log_dir", STRING, "The absolute log directory path."), - new Field("topics", new ArrayOf(new Schema( - TOPIC_NAME, - new Field("partitions", new ArrayOf(INT32), "List of partition ids of the topic.")))))))); - - /** - * The version number is bumped to indicate that on quota violation brokers send out responses before throttling. - */ - private static final Schema ALTER_REPLICA_LOG_DIRS_REQUEST_V1 = ALTER_REPLICA_LOG_DIRS_REQUEST_V0; - - public static Schema[] schemaVersions() { - return new Schema[]{ALTER_REPLICA_LOG_DIRS_REQUEST_V0, ALTER_REPLICA_LOG_DIRS_REQUEST_V1}; - } - - private final Map<TopicPartition, String> partitionDirs; + private final AlterReplicaLogDirsRequestData data; public static class Builder extends AbstractRequest.Builder<AlterReplicaLogDirsRequest> { - private final Map<TopicPartition, String> partitionDirs; + private final AlterReplicaLogDirsRequestData data; - public Builder(Map<TopicPartition, String> partitionDirs) { + public Builder(AlterReplicaLogDirsRequestData data) { super(ApiKeys.ALTER_REPLICA_LOG_DIRS); - this.partitionDirs = partitionDirs; + this.data = data; } @Override public AlterReplicaLogDirsRequest build(short version) { - return new AlterReplicaLogDirsRequest(partitionDirs, version); + return new AlterReplicaLogDirsRequest(data, version); } @Override public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("(type=AlterReplicaLogDirsRequest") - .append(", partitionDirs=") - .append(partitionDirs) - .append(")"); - return builder.toString(); + return data.toString(); } } public AlterReplicaLogDirsRequest(Struct struct, short version) { super(ApiKeys.ALTER_REPLICA_LOG_DIRS, version); - partitionDirs = new HashMap<>(); - for (Object logDirStructObj : struct.getArray(LOG_DIRS_KEY_NAME)) { - Struct logDirStruct = (Struct) logDirStructObj; - String logDir = logDirStruct.getString(LOG_DIR_KEY_NAME); - for (Object topicStructObj : logDirStruct.getArray(TOPICS_KEY_NAME)) { - Struct topicStruct = (Struct) topicStructObj; - String topic = topicStruct.get(TOPIC_NAME); - for (Object partitionObj : topicStruct.getArray(PARTITIONS_KEY_NAME)) { - int partition = (Integer) partitionObj; - partitionDirs.put(new TopicPartition(topic, partition), logDir); - } - } - } + this.data = new AlterReplicaLogDirsRequestData(struct, version); } - public AlterReplicaLogDirsRequest(Map<TopicPartition, String> partitionDirs, short version) { + public AlterReplicaLogDirsRequest(AlterReplicaLogDirsRequestData data, short version) { super(ApiKeys.ALTER_REPLICA_LOG_DIRS, version); - this.partitionDirs = partitionDirs; + this.data = data; } @Override protected Struct toStruct() { - Map<String, List<TopicPartition>> dirPartitions = new HashMap<>(); - for (Map.Entry<TopicPartition, String> entry: partitionDirs.entrySet()) { - if (!dirPartitions.containsKey(entry.getValue())) - dirPartitions.put(entry.getValue(), new ArrayList<>()); - dirPartitions.get(entry.getValue()).add(entry.getKey()); - } - - Struct struct = new Struct(ApiKeys.ALTER_REPLICA_LOG_DIRS.requestSchema(version())); - List<Struct> logDirStructArray = new ArrayList<>(); - for (Map.Entry<String, List<TopicPartition>> logDirEntry: dirPartitions.entrySet()) { - Struct logDirStruct = struct.instance(LOG_DIRS_KEY_NAME); - logDirStruct.set(LOG_DIR_KEY_NAME, logDirEntry.getKey()); - - List<Struct> topicStructArray = new ArrayList<>(); - for (Map.Entry<String, List<Integer>> topicEntry: CollectionUtils.groupPartitionsByTopic(logDirEntry.getValue()).entrySet()) { - Struct topicStruct = logDirStruct.instance(TOPICS_KEY_NAME); - topicStruct.set(TOPIC_NAME, topicEntry.getKey()); - topicStruct.set(PARTITIONS_KEY_NAME, topicEntry.getValue().toArray()); - topicStructArray.add(topicStruct); - } - logDirStruct.set(TOPICS_KEY_NAME, topicStructArray.toArray()); - logDirStructArray.add(logDirStruct); - } - struct.set(LOG_DIRS_KEY_NAME, logDirStructArray.toArray()); - return struct; + return data.toStruct(version()); } @Override - public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { - Map<TopicPartition, Errors> responseMap = new HashMap<>(); - for (Map.Entry<TopicPartition, String> entry : partitionDirs.entrySet()) { - responseMap.put(entry.getKey(), Errors.forException(e)); - } - return new AlterReplicaLogDirsResponse(throttleTimeMs, responseMap); + public AlterReplicaLogDirsResponse getErrorResponse(int throttleTimeMs, Throwable e) { + AlterReplicaLogDirsResponseData data = new AlterReplicaLogDirsResponseData(); + data.setResults(this.data.dirs().stream().flatMap(alterDir -> + alterDir.topics().stream().map(topic -> + new AlterReplicaLogDirTopicResult() + .setTopicName(topic.name()) + .setPartitions(topic.partitions().stream().map(partitionId -> + new AlterReplicaLogDirsResponseData.AlterReplicaLogDirPartitionResult() + .setErrorCode(Errors.forException(e).code()) + .setPartitionIndex(partitionId)).collect(Collectors.toList())))).collect(Collectors.toList())); + return new AlterReplicaLogDirsResponse(data.setThrottleTimeMs(throttleTimeMs)); } public Map<TopicPartition, String> partitionDirs() { - return partitionDirs; + Map<TopicPartition, String> result = new HashMap<>(); Review comment: Do you mind adding a unit test for this one in `AlterReplicaLogDirsRequestTest` for the sake of completeness? ########## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ########## @@ -3382,6 +3388,70 @@ public void testAlterClientQuotas() throws Exception { } } + @Test + public void testAlterReplicaLogDirsSuccess() throws Exception { + try (AdminClientUnitTestEnv env = mockClientEnv()) { + createAlterLogDirsResponse(env, env.cluster().nodeById(0), Errors.NONE, 0); + createAlterLogDirsResponse(env, env.cluster().nodeById(1), Errors.NONE, 0); + + TopicPartitionReplica tpr0 = new TopicPartitionReplica("topic", 0, 0); + TopicPartitionReplica tpr1 = new TopicPartitionReplica("topic", 0, 1); + + Map<TopicPartitionReplica, String> logDirs = new HashMap<>(); + assertNull(logDirs.put(tpr0, "/data0")); + assertNull(logDirs.put(tpr1, "/data1")); Review comment: `assertNull`s shouldn't be here but few lines bellow. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org