Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change [kafka]
github-actions[bot] commented on PR #15265: URL: https://github.com/apache/kafka/pull/15265#issuecomment-2138611194 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
mumrah merged PR #15470: URL: https://github.com/apache/kafka/pull/15470 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
CalvinConfluent commented on PR #15470: URL: https://github.com/apache/kafka/pull/15470#issuecomment-2040217099 @mumrah Thanks for the review. Ticket filed. https://issues.apache.org/jira/browse/KAFKA-15579 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
mumrah commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1551680241 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2185,9 +2185,157 @@ void handleFailure(Throwable throwable) { completeAllExceptionally(topicFutures.values(), throwable); } }; -if (!topicNamesList.isEmpty()) { -runnable.call(call, now); +} + +Call generateDescribeTopicsCallWithDescribeTopicPartitionsApi( +List topicNamesList, +Map> topicFutures, +Map nodes, +DescribeTopicsOptions options, +long now +) { +Map topicsRequests = new LinkedHashMap<>(); +topicNamesList.stream().sorted().forEach(topic -> { +topicsRequests.put(topic, new TopicRequest().setName(topic)); +}); +return new Call("describeTopicPartitions", calcDeadlineMs(now, options.timeoutMs()), +new LeastLoadedNodeProvider()) { +Map pendingTopics = topicsRequests; Review Comment: Why do we need this inner assignment? Can we make `topicsRequest` final in the outer scope? ## clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java: ## @@ -47,8 +49,32 @@ public DescribeTopicsOptions includeAuthorizedOperations(boolean includeAuthoriz return this; } +/** + * Whether to use the DescribeTopicPartitions API. It should be set to false if DescribeTopicPartitions API is + * not supported. + * + */ +public DescribeTopicsOptions useDescribeTopicPartitionsApi(boolean useDescribeTopicPartitionsApi) { +this.useDescribeTopicPartitionsApi = useDescribeTopicPartitionsApi; +return this; +} + +// Note that, partitionSizeLimitPerResponse will not be effective if it is larger than the config +// max.request.partition.size.limit on the server side. Review Comment: @CalvinConfluent can you open a ticket for this so we don't forget about it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
CalvinConfluent commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1544749533 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2185,9 +2184,155 @@ void handleFailure(Throwable throwable) { completeAllExceptionally(topicFutures.values(), throwable); } }; -if (!topicNamesList.isEmpty()) { -runnable.call(call, now); +} + +Call generateDescribeTopicsCallWithDescribeTopicPartitionsApi( +List topicNamesList, +Map> topicFutures, +Map nodes, +DescribeTopicsOptions options, +long now +) { +return new Call("describeTopicPartitions", calcDeadlineMs(now, options.timeoutMs()), +new LeastLoadedNodeProvider()) { +Map pendingTopics = +topicNamesList.stream().map(topicName -> new TopicRequest().setName(topicName)) +.collect(Collectors.toMap(topicRequest -> topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, TreeMap::new)); Review Comment: Thanks for the advice. Addressed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
mumrah commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1544478846 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2185,9 +2184,155 @@ void handleFailure(Throwable throwable) { completeAllExceptionally(topicFutures.values(), throwable); } }; -if (!topicNamesList.isEmpty()) { -runnable.call(call, now); +} + +Call generateDescribeTopicsCallWithDescribeTopicPartitionsApi( +List topicNamesList, +Map> topicFutures, +Map nodes, +DescribeTopicsOptions options, +long now +) { +return new Call("describeTopicPartitions", calcDeadlineMs(now, options.timeoutMs()), +new LeastLoadedNodeProvider()) { +Map pendingTopics = +topicNamesList.stream().map(topicName -> new TopicRequest().setName(topicName)) +.collect(Collectors.toMap(topicRequest -> topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, TreeMap::new)); Review Comment: I think this would be more readable if we just create a TreeMap and populate it in a for loop. Since we are using this map like a queue, it might be better to use LinkedHashMap (after sorting the topics). Really, we don't even need a map since we can build the TopicRequest-s on demand in createRequest. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
CalvinConfluent commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1526696015 ## tools/src/test/java/org/apache/kafka/tools/TopicCommandIntegrationTest.java: ## @@ -639,6 +639,25 @@ public void testDescribe(String quorum) throws ExecutionException, InterruptedEx assertTrue(rows[0].startsWith(String.format("Topic: %s", testTopicName)), "Row does not start with " + testTopicName + ". Row is: " + rows[0]); } +@ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) +@ValueSource(strings = {"quorum=zk", "quorum=kraft"}) +public void testDescribeWithDescribeTopicPartitionsApi(String quorum) throws ExecutionException, InterruptedException { +TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, 3, (short) 2, +scala.collection.immutable.Map$.MODULE$.empty(), new Properties() +); +String secondTopicName = "test-2"; +TestUtils.createTopicWithAdmin(adminClient, secondTopicName, scalaBrokers, scalaControllers, 3, (short) 2, +scala.collection.immutable.Map$.MODULE$.empty(), new Properties() +); + +String output = captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap( +"--describe", "--partition-size-limit-per-response=1")); +String[] rows = output.split("\n"); +assertEquals(8, rows.length, String.join("\n", rows)); +assertTrue(rows[2].contains("\tElr"), rows[2]); +assertTrue(rows[2].contains("LastKnownElr"), rows[2]); +} + @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) Review Comment: Thanks for the advice. Updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
CalvinConfluent commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1526695825 ## tools/src/main/java/org/apache/kafka/tools/TopicCommand.java: ## @@ -799,6 +820,11 @@ public TopicCommandOptions(String[] args) { "if set when creating topics, the action will only execute if the topic does not already exist."); excludeInternalTopicOpt = parser.accepts("exclude-internal", "exclude internal topics when running list or describe command. The internal topics will be listed by default"); +partitionSizeLimitPerResponseOpt = parser.accepts("partition-size-limit-per-response", +"the maximum partition size to be included in one DescribeTopicPartitions response. Only valid if use-describe-topics-api is used") Review Comment: Good catch. removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
CalvinConfluent commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1526695578 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2185,9 +2179,143 @@ void handleFailure(Throwable throwable) { completeAllExceptionally(topicFutures.values(), throwable); } }; -if (!topicNamesList.isEmpty()) { -runnable.call(call, now); +return call; +} + +@SuppressWarnings({"MethodLength", "NPathComplexity"}) +private Map> handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi( +final Collection topicNames, +DescribeTopicsOptions options +) { +final Map> topicFutures = new HashMap<>(topicNames.size()); +final ArrayList topicNamesList = new ArrayList<>(); +for (String topicName : topicNames) { +if (topicNameIsUnrepresentable(topicName)) { +KafkaFutureImpl future = new KafkaFutureImpl<>(); +future.completeExceptionally(new InvalidTopicException("The given topic name '" + +topicName + "' cannot be represented in a request.")); +topicFutures.put(topicName, future); +} else if (!topicFutures.containsKey(topicName)) { +topicFutures.put(topicName, new KafkaFutureImpl<>()); +topicNamesList.add(topicName); +} } + +if (topicNamesList.isEmpty()) { +return new HashMap<>(topicFutures); +} + +// First, we need to retrieve the node info. +DescribeClusterResult clusterResult = describeCluster(); +Map nodes; +try { +nodes = clusterResult.nodes().get().stream().collect(Collectors.toMap(Node::id, node -> node)); +} catch (InterruptedException | ExecutionException e) { +completeAllExceptionally(topicFutures.values(), e.getCause()); +return new HashMap<>(topicFutures); +} + +final long now = time.milliseconds(); +Call call = new Call("describeTopicPartitions", calcDeadlineMs(now, options.timeoutMs()), +new LeastLoadedNodeProvider()) { +Map pendingTopics = +topicNamesList.stream().map(topicName -> new TopicRequest().setName(topicName)) +.collect(Collectors.toMap(topicRequest -> topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, TreeMap::new)); + +TopicDescription partiallyFinishedTopicDescription = null; + +@Override +DescribeTopicPartitionsRequest.Builder createRequest(int timeoutMs) { +DescribeTopicPartitionsRequestData request = new DescribeTopicPartitionsRequestData() +.setTopics(new ArrayList<>(pendingTopics.values())) + .setResponsePartitionLimit(options.partitionSizeLimitPerResponse()); +if (partiallyFinishedTopicDescription != null) { +// If the previous cursor points to the partition 0, the cursor will not be set as the first one +// in the topic list should be the previous cursor topic. +request.setCursor(new DescribeTopicPartitionsRequestData.Cursor() +.setTopicName(partiallyFinishedTopicDescription.name()) + .setPartitionIndex(partiallyFinishedTopicDescription.partitions().size()) +); +} +return new DescribeTopicPartitionsRequest.Builder(request); +} + +@Override +void handleResponse(AbstractResponse abstractResponse) { +DescribeTopicPartitionsResponse response = (DescribeTopicPartitionsResponse) abstractResponse; +DescribeTopicPartitionsResponseData.Cursor responseCursor = response.data().nextCursor(); +// The topicDescription for the cursor topic of the current batch. +TopicDescription nextTopicDescription = null; + +for (DescribeTopicPartitionsResponseTopic topic : response.data().topics()) { +String topicName = topic.name(); +Errors error = Errors.forCode(topic.errorCode()); + +KafkaFutureImpl future = topicFutures.get(topicName); + +if (error != Errors.NONE) { +future.completeExceptionally(error.exception()); +pendingTopics.remove(topicName); +if (responseCursor != null && responseCursor.topicName().equals(topicName)) { +responseCursor = null; +} +continue; +} + +TopicDescription currentTopicDescription = getTopicDescriptionFromDescribeTopicsResponseTopic(topic, nodes); + +if (partiallyFinish
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
CalvinConfluent commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1526690281 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2185,9 +2179,143 @@ void handleFailure(Throwable throwable) { completeAllExceptionally(topicFutures.values(), throwable); } }; -if (!topicNamesList.isEmpty()) { -runnable.call(call, now); +return call; Review Comment: The current plan is to try new API first and if the cluster does not support the API, it switches to the MetadataAPI. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
CalvinConfluent commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1526685631 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2256,6 +2384,26 @@ void handleFailure(Throwable throwable) { return new HashMap<>(topicFutures); } +private TopicDescription getTopicDescriptionFromDescribeTopicsResponseTopic( +DescribeTopicPartitionsResponseTopic topic, +Map nodes +) { +List partitionInfos = topic.partitions(); +List partitions = new ArrayList<>(partitionInfos.size()); +for (DescribeTopicPartitionsResponsePartition partitionInfo : partitionInfos) { +TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo( +partitionInfo.partitionIndex(), +nodes.get(partitionInfo.leaderId()), +partitionInfo.replicaNodes().stream().map(id -> nodes.get(id)).collect(Collectors.toList()), +partitionInfo.isrNodes().stream().map(id -> nodes.get(id)).collect(Collectors.toList()), +partitionInfo.eligibleLeaderReplicas().stream().map(id -> nodes.get(id)).collect(Collectors.toList()), +partitionInfo.lastKnownElr().stream().map(id -> nodes.get(id)).collect(Collectors.toList())); +partitions.add(topicPartitionInfo); +} + partitions.sort(Comparator.comparingInt(TopicPartitionInfo::partition)); Review Comment: Good catch, removed the sorting. The server already guarantees the partitioning order. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
CalvinConfluent commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1526680281 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2185,9 +2179,143 @@ void handleFailure(Throwable throwable) { completeAllExceptionally(topicFutures.values(), throwable); } }; -if (!topicNamesList.isEmpty()) { -runnable.call(call, now); +return call; +} + +@SuppressWarnings({"MethodLength", "NPathComplexity"}) Review Comment: The handlerResponse still reports NPathComplexity after splitting the method, but it is not worth splitting that method further. Only apply the NPathComplexity suppress there. ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2256,6 +2384,26 @@ void handleFailure(Throwable throwable) { return new HashMap<>(topicFutures); } +private TopicDescription getTopicDescriptionFromDescribeTopicsResponseTopic( +DescribeTopicPartitionsResponseTopic topic, +Map nodes +) { +List partitionInfos = topic.partitions(); +List partitions = new ArrayList<>(partitionInfos.size()); +for (DescribeTopicPartitionsResponsePartition partitionInfo : partitionInfos) { +TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo( +partitionInfo.partitionIndex(), +nodes.get(partitionInfo.leaderId()), +partitionInfo.replicaNodes().stream().map(id -> nodes.get(id)).collect(Collectors.toList()), +partitionInfo.isrNodes().stream().map(id -> nodes.get(id)).collect(Collectors.toList()), +partitionInfo.eligibleLeaderReplicas().stream().map(id -> nodes.get(id)).collect(Collectors.toList()), +partitionInfo.lastKnownElr().stream().map(id -> nodes.get(id)).collect(Collectors.toList())); Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
mumrah commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1526390858 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2185,9 +2179,143 @@ void handleFailure(Throwable throwable) { completeAllExceptionally(topicFutures.values(), throwable); } }; -if (!topicNamesList.isEmpty()) { -runnable.call(call, now); +return call; +} + +@SuppressWarnings({"MethodLength", "NPathComplexity"}) Review Comment: Instead of suppressing these warnings, can we refactor the method to be less complex? For example, the Call instance could be defined elsewhere ## clients/src/main/java/org/apache/kafka/common/TopicPartitionInfo.java: ## @@ -40,12 +42,26 @@ public class TopicPartitionInfo { * @param replicas the replicas of the partition in the same order as the replica assignment (the preferred replica * is the head of the list) * @param isr the in-sync replicas + * @param elr the eligible leader replicas + * @param lastKnownElr the last known eligible leader replicas. */ +public TopicPartitionInfo(int partition, Node leader, List replicas, List isr, + List elr, List lastKnownElr) { Review Comment: nit: use this style if you need more than one line ``` Foo( int x, int y ) { ``` ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2185,9 +2179,143 @@ void handleFailure(Throwable throwable) { completeAllExceptionally(topicFutures.values(), throwable); } }; -if (!topicNamesList.isEmpty()) { -runnable.call(call, now); +return call; Review Comment: Why did we remove this optimization? Seems like we should try to keep the Metadata API code unchanged if possible ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2185,9 +2179,143 @@ void handleFailure(Throwable throwable) { completeAllExceptionally(topicFutures.values(), throwable); } }; -if (!topicNamesList.isEmpty()) { -runnable.call(call, now); +return call; +} + +@SuppressWarnings({"MethodLength", "NPathComplexity"}) +private Map> handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi( +final Collection topicNames, +DescribeTopicsOptions options +) { +final Map> topicFutures = new HashMap<>(topicNames.size()); +final ArrayList topicNamesList = new ArrayList<>(); +for (String topicName : topicNames) { +if (topicNameIsUnrepresentable(topicName)) { +KafkaFutureImpl future = new KafkaFutureImpl<>(); +future.completeExceptionally(new InvalidTopicException("The given topic name '" + +topicName + "' cannot be represented in a request.")); +topicFutures.put(topicName, future); +} else if (!topicFutures.containsKey(topicName)) { +topicFutures.put(topicName, new KafkaFutureImpl<>()); +topicNamesList.add(topicName); +} } + +if (topicNamesList.isEmpty()) { +return new HashMap<>(topicFutures); +} + +// First, we need to retrieve the node info. +DescribeClusterResult clusterResult = describeCluster(); +Map nodes; +try { +nodes = clusterResult.nodes().get().stream().collect(Collectors.toMap(Node::id, node -> node)); +} catch (InterruptedException | ExecutionException e) { +completeAllExceptionally(topicFutures.values(), e.getCause()); +return new HashMap<>(topicFutures); +} + +final long now = time.milliseconds(); +Call call = new Call("describeTopicPartitions", calcDeadlineMs(now, options.timeoutMs()), +new LeastLoadedNodeProvider()) { +Map pendingTopics = +topicNamesList.stream().map(topicName -> new TopicRequest().setName(topicName)) +.collect(Collectors.toMap(topicRequest -> topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, TreeMap::new)); + +TopicDescription partiallyFinishedTopicDescription = null; + +@Override +DescribeTopicPartitionsRequest.Builder createRequest(int timeoutMs) { +DescribeTopicPartitionsRequestData request = new DescribeTopicPartitionsRequestData() +.setTopics(new ArrayList<>(pendingTopics.values())) + .setResponsePartitionLimit(options.partitionSizeLimitPerResponse()); +if (partiallyFinishedTopicDescription != null) { +// If the previous cursor points to the partition 0, the cursor will not be set
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
artemlivshits commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1523996339 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2185,9 +2179,144 @@ void handleFailure(Throwable throwable) { completeAllExceptionally(topicFutures.values(), throwable); } }; -if (!topicNamesList.isEmpty()) { -runnable.call(call, now); +return call; +} + +@SuppressWarnings({"MethodLength", "NPathComplexity"}) +private Map> handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi( +final Collection topicNames, +DescribeTopicsOptions options +) { +final Map> topicFutures = new HashMap<>(topicNames.size()); +final ArrayList topicNamesList = new ArrayList<>(); +for (String topicName : topicNames) { +if (topicNameIsUnrepresentable(topicName)) { +KafkaFutureImpl future = new KafkaFutureImpl<>(); +future.completeExceptionally(new InvalidTopicException("The given topic name '" + +topicName + "' cannot be represented in a request.")); +topicFutures.put(topicName, future); +} else if (!topicFutures.containsKey(topicName)) { +topicFutures.put(topicName, new KafkaFutureImpl<>()); +topicNamesList.add(topicName); +} } + +if (topicNamesList.isEmpty()) { +return new HashMap<>(topicFutures); +} + +// First, we need to retrieve the node info. +DescribeClusterResult clusterResult = describeCluster(); +Map nodes; +try { +nodes = clusterResult.nodes().get().stream().collect(Collectors.toMap(Node::id, node -> node)); +} catch (InterruptedException | ExecutionException e) { +completeAllExceptionally(topicFutures.values(), e.getCause()); +return new HashMap<>(topicFutures); +} + +final long now = time.milliseconds(); +Call call = new Call("describeTopicPartitions", calcDeadlineMs(now, options.timeoutMs()), +new LeastLoadedNodeProvider()) { +Map pendingTopics = +topicNamesList.stream().map(topicName -> new TopicRequest().setName(topicName)) +.collect(Collectors.toMap(topicRequest -> topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, TreeMap::new)); + +TopicDescription partiallyFinishedTopicDescription = null; + +@Override +DescribeTopicPartitionsRequest.Builder createRequest(int timeoutMs) { +DescribeTopicPartitionsRequestData request = new DescribeTopicPartitionsRequestData() +.setTopics(new ArrayList<>(pendingTopics.values())) + .setResponsePartitionLimit(options.partitionSizeLimitPerResponse()); +if (partiallyFinishedTopicDescription != null) { +// If the previous cursor points to the partition 0, the cursor will not be set as the first one +// in the topic list should be the previous cursor topic. +request.setCursor(new DescribeTopicPartitionsRequestData.Cursor() +.setTopicName(partiallyFinishedTopicDescription.name()) + .setPartitionIndex(partiallyFinishedTopicDescription.partitions().size()) +); +} +return new DescribeTopicPartitionsRequest.Builder(request); +} + +@Override +void handleResponse(AbstractResponse abstractResponse) { +DescribeTopicPartitionsResponse response = (DescribeTopicPartitionsResponse) abstractResponse; +DescribeTopicPartitionsResponseData.Cursor responseCursor = response.data().nextCursor(); +// The topicDescription for the cursor topic of the current batch. +TopicDescription nextTopicDescription = null; + +for (DescribeTopicPartitionsResponseTopic topic : response.data().topics()) { +String topicName = topic.name(); +Errors error = Errors.forCode(topic.errorCode()); + +KafkaFutureImpl future = topicFutures.get(topicName); + +if (error != Errors.NONE) { +future.completeExceptionally(error.exception()); +pendingTopics.remove(topicName); +if (responseCursor != null && responseCursor.topicName().equals(topicName)) { +responseCursor = null; +} +continue; +} + +TopicDescription currentTopicDescription = getTopicDescriptionFromDescribeTopicsResponseTopic(topic, nodes); + +if (partiallyFinished
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
CalvinConfluent commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1523971792 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2184,9 +2178,139 @@ void handleFailure(Throwable throwable) { completeAllExceptionally(topicFutures.values(), throwable); } }; -if (!topicNamesList.isEmpty()) { -runnable.call(call, now); +return call; +} + +@SuppressWarnings({"MethodLength", "NPathComplexity"}) +private Map> handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi( +final Collection topicNames, +DescribeTopicsOptions options +) { +final Map> topicFutures = new HashMap<>(topicNames.size()); +final ArrayList topicNamesList = new ArrayList<>(); +for (String topicName : topicNames) { +if (topicNameIsUnrepresentable(topicName)) { +KafkaFutureImpl future = new KafkaFutureImpl<>(); +future.completeExceptionally(new InvalidTopicException("The given topic name '" + +topicName + "' cannot be represented in a request.")); +topicFutures.put(topicName, future); +} else if (!topicFutures.containsKey(topicName)) { +topicFutures.put(topicName, new KafkaFutureImpl<>()); +topicNamesList.add(topicName); +} } + +if (topicNamesList.isEmpty()) { +return new HashMap<>(topicFutures); +} + +// First, we need to retrieve the node info. +DescribeClusterResult clusterResult = describeCluster(); +Map nodes; +try { +nodes = clusterResult.nodes().get().stream().collect(Collectors.toMap(Node::id, node -> node)); +} catch (InterruptedException | ExecutionException e) { +completeAllExceptionally(topicFutures.values(), e.getCause()); +return new HashMap<>(topicFutures); +} + +final long now = time.milliseconds(); +Call call = new Call("describeTopicPartitions", calcDeadlineMs(now, options.timeoutMs()), +new LeastLoadedNodeProvider()) { +Map pendingTopics = +topicNamesList.stream().map(topicName -> new TopicRequest().setName(topicName)) +.collect(Collectors.toMap(topicRequest -> topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, TreeMap::new)); + +TopicDescription partiallyFinishedTopicDescription = null; +TopicDescription nextTopicDescription = null; + +@Override +DescribeTopicPartitionsRequest.Builder createRequest(int timeoutMs) { +DescribeTopicPartitionsRequestData request = new DescribeTopicPartitionsRequestData() +.setTopics(new ArrayList<>(pendingTopics.values())) + .setResponsePartitionLimit(options.partitionSizeLimitPerResponse()); +if (partiallyFinishedTopicDescription != null) { Review Comment: Correct, updated the comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
artemlivshits commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1523702932 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2184,9 +2178,139 @@ void handleFailure(Throwable throwable) { completeAllExceptionally(topicFutures.values(), throwable); } }; -if (!topicNamesList.isEmpty()) { -runnable.call(call, now); +return call; +} + +@SuppressWarnings({"MethodLength", "NPathComplexity"}) +private Map> handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi( +final Collection topicNames, +DescribeTopicsOptions options +) { +final Map> topicFutures = new HashMap<>(topicNames.size()); +final ArrayList topicNamesList = new ArrayList<>(); +for (String topicName : topicNames) { +if (topicNameIsUnrepresentable(topicName)) { +KafkaFutureImpl future = new KafkaFutureImpl<>(); +future.completeExceptionally(new InvalidTopicException("The given topic name '" + +topicName + "' cannot be represented in a request.")); +topicFutures.put(topicName, future); +} else if (!topicFutures.containsKey(topicName)) { +topicFutures.put(topicName, new KafkaFutureImpl<>()); +topicNamesList.add(topicName); +} } + +if (topicNamesList.isEmpty()) { +return new HashMap<>(topicFutures); +} + +// First, we need to retrieve the node info. +DescribeClusterResult clusterResult = describeCluster(); +Map nodes; +try { +nodes = clusterResult.nodes().get().stream().collect(Collectors.toMap(Node::id, node -> node)); +} catch (InterruptedException | ExecutionException e) { +completeAllExceptionally(topicFutures.values(), e.getCause()); +return new HashMap<>(topicFutures); +} + +final long now = time.milliseconds(); +Call call = new Call("describeTopicPartitions", calcDeadlineMs(now, options.timeoutMs()), +new LeastLoadedNodeProvider()) { +Map pendingTopics = +topicNamesList.stream().map(topicName -> new TopicRequest().setName(topicName)) +.collect(Collectors.toMap(topicRequest -> topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, TreeMap::new)); + +TopicDescription partiallyFinishedTopicDescription = null; +TopicDescription nextTopicDescription = null; + +@Override +DescribeTopicPartitionsRequest.Builder createRequest(int timeoutMs) { +DescribeTopicPartitionsRequestData request = new DescribeTopicPartitionsRequestData() +.setTopics(new ArrayList<>(pendingTopics.values())) + .setResponsePartitionLimit(options.partitionSizeLimitPerResponse()); +if (partiallyFinishedTopicDescription != null) { Review Comment: Ideally we should drive the pagination from the cursor, rather than spreading the logic between multiple states. We plan to get rid of topic list eventually and this logic would be easy to miss. ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2184,9 +2178,139 @@ void handleFailure(Throwable throwable) { completeAllExceptionally(topicFutures.values(), throwable); } }; -if (!topicNamesList.isEmpty()) { -runnable.call(call, now); +return call; +} + +@SuppressWarnings({"MethodLength", "NPathComplexity"}) +private Map> handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi( +final Collection topicNames, +DescribeTopicsOptions options +) { +final Map> topicFutures = new HashMap<>(topicNames.size()); +final ArrayList topicNamesList = new ArrayList<>(); +for (String topicName : topicNames) { +if (topicNameIsUnrepresentable(topicName)) { +KafkaFutureImpl future = new KafkaFutureImpl<>(); +future.completeExceptionally(new InvalidTopicException("The given topic name '" + +topicName + "' cannot be represented in a request.")); +topicFutures.put(topicName, future); +} else if (!topicFutures.containsKey(topicName)) { +topicFutures.put(topicName, new KafkaFutureImpl<>()); +topicNamesList.add(topicName); +} } + +if (topicNamesList.isEmpty()) { +return new HashMap<>(topicFutures); +} + +// First, we need to retrieve the node info. +DescribeClusterResult clusterResult = describeCluster(); +Map nodes; +try { +nodes = clusterResult.nodes().get().stream().collect(Collectors.toM
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
CalvinConfluent commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1520495884 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2184,9 +2178,139 @@ void handleFailure(Throwable throwable) { completeAllExceptionally(topicFutures.values(), throwable); } }; -if (!topicNamesList.isEmpty()) { -runnable.call(call, now); +return call; +} + +@SuppressWarnings({"MethodLength", "NPathComplexity"}) +private Map> handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi( +final Collection topicNames, +DescribeTopicsOptions options +) { +final Map> topicFutures = new HashMap<>(topicNames.size()); +final ArrayList topicNamesList = new ArrayList<>(); +for (String topicName : topicNames) { +if (topicNameIsUnrepresentable(topicName)) { +KafkaFutureImpl future = new KafkaFutureImpl<>(); +future.completeExceptionally(new InvalidTopicException("The given topic name '" + +topicName + "' cannot be represented in a request.")); +topicFutures.put(topicName, future); +} else if (!topicFutures.containsKey(topicName)) { +topicFutures.put(topicName, new KafkaFutureImpl<>()); +topicNamesList.add(topicName); +} } + +if (topicNamesList.isEmpty()) { +return new HashMap<>(topicFutures); +} + +// First, we need to retrieve the node info. +DescribeClusterResult clusterResult = describeCluster(); +Map nodes; +try { +nodes = clusterResult.nodes().get().stream().collect(Collectors.toMap(Node::id, node -> node)); +} catch (InterruptedException | ExecutionException e) { +completeAllExceptionally(topicFutures.values(), e.getCause()); +return new HashMap<>(topicFutures); +} + +final long now = time.milliseconds(); +Call call = new Call("describeTopicPartitions", calcDeadlineMs(now, options.timeoutMs()), +new LeastLoadedNodeProvider()) { +Map pendingTopics = +topicNamesList.stream().map(topicName -> new TopicRequest().setName(topicName)) +.collect(Collectors.toMap(topicRequest -> topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, TreeMap::new)); + +TopicDescription partiallyFinishedTopicDescription = null; +TopicDescription nextTopicDescription = null; + +@Override +DescribeTopicPartitionsRequest.Builder createRequest(int timeoutMs) { +DescribeTopicPartitionsRequestData request = new DescribeTopicPartitionsRequestData() +.setTopics(new ArrayList<>(pendingTopics.values())) + .setResponsePartitionLimit(options.partitionSizeLimitPerResponse()); +if (partiallyFinishedTopicDescription != null) { +request.setCursor(new DescribeTopicPartitionsRequestData.Cursor() +.setTopicName(partiallyFinishedTopicDescription.name()) + .setPartitionIndex(partiallyFinishedTopicDescription.partitions().size()) +); +} +return new DescribeTopicPartitionsRequest.Builder(request); +} + +@Override +void handleResponse(AbstractResponse abstractResponse) { +DescribeTopicPartitionsResponse response = (DescribeTopicPartitionsResponse) abstractResponse; +DescribeTopicPartitionsResponseData.Cursor responseCursor = response.data().nextCursor(); + +for (DescribeTopicPartitionsResponseTopic topic : response.data().topics()) { +String topicName = topic.name(); +Errors error = Errors.forCode(topic.errorCode()); + +KafkaFutureImpl future = topicFutures.get(topicName); + +if (error != Errors.NONE) { +future.completeExceptionally(error.exception()); +pendingTopics.remove(topicName); +if (responseCursor != null && responseCursor.topicName().equals(topicName)) { +responseCursor = null; +} +continue; +} + +TopicDescription currentTopicDescription = getTopicDescriptionFromDescribeTopicsResponseTopic(topic, nodes); + +if (partiallyFinishedTopicDescription != null && partiallyFinishedTopicDescription.name().equals(topicName)) { + partiallyFinishedTopicDescription.partitions().addAll(currentTopicDescription.partitions()); +continue; +} + +
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
CalvinConfluent commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1520498085 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2184,9 +2178,139 @@ void handleFailure(Throwable throwable) { completeAllExceptionally(topicFutures.values(), throwable); } }; -if (!topicNamesList.isEmpty()) { -runnable.call(call, now); +return call; +} + +@SuppressWarnings({"MethodLength", "NPathComplexity"}) +private Map> handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi( +final Collection topicNames, +DescribeTopicsOptions options +) { +final Map> topicFutures = new HashMap<>(topicNames.size()); +final ArrayList topicNamesList = new ArrayList<>(); +for (String topicName : topicNames) { +if (topicNameIsUnrepresentable(topicName)) { +KafkaFutureImpl future = new KafkaFutureImpl<>(); +future.completeExceptionally(new InvalidTopicException("The given topic name '" + +topicName + "' cannot be represented in a request.")); +topicFutures.put(topicName, future); +} else if (!topicFutures.containsKey(topicName)) { +topicFutures.put(topicName, new KafkaFutureImpl<>()); +topicNamesList.add(topicName); +} } + +if (topicNamesList.isEmpty()) { +return new HashMap<>(topicFutures); +} + +// First, we need to retrieve the node info. +DescribeClusterResult clusterResult = describeCluster(); +Map nodes; +try { +nodes = clusterResult.nodes().get().stream().collect(Collectors.toMap(Node::id, node -> node)); +} catch (InterruptedException | ExecutionException e) { +completeAllExceptionally(topicFutures.values(), e.getCause()); +return new HashMap<>(topicFutures); +} + +final long now = time.milliseconds(); +Call call = new Call("describeTopicPartitions", calcDeadlineMs(now, options.timeoutMs()), +new LeastLoadedNodeProvider()) { +Map pendingTopics = +topicNamesList.stream().map(topicName -> new TopicRequest().setName(topicName)) +.collect(Collectors.toMap(topicRequest -> topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, TreeMap::new)); + +TopicDescription partiallyFinishedTopicDescription = null; +TopicDescription nextTopicDescription = null; + +@Override +DescribeTopicPartitionsRequest.Builder createRequest(int timeoutMs) { +DescribeTopicPartitionsRequestData request = new DescribeTopicPartitionsRequestData() +.setTopics(new ArrayList<>(pendingTopics.values())) + .setResponsePartitionLimit(options.partitionSizeLimitPerResponse()); +if (partiallyFinishedTopicDescription != null) { Review Comment: Because we provide the topic list and the list only contains the topic we have not received the result, we can skip setting the partition 0 topics in the cursor. Such topics will be the first one in the Topics field. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
CalvinConfluent commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1520495884 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2184,9 +2178,139 @@ void handleFailure(Throwable throwable) { completeAllExceptionally(topicFutures.values(), throwable); } }; -if (!topicNamesList.isEmpty()) { -runnable.call(call, now); +return call; +} + +@SuppressWarnings({"MethodLength", "NPathComplexity"}) +private Map> handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi( +final Collection topicNames, +DescribeTopicsOptions options +) { +final Map> topicFutures = new HashMap<>(topicNames.size()); +final ArrayList topicNamesList = new ArrayList<>(); +for (String topicName : topicNames) { +if (topicNameIsUnrepresentable(topicName)) { +KafkaFutureImpl future = new KafkaFutureImpl<>(); +future.completeExceptionally(new InvalidTopicException("The given topic name '" + +topicName + "' cannot be represented in a request.")); +topicFutures.put(topicName, future); +} else if (!topicFutures.containsKey(topicName)) { +topicFutures.put(topicName, new KafkaFutureImpl<>()); +topicNamesList.add(topicName); +} } + +if (topicNamesList.isEmpty()) { +return new HashMap<>(topicFutures); +} + +// First, we need to retrieve the node info. +DescribeClusterResult clusterResult = describeCluster(); +Map nodes; +try { +nodes = clusterResult.nodes().get().stream().collect(Collectors.toMap(Node::id, node -> node)); +} catch (InterruptedException | ExecutionException e) { +completeAllExceptionally(topicFutures.values(), e.getCause()); +return new HashMap<>(topicFutures); +} + +final long now = time.milliseconds(); +Call call = new Call("describeTopicPartitions", calcDeadlineMs(now, options.timeoutMs()), +new LeastLoadedNodeProvider()) { +Map pendingTopics = +topicNamesList.stream().map(topicName -> new TopicRequest().setName(topicName)) +.collect(Collectors.toMap(topicRequest -> topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, TreeMap::new)); + +TopicDescription partiallyFinishedTopicDescription = null; +TopicDescription nextTopicDescription = null; + +@Override +DescribeTopicPartitionsRequest.Builder createRequest(int timeoutMs) { +DescribeTopicPartitionsRequestData request = new DescribeTopicPartitionsRequestData() +.setTopics(new ArrayList<>(pendingTopics.values())) + .setResponsePartitionLimit(options.partitionSizeLimitPerResponse()); +if (partiallyFinishedTopicDescription != null) { +request.setCursor(new DescribeTopicPartitionsRequestData.Cursor() +.setTopicName(partiallyFinishedTopicDescription.name()) + .setPartitionIndex(partiallyFinishedTopicDescription.partitions().size()) +); +} +return new DescribeTopicPartitionsRequest.Builder(request); +} + +@Override +void handleResponse(AbstractResponse abstractResponse) { +DescribeTopicPartitionsResponse response = (DescribeTopicPartitionsResponse) abstractResponse; +DescribeTopicPartitionsResponseData.Cursor responseCursor = response.data().nextCursor(); + +for (DescribeTopicPartitionsResponseTopic topic : response.data().topics()) { +String topicName = topic.name(); +Errors error = Errors.forCode(topic.errorCode()); + +KafkaFutureImpl future = topicFutures.get(topicName); + +if (error != Errors.NONE) { +future.completeExceptionally(error.exception()); +pendingTopics.remove(topicName); +if (responseCursor != null && responseCursor.topicName().equals(topicName)) { +responseCursor = null; +} +continue; +} + +TopicDescription currentTopicDescription = getTopicDescriptionFromDescribeTopicsResponseTopic(topic, nodes); + +if (partiallyFinishedTopicDescription != null && partiallyFinishedTopicDescription.name().equals(topicName)) { + partiallyFinishedTopicDescription.partitions().addAll(currentTopicDescription.partitions()); +continue; +} + +
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
artemlivshits commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1520443872 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2184,9 +2178,139 @@ void handleFailure(Throwable throwable) { completeAllExceptionally(topicFutures.values(), throwable); } }; -if (!topicNamesList.isEmpty()) { -runnable.call(call, now); +return call; +} + +@SuppressWarnings({"MethodLength", "NPathComplexity"}) +private Map> handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi( +final Collection topicNames, +DescribeTopicsOptions options +) { +final Map> topicFutures = new HashMap<>(topicNames.size()); +final ArrayList topicNamesList = new ArrayList<>(); +for (String topicName : topicNames) { +if (topicNameIsUnrepresentable(topicName)) { +KafkaFutureImpl future = new KafkaFutureImpl<>(); +future.completeExceptionally(new InvalidTopicException("The given topic name '" + +topicName + "' cannot be represented in a request.")); +topicFutures.put(topicName, future); +} else if (!topicFutures.containsKey(topicName)) { +topicFutures.put(topicName, new KafkaFutureImpl<>()); +topicNamesList.add(topicName); +} } + +if (topicNamesList.isEmpty()) { +return new HashMap<>(topicFutures); +} + +// First, we need to retrieve the node info. +DescribeClusterResult clusterResult = describeCluster(); +Map nodes; +try { +nodes = clusterResult.nodes().get().stream().collect(Collectors.toMap(Node::id, node -> node)); +} catch (InterruptedException | ExecutionException e) { +completeAllExceptionally(topicFutures.values(), e.getCause()); +return new HashMap<>(topicFutures); +} + +final long now = time.milliseconds(); +Call call = new Call("describeTopicPartitions", calcDeadlineMs(now, options.timeoutMs()), +new LeastLoadedNodeProvider()) { +Map pendingTopics = +topicNamesList.stream().map(topicName -> new TopicRequest().setName(topicName)) +.collect(Collectors.toMap(topicRequest -> topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, TreeMap::new)); + +TopicDescription partiallyFinishedTopicDescription = null; +TopicDescription nextTopicDescription = null; + +@Override +DescribeTopicPartitionsRequest.Builder createRequest(int timeoutMs) { +DescribeTopicPartitionsRequestData request = new DescribeTopicPartitionsRequestData() +.setTopics(new ArrayList<>(pendingTopics.values())) + .setResponsePartitionLimit(options.partitionSizeLimitPerResponse()); +if (partiallyFinishedTopicDescription != null) { +request.setCursor(new DescribeTopicPartitionsRequestData.Cursor() +.setTopicName(partiallyFinishedTopicDescription.name()) + .setPartitionIndex(partiallyFinishedTopicDescription.partitions().size()) +); +} +return new DescribeTopicPartitionsRequest.Builder(request); +} + +@Override +void handleResponse(AbstractResponse abstractResponse) { +DescribeTopicPartitionsResponse response = (DescribeTopicPartitionsResponse) abstractResponse; +DescribeTopicPartitionsResponseData.Cursor responseCursor = response.data().nextCursor(); + +for (DescribeTopicPartitionsResponseTopic topic : response.data().topics()) { +String topicName = topic.name(); +Errors error = Errors.forCode(topic.errorCode()); + +KafkaFutureImpl future = topicFutures.get(topicName); + +if (error != Errors.NONE) { +future.completeExceptionally(error.exception()); +pendingTopics.remove(topicName); +if (responseCursor != null && responseCursor.topicName().equals(topicName)) { +responseCursor = null; +} +continue; +} + +TopicDescription currentTopicDescription = getTopicDescriptionFromDescribeTopicsResponseTopic(topic, nodes); + +if (partiallyFinishedTopicDescription != null && partiallyFinishedTopicDescription.name().equals(topicName)) { + partiallyFinishedTopicDescription.partitions().addAll(currentTopicDescription.partitions()); +continue; +} + +
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
artemlivshits commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1516873731 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2184,9 +2178,138 @@ void handleFailure(Throwable throwable) { completeAllExceptionally(topicFutures.values(), throwable); } }; -if (!topicNamesList.isEmpty()) { -runnable.call(call, now); +return call; +} + +@SuppressWarnings("MethodLength") +private Map> handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi( +final Collection topicNames, +DescribeTopicsOptions options +) { +final Map> topicFutures = new HashMap<>(topicNames.size()); +final ArrayList topicNamesList = new ArrayList<>(); +for (String topicName : topicNames) { +if (topicNameIsUnrepresentable(topicName)) { +KafkaFutureImpl future = new KafkaFutureImpl<>(); +future.completeExceptionally(new InvalidTopicException("The given topic name '" + +topicName + "' cannot be represented in a request.")); +topicFutures.put(topicName, future); +} else if (!topicFutures.containsKey(topicName)) { +topicFutures.put(topicName, new KafkaFutureImpl<>()); +topicNamesList.add(topicName); +} +} + +if (topicNamesList.isEmpty()) { +return new HashMap<>(topicFutures); +} + +// First, we need to retrieve the node info. +DescribeClusterResult clusterResult = describeCluster(); +Map nodes; +try { +nodes = clusterResult.nodes().get().stream().collect(Collectors.toMap(Node::id, node -> node)); +} catch (InterruptedException | ExecutionException e) { +completeAllExceptionally(topicFutures.values(), e); +return new HashMap<>(topicFutures); } + +final long now = time.milliseconds(); +Call call = new Call("describeTopicPartitions", calcDeadlineMs(now, options.timeoutMs()), +new LeastLoadedNodeProvider()) { +Map pendingTopics = +topicNamesList.stream().map(topicName -> new TopicRequest().setName(topicName)) +.collect(Collectors.toMap(topicRequest -> topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, TreeMap::new)); + +DescribeTopicPartitionsRequestData.Cursor requestCursor = null; +TopicDescription partiallyFinishedTopicDescription = null; + +@Override +DescribeTopicPartitionsRequest.Builder createRequest(int timeoutMs) { +DescribeTopicPartitionsRequestData request = new DescribeTopicPartitionsRequestData() +.setTopics(new ArrayList<>(pendingTopics.values())) + .setResponsePartitionLimit(options.partitionSizeLimitPerResponse()); +request.setCursor(requestCursor); +return new DescribeTopicPartitionsRequest.Builder(request); +} + +@Override +void handleResponse(AbstractResponse abstractResponse) { +DescribeTopicPartitionsResponse response = (DescribeTopicPartitionsResponse) abstractResponse; +DescribeTopicPartitionsResponseData.Cursor responseCursor = response.data().nextCursor(); + +for (DescribeTopicPartitionsResponseTopic topic : response.data().topics()) { +String topicName = topic.name(); +Errors error = Errors.forCode(topic.errorCode()); + +KafkaFutureImpl future = topicFutures.get(topicName); +if (error != Errors.NONE) { +future.completeExceptionally(error.exception()); +topicFutures.remove(topicName); +pendingTopics.remove(topicName); +if (responseCursor != null && responseCursor.topicName().equals(topicName)) { +responseCursor = null; +} +continue; +} + +TopicDescription currentTopicDescription = getTopicDescriptionFromDescribeTopicsResponseTopic(topic, nodes); + +if (requestCursor != null && requestCursor.topicName().equals(topicName)) { Review Comment: But the user may not have specified the topic list at all, is it a good experience to show them an error if a topic they didn't specify got deleted? ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2184,9 +2178,138 @@ void handleFailure(Throwable throwable) { completeAllExceptionally(topicFutures.values(), throwable); } }; -if (!topicNamesList.isEmpty()) { -runnable.call(cal
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
CalvinConfluent commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1515290353 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2184,9 +2178,138 @@ void handleFailure(Throwable throwable) { completeAllExceptionally(topicFutures.values(), throwable); } }; -if (!topicNamesList.isEmpty()) { -runnable.call(call, now); +return call; +} + +@SuppressWarnings("MethodLength") +private Map> handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi( +final Collection topicNames, +DescribeTopicsOptions options +) { +final Map> topicFutures = new HashMap<>(topicNames.size()); +final ArrayList topicNamesList = new ArrayList<>(); +for (String topicName : topicNames) { +if (topicNameIsUnrepresentable(topicName)) { +KafkaFutureImpl future = new KafkaFutureImpl<>(); +future.completeExceptionally(new InvalidTopicException("The given topic name '" + +topicName + "' cannot be represented in a request.")); +topicFutures.put(topicName, future); +} else if (!topicFutures.containsKey(topicName)) { +topicFutures.put(topicName, new KafkaFutureImpl<>()); +topicNamesList.add(topicName); +} +} + +if (topicNamesList.isEmpty()) { +return new HashMap<>(topicFutures); +} + +// First, we need to retrieve the node info. +DescribeClusterResult clusterResult = describeCluster(); +Map nodes; +try { +nodes = clusterResult.nodes().get().stream().collect(Collectors.toMap(Node::id, node -> node)); +} catch (InterruptedException | ExecutionException e) { +completeAllExceptionally(topicFutures.values(), e); +return new HashMap<>(topicFutures); } + +final long now = time.milliseconds(); +Call call = new Call("describeTopicPartitions", calcDeadlineMs(now, options.timeoutMs()), +new LeastLoadedNodeProvider()) { +Map pendingTopics = +topicNamesList.stream().map(topicName -> new TopicRequest().setName(topicName)) +.collect(Collectors.toMap(topicRequest -> topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, TreeMap::new)); + +DescribeTopicPartitionsRequestData.Cursor requestCursor = null; +TopicDescription partiallyFinishedTopicDescription = null; + +@Override +DescribeTopicPartitionsRequest.Builder createRequest(int timeoutMs) { +DescribeTopicPartitionsRequestData request = new DescribeTopicPartitionsRequestData() +.setTopics(new ArrayList<>(pendingTopics.values())) + .setResponsePartitionLimit(options.partitionSizeLimitPerResponse()); +request.setCursor(requestCursor); +return new DescribeTopicPartitionsRequest.Builder(request); +} + +@Override +void handleResponse(AbstractResponse abstractResponse) { +DescribeTopicPartitionsResponse response = (DescribeTopicPartitionsResponse) abstractResponse; +DescribeTopicPartitionsResponseData.Cursor responseCursor = response.data().nextCursor(); + +for (DescribeTopicPartitionsResponseTopic topic : response.data().topics()) { +String topicName = topic.name(); +Errors error = Errors.forCode(topic.errorCode()); + +KafkaFutureImpl future = topicFutures.get(topicName); +if (error != Errors.NONE) { +future.completeExceptionally(error.exception()); +topicFutures.remove(topicName); +pendingTopics.remove(topicName); +if (responseCursor != null && responseCursor.topicName().equals(topicName)) { +responseCursor = null; +} +continue; +} + +TopicDescription currentTopicDescription = getTopicDescriptionFromDescribeTopicsResponseTopic(topic, nodes); + +if (requestCursor != null && requestCursor.topicName().equals(topicName)) { Review Comment: Because the current implementation sends the topic list with the cursor, if the cursor topic is missing from the topic list, the request will fail with the invalid request error. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apa
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
CalvinConfluent commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1515289272 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2184,9 +2178,138 @@ void handleFailure(Throwable throwable) { completeAllExceptionally(topicFutures.values(), throwable); } }; -if (!topicNamesList.isEmpty()) { -runnable.call(call, now); +return call; +} + +@SuppressWarnings("MethodLength") +private Map> handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi( +final Collection topicNames, +DescribeTopicsOptions options +) { +final Map> topicFutures = new HashMap<>(topicNames.size()); +final ArrayList topicNamesList = new ArrayList<>(); +for (String topicName : topicNames) { +if (topicNameIsUnrepresentable(topicName)) { +KafkaFutureImpl future = new KafkaFutureImpl<>(); +future.completeExceptionally(new InvalidTopicException("The given topic name '" + +topicName + "' cannot be represented in a request.")); +topicFutures.put(topicName, future); +} else if (!topicFutures.containsKey(topicName)) { +topicFutures.put(topicName, new KafkaFutureImpl<>()); +topicNamesList.add(topicName); +} +} + +if (topicNamesList.isEmpty()) { +return new HashMap<>(topicFutures); +} + +// First, we need to retrieve the node info. +DescribeClusterResult clusterResult = describeCluster(); +Map nodes; +try { +nodes = clusterResult.nodes().get().stream().collect(Collectors.toMap(Node::id, node -> node)); +} catch (InterruptedException | ExecutionException e) { +completeAllExceptionally(topicFutures.values(), e); +return new HashMap<>(topicFutures); } + +final long now = time.milliseconds(); +Call call = new Call("describeTopicPartitions", calcDeadlineMs(now, options.timeoutMs()), +new LeastLoadedNodeProvider()) { +Map pendingTopics = +topicNamesList.stream().map(topicName -> new TopicRequest().setName(topicName)) +.collect(Collectors.toMap(topicRequest -> topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, TreeMap::new)); + +DescribeTopicPartitionsRequestData.Cursor requestCursor = null; +TopicDescription partiallyFinishedTopicDescription = null; + +@Override +DescribeTopicPartitionsRequest.Builder createRequest(int timeoutMs) { +DescribeTopicPartitionsRequestData request = new DescribeTopicPartitionsRequestData() +.setTopics(new ArrayList<>(pendingTopics.values())) + .setResponsePartitionLimit(options.partitionSizeLimitPerResponse()); +request.setCursor(requestCursor); +return new DescribeTopicPartitionsRequest.Builder(request); +} + +@Override +void handleResponse(AbstractResponse abstractResponse) { +DescribeTopicPartitionsResponse response = (DescribeTopicPartitionsResponse) abstractResponse; +DescribeTopicPartitionsResponseData.Cursor responseCursor = response.data().nextCursor(); + +for (DescribeTopicPartitionsResponseTopic topic : response.data().topics()) { +String topicName = topic.name(); +Errors error = Errors.forCode(topic.errorCode()); + +KafkaFutureImpl future = topicFutures.get(topicName); +if (error != Errors.NONE) { +future.completeExceptionally(error.exception()); +topicFutures.remove(topicName); +pendingTopics.remove(topicName); +if (responseCursor != null && responseCursor.topicName().equals(topicName)) { +responseCursor = null; +} +continue; +} + +TopicDescription currentTopicDescription = getTopicDescriptionFromDescribeTopicsResponseTopic(topic, nodes); + +if (requestCursor != null && requestCursor.topicName().equals(topicName)) { +if (partiallyFinishedTopicDescription == null) { +// The previous round cursor can point to the partition 0 of the next topic. +partiallyFinishedTopicDescription = currentTopicDescription; +} else { + partiallyFinishedTopicDescription.partitions().addAll(currentTopicDescription.partitions()); +} + +if (responseCursor == null |
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
CalvinConfluent commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1515288246 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2184,9 +2177,122 @@ void handleFailure(Throwable throwable) { completeAllExceptionally(topicFutures.values(), throwable); } }; +return call; +} + +@SuppressWarnings("MethodLength") +private Map> handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi(final Collection topicNames, DescribeTopicsOptions options) { +final Map> topicFutures = new HashMap<>(topicNames.size()); +final ArrayList topicNamesList = new ArrayList<>(); +for (String topicName : topicNames) { +if (topicNameIsUnrepresentable(topicName)) { +KafkaFutureImpl future = new KafkaFutureImpl<>(); +future.completeExceptionally(new InvalidTopicException("The given topic name '" + +topicName + "' cannot be represented in a request.")); +topicFutures.put(topicName, future); +} else if (!topicFutures.containsKey(topicName)) { +topicFutures.put(topicName, new KafkaFutureImpl<>()); +topicNamesList.add(topicName); +} +} +final long now = time.milliseconds(); +Call call = new Call("describeTopicPartitions", calcDeadlineMs(now, options.timeoutMs()), +new LeastLoadedNodeProvider()) { +Map pendingTopics = +topicNamesList.stream().map(topicName -> new TopicRequest().setName(topicName)) +.collect(Collectors.toMap(topicRequest -> topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, TreeMap::new)); + +DescribeTopicPartitionsRequestData.Cursor requestCursor = null; +TopicDescription partiallyFinishedTopicDescription = null; + +@Override +DescribeTopicPartitionsRequest.Builder createRequest(int timeoutMs) { +DescribeTopicPartitionsRequestData request = new DescribeTopicPartitionsRequestData() +.setTopics(new ArrayList<>(pendingTopics.values())) + .setResponsePartitionLimit(options.partitionSizeLimitPerResponse()); +request.setCursor(requestCursor); +return new DescribeTopicPartitionsRequest.Builder(request); +} + +@Override +void handleResponse(AbstractResponse abstractResponse) { +DescribeTopicPartitionsResponse response = (DescribeTopicPartitionsResponse) abstractResponse; +DescribeTopicPartitionsResponseData.Cursor responseCursor = response.data().nextCursor(); + +for (DescribeTopicPartitionsResponseTopic topic : response.data().topics()) { +String topicName = topic.name(); +Errors error = Errors.forCode(topic.errorCode()); + +KafkaFutureImpl future = topicFutures.get(topicName); +if (error != Errors.NONE) { +future.completeExceptionally(error.exception()); +topicFutures.remove(topicName); Review Comment: Yes, we don't need to remove it. Also added a failure handling test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
artemlivshits commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1513820234 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2184,9 +2177,122 @@ void handleFailure(Throwable throwable) { completeAllExceptionally(topicFutures.values(), throwable); } }; +return call; +} + +@SuppressWarnings("MethodLength") +private Map> handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi(final Collection topicNames, DescribeTopicsOptions options) { +final Map> topicFutures = new HashMap<>(topicNames.size()); +final ArrayList topicNamesList = new ArrayList<>(); +for (String topicName : topicNames) { +if (topicNameIsUnrepresentable(topicName)) { +KafkaFutureImpl future = new KafkaFutureImpl<>(); +future.completeExceptionally(new InvalidTopicException("The given topic name '" + +topicName + "' cannot be represented in a request.")); +topicFutures.put(topicName, future); +} else if (!topicFutures.containsKey(topicName)) { +topicFutures.put(topicName, new KafkaFutureImpl<>()); +topicNamesList.add(topicName); +} +} +final long now = time.milliseconds(); +Call call = new Call("describeTopicPartitions", calcDeadlineMs(now, options.timeoutMs()), +new LeastLoadedNodeProvider()) { +Map pendingTopics = +topicNamesList.stream().map(topicName -> new TopicRequest().setName(topicName)) +.collect(Collectors.toMap(topicRequest -> topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, TreeMap::new)); + +DescribeTopicPartitionsRequestData.Cursor requestCursor = null; +TopicDescription partiallyFinishedTopicDescription = null; + +@Override +DescribeTopicPartitionsRequest.Builder createRequest(int timeoutMs) { +DescribeTopicPartitionsRequestData request = new DescribeTopicPartitionsRequestData() +.setTopics(new ArrayList<>(pendingTopics.values())) + .setResponsePartitionLimit(options.partitionSizeLimitPerResponse()); +request.setCursor(requestCursor); +return new DescribeTopicPartitionsRequest.Builder(request); +} + +@Override +void handleResponse(AbstractResponse abstractResponse) { +DescribeTopicPartitionsResponse response = (DescribeTopicPartitionsResponse) abstractResponse; +DescribeTopicPartitionsResponseData.Cursor responseCursor = response.data().nextCursor(); + +for (DescribeTopicPartitionsResponseTopic topic : response.data().topics()) { +String topicName = topic.name(); +Errors error = Errors.forCode(topic.errorCode()); + +KafkaFutureImpl future = topicFutures.get(topicName); +if (error != Errors.NONE) { +future.completeExceptionally(error.exception()); +topicFutures.remove(topicName); Review Comment: This will be executed concurrently with the thread that called handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi, so we may concurrently modify topic futures while the HashMap constructor at line 2313 is iterating over it and it's not a thread-safe collection. ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2184,9 +2178,138 @@ void handleFailure(Throwable throwable) { completeAllExceptionally(topicFutures.values(), throwable); } }; -if (!topicNamesList.isEmpty()) { -runnable.call(call, now); +return call; +} + +@SuppressWarnings("MethodLength") +private Map> handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi( +final Collection topicNames, +DescribeTopicsOptions options +) { +final Map> topicFutures = new HashMap<>(topicNames.size()); +final ArrayList topicNamesList = new ArrayList<>(); +for (String topicName : topicNames) { +if (topicNameIsUnrepresentable(topicName)) { +KafkaFutureImpl future = new KafkaFutureImpl<>(); +future.completeExceptionally(new InvalidTopicException("The given topic name '" + +topicName + "' cannot be represented in a request.")); +topicFutures.put(topicName, future); +} else if (!topicFutures.containsKey(topicName)) { +topicFutures.put(topicName, new KafkaFutureImpl<>()); +topicNamesList.add(topicName); +} +} + +if (topicNamesList.isEmpty()) { +return new HashMap<>(topicFuture
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
CalvinConfluent commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1515057898 ## clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java: ## @@ -1399,6 +1404,184 @@ public void testInvalidTopicNames() throws Exception { } } +@SuppressWarnings("NPathComplexity") Review Comment: Updated the server-side code and added more tests with invalid cursor. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
CalvinConfluent commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1515057124 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2276,6 +2415,11 @@ private Node leader(PartitionInfo partitionInfo) { return partitionInfo.leader(); } +// This is used in the describe topics path if using DescribeTopics API. +private Node replicaToFakeNode(int id) { +return new Node(id, "Dummy", 0); +} Review Comment: Now it will query the cluster info for the nodes. ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2276,6 +2399,11 @@ private Node leader(PartitionInfo partitionInfo) { return partitionInfo.leader(); } +// This is used in the describe topics path if using DescribeTopicPartitions API. +private Node replicaToFakeNode(int id) { +return new Node(id, "Dummy", 0); Review Comment: Good point. Updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
AndrewJSchofield commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1514119801 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2276,6 +2415,11 @@ private Node leader(PartitionInfo partitionInfo) { return partitionInfo.leader(); } +// This is used in the describe topics path if using DescribeTopics API. +private Node replicaToFakeNode(int id) { +return new Node(id, "Dummy", 0); +} Review Comment: Maybe you should create a new constructor for TopicPartitionInfo that doesn't require a Node. This faking is a bit ugly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
AndrewJSchofield commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1514117940 ## clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java: ## @@ -1399,6 +1404,184 @@ public void testInvalidTopicNames() throws Exception { } } +@SuppressWarnings("NPathComplexity") Review Comment: The client sends one. There will be multiple client implementations over time, so it's prudent to ensure that a malformed cursor is caught properly, just in case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
dajac commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1514092183 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2276,6 +2399,11 @@ private Node leader(PartitionInfo partitionInfo) { return partitionInfo.leader(); } +// This is used in the describe topics path if using DescribeTopicPartitions API. +private Node replicaToFakeNode(int id) { +return new Node(id, "Dummy", 0); Review Comment: Don't we break the contract of the admin api if we do this? The expectation is that the real nodes are returned. We could perhaps get them from the metadata? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
CalvinConfluent commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1513643851 ## clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java: ## @@ -47,8 +49,32 @@ public DescribeTopicsOptions includeAuthorizedOperations(boolean includeAuthoriz return this; } +/** + * Whether to use the DescribeTopicPartitions API. It should be set to false if DescribeTopicPartitions API is + * not supported. + * + */ +public DescribeTopicsOptions useDescribeTopicPartitionsApi(boolean useDescribeTopicPartitionsApi) { +this.useDescribeTopicPartitionsApi = useDescribeTopicPartitionsApi; +return this; +} + +// Note that, partitionSizeLimitPerResponse will not be effective if it is larger than the config +// max.request.partition.size.limit on the server side. Review Comment: Right now I don't think this config is useful because we are not doing the client-side pagination. The config only makes sense if one batch of partitions is large enough to cause client-side OOM. Maybe we should add this config in the future? What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
CalvinConfluent commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1513646133 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2276,6 +2415,11 @@ private Node leader(PartitionInfo partitionInfo) { return partitionInfo.leader(); } +// This is used in the describe topics path if using DescribeTopics API. +private Node replicaToFakeNode(int id) { +return new Node(id, "Dummy", 0); +} Review Comment: The DescribeTopicParitions does not provide the node info as Metadata Api does. However the TopicPartitionInfo constructor requires the node info, but the node info is useless in the describeTopic scenario. ## tools/src/main/java/org/apache/kafka/tools/TopicCommand.java: ## @@ -537,6 +544,18 @@ public Map listAllReassignments(Set
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
CalvinConfluent commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1513643851 ## clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java: ## @@ -47,8 +49,32 @@ public DescribeTopicsOptions includeAuthorizedOperations(boolean includeAuthoriz return this; } +/** + * Whether to use the DescribeTopicPartitions API. It should be set to false if DescribeTopicPartitions API is + * not supported. + * + */ +public DescribeTopicsOptions useDescribeTopicPartitionsApi(boolean useDescribeTopicPartitionsApi) { +this.useDescribeTopicPartitionsApi = useDescribeTopicPartitionsApi; +return this; +} + +// Note that, partitionSizeLimitPerResponse will not be effective if it is larger than the config +// max.request.partition.size.limit on the server side. Review Comment: The client can only know if the server-side limit is greater when the result is received. Actually, I think this config is only useful for testing. I am not sure whether any user will bother to change this config. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
CalvinConfluent commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1513641333 ## tools/src/main/java/org/apache/kafka/tools/TopicCommand.java: ## @@ -799,6 +823,11 @@ public TopicCommandOptions(String[] args) { "if set when creating topics, the action will only execute if the topic does not already exist."); excludeInternalTopicOpt = parser.accepts("exclude-internal", "exclude internal topics when running list or describe command. The internal topics will be listed by default"); +partitionSizeLimitPerResponseOpt = parser.accepts("partition-size-limit-per-response", Review Comment: user-describe-topics-api is removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
CalvinConfluent commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1513641009 ## clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java: ## @@ -1399,6 +1404,184 @@ public void testInvalidTopicNames() throws Exception { } } +@SuppressWarnings("NPathComplexity") Review Comment: Do you mean the server returns an invalid cursor or the client sends one? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
CalvinConfluent commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1513640718 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2190,6 +2201,117 @@ void handleFailure(Throwable throwable) { return new HashMap<>(topicFutures); } +@SuppressWarnings("MethodLength") +private Map> handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi(final Collection topicNames, DescribeTopicsOptions options) { +final Map> topicFutures = new HashMap<>(topicNames.size()); +final ArrayList topicNamesList = new ArrayList<>(); +for (String topicName : topicNames) { +if (topicNameIsUnrepresentable(topicName)) { +KafkaFutureImpl future = new KafkaFutureImpl<>(); +future.completeExceptionally(new InvalidTopicException("The given topic name '" + +topicName + "' cannot be represented in a request.")); +topicFutures.put(topicName, future); +} else if (!topicFutures.containsKey(topicName)) { +topicFutures.put(topicName, new KafkaFutureImpl<>()); +topicNamesList.add(topicName); +} +} +final long now = time.milliseconds(); +Call call = new Call("describeTopics", calcDeadlineMs(now, options.timeoutMs()), Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
CalvinConfluent commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1513640577 ## clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java: ## @@ -47,8 +49,32 @@ public DescribeTopicsOptions includeAuthorizedOperations(boolean includeAuthoriz return this; } +/** + * Whether to use the DescribeTopicPartitions API. It should be set to false if DescribeTopicPartitions API is + * not supported. + * + */ +public DescribeTopicsOptions useDescribeTopicPartitionsApi(boolean useDescribeTopicPartitionsApi) { +this.useDescribeTopicPartitionsApi = useDescribeTopicPartitionsApi; +return this; +} + +// Note that, partitionSizeLimitPerResponse will not be effective if it is larger than the config +// max.request.partition.size.limit on the server side. +public DescribeTopicsOptions partitionSizeLimitPerResponse(int partitionSizeLimitPerResponse) { +this.partitionSizeLimitPerResponse = partitionSizeLimitPerResponse; +return this; +} + public boolean includeAuthorizedOperations() { return includeAuthorizedOperations; } +public boolean useDescribeTopicPartitionsApi() { Review Comment: removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
CalvinConfluent commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1513640434 ## clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java: ## @@ -47,8 +49,32 @@ public DescribeTopicsOptions includeAuthorizedOperations(boolean includeAuthoriz return this; } +/** + * Whether to use the DescribeTopicPartitions API. It should be set to false if DescribeTopicPartitions API is + * not supported. + * + */ +public DescribeTopicsOptions useDescribeTopicPartitionsApi(boolean useDescribeTopicPartitionsApi) { Review Comment: Removed useDescribeTopicPartitionsApi. It was only used in the UnsupportedVersionException retry, but now I figured out a way to retry in the Call framework. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
kirktrue commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1513390603 ## clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java: ## @@ -47,8 +49,32 @@ public DescribeTopicsOptions includeAuthorizedOperations(boolean includeAuthoriz return this; } +/** + * Whether to use the DescribeTopicPartitions API. It should be set to false if DescribeTopicPartitions API is + * not supported. + * + */ +public DescribeTopicsOptions useDescribeTopicPartitionsApi(boolean useDescribeTopicPartitionsApi) { +this.useDescribeTopicPartitionsApi = useDescribeTopicPartitionsApi; +return this; +} + +// Note that, partitionSizeLimitPerResponse will not be effective if it is larger than the config +// max.request.partition.size.limit on the server side. Review Comment: Is there a warning logged in the case where the client sends a limit greater than what's allowed on the broker? ## tools/src/main/java/org/apache/kafka/tools/TopicCommand.java: ## @@ -537,6 +544,18 @@ public Map listAllReassignments(Set(topicFutures); } +@SuppressWarnings("MethodLength") +private Map> handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi(final Collection topicNames, DescribeTopicsOptions options) { +final Map> topicFutures = new HashMap<>(topicNames.size()); +final ArrayList topicNamesList = new ArrayList<>(); +for (String topicName : topicNames) { +if (topicNameIsUnrepresentable(topicName)) { +KafkaFutureImpl future = new KafkaFutureImpl<>(); +future.completeExceptionally(new InvalidTopicException("The given topic name '" + +topicName + "' cannot be represented in a request.")); +topicFutures.put(topicName, future); +} else if (!topicFutures.containsKey(topicName)) { +topicFutures.put(topicName, new KafkaFutureImpl<>()); +topicNamesList.add(topicName); +} +} +final long now = time.milliseconds(); +Call call = new Call("describeTopics", calcDeadlineMs(now, options.timeoutMs()), Review Comment: AFAICT, the `callName` used within the `Call` object is only used for logging. That said, there's no point in confusing the user who's looking through the logs. ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2276,6 +2415,11 @@ private Node leader(PartitionInfo partitionInfo) { return partitionInfo.leader(); } +// This is used in the describe topics path if using DescribeTopics API. +private Node replicaToFakeNode(int id) { +return new Node(id, "Dummy", 0); +} Review Comment: Just for my own understanding, why do we favor creating _fake_ nodes instead of looking up the _real_ nodes from the metadata or something? ## clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java: ## @@ -47,8 +49,32 @@ public DescribeTopicsOptions includeAuthorizedOperations(boolean includeAuthoriz return this; } +/** + * Whether to use the DescribeTopicPartitions API. It should be set to false if DescribeTopicPartitions API is + * not supported. + * + */ +public DescribeTopicsOptions useDescribeTopicPartitionsApi(boolean useDescribeTopicPartitionsApi) { Review Comment: Sorry for being daft, but when would the user know to set this one way or the other. Is this something that can be handled under the covers? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
dajac commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1513322953 ## clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java: ## @@ -47,8 +49,32 @@ public DescribeTopicsOptions includeAuthorizedOperations(boolean includeAuthoriz return this; } +/** + * Whether to use the DescribeTopicPartitions API. It should be set to false if DescribeTopicPartitions API is + * not supported. + * + */ +public DescribeTopicsOptions useDescribeTopicPartitionsApi(boolean useDescribeTopicPartitionsApi) { Review Comment: Shouldn't the selection be automatic? I don't think users will bother about this. Basically, the new API should be used when available and the old one when not. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
dajac commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1513322953 ## clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java: ## @@ -47,8 +49,32 @@ public DescribeTopicsOptions includeAuthorizedOperations(boolean includeAuthoriz return this; } +/** + * Whether to use the DescribeTopicPartitions API. It should be set to false if DescribeTopicPartitions API is + * not supported. + * + */ +public DescribeTopicsOptions useDescribeTopicPartitionsApi(boolean useDescribeTopicPartitionsApi) { Review Comment: Shouldn't the selection be automatic? I don't think uses will bother about this. Basically, the new API should be used when available and the old one when not. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
AndrewJSchofield commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1513310569 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2190,6 +2201,117 @@ void handleFailure(Throwable throwable) { return new HashMap<>(topicFutures); } +@SuppressWarnings("MethodLength") +private Map> handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi(final Collection topicNames, DescribeTopicsOptions options) { +final Map> topicFutures = new HashMap<>(topicNames.size()); +final ArrayList topicNamesList = new ArrayList<>(); +for (String topicName : topicNames) { +if (topicNameIsUnrepresentable(topicName)) { +KafkaFutureImpl future = new KafkaFutureImpl<>(); +future.completeExceptionally(new InvalidTopicException("The given topic name '" + +topicName + "' cannot be represented in a request.")); +topicFutures.put(topicName, future); +} else if (!topicFutures.containsKey(topicName)) { +topicFutures.put(topicName, new KafkaFutureImpl<>()); +topicNamesList.add(topicName); +} +} +final long now = time.milliseconds(); +Call call = new Call("describeTopics", calcDeadlineMs(now, options.timeoutMs()), Review Comment: Should this be "describeTopicPartitions"? ## clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java: ## @@ -47,8 +49,32 @@ public DescribeTopicsOptions includeAuthorizedOperations(boolean includeAuthoriz return this; } +/** + * Whether to use the DescribeTopicPartitions API. It should be set to false if DescribeTopicPartitions API is + * not supported. + * + */ +public DescribeTopicsOptions useDescribeTopicPartitionsApi(boolean useDescribeTopicPartitionsApi) { Review Comment: I think you need a small change to KIP-966 to document these changes to the admin API. ## tools/src/main/java/org/apache/kafka/tools/TopicCommand.java: ## @@ -799,6 +823,11 @@ public TopicCommandOptions(String[] args) { "if set when creating topics, the action will only execute if the topic does not already exist."); excludeInternalTopicOpt = parser.accepts("exclude-internal", "exclude internal topics when running list or describe command. The internal topics will be listed by default"); +partitionSizeLimitPerResponseOpt = parser.accepts("partition-size-limit-per-response", Review Comment: In other cases where a new API has been introduced, I think the principle followed is to try the new one without an option, and falling back if it is detected that it's required. That would be much nicer than expecting the innocent user from understanding what `user-describe-topics-api` means. ## clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java: ## @@ -1399,6 +1404,184 @@ public void testInvalidTopicNames() throws Exception { } } +@SuppressWarnings("NPathComplexity") Review Comment: Could we have a test with an invalid cursor? ## clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java: ## @@ -47,8 +49,32 @@ public DescribeTopicsOptions includeAuthorizedOperations(boolean includeAuthoriz return this; } +/** + * Whether to use the DescribeTopicPartitions API. It should be set to false if DescribeTopicPartitions API is + * not supported. + * + */ +public DescribeTopicsOptions useDescribeTopicPartitionsApi(boolean useDescribeTopicPartitionsApi) { +this.useDescribeTopicPartitionsApi = useDescribeTopicPartitionsApi; +return this; +} + +// Note that, partitionSizeLimitPerResponse will not be effective if it is larger than the config +// max.request.partition.size.limit on the server side. +public DescribeTopicsOptions partitionSizeLimitPerResponse(int partitionSizeLimitPerResponse) { +this.partitionSizeLimitPerResponse = partitionSizeLimitPerResponse; +return this; +} + public boolean includeAuthorizedOperations() { return includeAuthorizedOperations; } +public boolean useDescribeTopicPartitionsApi() { Review Comment: I suggest just `useDescribeTopicPartitions()`. In the Javadoc, you can mention that it's using the DescribeTopicPartitions API under the covers. Most users of the Kafka admin client would consider `KafkaAdminClient` to be the API, rather than the Kafka protocol which is what is meant here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries a
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
CalvinConfluent commented on PR #15470: URL: https://github.com/apache/kafka/pull/15470#issuecomment-1977980848 As discussed offline, we decided not to implement the pagination handling in this ticket. Here is the load-all-in-memory version to the https://github.com/apache/kafka/pull/15265 @artemlivshits @mumrah @kirktrue -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change [kafka]
artemlivshits commented on code in PR #15265: URL: https://github.com/apache/kafka/pull/15265#discussion_r1506982605 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2276,6 +2337,141 @@ private Node leader(PartitionInfo partitionInfo) { return partitionInfo.leader(); } +@Override +public void describeTopics( +TopicCollection topics, +DescribeTopicsOptions options, +AdminResultsSubscriber subscriber) { +if (topics instanceof TopicIdCollection) { +subscriber.onError( +new IllegalArgumentException("Currently the describeTopics subscription mode does not support topic IDs.") +); +return; +} +if (!(topics instanceof TopicNameCollection)) { +subscriber.onError( +new IllegalArgumentException("The TopicCollection: " + topics + " provided did not match any supported classes for describeTopics.") +); +return; +} + +TreeSet topicNames = new TreeSet<>(); +((TopicNameCollection) topics).topicNames().forEach(topicName -> { +if (topicNameIsUnrepresentable(topicName)) { + subscriber.onNext(DescribeTopicPartitionsResult.onError(topicName, new InvalidTopicException("The given topic name '" + +topicName + "' cannot be represented in a request."))); +} else { +topicNames.add(topicName); +} +}); + +RecurringCall call = new RecurringCall( Review Comment: I'm still not sure why we need the RecurringCall, I think something like this should be much less code: ``` ArrayBlockingQueue results = new ArrayBlockingQueue<>(5); Call call = new Call("describeTopics", calcDeadlineMs(now, options.timeoutMs()), new LeastLoadedNodeProvider()) { DescribeTopicPartitionsRequestData.Cursor currentCursor = new DescribeTopicPartitionsRequestData.Cursor(); // <...> @Override void handleResponse(AbstractResponse abstractResponse) { // ... Do the needful ... results.put(...); // ... if (hasMore) { // ... Set new cursor ... // ... runnable.call(this, now); } else { results.put(null); } } // <...> } runnable.call(call, time.milliseconds()); while (true) { DescribeTopicPartitionsResult result = results.take(); if (result == null) break; subscriber.onNext(result); } ``` And we won't need to create extra threads on the TopicCommand and etc. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change [kafka]
kirktrue commented on code in PR #15265: URL: https://github.com/apache/kafka/pull/15265#discussion_r1506743625 ## clients/src/main/java/org/apache/kafka/common/TopicPartitionInfo.java: ## @@ -79,9 +95,24 @@ public List isr() { return isr; } +/** + * Return the eligible leader replicas of the partition. Note that the ordering of the result is unspecified. + */ +public List eligibleLeaderReplicas() { +return elr; +} + +/** + * Return the last known eligible leader replicas of the partition. Note that the ordering of the result is unspecified. + */ +public List lastKnownEligibleLeaderReplicas() { +return lastKnownElr; +} Review Comment: Sorry, @CalvinConfluent, I've changed my mind 🤔 Using _ELR_ as an established acronym isn't too bad if one has read the KIP. I'd be happy with using `elr()` and `lastKnownElr()` as you originally had it. I would suggest if you do revert it, just explicitly call it out in the JavaDoc comments. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change [kafka]
kirktrue commented on code in PR #15265: URL: https://github.com/apache/kafka/pull/15265#discussion_r1506629098 ## clients/src/main/java/org/apache/kafka/clients/admin/Admin.java: ## @@ -335,6 +335,21 @@ default DescribeTopicsResult describeTopics(TopicCollection topics) { */ DescribeTopicsResult describeTopics(TopicCollection topics, DescribeTopicsOptions options); +/** + * Describe some topics in the cluster. + * + * When using topic IDs, this operation is supported by brokers with version 3.1.0 or higher. + * + * @param topics The topics to describe. + * @param options The options to use when describing the topics. + * @param subscriber The subscriber to consumer the results. + */ +default void describeTopics( +TopicCollection topics, +DescribeTopicsOptions options, +AdminResultsSubscriber subscriber) { +}; Review Comment: Just so I understand, is the default of a no-op intentional? ## clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java: ## @@ -30,6 +30,7 @@ public class DescribeTopicsOptions extends AbstractOptions { private boolean includeAuthorizedOperations; +private int partitionSizeLimitPerResponse; Review Comment: Is there an upper limit (besides Integer.MAX_SIZE 😏)? ## tools/src/main/java/org/apache/kafka/tools/TopicCommand.java: ## @@ -367,6 +372,14 @@ public void printDescription() { .map(node -> node.toString()) .collect(Collectors.joining(","))); } + +System.out.print("\tElr: " + info.eligibleLeaderReplicas().stream() Review Comment: ```suggestion System.out.print("\tELRs: " + info.eligibleLeaderReplicas().stream() ``` ## tools/src/main/java/org/apache/kafka/tools/TopicCommand.java: ## @@ -367,6 +372,14 @@ public void printDescription() { .map(node -> node.toString()) .collect(Collectors.joining(","))); } + +System.out.print("\tElr: " + info.eligibleLeaderReplicas().stream() +.map(node -> Integer.toString(node.id())) +.collect(Collectors.joining(","))); +System.out.print("\tLastKnownElr: " + info.lastKnownEligibleLeaderReplicas().stream() Review Comment: ```suggestion System.out.print("\tLast Known ELRs: " + info.lastKnownEligibleLeaderReplicas().stream() ``` ## clients/src/main/java/org/apache/kafka/clients/admin/Admin.java: ## @@ -335,6 +335,21 @@ default DescribeTopicsResult describeTopics(TopicCollection topics) { */ DescribeTopicsResult describeTopics(TopicCollection topics, DescribeTopicsOptions options); +/** + * Describe some topics in the cluster. + * + * When using topic IDs, this operation is supported by brokers with version 3.1.0 or higher. Review Comment: What is the behavior when running sub-3.1.0? ## clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicPartitionsResult.java: ## @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.protocol.Errors; + +import java.util.Collections; + +public class DescribeTopicPartitionsResult { +final public TopicDescription topicDescription; +final public Exception exception; Review Comment: Have we considered using `Optional` to lessen the chance of spurious NPEs? ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -994,6 +1002,37 @@ public boolean isInternal() { } } +abstract class RecurringCall { +private final String name; +final long deadlineMs; +private final AdminClientRunnable runnable; +KafkaFutureImpl nextRun; +abstract Call generateCall(); + +public RecurringCall(String name, long deadlineMs, AdminClientRunnable runnable) { +this.name = name; +this.deadlineMs = deadlineMs; +this.runnable = runnable; +} + +
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change [kafka]
CalvinConfluent commented on code in PR #15265: URL: https://github.com/apache/kafka/pull/15265#discussion_r1505044853 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -994,6 +1002,36 @@ public boolean isInternal() { } } +abstract class RecurringCall { +private final String name; +final long deadlineMs; +private final AdminClientRunnable runnable; +KafkaFutureImpl nextRun; +abstract Call generateCall(); + +public RecurringCall(String name, long deadlineMs, AdminClientRunnable runnable) { +this.name = name; +this.deadlineMs = deadlineMs; +this.runnable = runnable; +} + +public String toString() { +return "RecurCall(name=" + name + ", deadlineMs=" + deadlineMs + ")"; +} + +public void run() { +try { +do { +nextRun = new KafkaFutureImpl<>(); +Call call = generateCall(); +runnable.call(call, time.milliseconds()); +} while (nextRun.get()); +} catch (Exception e) { +log.info("Stop the recurring call " + name + " because " + e); Review Comment: Added the stack trace. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change [kafka]
CalvinConfluent commented on code in PR #15265: URL: https://github.com/apache/kafka/pull/15265#discussion_r1505044493 ## clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java: ## @@ -47,8 +49,25 @@ public DescribeTopicsOptions includeAuthorizedOperations(boolean includeAuthoriz return this; } +public DescribeTopicsOptions useDescribeTopicsApi(boolean useDescribeTopicsApi) { +this.useDescribeTopicsApi = useDescribeTopicsApi; +return this; +} Review Comment: Removed this option. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change [kafka]
CalvinConfluent commented on code in PR #15265: URL: https://github.com/apache/kafka/pull/15265#discussion_r1505044242 ## clients/src/main/java/org/apache/kafka/common/TopicPartitionInfo.java: ## @@ -79,9 +95,24 @@ public List isr() { return isr; } +/** + * Return the eligible leader replicas of the partition. Note that the ordering of the result is unspecified. + */ +public List elr() { +return elr; +} + +/** + * Return the last known eligible leader replicas of the partition. Note that the ordering of the result is unspecified. + */ +public List lastKnownElr() { +return lastKnownElr; +} + public String toString() { return "(partition=" + partition + ", leader=" + leader + ", replicas=" + -Utils.join(replicas, ", ") + ", isr=" + Utils.join(isr, ", ") + ")"; +Utils.join(replicas, ", ") + ", isr=" + Utils.join(isr, ", ") + +Utils.join(elr, ", ") + Utils.join(lastKnownElr, ", ") + ")"; Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change [kafka]
CalvinConfluent commented on code in PR #15265: URL: https://github.com/apache/kafka/pull/15265#discussion_r1505043369 ## tools/src/main/java/org/apache/kafka/tools/TopicCommand.java: ## @@ -558,20 +568,53 @@ public void describeTopic(TopicCommandOptions opts) throws ExecutionException, I } else { ensureTopicExists(topics, opts.topic().orElse(""), !opts.ifExists()); } -List topicDescriptions = new ArrayList<>(); if (!topicIds.isEmpty()) { Map descTopics = adminClient.describeTopics(TopicCollection.ofTopicIds(topicIds)).allTopicIds().get(); -topicDescriptions = new ArrayList<>(descTopics.values()); +describeTopicsFollowUp(new ArrayList<>(descTopics.values()), opts); +return; } if (!topics.isEmpty()) { -Map descTopics = - adminClient.describeTopics(TopicCollection.ofTopicNames(topics)).allTopicNames().get(); -topicDescriptions = new ArrayList<>(descTopics.values()); +final int partitionSizeLimit = opts.partitionSizeLimitPerResponse().orElse(2000); +try { +Iterator>> descTopicIterator = + adminClient.describeTopics(TopicCollection.ofTopicNames(topics), +new DescribeTopicsOptions().useDescribeTopicsApi(true) + .partitionSizeLimitPerResponse(partitionSizeLimit)).topicNameValuesIterator(); +while (descTopicIterator.hasNext()) { +List topicDescriptions = new ArrayList<>(); +for (int counter = 0; counter < partitionSizeLimit && descTopicIterator.hasNext();) { +org.apache.kafka.clients.admin.TopicDescription topicDescription = descTopicIterator.next().getValue().get(); +topicDescriptions.add(topicDescription); +counter += topicDescription.partitions().size(); +} +describeTopicsFollowUp(topicDescriptions, opts); Review Comment: Refactored. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change [kafka]
CalvinConfluent commented on code in PR #15265: URL: https://github.com/apache/kafka/pull/15265#discussion_r1505039131 ## tools/src/main/java/org/apache/kafka/tools/TopicCommand.java: ## @@ -558,20 +568,53 @@ public void describeTopic(TopicCommandOptions opts) throws ExecutionException, I } else { ensureTopicExists(topics, opts.topic().orElse(""), !opts.ifExists()); } -List topicDescriptions = new ArrayList<>(); if (!topicIds.isEmpty()) { Map descTopics = adminClient.describeTopics(TopicCollection.ofTopicIds(topicIds)).allTopicIds().get(); -topicDescriptions = new ArrayList<>(descTopics.values()); +describeTopicsFollowUp(new ArrayList<>(descTopics.values()), opts); +return; } if (!topics.isEmpty()) { -Map descTopics = - adminClient.describeTopics(TopicCollection.ofTopicNames(topics)).allTopicNames().get(); -topicDescriptions = new ArrayList<>(descTopics.values()); +final int partitionSizeLimit = opts.partitionSizeLimitPerResponse().orElse(2000); +try { +Iterator>> descTopicIterator = + adminClient.describeTopics(TopicCollection.ofTopicNames(topics), +new DescribeTopicsOptions().useDescribeTopicsApi(true) + .partitionSizeLimitPerResponse(partitionSizeLimit)).topicNameValuesIterator(); +while (descTopicIterator.hasNext()) { +List topicDescriptions = new ArrayList<>(); +for (int counter = 0; counter < partitionSizeLimit && descTopicIterator.hasNext();) { +org.apache.kafka.clients.admin.TopicDescription topicDescription = descTopicIterator.next().getValue().get(); +topicDescriptions.add(topicDescription); +counter += topicDescription.partitions().size(); +} +describeTopicsFollowUp(topicDescriptions, opts); +} +} catch (Exception e) { +if (e.toString().contains("UnsupportedVersionException")) { +// Retry the request with Metadata API. +Map descTopics = + adminClient.describeTopics(TopicCollection.ofTopicNames(topics), +new DescribeTopicsOptions().useDescribeTopicsApi(false) Review Comment: Metadata API does not support pagination. ## tools/src/main/java/org/apache/kafka/tools/TopicCommand.java: ## @@ -558,20 +568,53 @@ public void describeTopic(TopicCommandOptions opts) throws ExecutionException, I } else { ensureTopicExists(topics, opts.topic().orElse(""), !opts.ifExists()); } -List topicDescriptions = new ArrayList<>(); if (!topicIds.isEmpty()) { Map descTopics = adminClient.describeTopics(TopicCollection.ofTopicIds(topicIds)).allTopicIds().get(); -topicDescriptions = new ArrayList<>(descTopics.values()); +describeTopicsFollowUp(new ArrayList<>(descTopics.values()), opts); +return; } if (!topics.isEmpty()) { -Map descTopics = - adminClient.describeTopics(TopicCollection.ofTopicNames(topics)).allTopicNames().get(); -topicDescriptions = new ArrayList<>(descTopics.values()); +final int partitionSizeLimit = opts.partitionSizeLimitPerResponse().orElse(2000); +try { +Iterator>> descTopicIterator = + adminClient.describeTopics(TopicCollection.ofTopicNames(topics), +new DescribeTopicsOptions().useDescribeTopicsApi(true) + .partitionSizeLimitPerResponse(partitionSizeLimit)).topicNameValuesIterator(); +while (descTopicIterator.hasNext()) { +List topicDescriptions = new ArrayList<>(); +for (int counter = 0; counter < partitionSizeLimit && descTopicIterator.hasNext();) { +org.apache.kafka.clients.admin.TopicDescription topicDescription = descTopicIterator.next().getValue().get(); +topicDescriptions.add(topicDescription); +counter += topicDescription.partitions().size(); +} +describeTopicsFollowUp(topicDescriptions, opts); +} +} catch (Exception e) { +if (e.toString().contains("UnsupportedVersionException")) { +
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change [kafka]
CalvinConfluent commented on code in PR #15265: URL: https://github.com/apache/kafka/pull/15265#discussion_r1505035988 ## clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java: ## @@ -36,28 +38,38 @@ public class DescribeTopicsResult { private final Map> topicIdFutures; private final Map> nameFutures; +private final Iterator>> nameFuturesIterator; @Deprecated protected DescribeTopicsResult(Map> futures) { -this(null, futures); +this(null, futures, null); } // VisibleForTesting -protected DescribeTopicsResult(Map> topicIdFutures, Map> nameFutures) { -if (topicIdFutures != null && nameFutures != null) -throw new IllegalArgumentException("topicIdFutures and nameFutures cannot both be specified."); -if (topicIdFutures == null && nameFutures == null) -throw new IllegalArgumentException("topicIdFutures and nameFutures cannot both be null."); +protected DescribeTopicsResult( +Map> topicIdFutures, +Map> nameFutures, +Iterator>> nameFuturesIterator +) { +if (topicIdFutures != null && nameFutures != null && nameFuturesIterator != null) Review Comment: Refactored. ## clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java: ## @@ -36,28 +38,38 @@ public class DescribeTopicsResult { private final Map> topicIdFutures; private final Map> nameFutures; +private final Iterator>> nameFuturesIterator; @Deprecated protected DescribeTopicsResult(Map> futures) { -this(null, futures); +this(null, futures, null); } // VisibleForTesting -protected DescribeTopicsResult(Map> topicIdFutures, Map> nameFutures) { -if (topicIdFutures != null && nameFutures != null) -throw new IllegalArgumentException("topicIdFutures and nameFutures cannot both be specified."); -if (topicIdFutures == null && nameFutures == null) -throw new IllegalArgumentException("topicIdFutures and nameFutures cannot both be null."); +protected DescribeTopicsResult( +Map> topicIdFutures, +Map> nameFutures, +Iterator>> nameFuturesIterator +) { +if (topicIdFutures != null && nameFutures != null && nameFuturesIterator != null) +throw new IllegalArgumentException("topicIdFutures and nameFutures and nameFutureIterator cannot both be specified."); Review Comment: Refactored. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change [kafka]
kirktrue commented on code in PR #15265: URL: https://github.com/apache/kafka/pull/15265#discussion_r1503266038 ## clients/src/main/java/org/apache/kafka/common/TopicPartitionInfo.java: ## @@ -79,9 +95,24 @@ public List isr() { return isr; } +/** + * Return the eligible leader replicas of the partition. Note that the ordering of the result is unspecified. + */ +public List elr() { +return elr; +} + +/** + * Return the last known eligible leader replicas of the partition. Note that the ordering of the result is unspecified. + */ +public List lastKnownElr() { Review Comment: And likewise here: ```suggestion public List lastKnownEligibleLeaderReplicas() { ``` ## clients/src/main/java/org/apache/kafka/common/TopicPartitionInfo.java: ## @@ -79,9 +95,24 @@ public List isr() { return isr; } +/** + * Return the eligible leader replicas of the partition. Note that the ordering of the result is unspecified. + */ +public List elr() { Review Comment: Would you consider using the full name for the uninitiated? ```suggestion public List eligibleLeaderReplicas() { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change [kafka]
kirktrue commented on code in PR #15265: URL: https://github.com/apache/kafka/pull/15265#discussion_r1503175975 ## clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java: ## @@ -47,8 +49,25 @@ public DescribeTopicsOptions includeAuthorizedOperations(boolean includeAuthoriz return this; } +public DescribeTopicsOptions useDescribeTopicsApi(boolean useDescribeTopicsApi) { +this.useDescribeTopicsApi = useDescribeTopicsApi; +return this; +} Review Comment: Can we add some comments here for a developer to know _why_ to use the topics API or not? ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -994,6 +1002,36 @@ public boolean isInternal() { } } +abstract class RecurringCall { +private final String name; +final long deadlineMs; +private final AdminClientRunnable runnable; +KafkaFutureImpl nextRun; +abstract Call generateCall(); + +public RecurringCall(String name, long deadlineMs, AdminClientRunnable runnable) { +this.name = name; +this.deadlineMs = deadlineMs; +this.runnable = runnable; +} + +public String toString() { +return "RecurCall(name=" + name + ", deadlineMs=" + deadlineMs + ")"; Review Comment: ```suggestion return "RecurringCall(name=" + name + ", deadlineMs=" + deadlineMs + ")"; ``` ## clients/src/main/java/org/apache/kafka/common/TopicPartitionInfo.java: ## @@ -79,9 +95,24 @@ public List isr() { return isr; } +/** + * Return the eligible leader replicas of the partition. Note that the ordering of the result is unspecified. + */ +public List elr() { +return elr; +} + +/** + * Return the last known eligible leader replicas of the partition. Note that the ordering of the result is unspecified. + */ +public List lastKnownElr() { +return lastKnownElr; +} + public String toString() { return "(partition=" + partition + ", leader=" + leader + ", replicas=" + -Utils.join(replicas, ", ") + ", isr=" + Utils.join(isr, ", ") + ")"; +Utils.join(replicas, ", ") + ", isr=" + Utils.join(isr, ", ") + +Utils.join(elr, ", ") + Utils.join(lastKnownElr, ", ") + ")"; Review Comment: ```suggestion Utils.join(replicas, ", ") + ", isr=" + Utils.join(isr, ", ") + ", elr=" + Utils.join(elr, ", ") + ", lastKnownElr=" + Utils.join(lastKnownElr, ", ") + ")"; ``` ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -994,6 +1002,36 @@ public boolean isInternal() { } } +abstract class RecurringCall { +private final String name; +final long deadlineMs; +private final AdminClientRunnable runnable; +KafkaFutureImpl nextRun; +abstract Call generateCall(); + +public RecurringCall(String name, long deadlineMs, AdminClientRunnable runnable) { +this.name = name; +this.deadlineMs = deadlineMs; +this.runnable = runnable; +} + +public String toString() { +return "RecurCall(name=" + name + ", deadlineMs=" + deadlineMs + ")"; +} + +public void run() { +try { +do { +nextRun = new KafkaFutureImpl<>(); +Call call = generateCall(); +runnable.call(call, time.milliseconds()); +} while (nextRun.get()); +} catch (Exception e) { +log.info("Stop the recurring call " + name + " because " + e); Review Comment: Are we specifically wanting to avoid outputting a stack trace? ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2108,9 +2146,12 @@ void handleFailure(Throwable throwable) { public DescribeTopicsResult describeTopics(final TopicCollection topics, DescribeTopicsOptions options) { if (topics instanceof TopicIdCollection) return DescribeTopicsResult.ofTopicIds(handleDescribeTopicsByIds(((TopicIdCollection) topics).topicIds(), options)); -else if (topics instanceof TopicNameCollection) +else if (topics instanceof TopicNameCollection) { +if (options.useDescribeTopicsApi()) { +return DescribeTopicsResult.ofTopicNameIterator(new DescribeTopicPartitionsIterator(((TopicNameCollection) topics).topicNames(), options)); Review Comment: It's been my experience that it's "dangerous" 😨 to run arbitrary user code from within the context of the client code. User code can (and will) do unpredictable things with state, errors, threads, etc. The surrounding code inside the client has to be very careful to make sure it handles
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change [kafka]
artemlivshits commented on code in PR #15265: URL: https://github.com/apache/kafka/pull/15265#discussion_r1501002625 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2129,63 +2167,183 @@ private Map> handleDescribeTopicsByNames(f } } final long now = time.milliseconds(); -Call call = new Call("describeTopics", calcDeadlineMs(now, options.timeoutMs()), -new LeastLoadedNodeProvider()) { -private boolean supportsDisablingTopicCreation = true; +if (options.useDescribeTopicsApi()) { +RecurringCall call = new RecurringCall("DescribeTopics-Recurring", calcDeadlineMs(now, options.timeoutMs()), runnable) { Review Comment: Not doing pipelining in the first version sounds good to me. We can optimize later if needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change [kafka]
CalvinConfluent commented on code in PR #15265: URL: https://github.com/apache/kafka/pull/15265#discussion_r1499752721 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2108,9 +2146,12 @@ void handleFailure(Throwable throwable) { public DescribeTopicsResult describeTopics(final TopicCollection topics, DescribeTopicsOptions options) { if (topics instanceof TopicIdCollection) return DescribeTopicsResult.ofTopicIds(handleDescribeTopicsByIds(((TopicIdCollection) topics).topicIds(), options)); -else if (topics instanceof TopicNameCollection) +else if (topics instanceof TopicNameCollection) { +if (options.useDescribeTopicsApi()) { +return DescribeTopicsResult.ofTopicNameIterator(new DescribeTopicPartitionsIterator(((TopicNameCollection) topics).topicNames(), options)); Review Comment: @mumrah If we need to spend more time on public discussion, maybe we can first have a simple version like paging on a topic level. Also, limit the number of topics loading in memory like maybe 50 to accomplish the short-term goal of avoiding OOM. cc @artemlivshits -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change [kafka]
mumrah commented on code in PR #15265: URL: https://github.com/apache/kafka/pull/15265#discussion_r1497780170 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2129,63 +2167,183 @@ private Map> handleDescribeTopicsByNames(f } } final long now = time.milliseconds(); -Call call = new Call("describeTopics", calcDeadlineMs(now, options.timeoutMs()), -new LeastLoadedNodeProvider()) { -private boolean supportsDisablingTopicCreation = true; +if (options.useDescribeTopicsApi()) { +RecurringCall call = new RecurringCall("DescribeTopics-Recurring", calcDeadlineMs(now, options.timeoutMs()), runnable) { Review Comment: I think we can defer pipelining for now. If we do it the "dumb" way of just loading the next page of results as we drain the iterator, it will be slower, but it will work. ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2108,9 +2146,12 @@ void handleFailure(Throwable throwable) { public DescribeTopicsResult describeTopics(final TopicCollection topics, DescribeTopicsOptions options) { if (topics instanceof TopicIdCollection) return DescribeTopicsResult.ofTopicIds(handleDescribeTopicsByIds(((TopicIdCollection) topics).topicIds(), options)); -else if (topics instanceof TopicNameCollection) +else if (topics instanceof TopicNameCollection) { +if (options.useDescribeTopicsApi()) { +return DescribeTopicsResult.ofTopicNameIterator(new DescribeTopicPartitionsIterator(((TopicNameCollection) topics).topicNames(), options)); Review Comment: DescribeTopicsResult is a Map based results class, which does not align the streaming/paging approach we are trying to achieve with this new RPC. I don't think we should try to reuse DescribeTopicsResult for the new API. Something like ``` KafkaAdminClient#describeTopics(Consumer); ``` would be more natural for a streaming interface. With an interface like this, we just need to be concerned with efficiently paging through the results. The application can gather/filter/post-process the results as needed. Since this is a new public API, and really the first time doing streaming in KafkaAdminClient, I do think we should bring this up in the mailing list to collect feedback. E.g., do we want to return an Iterator/Stream, or accept a function like shown above. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change [kafka]
CalvinConfluent commented on code in PR #15265: URL: https://github.com/apache/kafka/pull/15265#discussion_r1496355495 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2129,63 +2167,183 @@ private Map> handleDescribeTopicsByNames(f } } final long now = time.milliseconds(); -Call call = new Call("describeTopics", calcDeadlineMs(now, options.timeoutMs()), -new LeastLoadedNodeProvider()) { -private boolean supportsDisablingTopicCreation = true; +if (options.useDescribeTopicsApi()) { +RecurringCall call = new RecurringCall("DescribeTopics-Recurring", calcDeadlineMs(now, options.timeoutMs()), runnable) { Review Comment: Thanks for the advice. We still want to avoid loading a huge topic in the memory but the current implementation does not really achieve that. I will make a change to address this. Maybe it is a bit over-engineering to have more than 1 ongoing call in the pipeline. I will keep at most one call in the next iteration commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change [kafka]
artemlivshits commented on code in PR #15265: URL: https://github.com/apache/kafka/pull/15265#discussion_r1495047191 ## clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java: ## @@ -36,28 +38,38 @@ public class DescribeTopicsResult { private final Map> topicIdFutures; private final Map> nameFutures; +private final Iterator>> nameFuturesIterator; @Deprecated protected DescribeTopicsResult(Map> futures) { -this(null, futures); +this(null, futures, null); } // VisibleForTesting -protected DescribeTopicsResult(Map> topicIdFutures, Map> nameFutures) { -if (topicIdFutures != null && nameFutures != null) -throw new IllegalArgumentException("topicIdFutures and nameFutures cannot both be specified."); -if (topicIdFutures == null && nameFutures == null) -throw new IllegalArgumentException("topicIdFutures and nameFutures cannot both be null."); +protected DescribeTopicsResult( +Map> topicIdFutures, +Map> nameFutures, +Iterator>> nameFuturesIterator +) { +if (topicIdFutures != null && nameFutures != null && nameFuturesIterator != null) +throw new IllegalArgumentException("topicIdFutures and nameFutures and nameFutureIterator cannot both be specified."); Review Comment: nit: using "both" implies that there are 2 things, we now have 3. ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2129,63 +2167,183 @@ private Map> handleDescribeTopicsByNames(f } } final long now = time.milliseconds(); -Call call = new Call("describeTopics", calcDeadlineMs(now, options.timeoutMs()), -new LeastLoadedNodeProvider()) { -private boolean supportsDisablingTopicCreation = true; +if (options.useDescribeTopicsApi()) { +RecurringCall call = new RecurringCall("DescribeTopics-Recurring", calcDeadlineMs(now, options.timeoutMs()), runnable) { Review Comment: Didn't quite get how the recurring call framework enables for lazy generation? The RecurringCall.run() just runs the whole thing in one go. ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2129,63 +2167,183 @@ private Map> handleDescribeTopicsByNames(f } } final long now = time.milliseconds(); -Call call = new Call("describeTopics", calcDeadlineMs(now, options.timeoutMs()), -new LeastLoadedNodeProvider()) { -private boolean supportsDisablingTopicCreation = true; +if (options.useDescribeTopicsApi()) { +RecurringCall call = new RecurringCall("DescribeTopics-Recurring", calcDeadlineMs(now, options.timeoutMs()), runnable) { Review Comment: Looking at the current logic I cannot make sense out of it. For some reason we get all topic names that fit under the batch size and then get all partitions for those topics and put them in memory. Why did we bother with all paging complexity if all we do is to get all partitions for 2000 topics in memory? Can we just iterate topic by topic (as it seems that we get the list of all topics anyway) and just get partitions for each one? With the paging, I would've expected that we did something like this: 1. start iteration -- issue one call, once the first call is completed maybe issue a few more (say 3) calls ahead for pipelining. 2. next -- iterate over current batch. 3. once the current batch is exhausted, switch to the next batch, pipeline one more call. This way if the caller finishes iteration early or if iteration is slow, we won't keep piling results in memory. ## tools/src/main/java/org/apache/kafka/tools/TopicCommand.java: ## @@ -558,20 +568,53 @@ public void describeTopic(TopicCommandOptions opts) throws ExecutionException, I } else { ensureTopicExists(topics, opts.topic().orElse(""), !opts.ifExists()); } -List topicDescriptions = new ArrayList<>(); if (!topicIds.isEmpty()) { Map descTopics = adminClient.describeTopics(TopicCollection.ofTopicIds(topicIds)).allTopicIds().get(); -topicDescriptions = new ArrayList<>(descTopics.values()); +describeTopicsFollowUp(new ArrayList<>(descTopics.values()), opts); +return; } if (!topics.isEmpty()) { -Map descTopics = - adminClient.describeTopics(TopicCollection.ofTopicNames(topics)).allTopicNames().get(); -topicDescriptions = new ArrayList<>(descTopics.values()); +final int partitionSizeLimit = opts.partitionSizeLimitPerResponse().orElse(2000); +try { +Iterator>> des
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change [kafka]
artemlivshits commented on code in PR #15265: URL: https://github.com/apache/kafka/pull/15265#discussion_r1491416954 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2129,63 +2167,183 @@ private Map> handleDescribeTopicsByNames(f } } final long now = time.milliseconds(); -Call call = new Call("describeTopics", calcDeadlineMs(now, options.timeoutMs()), -new LeastLoadedNodeProvider()) { -private boolean supportsDisablingTopicCreation = true; +if (options.useDescribeTopicsApi()) { +RecurringCall call = new RecurringCall("DescribeTopics-Recurring", calcDeadlineMs(now, options.timeoutMs()), runnable) { +Map pendingTopics = +topicNames.stream().map(topicName -> new TopicRequest().setName(topicName)) +.collect(Collectors.toMap(topicRequest -> topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, TreeMap::new) +); -@Override -MetadataRequest.Builder createRequest(int timeoutMs) { -if (supportsDisablingTopicCreation) -return new MetadataRequest.Builder(new MetadataRequestData() - .setTopics(convertToMetadataRequestTopic(topicNamesList)) -.setAllowAutoTopicCreation(false) - .setIncludeTopicAuthorizedOperations(options.includeAuthorizedOperations())); -else -return MetadataRequest.Builder.allTopics(); -} +String partiallyFinishedTopicName = ""; +int partiallyFinishedTopicNextPartitionId = -1; +TopicDescription partiallyFinishedTopicDescription = null; -@Override -void handleResponse(AbstractResponse abstractResponse) { -MetadataResponse response = (MetadataResponse) abstractResponse; -// Handle server responses for particular topics. -Cluster cluster = response.buildCluster(); -Map errors = response.errors(); -for (Map.Entry> entry : topicFutures.entrySet()) { -String topicName = entry.getKey(); -KafkaFutureImpl future = entry.getValue(); -Errors topicError = errors.get(topicName); -if (topicError != null) { -future.completeExceptionally(topicError.exception()); -continue; +@Override +Call generateCall() { +return new Call("describeTopics", this.deadlineMs, new LeastLoadedNodeProvider()) { +@Override +DescribeTopicPartitionsRequest.Builder createRequest(int timeoutMs) { +DescribeTopicPartitionsRequestData request = new DescribeTopicPartitionsRequestData() + .setTopics(pendingTopics.values().stream().collect(Collectors.toList())) + .setResponsePartitionLimit(options.partitionSizeLimitPerResponse()); +if (!partiallyFinishedTopicName.isEmpty()) { +request.setCursor(new DescribeTopicPartitionsRequestData.Cursor() +.setTopicName(partiallyFinishedTopicName) + .setPartitionIndex(partiallyFinishedTopicNextPartitionId) +); +} +return new DescribeTopicPartitionsRequest.Builder(request); +} + +@Override +void handleResponse(AbstractResponse abstractResponse) { +DescribeTopicPartitionsResponse response = (DescribeTopicPartitionsResponse) abstractResponse; +String cursorTopicName = ""; +int cursorPartitionId = -1; +if (response.data().nextCursor() != null) { +DescribeTopicPartitionsResponseData.Cursor cursor = response.data().nextCursor(); +cursorTopicName = cursor.topicName(); +cursorPartitionId = cursor.partitionIndex(); +} + +for (DescribeTopicPartitionsResponseTopic topic : response.data().topics()) { +String topicName = topic.name(); +Errors error = Errors.forCode(topic.errorCode()); + +KafkaFutureImpl future = topicFutures.get(topicName); +if (error != Errors.NONE) { + future.completeExceptionally(error.exception()); +
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change [kafka]
mumrah commented on code in PR #15265: URL: https://github.com/apache/kafka/pull/15265#discussion_r1489713255 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2129,63 +2167,183 @@ private Map> handleDescribeTopicsByNames(f } } final long now = time.milliseconds(); -Call call = new Call("describeTopics", calcDeadlineMs(now, options.timeoutMs()), -new LeastLoadedNodeProvider()) { -private boolean supportsDisablingTopicCreation = true; +if (options.useDescribeTopicsApi()) { +RecurringCall call = new RecurringCall("DescribeTopics-Recurring", calcDeadlineMs(now, options.timeoutMs()), runnable) { +Map pendingTopics = +topicNames.stream().map(topicName -> new TopicRequest().setName(topicName)) +.collect(Collectors.toMap(topicRequest -> topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, TreeMap::new) +); -@Override -MetadataRequest.Builder createRequest(int timeoutMs) { -if (supportsDisablingTopicCreation) -return new MetadataRequest.Builder(new MetadataRequestData() - .setTopics(convertToMetadataRequestTopic(topicNamesList)) -.setAllowAutoTopicCreation(false) - .setIncludeTopicAuthorizedOperations(options.includeAuthorizedOperations())); -else -return MetadataRequest.Builder.allTopics(); -} +String partiallyFinishedTopicName = ""; +int partiallyFinishedTopicNextPartitionId = -1; +TopicDescription partiallyFinishedTopicDescription = null; -@Override -void handleResponse(AbstractResponse abstractResponse) { -MetadataResponse response = (MetadataResponse) abstractResponse; -// Handle server responses for particular topics. -Cluster cluster = response.buildCluster(); -Map errors = response.errors(); -for (Map.Entry> entry : topicFutures.entrySet()) { -String topicName = entry.getKey(); -KafkaFutureImpl future = entry.getValue(); -Errors topicError = errors.get(topicName); -if (topicError != null) { -future.completeExceptionally(topicError.exception()); -continue; +@Override +Call generateCall() { +return new Call("describeTopics", this.deadlineMs, new LeastLoadedNodeProvider()) { +@Override +DescribeTopicPartitionsRequest.Builder createRequest(int timeoutMs) { +DescribeTopicPartitionsRequestData request = new DescribeTopicPartitionsRequestData() + .setTopics(pendingTopics.values().stream().collect(Collectors.toList())) + .setResponsePartitionLimit(options.partitionSizeLimitPerResponse()); +if (!partiallyFinishedTopicName.isEmpty()) { +request.setCursor(new DescribeTopicPartitionsRequestData.Cursor() +.setTopicName(partiallyFinishedTopicName) + .setPartitionIndex(partiallyFinishedTopicNextPartitionId) +); +} +return new DescribeTopicPartitionsRequest.Builder(request); +} + +@Override +void handleResponse(AbstractResponse abstractResponse) { +DescribeTopicPartitionsResponse response = (DescribeTopicPartitionsResponse) abstractResponse; +String cursorTopicName = ""; +int cursorPartitionId = -1; +if (response.data().nextCursor() != null) { +DescribeTopicPartitionsResponseData.Cursor cursor = response.data().nextCursor(); +cursorTopicName = cursor.topicName(); +cursorPartitionId = cursor.partitionIndex(); +} + +for (DescribeTopicPartitionsResponseTopic topic : response.data().topics()) { +String topicName = topic.name(); +Errors error = Errors.forCode(topic.errorCode()); + +KafkaFutureImpl future = topicFutures.get(topicName); +if (error != Errors.NONE) { + future.completeExceptionally(error.exception()); +
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change [kafka]
artemlivshits commented on code in PR #15265: URL: https://github.com/apache/kafka/pull/15265#discussion_r1476922434 ## tools/src/main/java/org/apache/kafka/tools/TopicCommand.java: ## @@ -799,6 +814,13 @@ public TopicCommandOptions(String[] args) { "if set when creating topics, the action will only execute if the topic does not already exist."); excludeInternalTopicOpt = parser.accepts("exclude-internal", "exclude internal topics when running list or describe command. The internal topics will be listed by default"); +useDescribeTopicsApiOpt = parser.accepts("use-describe-topics-api", Review Comment: Instead of an explicit option can we just try to use describe topics API if the broker has it and otherwise fall back to metadata API? ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2129,63 +2167,183 @@ private Map> handleDescribeTopicsByNames(f } } final long now = time.milliseconds(); -Call call = new Call("describeTopics", calcDeadlineMs(now, options.timeoutMs()), -new LeastLoadedNodeProvider()) { -private boolean supportsDisablingTopicCreation = true; +if (options.useDescribeTopicsApi()) { +RecurringCall call = new RecurringCall("DescribeTopics-Recurring", calcDeadlineMs(now, options.timeoutMs()), runnable) { +Map pendingTopics = +topicNames.stream().map(topicName -> new TopicRequest().setName(topicName)) +.collect(Collectors.toMap(topicRequest -> topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, TreeMap::new) +); -@Override -MetadataRequest.Builder createRequest(int timeoutMs) { -if (supportsDisablingTopicCreation) -return new MetadataRequest.Builder(new MetadataRequestData() - .setTopics(convertToMetadataRequestTopic(topicNamesList)) -.setAllowAutoTopicCreation(false) - .setIncludeTopicAuthorizedOperations(options.includeAuthorizedOperations())); -else -return MetadataRequest.Builder.allTopics(); -} +String partiallyFinishedTopicName = ""; +int partiallyFinishedTopicNextPartitionId = -1; +TopicDescription partiallyFinishedTopicDescription = null; -@Override -void handleResponse(AbstractResponse abstractResponse) { -MetadataResponse response = (MetadataResponse) abstractResponse; -// Handle server responses for particular topics. -Cluster cluster = response.buildCluster(); -Map errors = response.errors(); -for (Map.Entry> entry : topicFutures.entrySet()) { -String topicName = entry.getKey(); -KafkaFutureImpl future = entry.getValue(); -Errors topicError = errors.get(topicName); -if (topicError != null) { -future.completeExceptionally(topicError.exception()); -continue; +@Override +Call generateCall() { +return new Call("describeTopics", this.deadlineMs, new LeastLoadedNodeProvider()) { +@Override +DescribeTopicPartitionsRequest.Builder createRequest(int timeoutMs) { +DescribeTopicPartitionsRequestData request = new DescribeTopicPartitionsRequestData() + .setTopics(pendingTopics.values().stream().collect(Collectors.toList())) + .setResponsePartitionLimit(options.partitionSizeLimitPerResponse()); +if (!partiallyFinishedTopicName.isEmpty()) { +request.setCursor(new DescribeTopicPartitionsRequestData.Cursor() +.setTopicName(partiallyFinishedTopicName) + .setPartitionIndex(partiallyFinishedTopicNextPartitionId) +); +} +return new DescribeTopicPartitionsRequest.Builder(request); +} + +@Override +void handleResponse(AbstractResponse abstractResponse) { +DescribeTopicPartitionsResponse response = (DescribeTopicPartitionsResponse) abstractResponse; +String cursorTopicName = ""; +int cursorPartitionId = -1; +if (response.data().nextCursor() != null) { +DescribeTopicPartitionsResponseData.Cursor cursor = re