mimaison commented on a change in pull request #9007:
URL: https://github.com/apache/kafka/pull/9007#discussion_r459540733



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -2306,6 +2310,22 @@ void handleFailure(Throwable throwable) {
         return new DescribeLogDirsResult(new HashMap<>(futures));
     }
 
+    private Map<String, LogDirDescription> 
logDirDescriptions(DescribeLogDirsResponse response) {

Review comment:
       This can be static. Also should we keep it in `DescribeLogDirsResponse`?

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -1057,6 +1059,263 @@ public void testDescribeConfigsUnrequested() throws 
Exception {
         }
     }
 
+    private DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors 
error, String logDir, TopicPartition tp, long partitionSize, long offsetLag) {
+        return prepareDescribeLogDirsResponse(error, logDir,
+                prepareDescribeLogDirsTopics(partitionSize, offsetLag, 
tp.topic(), tp.partition(), false));
+    }
+
+    private List<DescribeLogDirsTopic> prepareDescribeLogDirsTopics(
+            long partitionSize, long offsetLag, String topic, int partition, 
boolean isFuture) {
+        return singletonList(new DescribeLogDirsTopic()
+                .setName(topic)
+                .setPartitions(singletonList(new 
DescribeLogDirsResponseData.DescribeLogDirsPartition()
+                        .setPartitionIndex(partition)
+                        .setPartitionSize(partitionSize)
+                        .setIsFutureKey(isFuture)
+                        .setOffsetLag(offsetLag))));
+    }
+
+    private DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors 
error, String logDir,
+                                                                   
List<DescribeLogDirsTopic> topics) {
+        return new DescribeLogDirsResponse(
+                new DescribeLogDirsResponseData().setResults(singletonList(new 
DescribeLogDirsResponseData.DescribeLogDirsResult()
+                        .setErrorCode(error.code())
+                        .setLogDir(logDir)
+                        .setTopics(topics)
+                )));
+    }
+
+    @Test
+    public void testDescribeLogDirs() throws ExecutionException, 
InterruptedException {
+        Set<Integer> brokers = Collections.singleton(0);
+        String logDir = "/var/data/kafka";
+        TopicPartition tp = new TopicPartition("topic", 12);
+        long partitionSize = 1234567890;
+        long offsetLag = 24;
+
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponseFrom(
+                    prepareDescribeLogDirsResponse(Errors.NONE, logDir, tp, 
partitionSize, offsetLag),
+                    env.cluster().nodeById(0));
+
+            DescribeLogDirsResult result = 
env.adminClient().describeLogDirs(brokers);
+
+            Map<Integer, KafkaFuture<Map<String, LogDirDescription>>> 
descriptions = result.descriptions();
+            assertEquals(brokers, descriptions.keySet());
+            assertNotNull(descriptions.get(0));
+            assertDescriptionContains(descriptions.get(0).get(), logDir, tp, 
partitionSize, offsetLag);
+
+            Map<Integer, Map<String, LogDirDescription>> allDescriptions = 
result.allDescriptions().get();
+            assertEquals(brokers, allDescriptions.keySet());
+            assertDescriptionContains(allDescriptions.get(0), logDir, tp, 
partitionSize, offsetLag);
+        }
+    }
+
+    private void assertDescriptionContains(Map<String, LogDirDescription> 
descriptionsMap, String logDir,

Review comment:
       This can be static

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -1057,6 +1059,263 @@ public void testDescribeConfigsUnrequested() throws 
Exception {
         }
     }
 
+    private DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors 
error, String logDir, TopicPartition tp, long partitionSize, long offsetLag) {

Review comment:
       This can be static

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -1057,6 +1059,263 @@ public void testDescribeConfigsUnrequested() throws 
Exception {
         }
     }
 
+    private DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors 
error, String logDir, TopicPartition tp, long partitionSize, long offsetLag) {
+        return prepareDescribeLogDirsResponse(error, logDir,
+                prepareDescribeLogDirsTopics(partitionSize, offsetLag, 
tp.topic(), tp.partition(), false));
+    }
+
+    private List<DescribeLogDirsTopic> prepareDescribeLogDirsTopics(

Review comment:
       This can be static

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -1057,6 +1059,263 @@ public void testDescribeConfigsUnrequested() throws 
Exception {
         }
     }
 
