dengziming commented on a change in pull request #11173: URL: https://github.com/apache/kafka/pull/11173#discussion_r763681964
########## File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala ########## @@ -224,9 +224,82 @@ object GetOffsetShell { /** * Return the partition infos. Filter them with topicPartitionFilter. */ - private def listPartitionInfos(consumer: KafkaConsumer[_, _], topicPartitionFilter: PartitionInfo => Boolean): Seq[PartitionInfo] = { - consumer.listTopics.asScala.values.flatMap { partitions => - partitions.asScala.filter(topicPartitionFilter) + private def listPartitionInfos( + client: Admin, + topicPartitionFilter: TopicPartitionFilter, + excludeInternalTopics: Boolean + ): Seq[PartitionInfo] = { + val listTopicsOptions = new ListTopicsOptions().listInternal(!excludeInternalTopics) + val topics = client.listTopics(listTopicsOptions).names.get + val filteredTopics = topics.asScala.filter(topic => topicPartitionFilter.isTopicAllowed(topic)) + + client.describeTopics(filteredTopics.asJava).allTopicNames.get.asScala.flatMap { case (topic, description) => + description + .partitions + .asScala + .map(tp => new PartitionInfo(topic, tp.partition, tp.leader, tp.replicas.asScala.toArray, tp.isr.asScala.toArray)) + .filter(tp => topicPartitionFilter.isPartitionAllowed(tp)) }.toBuffer } } + +/** + * Used to filter partitions after describing them + */ +trait PartitionFilter { + def isPartitionAllowed(partition: Int): Boolean +} + +case class PartitionsSetFilter(partitionIds: Set[Int]) extends PartitionFilter { + override def isPartitionAllowed(partition: Int): Boolean = partitionIds.isEmpty || partitionIds.contains(partition) +} + +case class UniquePartitionFilter(partition: Int) extends PartitionFilter { + override def isPartitionAllowed(partition: Int): Boolean = partition == this.partition +} + +case class PartitionRangeFilter(lowerRange: Int, upperRange: Int) extends PartitionFilter { + override def isPartitionAllowed(partition: Int): Boolean = partition >= lowerRange && partition < upperRange +} + +trait TopicPartitionFilter { + + /** + * Used to filter topics before describing them + */ + def isTopicAllowed(topic: String): Boolean + + /** + * Used to filter topics and topic-partitions after describing them + */ + def isPartitionAllowed(partition: PartitionInfo): Boolean +} + +/** + * Creates a topic-partition filter based on a topic filter and a partition filter + */ +case class TopicFilterAndPartitionFilter( + topicFilter: IncludeList, + excludeInternalTopics: Boolean, + partitionFilter: PartitionFilter +) extends TopicPartitionFilter { + + override def isPartitionAllowed(partition: PartitionInfo): Boolean = { + isTopicAllowed(partition.topic()) && partitionFilter.isPartitionAllowed(partition.partition()) + } + + override def isTopicAllowed(topic: String): Boolean = { + topicFilter.isTopicAllowed(topic, excludeInternalTopics) Review comment: Good catch and we should pass `false` here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org