dajac commented on a change in pull request #9430:
URL: https://github.com/apache/kafka/pull/9430#discussion_r565088240
##########
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##########
@@ -33,49 +34,65 @@ object GetOffsetShell {
def main(args: Array[String]): Unit = {
val parser = new OptionParser(false)
- val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of
hostname and port of the server to connect to.")
+ val brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use
--bootstrap-server instead; ignored if --bootstrap-server is specified. The
server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.")
.withRequiredArg
- .describedAs("hostname:port,...,hostname:port")
+ .describedAs("HOST1:PORT1,...,HOST3:PORT3")
.ofType(classOf[String])
- val topicOpt = parser.accepts("topic", "REQUIRED: The topic to get offset
from.")
+ val bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED. The
server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.")
+ .requiredUnless("broker-list")
+ .withRequiredArg
+ .describedAs("HOST1:PORT1,...,HOST3:PORT3")
+ .ofType(classOf[String])
+ val topicPartitionsOpt = parser.accepts("topic-partitions", "Comma
separated list of topic-partition specifications to get the offsets for, with
the format of topic:partition. The 'topic' part can be a regex or may be
omitted to only specify the partitions, and query all authorized topics." +
+ " The 'partition' part can be: a
number, a range in the format of 'NUMBER-NUMBER' (lower inclusive, upper
exclusive), an inclusive lower bound in the format of 'NUMBER-', an exclusive
upper bound in the format of '-NUMBER' or may be omitted to accept all
partitions of the specified topic.")
+ .withRequiredArg
+ .describedAs("topic:partition,...,topic:partition")
+ .ofType(classOf[String])
+ val topicOpt = parser.accepts("topic", s"The topic to get the offsets for.
It also accepts a regular expression. If not present, all authorized topics are
queried. Cannot be used if --topic-partitions is present.")
.withRequiredArg
.describedAs("topic")
.ofType(classOf[String])
- val partitionOpt = parser.accepts("partitions", "comma separated list of
partition ids. If not specified, it will find offsets for all partitions")
+ val partitionsOpt = parser.accepts("partitions", s"Comma separated list of
partition ids to get the offsets for. If not present, all partitions of the
authorized topics are queried. Cannot be used if --topic-partitions is
present.")
.withRequiredArg
.describedAs("partition ids")
.ofType(classOf[String])
- .defaultsTo("")
- val timeOpt = parser.accepts("time", "timestamp of the offsets before
that. [Note: No offset is returned, if the timestamp greater than recently
commited record timestamp is given.]")
+ val timeOpt = parser.accepts("time", "timestamp of the offsets before
that. [Note: No offset is returned, if the timestamp greater than recently
committed record timestamp is given.]")
.withRequiredArg
.describedAs("timestamp/-1(latest)/-2(earliest)")
.ofType(classOf[java.lang.Long])
.defaultsTo(-1L)
- parser.accepts("offsets", "DEPRECATED AND IGNORED: number of offsets
returned")
- .withRequiredArg
- .describedAs("count")
- .ofType(classOf[java.lang.Integer])
- .defaultsTo(1)
- parser.accepts("max-wait-ms", "DEPRECATED AND IGNORED: The max amount of
time each fetch request waits.")
+ val commandConfigOpt = parser.accepts("command-config", s"Property file
containing configs to be passed to Consumer Client.")
.withRequiredArg
- .describedAs("ms")
- .ofType(classOf[java.lang.Integer])
- .defaultsTo(1000)
+ .describedAs("config file")
+ .ofType(classOf[String])
+ val excludeInternalTopicsOpt = parser.accepts("exclude-internal-topics",
s"By default, internal topics are included. If specified, internal topics are
excluded.")
- if (args.length == 0)
- CommandLineUtils.printUsageAndDie(parser, "An interactive shell for
getting topic offsets.")
+ if (args.length == 0)
+ CommandLineUtils.printUsageAndDie(parser, "An interactive shell for
getting topic-partition offsets.")
val options = parser.parse(args : _*)
- CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt,
topicOpt)
+ val effectiveBrokerListOpt = if (options.has(bootstrapServerOpt))
+ bootstrapServerOpt
+ else
+ brokerListOpt
+
+ CommandLineUtils.checkRequiredArgs(parser, options, effectiveBrokerListOpt)
val clientId = "GetOffsetShell"
- val brokerList = options.valueOf(brokerListOpt)
+ val brokerList = options.valueOf(effectiveBrokerListOpt)
+
ToolsUtils.validatePortOrDie(parser, brokerList)
- val topic = options.valueOf(topicOpt)
+ val excludeInternalTopics = options.has(excludeInternalTopicsOpt)
+
+ if (options.has(topicPartitionsOpt) && (options.has(topicOpt) ||
options.has(partitionsOpt))) {
+ System.err.println(s"--topic-partitions cannot be used with --topic or
--partitions")
+ Exit.exit(1)
+ }
+
val partitionIdsRequested: Set[Int] = {
Review comment:
How about moving the computation of `partitionIdsRequested` to L112? It
does not make sense to do all of this if `options.has(topicPartitionsOpt)`.
##########
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##########
@@ -132,23 +150,104 @@ object GetOffsetShell {
}
}
- partitionOffsets.toArray.sortBy { case (tp, _) => tp.partition }.foreach {
case (tp, offset) =>
- println(s"$topic:${tp.partition}:${Option(offset).getOrElse("")}")
+ partitionOffsets.toSeq.sortWith((tp1, tp2) => {
+ val topicComp = tp1._1.topic.compareTo(tp2._1.topic)
+ if (topicComp == 0)
+ tp1._1.partition < tp2._1.partition
+ else
+ topicComp < 0
+ }).foreach { case (tp, offset) =>
+ println(s"${tp.topic}:${tp.partition}:${Option(offset).getOrElse("")}")
}
+ }
+ /**
+ * Creates a topic-partition filter based on a list of patterns.
+ * Expected format:
+ * List: TopicPartitionPattern(, TopicPartitionPattern)*
+ * TopicPartitionPattern: TopicPattern(:PartitionPattern)? |
:PartitionPattern
+ * TopicPattern: REGEX
+ * PartitionPattern: NUMBER | NUMBER-(NUMBER)? | -NUMBER
+ */
+ def createTopicPartitionFilterWithPatternList(topicPartitions: String,
excludeInternalTopics: Boolean): PartitionInfo => Boolean = {
+ val ruleSpecs = topicPartitions.split(",")
+ val rules = ruleSpecs.map { ruleSpec =>
Review comment:
Out of curiosity, did you consider using a regex to parse the rule spec?
It seems to me that something like this `([^:]*):([0-9]*)(?:-([0-9]*))?` could
work and could directly extract all the required parts. I haven't tested it.
That could reduce the code while making the grammar of the rule spec more
explicit.
##########
File path: core/src/test/scala/kafka/tools/GetOffsetShellTest.scala
##########
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.tools
+
+import java.util.Properties
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.{Exit, Logging, TestUtils}
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig,
ProducerRecord}
+import org.apache.kafka.common.serialization.StringSerializer
+import org.junit.Assert.assertEquals
+import org.junit.{Before, Test}
+
+class GetOffsetShellTest extends KafkaServerTestHarness with Logging {
+ private val topicCount = 4
+ private val offsetTopicPartitionCount = 4
+
+ override def generateConfigs: collection.Seq[KafkaConfig] =
TestUtils.createBrokerConfigs(1, zkConnect)
+ .map { p =>
+ p.put(KafkaConfig.OffsetsTopicPartitionsProp,
Int.box(offsetTopicPartitionCount))
+ p
+ }.map(KafkaConfig.fromProps)
+
+ @Before
+ def createTestAndInternalTopics(): Unit = {
+ Range(1, topicCount + 1).foreach(i => createTopic(topicName(i), i))
+
+ val props = new Properties()
+ props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
classOf[StringSerializer])
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
classOf[StringSerializer])
+
+ // Send X messages to each partition of topicX
+ val producer = new KafkaProducer[String, String](props)
+ Range(1, topicCount + 1).foreach(i => Range(0, i*i)
+ .foreach(msgCount => producer.send(new ProducerRecord[String,
String](topicName(i), msgCount % i, null, "val" + msgCount))))
+ producer.close()
+
+ TestUtils.createOffsetsTopic(zkClient, servers)
+ }
+
+ @Test
+ def testNoFilterOptions(): Unit = {
+ val offsets = executeAndParse(Array())
+ assertEquals(expectedOffsetsWithInternal(), offsets)
+ }
+
+ @Test
+ def testInternalExcluded(): Unit = {
+ val offsets = executeAndParse(Array("--exclude-internal-topics"))
+ assertEquals(expectedTestTopicOffsets(), offsets)
+ }
+
+ @Test
+ def testTopicNameArg(): Unit = {
+ Range(1, topicCount + 1).foreach(i => {
+ val offsets = executeAndParse(Array("--topic", topicName(i)))
+ assertEquals("Offset output did not match for " + topicName(i),
expectedOffsetsForTopic(i), offsets)
+ })
+ }
+
+ @Test
+ def testTopicPatternArg(): Unit = {
+ val offsets = executeAndParse(Array("--topic", "topic.*"))
+ assertEquals(expectedTestTopicOffsets(), offsets)
+ }
+
+ @Test
+ def testPartitionsArg(): Unit = {
+ val offsets = executeAndParse(Array("--partitions", "0,1"))
+ assertEquals(expectedOffsetsWithInternal().filter { case (_, partition, _)
=> partition <= 1 }, offsets)
+ }
+
+ @Test
+ def testTopicPatternArgWithPartitionsArg(): Unit = {
+ val offsets = executeAndParse(Array("--topic", "topic.*", "--partitions",
"0,1"))
+ assertEquals(expectedTestTopicOffsets().filter { case (_, partition, _) =>
partition <= 1 }, offsets)
+ }
+
+ @Test
+ def testTopicPartitionsArg(): Unit = {
+ val offsets = executeAndParse(Array("--topic-partitions",
"topic1:0,topic2:1,topic(3|4):2,__.*:3"))
+ assertEquals(
+ List(
+ ("__consumer_offsets", 3, Some(0)),
+ ("topic1", 0, Some(1)),
+ ("topic2", 1, Some(2)),
+ ("topic3", 2, Some(3)),
+ ("topic4", 2, Some(4))
+ ),
+ offsets
+ )
+ }
+
+ @Test
+ def testTopicPartitionsArgWithInternalExcluded(): Unit = {
+ val offsets = executeAndParse(Array("--topic-partitions",
+ "topic1:0,topic2:1,topic(3|4):2,__.*:3", "--exclude-internal-topics"))
+ assertEquals(
+ List(
+ ("topic1", 0, Some(1)),
+ ("topic2", 1, Some(2)),
+ ("topic3", 2, Some(3)),
+ ("topic4", 2, Some(4))
+ ),
+ offsets
+ )
+ }
+
+ @Test
+ def testTopicPartitionsNotFoundForNonExistentTopic(): Unit = {
+ assertExitCodeIsOne(Array("--topic", "some_nonexistent_topic"))
+ }
+
+ @Test
+ def testTopicPartitionsNotFoundForExcludedInternalTopic(): Unit = {
+ assertExitCodeIsOne(Array("--topic", "some_nonexistent_topic:*"))
+ }
+
+ @Test
+ def testTopicPartitionsNotFoundForNonMatchingTopicPartitionPattern(): Unit =
{
+ assertExitCodeIsOne(Array("--topic-partitions", "__consumer_offsets",
"--exclude-internal-topics"))
+ }
+
+ @Test
+ def testTopicPartitionsFlagWithTopicFlagCauseExit(): Unit = {
+ assertExitCodeIsOne(Array("--topic-partitions", "__consumer_offsets",
"--topic", "topic1"))
+ }
+
+ @Test
+ def testTopicPartitionsFlagWithPartitionsFlagCauseExit(): Unit = {
+ assertExitCodeIsOne(Array("--topic-partitions", "__consumer_offsets",
"--partitions", "0"))
+ }
+
+ private def expectedOffsetsWithInternal(): List[(String, Int, Option[Long])]
= {
+ Range(0, offsetTopicPartitionCount).map(i => ("__consumer_offsets", i,
Some(0L))).toList ++ expectedTestTopicOffsets()
+ }
+
+ private def expectedTestTopicOffsets(): List[(String, Int, Option[Long])] = {
+ Range(1, topicCount + 1).flatMap(i => expectedOffsetsForTopic(i)).toList
+ }
+
+ private def expectedOffsetsForTopic(i: Int): List[(String, Int,
Option[Long])] = {
+ val name = topicName(i)
+ Range(0, i).map(p => (name, p, Some(i.toLong))).toList
+ }
+
+ private def topicName(i: Int): String = "topic" + i
+
+ private def assertExitCodeIsOne(args: Array[String]): Unit = {
+ var exitStatus: Option[Int] = None
+ Exit.setExitProcedure { (status, _) =>
+ exitStatus = Some(status)
+ throw new RuntimeException
+ }
+
+ try {
+ GetOffsetShell.main(addBootstrapServer(args))
+ } catch {
+ case e: RuntimeException =>
+ } finally {
+ Exit.resetExitProcedure()
+ }
+
+ assertEquals(Some(1), exitStatus)
+ }
+
+ private def executeAndParse(args: Array[String]): List[(String, Int,
Option[Long])] = {
+ val output = executeAndGrabOutput(args)
+ output.split(System.lineSeparator())
+ .map(_.split(":"))
+ .filter(_.length >= 2)
+ .map(line => {
Review comment:
nit: `.map(line => {` -> `.map { line =>`
##########
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##########
@@ -132,23 +150,104 @@ object GetOffsetShell {
}
}
- partitionOffsets.toArray.sortBy { case (tp, _) => tp.partition }.foreach {
case (tp, offset) =>
- println(s"$topic:${tp.partition}:${Option(offset).getOrElse("")}")
+ partitionOffsets.toSeq.sortWith((tp1, tp2) => {
Review comment:
I just learnt that we can actually compare TopicPartition with the
following comparator in Scala:
```
def compareTopicPartitions(a: TopicPartition, b: TopicPartition): Boolean
= {
(a.topic(), a.partition()) < (b.topic(), b.partition())
}
```
and do `partitionOffsets.toSeq.sortWith(compareTopicPartitions).foreach {
...`. This makes the code a bit more readable.
##########
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##########
@@ -89,33 +106,34 @@ object GetOffsetShell {
}
val listOffsetsTimestamp = options.valueOf(timeOpt).longValue
- val config = new Properties
+ val topicPartitionFilter = if (options.has(topicPartitionsOpt)) {
+
createTopicPartitionFilterWithPatternList(options.valueOf(topicPartitionsOpt),
excludeInternalTopics)
+ } else {
+ createTopicPartitionFilterWithTopicAndPartitionPattern(
+ if (options.has(topicOpt)) Some(options.valueOf(topicOpt)) else None,
+ excludeInternalTopics,
+ partitionIdsRequested
+ )
+ }
+
+ val config = if (options.has(commandConfigOpt))
+ Utils.loadProps(options.valueOf(commandConfigOpt))
+ else
+ new Properties
config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
config.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId)
val consumer = new KafkaConsumer(config, new ByteArrayDeserializer, new
ByteArrayDeserializer)
- val partitionInfos = listPartitionInfos(consumer, topic,
partitionIdsRequested) match {
- case None =>
- System.err.println(s"Topic $topic does not exist")
- Exit.exit(1)
- case Some(p) if p.isEmpty =>
- if (partitionIdsRequested.isEmpty)
- System.err.println(s"Topic $topic has 0 partitions")
- else
- System.err.println(s"Topic $topic does not have any of the requested
partitions ${partitionIdsRequested.mkString(",")}")
- Exit.exit(1)
- case Some(p) => p
- }
+ val partitionInfos = listPartitionInfos(consumer, topicPartitionFilter)
- if (partitionIdsRequested.nonEmpty) {
- (partitionIdsRequested -- partitionInfos.map(_.partition)).foreach {
partitionId =>
- System.err.println(s"Error: partition $partitionId does not exist")
- }
+ if (partitionInfos.isEmpty) {
+ System.err.println(s"Could not match any topic-partitions with the
specified filters")
+ Exit.exit(1)
Review comment:
I am not fan of using `Exit.exit` all over the places. I know that it
was already like this before but I do wonder if we could improve this by
throwing exceptions instead and by catching them in the `main`. I think that
this would require to strip down the main function a bit.
Another reason for this is that the new code throws
`IllegalArgumentException` in few places but I don't think that we catch them
anywhere, do we? What do you think?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]