tombentley commented on a change in pull request #9007:
URL: https://github.com/apache/kafka/pull/9007#discussion_r454941348



##########
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -1057,6 +1059,205 @@ public void testDescribeConfigsUnrequested() throws 
Exception {
         }
     }
 
+    @Test
+    public void testDescribeLogDirs() throws ExecutionException, 
InterruptedException {
+        List<Integer> brokers = singletonList(0);
+        TopicPartition tp = new TopicPartition("topic", 12);
+
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponseFrom(new DescribeLogDirsResponse(
+                    new 
DescribeLogDirsResponseData().setResults(singletonList(new 
DescribeLogDirsResponseData.DescribeLogDirsResult()
+                            .setErrorCode(Errors.NONE.code())
+                            .setLogDir("/var/data/kafka")
+                            .setTopics(singletonList(new 
DescribeLogDirsResponseData.DescribeLogDirsTopic()
+                                    .setName(tp.topic())
+                                    .setPartitions(singletonList(new 
DescribeLogDirsResponseData.DescribeLogDirsPartition()
+                                            .setPartitionIndex(tp.partition())
+                                            .setPartitionSize(1234567890)
+                                            .setIsFutureKey(false)
+                                            .setOffsetLag(0)))))
+                    ))), env.cluster().nodeById(0));
+            DescribeLogDirsResult result = 
env.adminClient().describeLogDirs(brokers);
+            Map<Integer, KafkaFuture<Map<String, LogDirDescription>>> 
descriptions = result.descriptions();
+            assertEquals(Collections.singleton(0), descriptions.keySet());
+            assertNotNull(descriptions.get(0));
+            Map<String, LogDirDescription> descriptionsMap = 
descriptions.get(0).get();
+            assertEquals(Collections.singleton("/var/data/kafka"), 
descriptionsMap.keySet());
+            assertNull(descriptionsMap.get("/var/data/kafka").error());
+            Map<TopicPartition, ReplicaInfo> descriptionsReplicaInfos = 
descriptionsMap.get("/var/data/kafka").replicaInfos();
+            assertEquals(Collections.singleton(tp), 
descriptionsReplicaInfos.keySet());
+            assertEquals(1234567890, descriptionsReplicaInfos.get(tp).size());
+            assertEquals(0, descriptionsReplicaInfos.get(tp).offsetLag());
+            assertFalse(descriptionsReplicaInfos.get(tp).isFuture());

Review comment:
       This might have less mileage than you expected because the different 
types mean we need two methods each with two call sites, rather than 4 call 
sites for a single method, but I've done it anyway.

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -1057,6 +1059,205 @@ public void testDescribeConfigsUnrequested() throws 
Exception {
         }
     }
 
