artemlivshits commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1513820234
########## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ########## @@ -2184,9 +2177,122 @@ void handleFailure(Throwable throwable) { completeAllExceptionally(topicFutures.values(), throwable); } }; + return call; + } + + @SuppressWarnings("MethodLength") + private Map<String, KafkaFuture<TopicDescription>> handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi(final Collection<String> topicNames, DescribeTopicsOptions options) { + final Map<String, KafkaFutureImpl<TopicDescription>> topicFutures = new HashMap<>(topicNames.size()); + final ArrayList<String> topicNamesList = new ArrayList<>(); + for (String topicName : topicNames) { + if (topicNameIsUnrepresentable(topicName)) { + KafkaFutureImpl<TopicDescription> future = new KafkaFutureImpl<>(); + future.completeExceptionally(new InvalidTopicException("The given topic name '" + + topicName + "' cannot be represented in a request.")); + topicFutures.put(topicName, future); + } else if (!topicFutures.containsKey(topicName)) { + topicFutures.put(topicName, new KafkaFutureImpl<>()); + topicNamesList.add(topicName); + } + } + final long now = time.milliseconds(); + Call call = new Call("describeTopicPartitions", calcDeadlineMs(now, options.timeoutMs()), + new LeastLoadedNodeProvider()) { + Map<String, TopicRequest> pendingTopics = + topicNamesList.stream().map(topicName -> new TopicRequest().setName(topicName)) + .collect(Collectors.toMap(topicRequest -> topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, TreeMap::new)); + + DescribeTopicPartitionsRequestData.Cursor requestCursor = null; + TopicDescription partiallyFinishedTopicDescription = null; + + @Override + DescribeTopicPartitionsRequest.Builder createRequest(int timeoutMs) { + DescribeTopicPartitionsRequestData request = new DescribeTopicPartitionsRequestData() + .setTopics(new ArrayList<>(pendingTopics.values())) + .setResponsePartitionLimit(options.partitionSizeLimitPerResponse()); + request.setCursor(requestCursor); + return new DescribeTopicPartitionsRequest.Builder(request); + } + + @Override + void handleResponse(AbstractResponse abstractResponse) { + DescribeTopicPartitionsResponse response = (DescribeTopicPartitionsResponse) abstractResponse; + DescribeTopicPartitionsResponseData.Cursor responseCursor = response.data().nextCursor(); + + for (DescribeTopicPartitionsResponseTopic topic : response.data().topics()) { + String topicName = topic.name(); + Errors error = Errors.forCode(topic.errorCode()); + + KafkaFutureImpl<TopicDescription> future = topicFutures.get(topicName); + if (error != Errors.NONE) { + future.completeExceptionally(error.exception()); + topicFutures.remove(topicName); Review Comment: This will be executed concurrently with the thread that called handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi, so we may concurrently modify topic futures while the HashMap constructor at line 2313 is iterating over it and it's not a thread-safe collection. ########## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ########## @@ -2184,9 +2178,138 @@ void handleFailure(Throwable throwable) { completeAllExceptionally(topicFutures.values(), throwable); } }; - if (!topicNamesList.isEmpty()) { - runnable.call(call, now); + return call; + } + + @SuppressWarnings("MethodLength") + private Map<String, KafkaFuture<TopicDescription>> handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi( + final Collection<String> topicNames, + DescribeTopicsOptions options + ) { + final Map<String, KafkaFutureImpl<TopicDescription>> topicFutures = new HashMap<>(topicNames.size()); + final ArrayList<String> topicNamesList = new ArrayList<>(); + for (String topicName : topicNames) { + if (topicNameIsUnrepresentable(topicName)) { + KafkaFutureImpl<TopicDescription> future = new KafkaFutureImpl<>(); + future.completeExceptionally(new InvalidTopicException("The given topic name '" + + topicName + "' cannot be represented in a request.")); + topicFutures.put(topicName, future); + } else if (!topicFutures.containsKey(topicName)) { + topicFutures.put(topicName, new KafkaFutureImpl<>()); + topicNamesList.add(topicName); + } + } + + if (topicNamesList.isEmpty()) { + return new HashMap<>(topicFutures); + } + + // First, we need to retrieve the node info. + DescribeClusterResult clusterResult = describeCluster(); + Map<Integer, Node> nodes; + try { + nodes = clusterResult.nodes().get().stream().collect(Collectors.toMap(Node::id, node -> node)); + } catch (InterruptedException | ExecutionException e) { + completeAllExceptionally(topicFutures.values(), e); + return new HashMap<>(topicFutures); } + + final long now = time.milliseconds(); + Call call = new Call("describeTopicPartitions", calcDeadlineMs(now, options.timeoutMs()), + new LeastLoadedNodeProvider()) { + Map<String, TopicRequest> pendingTopics = + topicNamesList.stream().map(topicName -> new TopicRequest().setName(topicName)) + .collect(Collectors.toMap(topicRequest -> topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, TreeMap::new)); + + DescribeTopicPartitionsRequestData.Cursor requestCursor = null; + TopicDescription partiallyFinishedTopicDescription = null; + + @Override + DescribeTopicPartitionsRequest.Builder createRequest(int timeoutMs) { + DescribeTopicPartitionsRequestData request = new DescribeTopicPartitionsRequestData() + .setTopics(new ArrayList<>(pendingTopics.values())) + .setResponsePartitionLimit(options.partitionSizeLimitPerResponse()); + request.setCursor(requestCursor); + return new DescribeTopicPartitionsRequest.Builder(request); + } + + @Override + void handleResponse(AbstractResponse abstractResponse) { + DescribeTopicPartitionsResponse response = (DescribeTopicPartitionsResponse) abstractResponse; + DescribeTopicPartitionsResponseData.Cursor responseCursor = response.data().nextCursor(); + + for (DescribeTopicPartitionsResponseTopic topic : response.data().topics()) { + String topicName = topic.name(); + Errors error = Errors.forCode(topic.errorCode()); + + KafkaFutureImpl<TopicDescription> future = topicFutures.get(topicName); + if (error != Errors.NONE) { + future.completeExceptionally(error.exception()); + topicFutures.remove(topicName); + pendingTopics.remove(topicName); + if (responseCursor != null && responseCursor.topicName().equals(topicName)) { + responseCursor = null; + } + continue; + } + + TopicDescription currentTopicDescription = getTopicDescriptionFromDescribeTopicsResponseTopic(topic, nodes); + + if (requestCursor != null && requestCursor.topicName().equals(topicName)) { + if (partiallyFinishedTopicDescription == null) { + // The previous round cursor can point to the partition 0 of the next topic. + partiallyFinishedTopicDescription = currentTopicDescription; + } else { + partiallyFinishedTopicDescription.partitions().addAll(currentTopicDescription.partitions()); + } + + if (responseCursor == null || !responseCursor.topicName().equals(topicName)) { + // The partially finished topic can be closed in this round. + pendingTopics.remove(topicName); + future.complete(partiallyFinishedTopicDescription); + partiallyFinishedTopicDescription = null; + } + continue; + } + + if (responseCursor != null && responseCursor.topicName().equals(topicName)) { + // This is the first time this topic being pointed by the cursor. + partiallyFinishedTopicDescription = currentTopicDescription; + continue; + } + + pendingTopics.remove(topicName); + future.complete(currentTopicDescription); + } + if (responseCursor != null) { + requestCursor = new DescribeTopicPartitionsRequestData.Cursor() Review Comment: Why cannot we just assign `requestCursor = responseCursor`? ########## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ########## @@ -2184,9 +2178,138 @@ void handleFailure(Throwable throwable) { completeAllExceptionally(topicFutures.values(), throwable); } }; - if (!topicNamesList.isEmpty()) { - runnable.call(call, now); + return call; + } + + @SuppressWarnings("MethodLength") + private Map<String, KafkaFuture<TopicDescription>> handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi( + final Collection<String> topicNames, + DescribeTopicsOptions options + ) { + final Map<String, KafkaFutureImpl<TopicDescription>> topicFutures = new HashMap<>(topicNames.size()); + final ArrayList<String> topicNamesList = new ArrayList<>(); + for (String topicName : topicNames) { + if (topicNameIsUnrepresentable(topicName)) { + KafkaFutureImpl<TopicDescription> future = new KafkaFutureImpl<>(); + future.completeExceptionally(new InvalidTopicException("The given topic name '" + + topicName + "' cannot be represented in a request.")); + topicFutures.put(topicName, future); + } else if (!topicFutures.containsKey(topicName)) { + topicFutures.put(topicName, new KafkaFutureImpl<>()); + topicNamesList.add(topicName); + } + } + + if (topicNamesList.isEmpty()) { + return new HashMap<>(topicFutures); + } + + // First, we need to retrieve the node info. + DescribeClusterResult clusterResult = describeCluster(); + Map<Integer, Node> nodes; + try { + nodes = clusterResult.nodes().get().stream().collect(Collectors.toMap(Node::id, node -> node)); + } catch (InterruptedException | ExecutionException e) { + completeAllExceptionally(topicFutures.values(), e); + return new HashMap<>(topicFutures); } + + final long now = time.milliseconds(); + Call call = new Call("describeTopicPartitions", calcDeadlineMs(now, options.timeoutMs()), + new LeastLoadedNodeProvider()) { + Map<String, TopicRequest> pendingTopics = + topicNamesList.stream().map(topicName -> new TopicRequest().setName(topicName)) + .collect(Collectors.toMap(topicRequest -> topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, TreeMap::new)); + + DescribeTopicPartitionsRequestData.Cursor requestCursor = null; + TopicDescription partiallyFinishedTopicDescription = null; + + @Override + DescribeTopicPartitionsRequest.Builder createRequest(int timeoutMs) { + DescribeTopicPartitionsRequestData request = new DescribeTopicPartitionsRequestData() + .setTopics(new ArrayList<>(pendingTopics.values())) + .setResponsePartitionLimit(options.partitionSizeLimitPerResponse()); + request.setCursor(requestCursor); + return new DescribeTopicPartitionsRequest.Builder(request); + } + + @Override + void handleResponse(AbstractResponse abstractResponse) { + DescribeTopicPartitionsResponse response = (DescribeTopicPartitionsResponse) abstractResponse; + DescribeTopicPartitionsResponseData.Cursor responseCursor = response.data().nextCursor(); + + for (DescribeTopicPartitionsResponseTopic topic : response.data().topics()) { + String topicName = topic.name(); + Errors error = Errors.forCode(topic.errorCode()); + + KafkaFutureImpl<TopicDescription> future = topicFutures.get(topicName); + if (error != Errors.NONE) { + future.completeExceptionally(error.exception()); + topicFutures.remove(topicName); + pendingTopics.remove(topicName); + if (responseCursor != null && responseCursor.topicName().equals(topicName)) { + responseCursor = null; + } + continue; + } + + TopicDescription currentTopicDescription = getTopicDescriptionFromDescribeTopicsResponseTopic(topic, nodes); + + if (requestCursor != null && requestCursor.topicName().equals(topicName)) { Review Comment: Are we guaranteed that a topic specified in the request cursor would be present in the result? ########## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ########## @@ -2184,9 +2177,122 @@ void handleFailure(Throwable throwable) { completeAllExceptionally(topicFutures.values(), throwable); } }; + return call; + } + + @SuppressWarnings("MethodLength") + private Map<String, KafkaFuture<TopicDescription>> handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi(final Collection<String> topicNames, DescribeTopicsOptions options) { + final Map<String, KafkaFutureImpl<TopicDescription>> topicFutures = new HashMap<>(topicNames.size()); + final ArrayList<String> topicNamesList = new ArrayList<>(); + for (String topicName : topicNames) { + if (topicNameIsUnrepresentable(topicName)) { + KafkaFutureImpl<TopicDescription> future = new KafkaFutureImpl<>(); + future.completeExceptionally(new InvalidTopicException("The given topic name '" + + topicName + "' cannot be represented in a request.")); + topicFutures.put(topicName, future); + } else if (!topicFutures.containsKey(topicName)) { + topicFutures.put(topicName, new KafkaFutureImpl<>()); + topicNamesList.add(topicName); + } + } + final long now = time.milliseconds(); + Call call = new Call("describeTopicPartitions", calcDeadlineMs(now, options.timeoutMs()), + new LeastLoadedNodeProvider()) { + Map<String, TopicRequest> pendingTopics = + topicNamesList.stream().map(topicName -> new TopicRequest().setName(topicName)) + .collect(Collectors.toMap(topicRequest -> topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, TreeMap::new)); + + DescribeTopicPartitionsRequestData.Cursor requestCursor = null; + TopicDescription partiallyFinishedTopicDescription = null; + + @Override + DescribeTopicPartitionsRequest.Builder createRequest(int timeoutMs) { + DescribeTopicPartitionsRequestData request = new DescribeTopicPartitionsRequestData() + .setTopics(new ArrayList<>(pendingTopics.values())) + .setResponsePartitionLimit(options.partitionSizeLimitPerResponse()); + request.setCursor(requestCursor); + return new DescribeTopicPartitionsRequest.Builder(request); + } + + @Override + void handleResponse(AbstractResponse abstractResponse) { + DescribeTopicPartitionsResponse response = (DescribeTopicPartitionsResponse) abstractResponse; + DescribeTopicPartitionsResponseData.Cursor responseCursor = response.data().nextCursor(); + + for (DescribeTopicPartitionsResponseTopic topic : response.data().topics()) { + String topicName = topic.name(); + Errors error = Errors.forCode(topic.errorCode()); + + KafkaFutureImpl<TopicDescription> future = topicFutures.get(topicName); + if (error != Errors.NONE) { + future.completeExceptionally(error.exception()); + topicFutures.remove(topicName); Review Comment: Do we actually need to remove it? We might as well keep it and return the original map and avoid a copy. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org