vinothchandar commented on a change in pull request #8737: URL: https://github.com/apache/kafka/pull/8737#discussion_r432868132
########## File path: core/src/main/scala/kafka/admin/TopicCommand.scala ########## @@ -290,42 +299,50 @@ object TopicCommand extends Logging { override def describeTopic(opts: TopicCommandOptions): Unit = { val topics = getTopics(opts.topic, opts.excludeInternalTopics) - val allConfigs = adminClient.describeConfigs(topics.map(new ConfigResource(Type.TOPIC, _)).asJavaCollection).values() - val liveBrokers = adminClient.describeCluster().nodes().get().asScala.map(_.id()) - val reassignments = listAllReassignments() - val topicDescriptions = adminClient.describeTopics(topics.asJavaCollection).all().get().values().asScala - val describeOptions = new DescribeOptions(opts, liveBrokers.toSet) - - for (td <- topicDescriptions) { - val topicName = td.name - val config = allConfigs.get(new ConfigResource(Type.TOPIC, topicName)).get() - val sortedPartitions = td.partitions.asScala.sortBy(_.partition) - - if (describeOptions.describeConfigs) { - val hasNonDefault = config.entries().asScala.exists(!_.isDefault) - if (!opts.reportOverriddenConfigs || hasNonDefault) { - val numPartitions = td.partitions().size - val firstPartition = td.partitions.iterator.next() - val reassignment = reassignments.get(new TopicPartition(td.name, firstPartition.partition)) - val topicDesc = TopicDescription(topicName, numPartitions, getReplicationFactor(firstPartition, reassignment), config, markedForDeletion = false) - topicDesc.printDescription() + ensureTopicExists(topics, opts.topic, !opts.ifExists) + + if (topics.nonEmpty) { + val allConfigs = adminClient.describeConfigs(topics.map(new ConfigResource(Type.TOPIC, _)).asJavaCollection).values() + val liveBrokers = adminClient.describeCluster().nodes().get().asScala.map(_.id()) + val topicDescriptions = adminClient.describeTopics(topics.asJavaCollection).all().get().values().asScala + val describeOptions = new DescribeOptions(opts, liveBrokers.toSet) + val topicPartitions = topicDescriptions + .flatMap(td => td.partitions.iterator().asScala.map(p => new TopicPartition(td.name(), p.partition()))) + .toSet.asJava + val reassignments = listAllReassignments(topicPartitions) + + for (td <- topicDescriptions) { + val topicName = td.name + val config = allConfigs.get(new ConfigResource(Type.TOPIC, topicName)).get() + val sortedPartitions = td.partitions.asScala.sortBy(_.partition) + + if (describeOptions.describeConfigs) { + val hasNonDefault = config.entries().asScala.exists(!_.isDefault) + if (!opts.reportOverriddenConfigs || hasNonDefault) { + val numPartitions = td.partitions().size + val firstPartition = td.partitions.iterator.next() + val reassignment = reassignments.get(new TopicPartition(td.name, firstPartition.partition)) + val topicDesc = TopicDescription(topicName, numPartitions, getReplicationFactor(firstPartition, reassignment), config, markedForDeletion = false) + topicDesc.printDescription() + } } - } - if (describeOptions.describePartitions) { - for (partition <- sortedPartitions) { - val reassignment = reassignments.get(new TopicPartition(td.name, partition.partition)) - val partitionDesc = PartitionDescription(topicName, partition, Some(config), markedForDeletion = false, reassignment) - describeOptions.maybePrintPartitionDescription(partitionDesc) + if (describeOptions.describePartitions) { + for (partition <- sortedPartitions) { + val reassignment = reassignments.get(new TopicPartition(td.name, partition.partition)) + val partitionDesc = PartitionDescription(topicName, partition, Some(config), markedForDeletion = false, reassignment) + describeOptions.maybePrintPartitionDescription(partitionDesc) + } } } } } override def deleteTopic(opts: TopicCommandOptions): Unit = { val topics = getTopics(opts.topic, opts.excludeInternalTopics) - ensureTopicExists(topics, opts.topic) - adminClient.deleteTopics(topics.asJavaCollection).all().get() + ensureTopicExists(topics, opts.topic, !opts.ifExists) + if (topics.nonEmpty) Review comment: For `alter` : we call KafkaAdminClient#createPartitions() which will make the RPC call even if `topics` at ~L2406 is empty.. For `describe`: KafkaAdminClient#describeCluster() call is still wasteful. no? For `delete`: it's actually handled, RPC avoided.. I will remove the check ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org