+    @Test
+    public void testDescribeLogDirs() throws ExecutionException, 
InterruptedException {
+        List<Integer> brokers = singletonList(0);
+        TopicPartition tp = new TopicPartition("topic", 12);
+
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponseFrom(new DescribeLogDirsResponse(
+                    new 
DescribeLogDirsResponseData().setResults(singletonList(new 
DescribeLogDirsResponseData.DescribeLogDirsResult()
+                            .setErrorCode(Errors.NONE.code())
+                            .setLogDir("/var/data/kafka")
+                            .setTopics(singletonList(new 
DescribeLogDirsResponseData.DescribeLogDirsTopic()
+                                    .setName(tp.topic())
+                                    .setPartitions(singletonList(new 
DescribeLogDirsResponseData.DescribeLogDirsPartition()
+                                            .setPartitionIndex(tp.partition())
+                                            .setPartitionSize(1234567890)
+                                            .setIsFutureKey(false)
+                                            .setOffsetLag(0)))))
+                    ))), env.cluster().nodeById(0));
+            DescribeLogDirsResult result = 
env.adminClient().describeLogDirs(brokers);
+            Map<Integer, KafkaFuture<Map<String, LogDirDescription>>> 
descriptions = result.descriptions();
+            assertEquals(Collections.singleton(0), descriptions.keySet());
+            assertNotNull(descriptions.get(0));
+            Map<String, LogDirDescription> descriptionsMap = 
descriptions.get(0).get();
+            assertEquals(Collections.singleton("/var/data/kafka"), 
descriptionsMap.keySet());
+            assertNull(descriptionsMap.get("/var/data/kafka").error());
+            Map<TopicPartition, ReplicaInfo> descriptionsReplicaInfos = 
descriptionsMap.get("/var/data/kafka").replicaInfos();
+            assertEquals(Collections.singleton(tp), 
descriptionsReplicaInfos.keySet());
+            assertEquals(1234567890, descriptionsReplicaInfos.get(tp).size());
+            assertEquals(0, descriptionsReplicaInfos.get(tp).offsetLag());
+            assertFalse(descriptionsReplicaInfos.get(tp).isFuture());
+
+            Map<Integer, Map<String, LogDirDescription>> allDescriptions = 
result.allDescriptions().get();
+            assertEquals(Collections.singleton(0), allDescriptions.keySet());
+            assertNotNull(allDescriptions.get(0));
+            assertEquals(Collections.singleton("/var/data/kafka"), 
allDescriptions.get(0).keySet());
+            assertNull(allDescriptions.get(0).get("/var/data/kafka").error());
+            Map<TopicPartition, ReplicaInfo> allDescriptionsReplicInfos = 
allDescriptions.get(0).get("/var/data/kafka").replicaInfos();
+            assertEquals(Collections.singleton(tp), 
allDescriptionsReplicInfos.keySet());
+            assertEquals(1234567890, 
allDescriptionsReplicInfos.get(tp).size());
+            assertEquals(0, allDescriptionsReplicInfos.get(tp).offsetLag());
+            assertFalse(allDescriptionsReplicInfos.get(tp).isFuture());
+        }
+    }
+
+    @SuppressWarnings("deprecation")
+    @Test
+    public void testDescribeLogDirsDeprecated() throws ExecutionException, 
InterruptedException {
+        List<Integer> brokers = singletonList(0);
+        TopicPartition tp = new TopicPartition("topic", 12);
+
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponseFrom(new DescribeLogDirsResponse(
+                    new 
DescribeLogDirsResponseData().setResults(singletonList(new 
DescribeLogDirsResponseData.DescribeLogDirsResult()
+                            .setErrorCode(Errors.NONE.code())
+                            .setLogDir("/var/data/kafka")
+                            .setTopics(singletonList(new 
DescribeLogDirsResponseData.DescribeLogDirsTopic()
+                                    .setName(tp.topic())
+                                    .setPartitions(singletonList(new 
DescribeLogDirsResponseData.DescribeLogDirsPartition()
+                                            .setPartitionIndex(tp.partition())
+                                            .setPartitionSize(1234567890)
+                                            .setIsFutureKey(false)
+                                            .setOffsetLag(0)))))
+                    ))), env.cluster().nodeById(0));
+            DescribeLogDirsResult result = 
env.adminClient().describeLogDirs(brokers);
+
+            Map<Integer, KafkaFuture<Map<String, 
DescribeLogDirsResponse.LogDirInfo>>> deprecatedValues = result.values();
+            assertEquals(Collections.singleton(0), deprecatedValues.keySet());
+            assertNotNull(deprecatedValues.get(0));
+            Map<String, DescribeLogDirsResponse.LogDirInfo> valuesMap = 
deprecatedValues.get(0).get();
+            assertEquals(Collections.singleton("/var/data/kafka"), 
valuesMap.keySet());
+            assertEquals(Errors.NONE, valuesMap.get("/var/data/kafka").error);
+            Map<TopicPartition, DescribeLogDirsResponse.ReplicaInfo> 
valuesReplicaInfos =
+                    valuesMap.get("/var/data/kafka").replicaInfos;
+            assertEquals(Collections.singleton(tp), 
valuesReplicaInfos.keySet());
+            assertEquals(1234567890, valuesReplicaInfos.get(tp).size);
+            assertEquals(0, valuesReplicaInfos.get(tp).offsetLag);
+            assertFalse(valuesReplicaInfos.get(tp).isFuture);
+
+            Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>> 
deprecatedAll = result.all().get();
+            assertEquals(Collections.singleton(0), deprecatedAll.keySet());
+            assertNotNull(deprecatedAll.get(0));
+            assertEquals(Collections.singleton("/var/data/kafka"), 
deprecatedAll.get(0).keySet());
+            assertEquals(Errors.NONE, 
deprecatedAll.get(0).get("/var/data/kafka").error);
+            Map<TopicPartition, DescribeLogDirsResponse.ReplicaInfo> 
allReplicaInfos =
+                    deprecatedAll.get(0).get("/var/data/kafka").replicaInfos;
+            assertEquals(Collections.singleton(tp), allReplicaInfos.keySet());
+            assertEquals(1234567890, allReplicaInfos.get(tp).size);
+            assertEquals(0, allReplicaInfos.get(tp).offsetLag);
+            assertFalse(allReplicaInfos.get(tp).isFuture);
+        }
+    }
+
+    @Test
+    public void testDescribeLogDirsOfflineDir() throws ExecutionException, 
InterruptedException {
+        List<Integer> brokers = singletonList(0);
+
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponseFrom(new DescribeLogDirsResponse(
+                    new DescribeLogDirsResponseData().setResults(asList(new 
DescribeLogDirsResponseData.DescribeLogDirsResult()
+                            .setErrorCode(Errors.KAFKA_STORAGE_ERROR.code())
+                            .setLogDir("/var/data/kafka")
+                            .setTopics(emptyList())
+                    ))), env.cluster().nodeById(0));
+            DescribeLogDirsResult result = 
env.adminClient().describeLogDirs(brokers);
+            Map<Integer, KafkaFuture<Map<String, LogDirDescription>>> 
descriptions = result.descriptions();
+            assertEquals(Collections.singleton(0), descriptions.keySet());
+            assertNotNull(descriptions.get(0));
+            Map<String, LogDirDescription> descriptionsMap = 
descriptions.get(0).get();
+            assertEquals(Collections.singleton("/var/data/kafka"), 
descriptionsMap.keySet());
+            assertEquals(KafkaStorageException.class, 
descriptionsMap.get("/var/data/kafka").error().getClass());
+            assertEquals(emptySet(), 
descriptionsMap.get("/var/data/kafka").replicaInfos().keySet());
+
+            Map<Integer, Map<String, LogDirDescription>> allDescriptions = 
result.allDescriptions().get();
+            assertEquals(Collections.singleton(0), allDescriptions.keySet());
+            Map<String, LogDirDescription> allMap = allDescriptions.get(0);
+            assertNotNull(allMap);
+            assertEquals(Collections.singleton("/var/data/kafka"), 
allMap.keySet());
+            assertEquals(KafkaStorageException.class, 
allMap.get("/var/data/kafka").error().getClass());
+            assertEquals(emptySet(), 
allMap.get("/var/data/kafka").replicaInfos().keySet());
+        }
+    }
+
+    @SuppressWarnings("deprecation")
+    @Test
+    public void testDescribeLogDirsOfflineDirDeprecated() throws 
ExecutionException, InterruptedException {
+        List<Integer> brokers = singletonList(0);
+
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponseFrom(new DescribeLogDirsResponse(
+                    new DescribeLogDirsResponseData().setResults(asList(new 
DescribeLogDirsResponseData.DescribeLogDirsResult()
+                            .setErrorCode(Errors.KAFKA_STORAGE_ERROR.code())
+                            .setLogDir("/var/data/kafka")
+                            .setTopics(emptyList())
+                    ))), env.cluster().nodeById(0));
+            DescribeLogDirsResult result = 
env.adminClient().describeLogDirs(brokers);
+
+            Map<Integer, KafkaFuture<Map<String, 
DescribeLogDirsResponse.LogDirInfo>>> deprecatedValues = result.values();
+            assertEquals(Collections.singleton(0), deprecatedValues.keySet());
+            assertNotNull(deprecatedValues.get(0));
+            Map<String, DescribeLogDirsResponse.LogDirInfo> valuesMap = 
deprecatedValues.get(0).get();
+            assertEquals(Collections.singleton("/var/data/kafka"), 
valuesMap.keySet());
+            assertEquals(Errors.KAFKA_STORAGE_ERROR, 
valuesMap.get("/var/data/kafka").error);
+            assertEquals(emptySet(), 
valuesMap.get("/var/data/kafka").replicaInfos.keySet());
+
+            Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>> 
deprecatedAll = result.all().get();
+            assertEquals(Collections.singleton(0), deprecatedAll.keySet());
+            Map<String, DescribeLogDirsResponse.LogDirInfo> allMap = 
deprecatedAll.get(0);
+            assertNotNull(allMap);
+            assertEquals(Collections.singleton("/var/data/kafka"), 
allMap.keySet());
+            assertEquals(Errors.KAFKA_STORAGE_ERROR, 
allMap.get("/var/data/kafka").error);
+            assertEquals(emptySet(), 
allMap.get("/var/data/kafka").replicaInfos.keySet());
+        }
+    }
+
+    @Test
+    public void testDescribeReplicaLogDirs() throws ExecutionException, 
InterruptedException {

Review comment:
       I added it to the existing test. Due to the new helper methods I felt 
this didn't really complicate the test very much and is also allows us to cover 
the case where the RPC returns `STORAGE_ERROR`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to