artemlivshits commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1516873731


##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -2184,9 +2178,138 @@ void handleFailure(Throwable throwable) {
                 completeAllExceptionally(topicFutures.values(), throwable);
             }
         };
-        if (!topicNamesList.isEmpty()) {
-            runnable.call(call, now);
+        return call;
+    }
+
+    @SuppressWarnings("MethodLength")
+    private Map<String, KafkaFuture<TopicDescription>> 
handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi(
+        final Collection<String> topicNames,
+        DescribeTopicsOptions options
+    ) {
+        final Map<String, KafkaFutureImpl<TopicDescription>> topicFutures = 
new HashMap<>(topicNames.size());
+        final ArrayList<String> topicNamesList = new ArrayList<>();
+        for (String topicName : topicNames) {
+            if (topicNameIsUnrepresentable(topicName)) {
+                KafkaFutureImpl<TopicDescription> future = new 
KafkaFutureImpl<>();
+                future.completeExceptionally(new InvalidTopicException("The 
given topic name '" +
+                        topicName + "' cannot be represented in a request."));
+                topicFutures.put(topicName, future);
+            } else if (!topicFutures.containsKey(topicName)) {
+                topicFutures.put(topicName, new KafkaFutureImpl<>());
+                topicNamesList.add(topicName);
+            }
+        }
+
+        if (topicNamesList.isEmpty()) {
+            return new HashMap<>(topicFutures);
+        }
+
+        // First, we need to retrieve the node info.
+        DescribeClusterResult clusterResult = describeCluster();
+        Map<Integer, Node> nodes;
+        try {
+            nodes = 
clusterResult.nodes().get().stream().collect(Collectors.toMap(Node::id, node -> 
node));
+        } catch (InterruptedException | ExecutionException e) {
+            completeAllExceptionally(topicFutures.values(), e);
+            return new HashMap<>(topicFutures);
         }
+
+        final long now = time.milliseconds();
+        Call call = new Call("describeTopicPartitions", calcDeadlineMs(now, 
options.timeoutMs()),
+            new LeastLoadedNodeProvider()) {
+            Map<String, TopicRequest> pendingTopics =
+                topicNamesList.stream().map(topicName -> new 
TopicRequest().setName(topicName))
+                    .collect(Collectors.toMap(topicRequest -> 
topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, 
TreeMap::new));
+
+            DescribeTopicPartitionsRequestData.Cursor requestCursor = null;
+            TopicDescription partiallyFinishedTopicDescription = null;
+
+            @Override
+            DescribeTopicPartitionsRequest.Builder createRequest(int 
timeoutMs) {
+                DescribeTopicPartitionsRequestData request = new 
DescribeTopicPartitionsRequestData()
+                    .setTopics(new ArrayList<>(pendingTopics.values()))
+                    
.setResponsePartitionLimit(options.partitionSizeLimitPerResponse());
+                request.setCursor(requestCursor);
+                return new DescribeTopicPartitionsRequest.Builder(request);
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                DescribeTopicPartitionsResponse response = 
(DescribeTopicPartitionsResponse) abstractResponse;
+                DescribeTopicPartitionsResponseData.Cursor responseCursor = 
response.data().nextCursor();
+
+                for (DescribeTopicPartitionsResponseTopic topic : 
response.data().topics()) {
+                    String topicName = topic.name();
+                    Errors error = Errors.forCode(topic.errorCode());
+
+                    KafkaFutureImpl<TopicDescription> future = 
topicFutures.get(topicName);
+                    if (error != Errors.NONE) {
+                        future.completeExceptionally(error.exception());
+                        topicFutures.remove(topicName);
+                        pendingTopics.remove(topicName);
+                        if (responseCursor != null && 
responseCursor.topicName().equals(topicName)) {
+                            responseCursor = null;
+                        }
+                        continue;
+                    }
+
+                    TopicDescription currentTopicDescription = 
getTopicDescriptionFromDescribeTopicsResponseTopic(topic, nodes);
+
+                    if (requestCursor != null && 
requestCursor.topicName().equals(topicName)) {

Review Comment:
   But the user may not have specified the topic list at all, is it a good 
experience to show them an error if a topic they didn't specify got deleted?



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -2184,9 +2178,138 @@ void handleFailure(Throwable throwable) {
                 completeAllExceptionally(topicFutures.values(), throwable);
             }
         };
-        if (!topicNamesList.isEmpty()) {
-            runnable.call(call, now);
+        return call;
+    }
+
+    @SuppressWarnings("MethodLength")
+    private Map<String, KafkaFuture<TopicDescription>> 
handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi(
+        final Collection<String> topicNames,
+        DescribeTopicsOptions options
+    ) {
+        final Map<String, KafkaFutureImpl<TopicDescription>> topicFutures = 
new HashMap<>(topicNames.size());
+        final ArrayList<String> topicNamesList = new ArrayList<>();
+        for (String topicName : topicNames) {
+            if (topicNameIsUnrepresentable(topicName)) {
+                KafkaFutureImpl<TopicDescription> future = new 
KafkaFutureImpl<>();
+                future.completeExceptionally(new InvalidTopicException("The 
given topic name '" +
+                        topicName + "' cannot be represented in a request."));
+                topicFutures.put(topicName, future);
+            } else if (!topicFutures.containsKey(topicName)) {
+                topicFutures.put(topicName, new KafkaFutureImpl<>());
+                topicNamesList.add(topicName);
+            }
+        }
+
+        if (topicNamesList.isEmpty()) {
+            return new HashMap<>(topicFutures);
+        }
+
+        // First, we need to retrieve the node info.
+        DescribeClusterResult clusterResult = describeCluster();
+        Map<Integer, Node> nodes;
+        try {
+            nodes = 
clusterResult.nodes().get().stream().collect(Collectors.toMap(Node::id, node -> 
node));
+        } catch (InterruptedException | ExecutionException e) {
+            completeAllExceptionally(topicFutures.values(), e);
+            return new HashMap<>(topicFutures);
         }
+
+        final long now = time.milliseconds();
+        Call call = new Call("describeTopicPartitions", calcDeadlineMs(now, 
options.timeoutMs()),
+            new LeastLoadedNodeProvider()) {
+            Map<String, TopicRequest> pendingTopics =
+                topicNamesList.stream().map(topicName -> new 
TopicRequest().setName(topicName))
+                    .collect(Collectors.toMap(topicRequest -> 
topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, 
TreeMap::new));
+
+            DescribeTopicPartitionsRequestData.Cursor requestCursor = null;
+            TopicDescription partiallyFinishedTopicDescription = null;
+
+            @Override
+            DescribeTopicPartitionsRequest.Builder createRequest(int 
timeoutMs) {
+                DescribeTopicPartitionsRequestData request = new 
DescribeTopicPartitionsRequestData()
+                    .setTopics(new ArrayList<>(pendingTopics.values()))
+                    
.setResponsePartitionLimit(options.partitionSizeLimitPerResponse());
+                request.setCursor(requestCursor);
+                return new DescribeTopicPartitionsRequest.Builder(request);
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                DescribeTopicPartitionsResponse response = 
(DescribeTopicPartitionsResponse) abstractResponse;
+                DescribeTopicPartitionsResponseData.Cursor responseCursor = 
response.data().nextCursor();
+
+                for (DescribeTopicPartitionsResponseTopic topic : 
response.data().topics()) {
+                    String topicName = topic.name();
+                    Errors error = Errors.forCode(topic.errorCode());
+
+                    KafkaFutureImpl<TopicDescription> future = 
topicFutures.get(topicName);
+                    if (error != Errors.NONE) {
+                        future.completeExceptionally(error.exception());
+                        topicFutures.remove(topicName);
+                        pendingTopics.remove(topicName);
+                        if (responseCursor != null && 
responseCursor.topicName().equals(topicName)) {
+                            responseCursor = null;
+                        }
+                        continue;
+                    }
+
+                    TopicDescription currentTopicDescription = 
getTopicDescriptionFromDescribeTopicsResponseTopic(topic, nodes);
+
+                    if (requestCursor != null && 
requestCursor.topicName().equals(topicName)) {

Review Comment:
   In any case, I think it would be more robust if we didn't rely on the 
previous cursor, so that if we eventually change the code to work without topic 
lookup we don't have subtle bugs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to