urbandan commented on a change in pull request #9430:
URL: https://github.com/apache/kafka/pull/9430#discussion_r563672764



##########
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##########
@@ -132,23 +154,119 @@ object GetOffsetShell {
         }
     }
 
-    partitionOffsets.toArray.sortBy { case (tp, _) => tp.partition }.foreach { 
case (tp, offset) =>
-      println(s"$topic:${tp.partition}:${Option(offset).getOrElse("")}")
+    partitionOffsets.toSeq.sortWith((tp1, tp2) => {
+      val topicComp = tp1._1.topic.compareTo(tp2._1.topic)
+      if (topicComp == 0)
+        tp1._1.partition < tp2._1.partition
+      else
+        topicComp < 0
+    }).foreach { case (tp, offset) =>
+      println(s"${tp.topic}:${tp.partition}:${Option(offset).getOrElse("")}")
     }
 
   }
 
   /**
-   * Return the partition infos for `topic`. If the topic does not exist, 
`None` is returned.
+   * Creates a topic-partition filter based on a list of patterns.
+   * Expected format:
+   * List: TopicPartitionPattern(, TopicPartitionPattern)*
+   * TopicPartitionPattern: TopicPattern(:PartitionPattern)? | 
:PartitionPattern
+   * TopicPattern: REGEX
+   * PartitionPattern: NUMBER | NUMBER-(NUMBER)? | -NUMBER
    */
-  private def listPartitionInfos(consumer: KafkaConsumer[_, _], topic: String, 
partitionIds: Set[Int]): Option[Seq[PartitionInfo]] = {
-    val partitionInfos = consumer.listTopics.asScala.filter { case (k, _) => k 
== topic }.values.flatMap(_.asScala).toBuffer
-    if (partitionInfos.isEmpty)
-      None
-    else if (partitionIds.isEmpty)
-      Some(partitionInfos)
-    else
-      Some(partitionInfos.filter(p => partitionIds.contains(p.partition)))
+  private def createTopicPartitionFilterWithPatternList(topicPartitions: 
String, excludeInternalTopics: Boolean): PartitionInfo => Boolean = {
+    val ruleSpecs = topicPartitions.split(",")
+    val rules = ruleSpecs.map(ruleSpec => {
+      val parts = ruleSpec.split(":")
+      if (parts.length == 1) {
+        val whitelist = IncludeList(parts(0))
+        tp: PartitionInfo => whitelist.isTopicAllowed(tp.topic, 
excludeInternalTopics)
+      } else if (parts.length == 2) {
+        val partitionFilter = createPartitionFilter(parts(1))
+
+        if (parts(0).trim().isEmpty) {
+          tp: PartitionInfo => partitionFilter.apply(tp.partition)
+        } else {
+          val whitelist = IncludeList(parts(0))
+          tp: PartitionInfo => whitelist.isTopicAllowed(tp.topic, 
excludeInternalTopics) && partitionFilter.apply(tp.partition)
+        }
+      } else {
+        throw new IllegalArgumentException(s"Invalid topic-partition rule: 
$ruleSpec")
+      }
+    })
+
+    tp => rules.exists(rule => rule.apply(tp))
+  }
+
+  /**
+   * Creates a partition filter based on a single id or a range.
+   * Expected format:
+   * PartitionPattern: NUMBER | NUMBER-(NUMBER)? | -NUMBER
+   */
+  private def createPartitionFilter(spec: String): Int => Boolean = {
+    if (spec.indexOf('-') != -1) {
+      val rangeParts = spec.split("-", -1)
+      if(rangeParts.length != 2 || rangeParts(0).isEmpty && 
rangeParts(1).isEmpty) {
+        throw new IllegalArgumentException(s"Invalid range specification: 
$spec")
+      }
+
+      if(rangeParts(0).isEmpty) {
+        val max = rangeParts(1).toInt
+        partition: Int => partition < max
+      } else if(rangeParts(1).isEmpty) {
+        val min = rangeParts(0).toInt
+        partition: Int => partition >= min
+      } else {
+        val min = rangeParts(0).toInt
+        val max = rangeParts(1).toInt
+
+        if (min > max) {
+          throw new IllegalArgumentException(s"Range lower bound cannot be 
greater than upper bound: $spec")
+        }
+
+        partition: Int => partition >= min && partition < max
+      }
+    } else {
+      val number = spec.toInt

Review comment:
       You are right, should be handled, fixed it




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to