+    private DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors 
error, String logDir, TopicPartition tp, long partitionSize, long offsetLag) {
+        return prepareDescribeLogDirsResponse(error, logDir,
+                prepareDescribeLogDirsTopics(partitionSize, offsetLag, 
tp.topic(), tp.partition(), false));
+    }
+
+    private List<DescribeLogDirsTopic> prepareDescribeLogDirsTopics(
+            long partitionSize, long offsetLag, String topic, int partition, 
boolean isFuture) {
+        return singletonList(new DescribeLogDirsTopic()
+                .setName(topic)
+                .setPartitions(singletonList(new 
DescribeLogDirsResponseData.DescribeLogDirsPartition()
+                        .setPartitionIndex(partition)
+                        .setPartitionSize(partitionSize)
+                        .setIsFutureKey(isFuture)
+                        .setOffsetLag(offsetLag))));
+    }
+
+    private DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors 
error, String logDir,
+                                                                   
List<DescribeLogDirsTopic> topics) {
+        return new DescribeLogDirsResponse(
+                new DescribeLogDirsResponseData().setResults(singletonList(new 
DescribeLogDirsResponseData.DescribeLogDirsResult()
+                        .setErrorCode(error.code())
+                        .setLogDir(logDir)
+                        .setTopics(topics)
+                )));
+    }
+
+    @Test
+    public void testDescribeLogDirs() throws ExecutionException, 
InterruptedException {
+        Set<Integer> brokers = Collections.singleton(0);
+        String logDir = "/var/data/kafka";
+        TopicPartition tp = new TopicPartition("topic", 12);
+        long partitionSize = 1234567890;
+        long offsetLag = 24;
+
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponseFrom(
+                    prepareDescribeLogDirsResponse(Errors.NONE, logDir, tp, 
partitionSize, offsetLag),
+                    env.cluster().nodeById(0));
+
+            DescribeLogDirsResult result = 
env.adminClient().describeLogDirs(brokers);
+
+            Map<Integer, KafkaFuture<Map<String, LogDirDescription>>> 
descriptions = result.descriptions();
+            assertEquals(brokers, descriptions.keySet());
+            assertNotNull(descriptions.get(0));
+            assertDescriptionContains(descriptions.get(0).get(), logDir, tp, 
partitionSize, offsetLag);
+
+            Map<Integer, Map<String, LogDirDescription>> allDescriptions = 
result.allDescriptions().get();
+            assertEquals(brokers, allDescriptions.keySet());
+            assertDescriptionContains(allDescriptions.get(0), logDir, tp, 
partitionSize, offsetLag);
+        }
+    }
+
+    private void assertDescriptionContains(Map<String, LogDirDescription> 
descriptionsMap, String logDir,
+                                           TopicPartition tp, long 
partitionSize, long offsetLag) {
+        assertNotNull(descriptionsMap);
+        assertEquals(Collections.singleton(logDir), descriptionsMap.keySet());
+        assertNull(descriptionsMap.get(logDir).error());
+        Map<TopicPartition, ReplicaInfo> descriptionsReplicaInfos = 
descriptionsMap.get(logDir).replicaInfos();
+        assertEquals(Collections.singleton(tp), 
descriptionsReplicaInfos.keySet());
+        assertEquals(partitionSize, descriptionsReplicaInfos.get(tp).size());
+        assertEquals(offsetLag, descriptionsReplicaInfos.get(tp).offsetLag());
+        assertFalse(descriptionsReplicaInfos.get(tp).isFuture());
+    }
+
+    @SuppressWarnings("deprecation")
+    @Test
+    public void testDescribeLogDirsDeprecated() throws ExecutionException, 
InterruptedException {
+        Set<Integer> brokers = Collections.singleton(0);
+        TopicPartition tp = new TopicPartition("topic", 12);
+        String logDir = "/var/data/kafka";
+        Errors error = Errors.NONE;
+        int offsetLag = 24;
+        long partitionSize = 1234567890;
+
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponseFrom(
+                    prepareDescribeLogDirsResponse(error, logDir, tp, 
partitionSize, offsetLag),
+                    env.cluster().nodeById(0));
+
+            DescribeLogDirsResult result = 
env.adminClient().describeLogDirs(brokers);
+
+            Map<Integer, KafkaFuture<Map<String, 
DescribeLogDirsResponse.LogDirInfo>>> deprecatedValues = result.values();
+            assertEquals(brokers, deprecatedValues.keySet());
+            assertNotNull(deprecatedValues.get(0));
+            assertDescriptionContains(deprecatedValues.get(0).get(), logDir, 
tp, error, offsetLag, partitionSize);
+
+            Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>> 
deprecatedAll = result.all().get();
+            assertEquals(brokers, deprecatedAll.keySet());
+            assertDescriptionContains(deprecatedAll.get(0), logDir, tp, error, 
offsetLag, partitionSize);
+        }
+    }
+
+    @SuppressWarnings("deprecation")
+    private void assertDescriptionContains(Map<String, 
DescribeLogDirsResponse.LogDirInfo> descriptionsMap,

Review comment:
       This can be static

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -2306,6 +2310,22 @@ void handleFailure(Throwable throwable) {
         return new DescribeLogDirsResult(new HashMap<>(futures));
     }
 
+    private Map<String, LogDirDescription> 
logDirDescriptions(DescribeLogDirsResponse response) {
+        HashMap<String, LogDirDescription> result = new 
HashMap<>(response.data().results().size());

Review comment:
       The left side can be `Map`

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -1057,6 +1059,263 @@ public void testDescribeConfigsUnrequested() throws 
Exception {
         }
     }
 
+    private DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors 
error, String logDir, TopicPartition tp, long partitionSize, long offsetLag) {
+        return prepareDescribeLogDirsResponse(error, logDir,
+                prepareDescribeLogDirsTopics(partitionSize, offsetLag, 
tp.topic(), tp.partition(), false));
+    }
+
+    private List<DescribeLogDirsTopic> prepareDescribeLogDirsTopics(
+            long partitionSize, long offsetLag, String topic, int partition, 
boolean isFuture) {
+        return singletonList(new DescribeLogDirsTopic()
+                .setName(topic)
+                .setPartitions(singletonList(new 
DescribeLogDirsResponseData.DescribeLogDirsPartition()
+                        .setPartitionIndex(partition)
+                        .setPartitionSize(partitionSize)
+                        .setIsFutureKey(isFuture)
+                        .setOffsetLag(offsetLag))));
+    }
+
+    private DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors 
error, String logDir,

Review comment:
       This can be static




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to