kirktrue commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1513390603
########## clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java: ########## @@ -47,8 +49,32 @@ public DescribeTopicsOptions includeAuthorizedOperations(boolean includeAuthoriz return this; } + /** + * Whether to use the DescribeTopicPartitions API. It should be set to false if DescribeTopicPartitions API is + * not supported. + * + */ + public DescribeTopicsOptions useDescribeTopicPartitionsApi(boolean useDescribeTopicPartitionsApi) { + this.useDescribeTopicPartitionsApi = useDescribeTopicPartitionsApi; + return this; + } + + // Note that, partitionSizeLimitPerResponse will not be effective if it is larger than the config + // max.request.partition.size.limit on the server side. Review Comment: Is there a warning logged in the case where the client sends a limit greater than what's allowed on the broker? ########## tools/src/main/java/org/apache/kafka/tools/TopicCommand.java: ########## @@ -537,6 +544,18 @@ public Map<TopicPartition, PartitionReassignment> listAllReassignments(Set<Topic } public void describeTopic(TopicCommandOptions opts) throws ExecutionException, InterruptedException { + try { + describeTopic(opts, true); + } catch (Exception e) { + if (e.getMessage().contains("UnsupportedVersionException")) { Review Comment: This seems a little fragile. Is there a reason we can't check the actual class type of the exception (or its children, if necessary)? ########## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ########## @@ -2190,6 +2201,117 @@ void handleFailure(Throwable throwable) { return new HashMap<>(topicFutures); } + @SuppressWarnings("MethodLength") + private Map<String, KafkaFuture<TopicDescription>> handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi(final Collection<String> topicNames, DescribeTopicsOptions options) { + final Map<String, KafkaFutureImpl<TopicDescription>> topicFutures = new HashMap<>(topicNames.size()); + final ArrayList<String> topicNamesList = new ArrayList<>(); + for (String topicName : topicNames) { + if (topicNameIsUnrepresentable(topicName)) { + KafkaFutureImpl<TopicDescription> future = new KafkaFutureImpl<>(); + future.completeExceptionally(new InvalidTopicException("The given topic name '" + + topicName + "' cannot be represented in a request.")); + topicFutures.put(topicName, future); + } else if (!topicFutures.containsKey(topicName)) { + topicFutures.put(topicName, new KafkaFutureImpl<>()); + topicNamesList.add(topicName); + } + } + final long now = time.milliseconds(); + Call call = new Call("describeTopics", calcDeadlineMs(now, options.timeoutMs()), Review Comment: AFAICT, the `callName` used within the `Call` object is only used for logging. That said, there's no point in confusing the user who's looking through the logs. ########## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ########## @@ -2276,6 +2415,11 @@ private Node leader(PartitionInfo partitionInfo) { return partitionInfo.leader(); } + // This is used in the describe topics path if using DescribeTopics API. + private Node replicaToFakeNode(int id) { + return new Node(id, "Dummy", 0); + } Review Comment: Just for my own understanding, why do we favor creating _fake_ nodes instead of looking up the _real_ nodes from the metadata or something? ########## clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java: ########## @@ -47,8 +49,32 @@ public DescribeTopicsOptions includeAuthorizedOperations(boolean includeAuthoriz return this; } + /** + * Whether to use the DescribeTopicPartitions API. It should be set to false if DescribeTopicPartitions API is + * not supported. + * + */ + public DescribeTopicsOptions useDescribeTopicPartitionsApi(boolean useDescribeTopicPartitionsApi) { Review Comment: Sorry for being daft, but when would the user know to set this one way or the other. Is this something that can be handled under the covers? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org