CalvinConfluent commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1526695578


##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -2185,9 +2179,143 @@ void handleFailure(Throwable throwable) {
                 completeAllExceptionally(topicFutures.values(), throwable);
             }
         };
-        if (!topicNamesList.isEmpty()) {
-            runnable.call(call, now);
+        return call;
+    }
+
+    @SuppressWarnings({"MethodLength", "NPathComplexity"})
+    private Map<String, KafkaFuture<TopicDescription>> 
handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi(
+        final Collection<String> topicNames,
+        DescribeTopicsOptions options
+    ) {
+        final Map<String, KafkaFutureImpl<TopicDescription>> topicFutures = 
new HashMap<>(topicNames.size());
+        final ArrayList<String> topicNamesList = new ArrayList<>();
+        for (String topicName : topicNames) {
+            if (topicNameIsUnrepresentable(topicName)) {
+                KafkaFutureImpl<TopicDescription> future = new 
KafkaFutureImpl<>();
+                future.completeExceptionally(new InvalidTopicException("The 
given topic name '" +
+                    topicName + "' cannot be represented in a request."));
+                topicFutures.put(topicName, future);
+            } else if (!topicFutures.containsKey(topicName)) {
+                topicFutures.put(topicName, new KafkaFutureImpl<>());
+                topicNamesList.add(topicName);
+            }
         }
+
+        if (topicNamesList.isEmpty()) {
+            return new HashMap<>(topicFutures);
+        }
+
+        // First, we need to retrieve the node info.
+        DescribeClusterResult clusterResult = describeCluster();
+        Map<Integer, Node> nodes;
+        try {
+            nodes = 
clusterResult.nodes().get().stream().collect(Collectors.toMap(Node::id, node -> 
node));
+        } catch (InterruptedException | ExecutionException e) {
+            completeAllExceptionally(topicFutures.values(), e.getCause());
+            return new HashMap<>(topicFutures);
+        }
+
+        final long now = time.milliseconds();
+        Call call = new Call("describeTopicPartitions", calcDeadlineMs(now, 
options.timeoutMs()),
+            new LeastLoadedNodeProvider()) {
+            Map<String, TopicRequest> pendingTopics =
+                topicNamesList.stream().map(topicName -> new 
TopicRequest().setName(topicName))
+                    .collect(Collectors.toMap(topicRequest -> 
topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, 
TreeMap::new));
+
+            TopicDescription partiallyFinishedTopicDescription = null;
+
+            @Override
+            DescribeTopicPartitionsRequest.Builder createRequest(int 
timeoutMs) {
+                DescribeTopicPartitionsRequestData request = new 
DescribeTopicPartitionsRequestData()
+                    .setTopics(new ArrayList<>(pendingTopics.values()))
+                    
.setResponsePartitionLimit(options.partitionSizeLimitPerResponse());
+                if (partiallyFinishedTopicDescription != null) {
+                    // If the previous cursor points to the partition 0, the 
cursor will not be set as the first one
+                    // in the topic list should be the previous cursor topic.
+                    request.setCursor(new 
DescribeTopicPartitionsRequestData.Cursor()
+                        .setTopicName(partiallyFinishedTopicDescription.name())
+                        
.setPartitionIndex(partiallyFinishedTopicDescription.partitions().size())
+                    );
+                }
+                return new DescribeTopicPartitionsRequest.Builder(request);
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                DescribeTopicPartitionsResponse response = 
(DescribeTopicPartitionsResponse) abstractResponse;
+                DescribeTopicPartitionsResponseData.Cursor responseCursor = 
response.data().nextCursor();
+                // The topicDescription for the cursor topic of the current 
batch.
+                TopicDescription nextTopicDescription = null;
+
+                for (DescribeTopicPartitionsResponseTopic topic : 
response.data().topics()) {
+                    String topicName = topic.name();
+                    Errors error = Errors.forCode(topic.errorCode());
+
+                    KafkaFutureImpl<TopicDescription> future = 
topicFutures.get(topicName);
+
+                    if (error != Errors.NONE) {
+                        future.completeExceptionally(error.exception());
+                        pendingTopics.remove(topicName);
+                        if (responseCursor != null && 
responseCursor.topicName().equals(topicName)) {
+                            responseCursor = null;
+                        }
+                        continue;
+                    }
+
+                    TopicDescription currentTopicDescription = 
getTopicDescriptionFromDescribeTopicsResponseTopic(topic, nodes);
+
+                    if (partiallyFinishedTopicDescription != null && 
partiallyFinishedTopicDescription.name().equals(topicName)) {
+                        // Add the partitions for the cursor topic of the 
previous batch.
+                        
partiallyFinishedTopicDescription.partitions().addAll(currentTopicDescription.partitions());
+                        continue;
+                    }
+
+                    if (responseCursor != null && 
responseCursor.topicName().equals(topicName)) {
+                        // In the same batch of result, it may need to handle 
the partitions for the previous cursor
+                        // topic and the current cursor topic. Cache the 
result in the nextTopicDescription.
+                        nextTopicDescription = currentTopicDescription;
+                        continue;
+                    }
+
+                    pendingTopics.remove(topicName);
+                    future.complete(currentTopicDescription);
+                }
+
+                if (partiallyFinishedTopicDescription != null &&
+                        (responseCursor == null || 
!responseCursor.topicName().equals(partiallyFinishedTopicDescription.name()))) {
+                    // We can't simply check nextTopicDescription != null here 
to close the partiallyFinishedTopicDescription.
+                    // Because the responseCursor topic may not show in the 
response.
+                    String topicName = 
partiallyFinishedTopicDescription.name();
+                    
topicFutures.get(topicName).complete(partiallyFinishedTopicDescription);
+                    pendingTopics.remove(topicName);
+                    partiallyFinishedTopicDescription = null;
+                }
+                if (nextTopicDescription != null) {
+                    partiallyFinishedTopicDescription = nextTopicDescription;
+                }
+
+                if (!pendingTopics.isEmpty()) {
+                    runnable.call(this, time.milliseconds());
+                }
+            }
+
+            @Override
+            boolean 
handleUnsupportedVersionException(UnsupportedVersionException exception) {
+                final long now = time.milliseconds();
+                log.warn("The DescribeTopicPartitions API is not supported, 
using Metadata API to describe topics.");
+                
runnable.call(generateDescribeTopicsCallWithMetadataAPI(topicNamesList, 
topicFutures, options, now), now);
+                return false;
+            }

Review Comment:
   Correct, the client will directly return the UnsupportedVersion error to the 
_Call_ if the DescribeTopicPartitions API is not in the ApiVersionResponse 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to