dajac commented on a change in pull request #11173:
URL: https://github.com/apache/kafka/pull/11173#discussion_r763039984



##########
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##########
@@ -224,9 +224,82 @@ object GetOffsetShell {
   /**
    * Return the partition infos. Filter them with topicPartitionFilter.
    */
-  private def listPartitionInfos(consumer: KafkaConsumer[_, _], 
topicPartitionFilter: PartitionInfo => Boolean): Seq[PartitionInfo] = {
-    consumer.listTopics.asScala.values.flatMap { partitions =>
-      partitions.asScala.filter(topicPartitionFilter)
+  private def listPartitionInfos(
+    client: Admin,
+    topicPartitionFilter: TopicPartitionFilter,
+    excludeInternalTopics: Boolean
+  ): Seq[PartitionInfo] = {
+    val listTopicsOptions = new 
ListTopicsOptions().listInternal(!excludeInternalTopics)
+    val topics = client.listTopics(listTopicsOptions).names.get
+    val filteredTopics = topics.asScala.filter(topic => 
topicPartitionFilter.isTopicAllowed(topic))
+
+    
client.describeTopics(filteredTopics.asJava).allTopicNames.get.asScala.flatMap 
{ case (topic, description) =>
+      description
+        .partitions
+        .asScala
+        .map(tp => new PartitionInfo(topic, tp.partition, tp.leader, 
tp.replicas.asScala.toArray, tp.isr.asScala.toArray))
+        .filter(tp => topicPartitionFilter.isPartitionAllowed(tp))
     }.toBuffer
   }
 }
+
+/**
+ * Used to filter partitions after describing them
+ */
+trait PartitionFilter {
+  def isPartitionAllowed(partition: Int): Boolean
+}
+
+case class PartitionsSetFilter(partitionIds: Set[Int]) extends PartitionFilter 
{
+  override def isPartitionAllowed(partition: Int): Boolean = 
partitionIds.isEmpty || partitionIds.contains(partition)
+}
+
+case class UniquePartitionFilter(partition: Int) extends PartitionFilter {
+  override def isPartitionAllowed(partition: Int): Boolean = partition == 
this.partition
+}
+
+case class PartitionRangeFilter(lowerRange: Int, upperRange: Int) extends 
PartitionFilter {
+  override def isPartitionAllowed(partition: Int): Boolean = partition >= 
lowerRange && partition < upperRange
+}
+
+trait TopicPartitionFilter {
+
+  /**
+   * Used to filter topics before describing them
+   */
+  def isTopicAllowed(topic: String): Boolean
+
+  /**
+   * Used to filter topics and topic-partitions after describing them
+   */
+  def isPartitionAllowed(partition: PartitionInfo): Boolean
+}
+
+/**
+ * Creates a topic-partition filter based on a topic filter and a partition 
filter
+ */
+case class TopicFilterAndPartitionFilter(
+  topicFilter: IncludeList,
+  excludeInternalTopics: Boolean,
+  partitionFilter: PartitionFilter
+) extends TopicPartitionFilter {
+
+  override def isPartitionAllowed(partition: PartitionInfo): Boolean = {
+    isTopicAllowed(partition.topic()) && 
partitionFilter.isPartitionAllowed(partition.partition())
+  }
+
+  override def isTopicAllowed(topic: String): Boolean = {
+    topicFilter.isTopicAllowed(topic, excludeInternalTopics)

Review comment:
       Could we just pass `true` instead of `excludeInternalTopics` here? 
Internal topics are already filtered out by the admin client in 
`listPartitionInfos`.

##########
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##########
@@ -140,22 +139,25 @@ object GetOffsetShell {
           Some(new TopicPartition(p.topic, p.partition))
       }
 
+      val offsetSpec = listOffsetsTimestamp match {
+        case ListOffsetsRequest.EARLIEST_TIMESTAMP => OffsetSpec.earliest()
+        case ListOffsetsRequest.LATEST_TIMESTAMP => OffsetSpec.latest()
+        case ListOffsetsRequest.MAX_TIMESTAMP => OffsetSpec.maxTimestamp()
+        case _ => OffsetSpec.forTimestamp(listOffsetsTimestamp)
+      }
+
+      val timestampsToSearch = topicPartitions.map(tp => tp -> 
offsetSpec).toMap.asJava
+
       /* Note that the value of the map can be null */
-      val partitionOffsets: collection.Map[TopicPartition, java.lang.Long] = 
listOffsetsTimestamp match {
-        case ListOffsetsRequest.EARLIEST_TIMESTAMP => 
consumer.beginningOffsets(topicPartitions.asJava).asScala
-        case ListOffsetsRequest.LATEST_TIMESTAMP => 
consumer.endOffsets(topicPartitions.asJava).asScala
-        case _ =>
-          val timestampsToSearch = topicPartitions.map(tp => tp -> 
(listOffsetsTimestamp: java.lang.Long)).toMap.asJava
-          consumer.offsetsForTimes(timestampsToSearch).asScala.map { case (k, 
x) =>
-            if (x == null) (k, null) else (k, x.offset: java.lang.Long)
-          }
+      val partitionOffsets = 
client.listOffsets(timestampsToSearch).all().get().asScala.map { case (k, x) =>
+        if (x == null) (k, null) else (k, x.offset: java.lang.Long)

Review comment:
       * Can `x` be `null` with the admin client?
   * It might be better to iterate over the partitions and call 
`ListOffsetsResult.partitionResult` for each of them. If we do this, we can 
basically avoid filtering out the partitions without leaders as the admin 
client does it as well. Then, we can catch the relevant exception 
(`LEADER_NOT_AVAILABLE`) and log the error. That seems a bit better as the 
metadata used by the admin client are more up to date. This would also allow us 
to remove the conversion to `PartitionInfo` in `listPartitionInfos`. What do 
you think?

##########
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##########
@@ -224,9 +224,82 @@ object GetOffsetShell {
   /**
    * Return the partition infos. Filter them with topicPartitionFilter.
    */
-  private def listPartitionInfos(consumer: KafkaConsumer[_, _], 
topicPartitionFilter: PartitionInfo => Boolean): Seq[PartitionInfo] = {
-    consumer.listTopics.asScala.values.flatMap { partitions =>
-      partitions.asScala.filter(topicPartitionFilter)
+  private def listPartitionInfos(
+    client: Admin,
+    topicPartitionFilter: TopicPartitionFilter,
+    excludeInternalTopics: Boolean
+  ): Seq[PartitionInfo] = {
+    val listTopicsOptions = new 
ListTopicsOptions().listInternal(!excludeInternalTopics)
+    val topics = client.listTopics(listTopicsOptions).names.get
+    val filteredTopics = topics.asScala.filter(topic => 
topicPartitionFilter.isTopicAllowed(topic))
+
+    
client.describeTopics(filteredTopics.asJava).allTopicNames.get.asScala.flatMap 
{ case (topic, description) =>
+      description
+        .partitions
+        .asScala
+        .map(tp => new PartitionInfo(topic, tp.partition, tp.leader, 
tp.replicas.asScala.toArray, tp.isr.asScala.toArray))
+        .filter(tp => topicPartitionFilter.isPartitionAllowed(tp))
     }.toBuffer
   }
 }
+
+/**
+ * Used to filter partitions after describing them
+ */
+trait PartitionFilter {
+  def isPartitionAllowed(partition: Int): Boolean
+}
+
+case class PartitionsSetFilter(partitionIds: Set[Int]) extends PartitionFilter 
{
+  override def isPartitionAllowed(partition: Int): Boolean = 
partitionIds.isEmpty || partitionIds.contains(partition)
+}
+
+case class UniquePartitionFilter(partition: Int) extends PartitionFilter {
+  override def isPartitionAllowed(partition: Int): Boolean = partition == 
this.partition
+}
+
+case class PartitionRangeFilter(lowerRange: Int, upperRange: Int) extends 
PartitionFilter {
+  override def isPartitionAllowed(partition: Int): Boolean = partition >= 
lowerRange && partition < upperRange
+}
+
+trait TopicPartitionFilter {
+
+  /**
+   * Used to filter topics before describing them
+   */
+  def isTopicAllowed(topic: String): Boolean
+
+  /**
+   * Used to filter topics and topic-partitions after describing them
+   */
+  def isPartitionAllowed(partition: PartitionInfo): Boolean
+}
+
+/**
+ * Creates a topic-partition filter based on a topic filter and a partition 
filter
+ */
+case class TopicFilterAndPartitionFilter(
+  topicFilter: IncludeList,
+  excludeInternalTopics: Boolean,
+  partitionFilter: PartitionFilter
+) extends TopicPartitionFilter {
+
+  override def isPartitionAllowed(partition: PartitionInfo): Boolean = {
+    isTopicAllowed(partition.topic()) && 
partitionFilter.isPartitionAllowed(partition.partition())
+  }
+
+  override def isTopicAllowed(topic: String): Boolean = {
+    topicFilter.isTopicAllowed(topic, excludeInternalTopics)
+  }
+}
+
+case class CompositeTopicPartitionFilter(filters: Array[TopicPartitionFilter]) 
extends TopicPartitionFilter {
+
+  override def isTopicAllowed(topic: String): Boolean = {
+    filters.exists(filter => filter.isTopicAllowed(topic))
+  }
+
+  override def isPartitionAllowed(tp: PartitionInfo): Boolean = {
+    filters.exists(filter => filter.isPartitionAllowed(tp))

Review comment:
       nit: `filters.exists(filter.isPartitionAllowed)`?

##########
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##########
@@ -224,9 +224,82 @@ object GetOffsetShell {
   /**
    * Return the partition infos. Filter them with topicPartitionFilter.
    */
-  private def listPartitionInfos(consumer: KafkaConsumer[_, _], 
topicPartitionFilter: PartitionInfo => Boolean): Seq[PartitionInfo] = {
-    consumer.listTopics.asScala.values.flatMap { partitions =>
-      partitions.asScala.filter(topicPartitionFilter)
+  private def listPartitionInfos(
+    client: Admin,
+    topicPartitionFilter: TopicPartitionFilter,
+    excludeInternalTopics: Boolean
+  ): Seq[PartitionInfo] = {
+    val listTopicsOptions = new 
ListTopicsOptions().listInternal(!excludeInternalTopics)
+    val topics = client.listTopics(listTopicsOptions).names.get
+    val filteredTopics = topics.asScala.filter(topic => 
topicPartitionFilter.isTopicAllowed(topic))

Review comment:
       nit: `topics.asScala.filter(topicPartitionFilter.isTopicAllowed)`?

##########
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##########
@@ -224,9 +224,82 @@ object GetOffsetShell {
   /**
    * Return the partition infos. Filter them with topicPartitionFilter.
    */
-  private def listPartitionInfos(consumer: KafkaConsumer[_, _], 
topicPartitionFilter: PartitionInfo => Boolean): Seq[PartitionInfo] = {
-    consumer.listTopics.asScala.values.flatMap { partitions =>
-      partitions.asScala.filter(topicPartitionFilter)
+  private def listPartitionInfos(
+    client: Admin,
+    topicPartitionFilter: TopicPartitionFilter,
+    excludeInternalTopics: Boolean
+  ): Seq[PartitionInfo] = {
+    val listTopicsOptions = new 
ListTopicsOptions().listInternal(!excludeInternalTopics)
+    val topics = client.listTopics(listTopicsOptions).names.get
+    val filteredTopics = topics.asScala.filter(topic => 
topicPartitionFilter.isTopicAllowed(topic))
+
+    
client.describeTopics(filteredTopics.asJava).allTopicNames.get.asScala.flatMap 
{ case (topic, description) =>
+      description
+        .partitions
+        .asScala
+        .map(tp => new PartitionInfo(topic, tp.partition, tp.leader, 
tp.replicas.asScala.toArray, tp.isr.asScala.toArray))
+        .filter(tp => topicPartitionFilter.isPartitionAllowed(tp))

Review comment:
       If we do what I suggested before, we could use `TopicPartition` here.

##########
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##########
@@ -224,9 +224,82 @@ object GetOffsetShell {
   /**
    * Return the partition infos. Filter them with topicPartitionFilter.
    */
-  private def listPartitionInfos(consumer: KafkaConsumer[_, _], 
topicPartitionFilter: PartitionInfo => Boolean): Seq[PartitionInfo] = {
-    consumer.listTopics.asScala.values.flatMap { partitions =>
-      partitions.asScala.filter(topicPartitionFilter)
+  private def listPartitionInfos(
+    client: Admin,
+    topicPartitionFilter: TopicPartitionFilter,
+    excludeInternalTopics: Boolean
+  ): Seq[PartitionInfo] = {
+    val listTopicsOptions = new 
ListTopicsOptions().listInternal(!excludeInternalTopics)
+    val topics = client.listTopics(listTopicsOptions).names.get
+    val filteredTopics = topics.asScala.filter(topic => 
topicPartitionFilter.isTopicAllowed(topic))
+
+    
client.describeTopics(filteredTopics.asJava).allTopicNames.get.asScala.flatMap 
{ case (topic, description) =>
+      description
+        .partitions
+        .asScala
+        .map(tp => new PartitionInfo(topic, tp.partition, tp.leader, 
tp.replicas.asScala.toArray, tp.isr.asScala.toArray))
+        .filter(tp => topicPartitionFilter.isPartitionAllowed(tp))
     }.toBuffer
   }
 }
+
+/**
+ * Used to filter partitions after describing them
+ */
+trait PartitionFilter {
+  def isPartitionAllowed(partition: Int): Boolean
+}
+
+case class PartitionsSetFilter(partitionIds: Set[Int]) extends PartitionFilter 
{
+  override def isPartitionAllowed(partition: Int): Boolean = 
partitionIds.isEmpty || partitionIds.contains(partition)
+}
+
+case class UniquePartitionFilter(partition: Int) extends PartitionFilter {
+  override def isPartitionAllowed(partition: Int): Boolean = partition == 
this.partition
+}
+
+case class PartitionRangeFilter(lowerRange: Int, upperRange: Int) extends 
PartitionFilter {
+  override def isPartitionAllowed(partition: Int): Boolean = partition >= 
lowerRange && partition < upperRange
+}
+
+trait TopicPartitionFilter {
+
+  /**
+   * Used to filter topics before describing them
+   */
+  def isTopicAllowed(topic: String): Boolean
+
+  /**
+   * Used to filter topics and topic-partitions after describing them
+   */
+  def isPartitionAllowed(partition: PartitionInfo): Boolean
+}
+
+/**
+ * Creates a topic-partition filter based on a topic filter and a partition 
filter
+ */
+case class TopicFilterAndPartitionFilter(
+  topicFilter: IncludeList,
+  excludeInternalTopics: Boolean,
+  partitionFilter: PartitionFilter
+) extends TopicPartitionFilter {
+
+  override def isPartitionAllowed(partition: PartitionInfo): Boolean = {
+    isTopicAllowed(partition.topic()) && 
partitionFilter.isPartitionAllowed(partition.partition())
+  }
+
+  override def isTopicAllowed(topic: String): Boolean = {
+    topicFilter.isTopicAllowed(topic, excludeInternalTopics)
+  }
+}
+
+case class CompositeTopicPartitionFilter(filters: Array[TopicPartitionFilter]) 
extends TopicPartitionFilter {
+
+  override def isTopicAllowed(topic: String): Boolean = {
+    filters.exists(filter => filter.isTopicAllowed(topic))

Review comment:
       nit: `filters.exists(filter.isTopicAllowed)`?

##########
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##########
@@ -224,9 +224,82 @@ object GetOffsetShell {
   /**
    * Return the partition infos. Filter them with topicPartitionFilter.
    */
-  private def listPartitionInfos(consumer: KafkaConsumer[_, _], 
topicPartitionFilter: PartitionInfo => Boolean): Seq[PartitionInfo] = {
-    consumer.listTopics.asScala.values.flatMap { partitions =>
-      partitions.asScala.filter(topicPartitionFilter)
+  private def listPartitionInfos(
+    client: Admin,
+    topicPartitionFilter: TopicPartitionFilter,
+    excludeInternalTopics: Boolean
+  ): Seq[PartitionInfo] = {
+    val listTopicsOptions = new 
ListTopicsOptions().listInternal(!excludeInternalTopics)
+    val topics = client.listTopics(listTopicsOptions).names.get
+    val filteredTopics = topics.asScala.filter(topic => 
topicPartitionFilter.isTopicAllowed(topic))
+
+    
client.describeTopics(filteredTopics.asJava).allTopicNames.get.asScala.flatMap 
{ case (topic, description) =>
+      description
+        .partitions
+        .asScala
+        .map(tp => new PartitionInfo(topic, tp.partition, tp.leader, 
tp.replicas.asScala.toArray, tp.isr.asScala.toArray))
+        .filter(tp => topicPartitionFilter.isPartitionAllowed(tp))
     }.toBuffer
   }
 }
+
+/**
+ * Used to filter partitions after describing them
+ */
+trait PartitionFilter {
+  def isPartitionAllowed(partition: Int): Boolean
+}
+
+case class PartitionsSetFilter(partitionIds: Set[Int]) extends PartitionFilter 
{
+  override def isPartitionAllowed(partition: Int): Boolean = 
partitionIds.isEmpty || partitionIds.contains(partition)
+}
+
+case class UniquePartitionFilter(partition: Int) extends PartitionFilter {
+  override def isPartitionAllowed(partition: Int): Boolean = partition == 
this.partition
+}
+
+case class PartitionRangeFilter(lowerRange: Int, upperRange: Int) extends 
PartitionFilter {
+  override def isPartitionAllowed(partition: Int): Boolean = partition >= 
lowerRange && partition < upperRange
+}
+
+trait TopicPartitionFilter {
+
+  /**
+   * Used to filter topics before describing them
+   */
+  def isTopicAllowed(topic: String): Boolean
+
+  /**
+   * Used to filter topics and topic-partitions after describing them
+   */
+  def isPartitionAllowed(partition: PartitionInfo): Boolean
+}
+
+/**
+ * Creates a topic-partition filter based on a topic filter and a partition 
filter
+ */
+case class TopicFilterAndPartitionFilter(
+  topicFilter: IncludeList,
+  excludeInternalTopics: Boolean,
+  partitionFilter: PartitionFilter
+) extends TopicPartitionFilter {
+
+  override def isPartitionAllowed(partition: PartitionInfo): Boolean = {
+    isTopicAllowed(partition.topic()) && 
partitionFilter.isPartitionAllowed(partition.partition())

Review comment:
       nit: Parenthesis of the getters could be removed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to