dajac commented on a change in pull request #11173:
URL: https://github.com/apache/kafka/pull/11173#discussion_r762038197



##########
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##########
@@ -222,11 +240,19 @@ object GetOffsetShell {
   }
 
   /**
-   * Return the partition infos. Filter them with topicPartitionFilter.
+   * Return the partition infos. Filter them with topicFilter and 
topicPartitionFilter.
    */
-  private def listPartitionInfos(consumer: KafkaConsumer[_, _], 
topicPartitionFilter: PartitionInfo => Boolean): Seq[PartitionInfo] = {
-    consumer.listTopics.asScala.values.flatMap { partitions =>
-      partitions.asScala.filter(topicPartitionFilter)
+  private def listPartitionInfos(client: Admin,
+                                 topicFilter: String => Boolean,
+                                 topicPartitionFilter: PartitionInfo => 
Boolean,
+                                 excludeInternalTopics: Boolean): 
Seq[PartitionInfo] = {
+    val topics = client.listTopics(new 
ListTopicsOptions().listInternal(!excludeInternalTopics)).names().get().asScala.filter(topicFilter)

Review comment:
       nit: This line is quite hard to read. Could we extract `new 
ListTopicsOptions().listInternal(!excludeInternalTopics)` in a variable? You 
can also remove the parenthesis of all the getters (e.g. names, get).

##########
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##########
@@ -195,16 +205,24 @@ object GetOffsetShell {
         val upperRange = group(4).map(_.toInt).getOrElse(Int.MaxValue)
         (p: Int) => p >= lowerRange && p < upperRange
     }
-
-    tp => topicFilter.isTopicAllowed(tp.topic, excludeInternalTopics) && 
partitionFilter(tp.partition)
+    (
+      topic => topicFilter.isTopicAllowed(topic, excludeInternalTopics),
+      tp => topicFilter.isTopicAllowed(tp.topic, excludeInternalTopics) && 
partitionFilter(tp.partition)
+    )

Review comment:
       The code is getting a bit complicated with all those filter functions. I 
wonder if we should introduce an abstraction for them now. Something along the 
lines of the following. The name are not very good but the idea is there. What 
do you think?
   
   ```scala
   trait TopicPartitionFilter {
     def isTopicAllowed(topic: String): Boolean
     def isPartitionAllowed(partition: PartitionInfo): Boolean
   }
   
   case class UniqueTopicPartitionFilter(
     topicRegex: String,
     partition: Int
   ) extends TopicPartitionFilter {
    ...
   }
   
   case class RangeTopicPartitionFilter(
     topicRegex: String,
     lower: Int,
     upper: Int
   ) extends TopicPartitionFilter {
     ...
   }
   
   case class SetTopicPartitionFilter(
     topic: String,
     partitionIds: Set[Int]
   ) extends TopicPartitionFilter {
     ...
   }
   
   case class CompositePartitionFilter(
     filters: List[TopicPartitionFilter]
   ) extends TopicPartitionFilter {
     ...
   }
   ```

##########
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##########
@@ -164,20 +167,27 @@ object GetOffsetShell {
   }
 
   /**
-   * Creates a topic-partition filter based on a list of patterns.
+   * Creates a topic filter and a topic-partition filter based on a list of 
patterns.
    * Expected format:
    * List: TopicPartitionPattern(, TopicPartitionPattern)*
    * TopicPartitionPattern: TopicPattern(:PartitionPattern)? | 
:PartitionPattern
    * TopicPattern: REGEX
    * PartitionPattern: NUMBER | NUMBER-(NUMBER)? | -NUMBER
    */
-  def createTopicPartitionFilterWithPatternList(topicPartitions: String, 
excludeInternalTopics: Boolean): PartitionInfo => Boolean = {
+  def createTopicPartitionFilterWithPatternList(topicPartitions: String,
+                                                excludeInternalTopics: Boolean
+                                               ): (String => Boolean, 
PartitionInfo => Boolean) = {

Review comment:
       nit: Could we format methods as follow? We tend to follow this pattern 
nowadays.
   
   ```scala
   def createTopicPartitionFilterWithPatternList(
     topicPartitions: String,
     excludeInternalTopics: Boolean
   ): (String => Boolean, PartitionInfo => Boolean) = {
   
   }
   ```

##########
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##########
@@ -222,11 +240,19 @@ object GetOffsetShell {
   }
 
   /**
-   * Return the partition infos. Filter them with topicPartitionFilter.
+   * Return the partition infos. Filter them with topicFilter and 
topicPartitionFilter.
    */
-  private def listPartitionInfos(consumer: KafkaConsumer[_, _], 
topicPartitionFilter: PartitionInfo => Boolean): Seq[PartitionInfo] = {
-    consumer.listTopics.asScala.values.flatMap { partitions =>
-      partitions.asScala.filter(topicPartitionFilter)
+  private def listPartitionInfos(client: Admin,
+                                 topicFilter: String => Boolean,
+                                 topicPartitionFilter: PartitionInfo => 
Boolean,
+                                 excludeInternalTopics: Boolean): 
Seq[PartitionInfo] = {
+    val topics = client.listTopics(new 
ListTopicsOptions().listInternal(!excludeInternalTopics)).names().get().asScala.filter(topicFilter)
+    
client.describeTopics(topics.asJavaCollection).allTopicNames().get().asScala.flatMap
 { case (topic, description) =>
+      description
+        .partitions()
+        .asScala
+        .map(tp => new PartitionInfo(topic, tp.partition(), tp.leader(), 
tp.replicas().asScala.toArray, tp.isr().asScala.toArray))

Review comment:
       Instead of converting to `PartitionInfo`, could we adapt the filters to 
work with what we have here?

##########
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##########
@@ -123,10 +123,10 @@ object GetOffsetShell {
       new Properties
     config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
     config.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId)

Review comment:
       Should we use `AdminConfig`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to