showuon commented on code in PR #10827:
URL: https://github.com/apache/kafka/pull/10827#discussion_r845892551
##########
core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala:
##########
@@ -140,25 +150,28 @@ object ReplicaVerificationTool extends Logging {
val maxWaitMs = options.valueOf(maxWaitMsOpt).intValue
val initialOffsetTime = options.valueOf(initialOffsetTimeOpt).longValue
val reportInterval = options.valueOf(reportIntervalOpt).longValue
+ }
+
+ def main(args: Array[String]): Unit = {
+ val opts = new ReplicaVerificationToolOptions(args)
+
// getting topic metadata
info("Getting topic metadata...")
- val brokerList = options.valueOf(brokerListOpt)
- ToolsUtils.validatePortOrDie(parser, brokerList)
val (topicsMetadata, brokerInfo) = {
- val adminClient = createAdminClient(brokerList)
- try ((listTopicsMetadata(adminClient), brokerDetails(adminClient)))
+ val adminClient = createAdminClient(opts.bootstrapServer)
+ try (listTopicsMetadata(adminClient), brokerDetails(adminClient))
finally CoreUtils.swallow(adminClient.close(), this)
}
- val topicIds = topicsMetadata.map( metadata => metadata.name() ->
metadata.topicId()).toMap
+ val topicIds = topicsMetadata.map(metadata => metadata.name() ->
metadata.topicId()).toMap
val filteredTopicMetadata = topicsMetadata.filter { topicMetaData =>
- topicsIncludeFilter.isTopicAllowed(topicMetaData.name,
excludeInternalTopics = false)
+ opts.topicsIncludeFilter.isTopicAllowed(topicMetaData.name,
excludeInternalTopics = false)
}
if (filteredTopicMetadata.isEmpty) {
- error(s"No topics found. $topicsIncludeOpt if specified, is either
filtering out all topics or there is no topic.")
+ error(s"No topics found. ${opts.topicWhiteListOpt} if specified, is
either filtering out all topics or there is no topic.")
Review Comment:
Is it intended to change from `topicsIncludeOpt` to `topicWhiteListOpt` ?
##########
core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala:
##########
@@ -73,55 +74,64 @@ object ReplicaVerificationTool extends Logging {
ReplicaVerificationTool.dateFormat.format(new
Date(Time.SYSTEM.milliseconds))
}
- def main(args: Array[String]): Unit = {
- val parser = new OptionParser(false)
- val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of
hostname and port of the server to connect to.")
- .withRequiredArg
- .describedAs("hostname:port,...,hostname:port")
- .ofType(classOf[String])
- val fetchSizeOpt = parser.accepts("fetch-size", "The fetch size of each
request.")
- .withRequiredArg
- .describedAs("bytes")
- .ofType(classOf[java.lang.Integer])
-
.defaultsTo(ConsumerConfig.DEFAULT_MAX_PARTITION_FETCH_BYTES)
- val maxWaitMsOpt = parser.accepts("max-wait-ms", "The max amount of time
each fetch request waits.")
- .withRequiredArg
- .describedAs("ms")
- .ofType(classOf[java.lang.Integer])
- .defaultsTo(1000)
+ // Non-private for testing
+ sealed class ReplicaVerificationToolOptions(args: Array[String]) extends
CommandDefaultOptions(args) {
+ private val brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use
--bootstrap-server instead; ignored if --bootstrap-server is specified. The
list of hostname and port of the server to connect to.")
+ .withRequiredArg
+ .describedAs("server to connect to")
+ .ofType(classOf[String])
+ private val bootstrapServerOpt = parser.accepts("bootstrap-server",
"REQUIRED. The list of hostname and port of the server to connect to.")
+ .requiredUnless("broker-list")
+ .withRequiredArg
+ .describedAs("server to connect to")
+ .ofType(classOf[String])
+ private val fetchSizeOpt = parser.accepts("fetch-size", "The fetch size of
each request.")
+ .withRequiredArg
+ .describedAs("bytes")
+ .ofType(classOf[java.lang.Integer])
+ .defaultsTo(ConsumerConfig.DEFAULT_MAX_PARTITION_FETCH_BYTES)
+ private val maxWaitMsOpt = parser.accepts("max-wait-ms", "The max amount
of time each fetch request waits.")
+ .withRequiredArg
+ .describedAs("ms")
+ .ofType(classOf[java.lang.Integer])
+ .defaultsTo(1000)
val topicWhiteListOpt = parser.accepts("topic-white-list", "DEPRECATED use
--topics-include instead; ignored if --topics-include specified. List of topics
to verify replica consistency. Defaults to '.*' (all topics)")
- .withRequiredArg
- .describedAs("Java regex (String)")
- .ofType(classOf[String])
- .defaultsTo(".*")
+ .withRequiredArg
+ .describedAs("Java regex (String)")
+ .ofType(classOf[String])
+ .defaultsTo(".*")
val topicsIncludeOpt = parser.accepts("topics-include", "List of topics to
verify replica consistency. Defaults to '.*' (all topics)")
- .withRequiredArg
- .describedAs("Java regex (String)")
- .ofType(classOf[String])
- .defaultsTo(".*")
- val initialOffsetTimeOpt = parser.accepts("time", "Timestamp for getting
the initial offsets.")
- .withRequiredArg
- .describedAs("timestamp/-1(latest)/-2(earliest)")
- .ofType(classOf[java.lang.Long])
- .defaultsTo(-1L)
- val reportIntervalOpt = parser.accepts("report-interval-ms", "The
reporting interval.")
- .withRequiredArg
- .describedAs("ms")
- .ofType(classOf[java.lang.Long])
- .defaultsTo(30 * 1000L)
- val helpOpt = parser.accepts("help", "Print usage information.").forHelp()
- val versionOpt = parser.accepts("version", "Print version information and
exit.").forHelp()
Review Comment:
Why did we remove `help` and `version` options?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]