dajac commented on a change in pull request #9007: URL: https://github.com/apache/kafka/pull/9007#discussion_r454572008
########## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ########## @@ -1057,6 +1059,205 @@ public void testDescribeConfigsUnrequested() throws Exception { } } + @Test + public void testDescribeLogDirs() throws ExecutionException, InterruptedException { + List<Integer> brokers = singletonList(0); + TopicPartition tp = new TopicPartition("topic", 12); + + try (AdminClientUnitTestEnv env = mockClientEnv()) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponseFrom(new DescribeLogDirsResponse( Review comment: nit: What about extracting the construction in a small helper method `prepareDescribeLogDirsResponse` that create a response for one LogDir and TopicPartition? It seems that the same block of code is used in many tests. ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ########## @@ -2354,32 +2374,31 @@ public DescribeReplicaLogDirsResult describeReplicaLogDirs(Collection<TopicParti @Override public void handleResponse(AbstractResponse abstractResponse) { DescribeLogDirsResponse response = (DescribeLogDirsResponse) abstractResponse; - for (Map.Entry<String, DescribeLogDirsResponse.LogDirInfo> responseEntry: response.logDirInfos().entrySet()) { + for (Map.Entry<String, LogDirDescription> responseEntry: logDirDescriptions(response).entrySet()) { String logDir = responseEntry.getKey(); - DescribeLogDirsResponse.LogDirInfo logDirInfo = responseEntry.getValue(); + LogDirDescription logDirInfo = responseEntry.getValue(); // No replica info will be provided if the log directory is offline - if (logDirInfo.error == Errors.KAFKA_STORAGE_ERROR) + if (logDirInfo.error() instanceof KafkaStorageException) continue; - if (logDirInfo.error != Errors.NONE) + if (logDirInfo.error() != null) handleFailure(new IllegalStateException( - "The error " + logDirInfo.error + " for log directory " + logDir + " in the response from broker " + brokerId + " is illegal")); + "The error " + logDirInfo.error().getClass().getName() + " for log directory " + logDir + " in the response from broker " + brokerId + " is illegal")); - for (Map.Entry<TopicPartition, DescribeLogDirsResponse.ReplicaInfo> replicaInfoEntry: logDirInfo.replicaInfos.entrySet()) { + for (Map.Entry<TopicPartition, ReplicaInfo> replicaInfoEntry: logDirInfo.replicaInfos().entrySet()) { TopicPartition tp = replicaInfoEntry.getKey(); - DescribeLogDirsResponse.ReplicaInfo replicaInfo = replicaInfoEntry.getValue(); + ReplicaInfo replicaInfo = replicaInfoEntry.getValue(); ReplicaLogDirInfo replicaLogDirInfo = replicaDirInfoByPartition.get(tp); if (replicaLogDirInfo == null) { - handleFailure(new IllegalStateException( - "The partition " + tp + " in the response from broker " + brokerId + " is not in the request")); - } else if (replicaInfo.isFuture) { + log.warn("Server response from broker {} mentioned unknown partition {}", brokerId, tp); Review comment: I suggest to add a unit test to cover this change. I think that the previous behaviour was a bug so it would be great to not reintroduce it in the future. ########## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ########## @@ -1057,6 +1059,205 @@ public void testDescribeConfigsUnrequested() throws Exception { } } + @Test + public void testDescribeLogDirs() throws ExecutionException, InterruptedException { + List<Integer> brokers = singletonList(0); + TopicPartition tp = new TopicPartition("topic", 12); + + try (AdminClientUnitTestEnv env = mockClientEnv()) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponseFrom(new DescribeLogDirsResponse( + new DescribeLogDirsResponseData().setResults(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsResult() + .setErrorCode(Errors.NONE.code()) + .setLogDir("/var/data/kafka") + .setTopics(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsTopic() + .setName(tp.topic()) + .setPartitions(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsPartition() + .setPartitionIndex(tp.partition()) + .setPartitionSize(1234567890) + .setIsFutureKey(false) + .setOffsetLag(0))))) + ))), env.cluster().nodeById(0)); + DescribeLogDirsResult result = env.adminClient().describeLogDirs(brokers); Review comment: nit: In some of the other tests, you have an empty line after calling the method of the admin client. Shall we add one everywhere in order to be consistent? I personally like to have one before and after to separate blocks of code. I leave this up to you. ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/LogDirDescription.java ########## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.ApiException; + +import java.util.Map; + +import static java.util.Collections.unmodifiableMap; + +/** + * A description of a log directory on a particular broker. + */ +public class LogDirDescription { + private final Map<TopicPartition, ReplicaInfo> replicaInfos; + private final ApiException error; + + public LogDirDescription(ApiException error, Map<TopicPartition, ReplicaInfo> replicaInfos) { Review comment: That's great point. At the moment, I think that we are not consistent about this. Some are package private and some are not. The advantage of keeping it public is that it allows to use the class in unit tests which resides in other packages. ########## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ########## @@ -1057,6 +1059,205 @@ public void testDescribeConfigsUnrequested() throws Exception { } } + @Test + public void testDescribeLogDirs() throws ExecutionException, InterruptedException { + List<Integer> brokers = singletonList(0); + TopicPartition tp = new TopicPartition("topic", 12); + + try (AdminClientUnitTestEnv env = mockClientEnv()) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponseFrom(new DescribeLogDirsResponse( + new DescribeLogDirsResponseData().setResults(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsResult() + .setErrorCode(Errors.NONE.code()) + .setLogDir("/var/data/kafka") + .setTopics(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsTopic() + .setName(tp.topic()) + .setPartitions(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsPartition() + .setPartitionIndex(tp.partition()) + .setPartitionSize(1234567890) + .setIsFutureKey(false) + .setOffsetLag(0))))) + ))), env.cluster().nodeById(0)); + DescribeLogDirsResult result = env.adminClient().describeLogDirs(brokers); + Map<Integer, KafkaFuture<Map<String, LogDirDescription>>> descriptions = result.descriptions(); + assertEquals(Collections.singleton(0), descriptions.keySet()); + assertNotNull(descriptions.get(0)); + Map<String, LogDirDescription> descriptionsMap = descriptions.get(0).get(); + assertEquals(Collections.singleton("/var/data/kafka"), descriptionsMap.keySet()); + assertNull(descriptionsMap.get("/var/data/kafka").error()); + Map<TopicPartition, ReplicaInfo> descriptionsReplicaInfos = descriptionsMap.get("/var/data/kafka").replicaInfos(); + assertEquals(Collections.singleton(tp), descriptionsReplicaInfos.keySet()); + assertEquals(1234567890, descriptionsReplicaInfos.get(tp).size()); + assertEquals(0, descriptionsReplicaInfos.get(tp).offsetLag()); + assertFalse(descriptionsReplicaInfos.get(tp).isFuture()); + + Map<Integer, Map<String, LogDirDescription>> allDescriptions = result.allDescriptions().get(); + assertEquals(Collections.singleton(0), allDescriptions.keySet()); + assertNotNull(allDescriptions.get(0)); + assertEquals(Collections.singleton("/var/data/kafka"), allDescriptions.get(0).keySet()); + assertNull(allDescriptions.get(0).get("/var/data/kafka").error()); + Map<TopicPartition, ReplicaInfo> allDescriptionsReplicInfos = allDescriptions.get(0).get("/var/data/kafka").replicaInfos(); + assertEquals(Collections.singleton(tp), allDescriptionsReplicInfos.keySet()); + assertEquals(1234567890, allDescriptionsReplicInfos.get(tp).size()); + assertEquals(0, allDescriptionsReplicInfos.get(tp).offsetLag()); + assertFalse(allDescriptionsReplicInfos.get(tp).isFuture()); + } + } + + @SuppressWarnings("deprecation") + @Test + public void testDescribeLogDirsDeprecated() throws ExecutionException, InterruptedException { + List<Integer> brokers = singletonList(0); + TopicPartition tp = new TopicPartition("topic", 12); + + try (AdminClientUnitTestEnv env = mockClientEnv()) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponseFrom(new DescribeLogDirsResponse( + new DescribeLogDirsResponseData().setResults(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsResult() + .setErrorCode(Errors.NONE.code()) + .setLogDir("/var/data/kafka") + .setTopics(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsTopic() + .setName(tp.topic()) + .setPartitions(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsPartition() + .setPartitionIndex(tp.partition()) + .setPartitionSize(1234567890) + .setIsFutureKey(false) + .setOffsetLag(0))))) + ))), env.cluster().nodeById(0)); + DescribeLogDirsResult result = env.adminClient().describeLogDirs(brokers); + + Map<Integer, KafkaFuture<Map<String, DescribeLogDirsResponse.LogDirInfo>>> deprecatedValues = result.values(); + assertEquals(Collections.singleton(0), deprecatedValues.keySet()); + assertNotNull(deprecatedValues.get(0)); + Map<String, DescribeLogDirsResponse.LogDirInfo> valuesMap = deprecatedValues.get(0).get(); + assertEquals(Collections.singleton("/var/data/kafka"), valuesMap.keySet()); + assertEquals(Errors.NONE, valuesMap.get("/var/data/kafka").error); + Map<TopicPartition, DescribeLogDirsResponse.ReplicaInfo> valuesReplicaInfos = + valuesMap.get("/var/data/kafka").replicaInfos; + assertEquals(Collections.singleton(tp), valuesReplicaInfos.keySet()); + assertEquals(1234567890, valuesReplicaInfos.get(tp).size); + assertEquals(0, valuesReplicaInfos.get(tp).offsetLag); + assertFalse(valuesReplicaInfos.get(tp).isFuture); + + Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>> deprecatedAll = result.all().get(); + assertEquals(Collections.singleton(0), deprecatedAll.keySet()); + assertNotNull(deprecatedAll.get(0)); + assertEquals(Collections.singleton("/var/data/kafka"), deprecatedAll.get(0).keySet()); + assertEquals(Errors.NONE, deprecatedAll.get(0).get("/var/data/kafka").error); + Map<TopicPartition, DescribeLogDirsResponse.ReplicaInfo> allReplicaInfos = + deprecatedAll.get(0).get("/var/data/kafka").replicaInfos; + assertEquals(Collections.singleton(tp), allReplicaInfos.keySet()); + assertEquals(1234567890, allReplicaInfos.get(tp).size); + assertEquals(0, allReplicaInfos.get(tp).offsetLag); + assertFalse(allReplicaInfos.get(tp).isFuture); + } + } + + @Test + public void testDescribeLogDirsOfflineDir() throws ExecutionException, InterruptedException { + List<Integer> brokers = singletonList(0); + + try (AdminClientUnitTestEnv env = mockClientEnv()) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponseFrom(new DescribeLogDirsResponse( + new DescribeLogDirsResponseData().setResults(asList(new DescribeLogDirsResponseData.DescribeLogDirsResult() + .setErrorCode(Errors.KAFKA_STORAGE_ERROR.code()) + .setLogDir("/var/data/kafka") + .setTopics(emptyList()) + ))), env.cluster().nodeById(0)); + DescribeLogDirsResult result = env.adminClient().describeLogDirs(brokers); + Map<Integer, KafkaFuture<Map<String, LogDirDescription>>> descriptions = result.descriptions(); + assertEquals(Collections.singleton(0), descriptions.keySet()); + assertNotNull(descriptions.get(0)); + Map<String, LogDirDescription> descriptionsMap = descriptions.get(0).get(); + assertEquals(Collections.singleton("/var/data/kafka"), descriptionsMap.keySet()); + assertEquals(KafkaStorageException.class, descriptionsMap.get("/var/data/kafka").error().getClass()); + assertEquals(emptySet(), descriptionsMap.get("/var/data/kafka").replicaInfos().keySet()); + + Map<Integer, Map<String, LogDirDescription>> allDescriptions = result.allDescriptions().get(); + assertEquals(Collections.singleton(0), allDescriptions.keySet()); + Map<String, LogDirDescription> allMap = allDescriptions.get(0); + assertNotNull(allMap); + assertEquals(Collections.singleton("/var/data/kafka"), allMap.keySet()); + assertEquals(KafkaStorageException.class, allMap.get("/var/data/kafka").error().getClass()); + assertEquals(emptySet(), allMap.get("/var/data/kafka").replicaInfos().keySet()); + } + } + + @SuppressWarnings("deprecation") + @Test + public void testDescribeLogDirsOfflineDirDeprecated() throws ExecutionException, InterruptedException { + List<Integer> brokers = singletonList(0); + + try (AdminClientUnitTestEnv env = mockClientEnv()) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponseFrom(new DescribeLogDirsResponse( + new DescribeLogDirsResponseData().setResults(asList(new DescribeLogDirsResponseData.DescribeLogDirsResult() + .setErrorCode(Errors.KAFKA_STORAGE_ERROR.code()) + .setLogDir("/var/data/kafka") + .setTopics(emptyList()) + ))), env.cluster().nodeById(0)); + DescribeLogDirsResult result = env.adminClient().describeLogDirs(brokers); + + Map<Integer, KafkaFuture<Map<String, DescribeLogDirsResponse.LogDirInfo>>> deprecatedValues = result.values(); + assertEquals(Collections.singleton(0), deprecatedValues.keySet()); + assertNotNull(deprecatedValues.get(0)); + Map<String, DescribeLogDirsResponse.LogDirInfo> valuesMap = deprecatedValues.get(0).get(); + assertEquals(Collections.singleton("/var/data/kafka"), valuesMap.keySet()); + assertEquals(Errors.KAFKA_STORAGE_ERROR, valuesMap.get("/var/data/kafka").error); + assertEquals(emptySet(), valuesMap.get("/var/data/kafka").replicaInfos.keySet()); + + Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>> deprecatedAll = result.all().get(); + assertEquals(Collections.singleton(0), deprecatedAll.keySet()); + Map<String, DescribeLogDirsResponse.LogDirInfo> allMap = deprecatedAll.get(0); + assertNotNull(allMap); + assertEquals(Collections.singleton("/var/data/kafka"), allMap.keySet()); + assertEquals(Errors.KAFKA_STORAGE_ERROR, allMap.get("/var/data/kafka").error); + assertEquals(emptySet(), allMap.get("/var/data/kafka").replicaInfos.keySet()); + } + } + + @Test + public void testDescribeReplicaLogDirs() throws ExecutionException, InterruptedException { Review comment: This is not due to your PR but shall we add a unit test which uses multiple brokers? ########## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ########## @@ -1057,6 +1059,205 @@ public void testDescribeConfigsUnrequested() throws Exception { } } + @Test + public void testDescribeLogDirs() throws ExecutionException, InterruptedException { + List<Integer> brokers = singletonList(0); + TopicPartition tp = new TopicPartition("topic", 12); + + try (AdminClientUnitTestEnv env = mockClientEnv()) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponseFrom(new DescribeLogDirsResponse( + new DescribeLogDirsResponseData().setResults(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsResult() + .setErrorCode(Errors.NONE.code()) + .setLogDir("/var/data/kafka") + .setTopics(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsTopic() + .setName(tp.topic()) + .setPartitions(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsPartition() + .setPartitionIndex(tp.partition()) + .setPartitionSize(1234567890) + .setIsFutureKey(false) + .setOffsetLag(0))))) + ))), env.cluster().nodeById(0)); + DescribeLogDirsResult result = env.adminClient().describeLogDirs(brokers); + Map<Integer, KafkaFuture<Map<String, LogDirDescription>>> descriptions = result.descriptions(); + assertEquals(Collections.singleton(0), descriptions.keySet()); Review comment: nit: You can reuse `brokers` here. Would it make sense to allow extract the other constants in local variables? ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ########## @@ -2286,13 +2288,15 @@ public DescribeLogDirsResult describeLogDirs(Collection<Integer> brokers, Descri return new DescribeLogDirsRequest.Builder(new DescribeLogDirsRequestData().setTopics(null)); } + @SuppressWarnings("deprecation") @Override public void handleResponse(AbstractResponse abstractResponse) { DescribeLogDirsResponse response = (DescribeLogDirsResponse) abstractResponse; - if (response.logDirInfos().size() > 0) { - future.complete(response.logDirInfos()); + Map<String, LogDirDescription> descriptions = logDirDescriptions(response); + if (descriptions.size() > 0) { + future.complete(descriptions); } else { - // response.logDirInfos() will be empty if and only if the user is not authorized to describe clsuter resource. + // response/descriptions will be empty if and only if the user is not authorized to describe cluster resource. Review comment: nit: shall we remove `response/`? ########## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ########## @@ -1057,6 +1059,205 @@ public void testDescribeConfigsUnrequested() throws Exception { } } + @Test + public void testDescribeLogDirs() throws ExecutionException, InterruptedException { + List<Integer> brokers = singletonList(0); + TopicPartition tp = new TopicPartition("topic", 12); + + try (AdminClientUnitTestEnv env = mockClientEnv()) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponseFrom(new DescribeLogDirsResponse( + new DescribeLogDirsResponseData().setResults(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsResult() + .setErrorCode(Errors.NONE.code()) + .setLogDir("/var/data/kafka") + .setTopics(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsTopic() + .setName(tp.topic()) + .setPartitions(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsPartition() + .setPartitionIndex(tp.partition()) + .setPartitionSize(1234567890) + .setIsFutureKey(false) + .setOffsetLag(0))))) + ))), env.cluster().nodeById(0)); + DescribeLogDirsResult result = env.adminClient().describeLogDirs(brokers); + Map<Integer, KafkaFuture<Map<String, LogDirDescription>>> descriptions = result.descriptions(); + assertEquals(Collections.singleton(0), descriptions.keySet()); + assertNotNull(descriptions.get(0)); + Map<String, LogDirDescription> descriptionsMap = descriptions.get(0).get(); + assertEquals(Collections.singleton("/var/data/kafka"), descriptionsMap.keySet()); + assertNull(descriptionsMap.get("/var/data/kafka").error()); + Map<TopicPartition, ReplicaInfo> descriptionsReplicaInfos = descriptionsMap.get("/var/data/kafka").replicaInfos(); + assertEquals(Collections.singleton(tp), descriptionsReplicaInfos.keySet()); + assertEquals(1234567890, descriptionsReplicaInfos.get(tp).size()); + assertEquals(0, descriptionsReplicaInfos.get(tp).offsetLag()); + assertFalse(descriptionsReplicaInfos.get(tp).isFuture()); Review comment: This block of assertions is used multiple times. Would it make sense to extract it in a helper method, say `assertDescriptions`, that verifies a descriptions map contains the information about a single log dir/topic partition? Something like `assertDescriptionContains(descriptionsMap, logDir, tp, size, offsetLag, isFuture)`. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org