tombentley commented on a change in pull request #9007: URL: https://github.com/apache/kafka/pull/9007#discussion_r454941348
########## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ########## @@ -1057,6 +1059,205 @@ public void testDescribeConfigsUnrequested() throws Exception { } } + @Test + public void testDescribeLogDirs() throws ExecutionException, InterruptedException { + List<Integer> brokers = singletonList(0); + TopicPartition tp = new TopicPartition("topic", 12); + + try (AdminClientUnitTestEnv env = mockClientEnv()) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponseFrom(new DescribeLogDirsResponse( + new DescribeLogDirsResponseData().setResults(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsResult() + .setErrorCode(Errors.NONE.code()) + .setLogDir("/var/data/kafka") + .setTopics(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsTopic() + .setName(tp.topic()) + .setPartitions(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsPartition() + .setPartitionIndex(tp.partition()) + .setPartitionSize(1234567890) + .setIsFutureKey(false) + .setOffsetLag(0))))) + ))), env.cluster().nodeById(0)); + DescribeLogDirsResult result = env.adminClient().describeLogDirs(brokers); + Map<Integer, KafkaFuture<Map<String, LogDirDescription>>> descriptions = result.descriptions(); + assertEquals(Collections.singleton(0), descriptions.keySet()); + assertNotNull(descriptions.get(0)); + Map<String, LogDirDescription> descriptionsMap = descriptions.get(0).get(); + assertEquals(Collections.singleton("/var/data/kafka"), descriptionsMap.keySet()); + assertNull(descriptionsMap.get("/var/data/kafka").error()); + Map<TopicPartition, ReplicaInfo> descriptionsReplicaInfos = descriptionsMap.get("/var/data/kafka").replicaInfos(); + assertEquals(Collections.singleton(tp), descriptionsReplicaInfos.keySet()); + assertEquals(1234567890, descriptionsReplicaInfos.get(tp).size()); + assertEquals(0, descriptionsReplicaInfos.get(tp).offsetLag()); + assertFalse(descriptionsReplicaInfos.get(tp).isFuture()); Review comment: This might have less mileage than you expected because the different types mean we need two methods each with two call sites, rather than 4 call sites for a single method, but I've done it anyway. ########## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ########## @@ -1057,6 +1059,205 @@ public void testDescribeConfigsUnrequested() throws Exception { } } + @Test + public void testDescribeLogDirs() throws ExecutionException, InterruptedException { + List<Integer> brokers = singletonList(0); + TopicPartition tp = new TopicPartition("topic", 12); + + try (AdminClientUnitTestEnv env = mockClientEnv()) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponseFrom(new DescribeLogDirsResponse( + new DescribeLogDirsResponseData().setResults(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsResult() + .setErrorCode(Errors.NONE.code()) + .setLogDir("/var/data/kafka") + .setTopics(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsTopic() + .setName(tp.topic()) + .setPartitions(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsPartition() + .setPartitionIndex(tp.partition()) + .setPartitionSize(1234567890) + .setIsFutureKey(false) + .setOffsetLag(0))))) + ))), env.cluster().nodeById(0)); + DescribeLogDirsResult result = env.adminClient().describeLogDirs(brokers); + Map<Integer, KafkaFuture<Map<String, LogDirDescription>>> descriptions = result.descriptions(); + assertEquals(Collections.singleton(0), descriptions.keySet()); + assertNotNull(descriptions.get(0)); + Map<String, LogDirDescription> descriptionsMap = descriptions.get(0).get(); + assertEquals(Collections.singleton("/var/data/kafka"), descriptionsMap.keySet()); + assertNull(descriptionsMap.get("/var/data/kafka").error()); + Map<TopicPartition, ReplicaInfo> descriptionsReplicaInfos = descriptionsMap.get("/var/data/kafka").replicaInfos(); + assertEquals(Collections.singleton(tp), descriptionsReplicaInfos.keySet()); + assertEquals(1234567890, descriptionsReplicaInfos.get(tp).size()); + assertEquals(0, descriptionsReplicaInfos.get(tp).offsetLag()); + assertFalse(descriptionsReplicaInfos.get(tp).isFuture()); + + Map<Integer, Map<String, LogDirDescription>> allDescriptions = result.allDescriptions().get(); + assertEquals(Collections.singleton(0), allDescriptions.keySet()); + assertNotNull(allDescriptions.get(0)); + assertEquals(Collections.singleton("/var/data/kafka"), allDescriptions.get(0).keySet()); + assertNull(allDescriptions.get(0).get("/var/data/kafka").error()); + Map<TopicPartition, ReplicaInfo> allDescriptionsReplicInfos = allDescriptions.get(0).get("/var/data/kafka").replicaInfos(); + assertEquals(Collections.singleton(tp), allDescriptionsReplicInfos.keySet()); + assertEquals(1234567890, allDescriptionsReplicInfos.get(tp).size()); + assertEquals(0, allDescriptionsReplicInfos.get(tp).offsetLag()); + assertFalse(allDescriptionsReplicInfos.get(tp).isFuture()); + } + } + + @SuppressWarnings("deprecation") + @Test + public void testDescribeLogDirsDeprecated() throws ExecutionException, InterruptedException { + List<Integer> brokers = singletonList(0); + TopicPartition tp = new TopicPartition("topic", 12); + + try (AdminClientUnitTestEnv env = mockClientEnv()) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponseFrom(new DescribeLogDirsResponse( + new DescribeLogDirsResponseData().setResults(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsResult() + .setErrorCode(Errors.NONE.code()) + .setLogDir("/var/data/kafka") + .setTopics(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsTopic() + .setName(tp.topic()) + .setPartitions(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsPartition() + .setPartitionIndex(tp.partition()) + .setPartitionSize(1234567890) + .setIsFutureKey(false) + .setOffsetLag(0))))) + ))), env.cluster().nodeById(0)); + DescribeLogDirsResult result = env.adminClient().describeLogDirs(brokers); + + Map<Integer, KafkaFuture<Map<String, DescribeLogDirsResponse.LogDirInfo>>> deprecatedValues = result.values(); + assertEquals(Collections.singleton(0), deprecatedValues.keySet()); + assertNotNull(deprecatedValues.get(0)); + Map<String, DescribeLogDirsResponse.LogDirInfo> valuesMap = deprecatedValues.get(0).get(); + assertEquals(Collections.singleton("/var/data/kafka"), valuesMap.keySet()); + assertEquals(Errors.NONE, valuesMap.get("/var/data/kafka").error); + Map<TopicPartition, DescribeLogDirsResponse.ReplicaInfo> valuesReplicaInfos = + valuesMap.get("/var/data/kafka").replicaInfos; + assertEquals(Collections.singleton(tp), valuesReplicaInfos.keySet()); + assertEquals(1234567890, valuesReplicaInfos.get(tp).size); + assertEquals(0, valuesReplicaInfos.get(tp).offsetLag); + assertFalse(valuesReplicaInfos.get(tp).isFuture); + + Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>> deprecatedAll = result.all().get(); + assertEquals(Collections.singleton(0), deprecatedAll.keySet()); + assertNotNull(deprecatedAll.get(0)); + assertEquals(Collections.singleton("/var/data/kafka"), deprecatedAll.get(0).keySet()); + assertEquals(Errors.NONE, deprecatedAll.get(0).get("/var/data/kafka").error); + Map<TopicPartition, DescribeLogDirsResponse.ReplicaInfo> allReplicaInfos = + deprecatedAll.get(0).get("/var/data/kafka").replicaInfos; + assertEquals(Collections.singleton(tp), allReplicaInfos.keySet()); + assertEquals(1234567890, allReplicaInfos.get(tp).size); + assertEquals(0, allReplicaInfos.get(tp).offsetLag); + assertFalse(allReplicaInfos.get(tp).isFuture); + } + } + + @Test + public void testDescribeLogDirsOfflineDir() throws ExecutionException, InterruptedException { + List<Integer> brokers = singletonList(0); + + try (AdminClientUnitTestEnv env = mockClientEnv()) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponseFrom(new DescribeLogDirsResponse( + new DescribeLogDirsResponseData().setResults(asList(new DescribeLogDirsResponseData.DescribeLogDirsResult() + .setErrorCode(Errors.KAFKA_STORAGE_ERROR.code()) + .setLogDir("/var/data/kafka") + .setTopics(emptyList()) + ))), env.cluster().nodeById(0)); + DescribeLogDirsResult result = env.adminClient().describeLogDirs(brokers); + Map<Integer, KafkaFuture<Map<String, LogDirDescription>>> descriptions = result.descriptions(); + assertEquals(Collections.singleton(0), descriptions.keySet()); + assertNotNull(descriptions.get(0)); + Map<String, LogDirDescription> descriptionsMap = descriptions.get(0).get(); + assertEquals(Collections.singleton("/var/data/kafka"), descriptionsMap.keySet()); + assertEquals(KafkaStorageException.class, descriptionsMap.get("/var/data/kafka").error().getClass()); + assertEquals(emptySet(), descriptionsMap.get("/var/data/kafka").replicaInfos().keySet()); + + Map<Integer, Map<String, LogDirDescription>> allDescriptions = result.allDescriptions().get(); + assertEquals(Collections.singleton(0), allDescriptions.keySet()); + Map<String, LogDirDescription> allMap = allDescriptions.get(0); + assertNotNull(allMap); + assertEquals(Collections.singleton("/var/data/kafka"), allMap.keySet()); + assertEquals(KafkaStorageException.class, allMap.get("/var/data/kafka").error().getClass()); + assertEquals(emptySet(), allMap.get("/var/data/kafka").replicaInfos().keySet()); + } + } + + @SuppressWarnings("deprecation") + @Test + public void testDescribeLogDirsOfflineDirDeprecated() throws ExecutionException, InterruptedException { + List<Integer> brokers = singletonList(0); + + try (AdminClientUnitTestEnv env = mockClientEnv()) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponseFrom(new DescribeLogDirsResponse( + new DescribeLogDirsResponseData().setResults(asList(new DescribeLogDirsResponseData.DescribeLogDirsResult() + .setErrorCode(Errors.KAFKA_STORAGE_ERROR.code()) + .setLogDir("/var/data/kafka") + .setTopics(emptyList()) + ))), env.cluster().nodeById(0)); + DescribeLogDirsResult result = env.adminClient().describeLogDirs(brokers); + + Map<Integer, KafkaFuture<Map<String, DescribeLogDirsResponse.LogDirInfo>>> deprecatedValues = result.values(); + assertEquals(Collections.singleton(0), deprecatedValues.keySet()); + assertNotNull(deprecatedValues.get(0)); + Map<String, DescribeLogDirsResponse.LogDirInfo> valuesMap = deprecatedValues.get(0).get(); + assertEquals(Collections.singleton("/var/data/kafka"), valuesMap.keySet()); + assertEquals(Errors.KAFKA_STORAGE_ERROR, valuesMap.get("/var/data/kafka").error); + assertEquals(emptySet(), valuesMap.get("/var/data/kafka").replicaInfos.keySet()); + + Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>> deprecatedAll = result.all().get(); + assertEquals(Collections.singleton(0), deprecatedAll.keySet()); + Map<String, DescribeLogDirsResponse.LogDirInfo> allMap = deprecatedAll.get(0); + assertNotNull(allMap); + assertEquals(Collections.singleton("/var/data/kafka"), allMap.keySet()); + assertEquals(Errors.KAFKA_STORAGE_ERROR, allMap.get("/var/data/kafka").error); + assertEquals(emptySet(), allMap.get("/var/data/kafka").replicaInfos.keySet()); + } + } + + @Test + public void testDescribeReplicaLogDirs() throws ExecutionException, InterruptedException { Review comment: I added it to the existing test. Due to the new helper methods I felt this didn't really complicate the test very much and is also allows us to cover the case where the RPC returns `STORAGE_ERROR`. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org