CalvinConfluent commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1526695578
########## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ########## @@ -2185,9 +2179,143 @@ void handleFailure(Throwable throwable) { completeAllExceptionally(topicFutures.values(), throwable); } }; - if (!topicNamesList.isEmpty()) { - runnable.call(call, now); + return call; + } + + @SuppressWarnings({"MethodLength", "NPathComplexity"}) + private Map<String, KafkaFuture<TopicDescription>> handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi( + final Collection<String> topicNames, + DescribeTopicsOptions options + ) { + final Map<String, KafkaFutureImpl<TopicDescription>> topicFutures = new HashMap<>(topicNames.size()); + final ArrayList<String> topicNamesList = new ArrayList<>(); + for (String topicName : topicNames) { + if (topicNameIsUnrepresentable(topicName)) { + KafkaFutureImpl<TopicDescription> future = new KafkaFutureImpl<>(); + future.completeExceptionally(new InvalidTopicException("The given topic name '" + + topicName + "' cannot be represented in a request.")); + topicFutures.put(topicName, future); + } else if (!topicFutures.containsKey(topicName)) { + topicFutures.put(topicName, new KafkaFutureImpl<>()); + topicNamesList.add(topicName); + } } + + if (topicNamesList.isEmpty()) { + return new HashMap<>(topicFutures); + } + + // First, we need to retrieve the node info. + DescribeClusterResult clusterResult = describeCluster(); + Map<Integer, Node> nodes; + try { + nodes = clusterResult.nodes().get().stream().collect(Collectors.toMap(Node::id, node -> node)); + } catch (InterruptedException | ExecutionException e) { + completeAllExceptionally(topicFutures.values(), e.getCause()); + return new HashMap<>(topicFutures); + } + + final long now = time.milliseconds(); + Call call = new Call("describeTopicPartitions", calcDeadlineMs(now, options.timeoutMs()), + new LeastLoadedNodeProvider()) { + Map<String, TopicRequest> pendingTopics = + topicNamesList.stream().map(topicName -> new TopicRequest().setName(topicName)) + .collect(Collectors.toMap(topicRequest -> topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, TreeMap::new)); + + TopicDescription partiallyFinishedTopicDescription = null; + + @Override + DescribeTopicPartitionsRequest.Builder createRequest(int timeoutMs) { + DescribeTopicPartitionsRequestData request = new DescribeTopicPartitionsRequestData() + .setTopics(new ArrayList<>(pendingTopics.values())) + .setResponsePartitionLimit(options.partitionSizeLimitPerResponse()); + if (partiallyFinishedTopicDescription != null) { + // If the previous cursor points to the partition 0, the cursor will not be set as the first one + // in the topic list should be the previous cursor topic. + request.setCursor(new DescribeTopicPartitionsRequestData.Cursor() + .setTopicName(partiallyFinishedTopicDescription.name()) + .setPartitionIndex(partiallyFinishedTopicDescription.partitions().size()) + ); + } + return new DescribeTopicPartitionsRequest.Builder(request); + } + + @Override + void handleResponse(AbstractResponse abstractResponse) { + DescribeTopicPartitionsResponse response = (DescribeTopicPartitionsResponse) abstractResponse; + DescribeTopicPartitionsResponseData.Cursor responseCursor = response.data().nextCursor(); + // The topicDescription for the cursor topic of the current batch. + TopicDescription nextTopicDescription = null; + + for (DescribeTopicPartitionsResponseTopic topic : response.data().topics()) { + String topicName = topic.name(); + Errors error = Errors.forCode(topic.errorCode()); + + KafkaFutureImpl<TopicDescription> future = topicFutures.get(topicName); + + if (error != Errors.NONE) { + future.completeExceptionally(error.exception()); + pendingTopics.remove(topicName); + if (responseCursor != null && responseCursor.topicName().equals(topicName)) { + responseCursor = null; + } + continue; + } + + TopicDescription currentTopicDescription = getTopicDescriptionFromDescribeTopicsResponseTopic(topic, nodes); + + if (partiallyFinishedTopicDescription != null && partiallyFinishedTopicDescription.name().equals(topicName)) { + // Add the partitions for the cursor topic of the previous batch. + partiallyFinishedTopicDescription.partitions().addAll(currentTopicDescription.partitions()); + continue; + } + + if (responseCursor != null && responseCursor.topicName().equals(topicName)) { + // In the same batch of result, it may need to handle the partitions for the previous cursor + // topic and the current cursor topic. Cache the result in the nextTopicDescription. + nextTopicDescription = currentTopicDescription; + continue; + } + + pendingTopics.remove(topicName); + future.complete(currentTopicDescription); + } + + if (partiallyFinishedTopicDescription != null && + (responseCursor == null || !responseCursor.topicName().equals(partiallyFinishedTopicDescription.name()))) { + // We can't simply check nextTopicDescription != null here to close the partiallyFinishedTopicDescription. + // Because the responseCursor topic may not show in the response. + String topicName = partiallyFinishedTopicDescription.name(); + topicFutures.get(topicName).complete(partiallyFinishedTopicDescription); + pendingTopics.remove(topicName); + partiallyFinishedTopicDescription = null; + } + if (nextTopicDescription != null) { + partiallyFinishedTopicDescription = nextTopicDescription; + } + + if (!pendingTopics.isEmpty()) { + runnable.call(this, time.milliseconds()); + } + } + + @Override + boolean handleUnsupportedVersionException(UnsupportedVersionException exception) { + final long now = time.milliseconds(); + log.warn("The DescribeTopicPartitions API is not supported, using Metadata API to describe topics."); + runnable.call(generateDescribeTopicsCallWithMetadataAPI(topicNamesList, topicFutures, options, now), now); + return false; + } Review Comment: Correct, the client will directly return the UnsupportedVersion error to the _Call_ if the DescribeTopicPartitions API is not in the ApiVersionResponse -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org