AndrewJSchofield commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1513310569


##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -2190,6 +2201,117 @@ void handleFailure(Throwable throwable) {
         return new HashMap<>(topicFutures);
     }
 
+    @SuppressWarnings("MethodLength")
+    private Map<String, KafkaFuture<TopicDescription>> 
handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi(final 
Collection<String> topicNames, DescribeTopicsOptions options) {
+        final Map<String, KafkaFutureImpl<TopicDescription>> topicFutures = 
new HashMap<>(topicNames.size());
+        final ArrayList<String> topicNamesList = new ArrayList<>();
+        for (String topicName : topicNames) {
+            if (topicNameIsUnrepresentable(topicName)) {
+                KafkaFutureImpl<TopicDescription> future = new 
KafkaFutureImpl<>();
+                future.completeExceptionally(new InvalidTopicException("The 
given topic name '" +
+                        topicName + "' cannot be represented in a request."));
+                topicFutures.put(topicName, future);
+            } else if (!topicFutures.containsKey(topicName)) {
+                topicFutures.put(topicName, new KafkaFutureImpl<>());
+                topicNamesList.add(topicName);
+            }
+        }
+        final long now = time.milliseconds();
+        Call call = new Call("describeTopics", calcDeadlineMs(now, 
options.timeoutMs()),

Review Comment:
   Should this be "describeTopicPartitions"?



##########
clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java:
##########
@@ -47,8 +49,32 @@ public DescribeTopicsOptions 
includeAuthorizedOperations(boolean includeAuthoriz
         return this;
     }
 
+    /**
+     * Whether to use the DescribeTopicPartitions API. It should be set to 
false if DescribeTopicPartitions API is
+     * not supported.
+     *
+     */
+    public DescribeTopicsOptions useDescribeTopicPartitionsApi(boolean 
useDescribeTopicPartitionsApi) {

Review Comment:
   I think you need a small change to KIP-966 to document these changes to the 
admin API.



##########
tools/src/main/java/org/apache/kafka/tools/TopicCommand.java:
##########
@@ -799,6 +823,11 @@ public TopicCommandOptions(String[] args) {
                 "if set when creating topics, the action will only execute if 
the topic does not already exist.");
             excludeInternalTopicOpt = parser.accepts("exclude-internal",
                 "exclude internal topics when running list or describe 
command. The internal topics will be listed by default");
+            partitionSizeLimitPerResponseOpt = 
parser.accepts("partition-size-limit-per-response",

Review Comment:
   In other cases where a new API has been introduced, I think the principle 
followed is to try the new one without an option, and falling back if it is 
detected that it's required. That would be much nicer than expecting the 
innocent user from understanding what `user-describe-topics-api` means.



##########
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##########
@@ -1399,6 +1404,184 @@ public void testInvalidTopicNames() throws Exception {
         }
     }
 
+    @SuppressWarnings("NPathComplexity")

Review Comment:
   Could we have a test with an invalid cursor?



##########
clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java:
##########
@@ -47,8 +49,32 @@ public DescribeTopicsOptions 
includeAuthorizedOperations(boolean includeAuthoriz
         return this;
     }
 
+    /**
+     * Whether to use the DescribeTopicPartitions API. It should be set to 
false if DescribeTopicPartitions API is
+     * not supported.
+     *
+     */
+    public DescribeTopicsOptions useDescribeTopicPartitionsApi(boolean 
useDescribeTopicPartitionsApi) {
+        this.useDescribeTopicPartitionsApi = useDescribeTopicPartitionsApi;
+        return this;
+    }
+
+    // Note that, partitionSizeLimitPerResponse will not be effective if it is 
larger than the config
+    // max.request.partition.size.limit on the server side.
+    public DescribeTopicsOptions partitionSizeLimitPerResponse(int 
partitionSizeLimitPerResponse) {
+        this.partitionSizeLimitPerResponse = partitionSizeLimitPerResponse;
+        return this;
+    }
+
     public boolean includeAuthorizedOperations() {
         return includeAuthorizedOperations;
     }
 
+    public boolean useDescribeTopicPartitionsApi() {

Review Comment:
   I suggest just `useDescribeTopicPartitions()`. In the Javadoc, you can 
mention that it's using the DescribeTopicPartitions API under the covers. Most 
users of the Kafka admin client would consider `KafkaAdminClient` to be the 
API, rather than the Kafka protocol which is what is meant here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to