This is an automated email from the ASF dual-hosted git repository. mimaison pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 72cfc994f56 KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions to tools (#13131) 72cfc994f56 is described below commit 72cfc994f5675be349d4494ece3528efed290651 Author: Federico Valeri <fedeval...@gmail.com> AuthorDate: Thu Jan 26 20:06:09 2023 +0100 KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions to tools (#13131) Reviewers: Mickael Maison <mickael.mai...@gmail.com>, Christo Lolov <christolo...@gmail.com>, Sagar Rao <sagarmeansoc...@gmail.com> --- build.gradle | 1 + checkstyle/import-control-core.xml | 3 +- checkstyle/import-control.xml | 2 + core/src/main/scala/kafka/Kafka.scala | 13 +- core/src/main/scala/kafka/admin/AclCommand.scala | 44 ++-- .../kafka/admin/BrokerApiVersionsCommand.scala | 5 +- .../src/main/scala/kafka/admin/ConfigCommand.scala | 11 +- .../scala/kafka/admin/ConsumerGroupCommand.scala | 64 +++--- .../scala/kafka/admin/DelegationTokenCommand.scala | 23 ++- .../scala/kafka/admin/DeleteRecordsCommand.scala | 6 +- .../scala/kafka/admin/LeaderElectionCommand.scala | 6 +- .../main/scala/kafka/admin/LogDirsCommand.scala | 7 +- .../kafka/admin/ReassignPartitionsCommand.scala | 11 +- core/src/main/scala/kafka/admin/TopicCommand.scala | 46 +++-- .../scala/kafka/admin/ZkSecurityMigrator.scala | 7 +- .../main/scala/kafka/tools/ConsoleConsumer.scala | 23 ++- .../main/scala/kafka/tools/ConsoleProducer.scala | 13 +- .../scala/kafka/tools/ConsumerPerformance.scala | 8 +- .../main/scala/kafka/tools/DumpLogSegments.scala | 3 +- .../main/scala/kafka/tools/GetOffsetShell.scala | 5 +- core/src/main/scala/kafka/tools/JmxTool.scala | 8 +- core/src/main/scala/kafka/tools/MirrorMaker.scala | 4 +- core/src/main/scala/kafka/tools/PerfConfig.scala | 3 +- .../kafka/tools/ReplicaVerificationTool.scala | 5 +- .../scala/kafka/tools/StateChangeLogMerger.scala | 6 +- .../main/scala/kafka/tools/StreamsResetter.java | 11 +- .../main/scala/kafka/tools/TestRaftServer.scala | 7 +- .../scala/kafka/utils/CommandDefaultOptions.scala | 27 --- .../main/scala/kafka/utils/CommandLineUtils.scala | 145 ------------- core/src/main/scala/kafka/utils/ToolsUtils.scala | 19 +- .../scala/kafka/tools/LogCompactionTester.scala | 3 +- .../scala/other/kafka/TestLinearWriteSpeed.scala | 1 + .../other/kafka/TestPurgatoryPerformance.scala | 1 + .../admin/ReassignPartitionsCommandArgsTest.scala | 2 +- .../unit/kafka/utils/CommandLineUtilsTest.scala | 223 -------------------- .../kafka/server/util/CommandDefaultOptions.java | 41 ++++ .../apache/kafka/server/util/CommandLineUtils.java | 197 ++++++++++++++++++ .../kafka/server/util/CommandLineUtilsTest.java | 227 +++++++++++++++++++++ 38 files changed, 664 insertions(+), 567 deletions(-) diff --git a/build.gradle b/build.gradle index 8441ddeaf05..56883860573 100644 --- a/build.gradle +++ b/build.gradle @@ -1536,6 +1536,7 @@ project(':server-common') { api project(':clients') implementation libs.slf4jApi implementation libs.metrics + implementation libs.joptSimple testImplementation project(':clients') testImplementation project(':clients').sourceSets.test.output diff --git a/checkstyle/import-control-core.xml b/checkstyle/import-control-core.xml index 5a92c44d7c8..0b15f958b96 100644 --- a/checkstyle/import-control-core.xml +++ b/checkstyle/import-control-core.xml @@ -60,8 +60,9 @@ <subpackage name="tools"> <allow pkg="org.apache.kafka.clients.admin" /> <allow pkg="kafka.admin" /> - <allow pkg="joptsimple" /> <allow pkg="org.apache.kafka.clients.consumer" /> + <allow pkg="org.apache.kafka.server.util" /> + <allow pkg="joptsimple" /> </subpackage> <subpackage name="coordinator"> diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 8792c26c949..0e767baee5d 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -347,6 +347,7 @@ <subpackage name="server"> <allow pkg="org.apache.kafka.common" /> + <allow pkg="joptsimple" /> <!-- This is required to make AlterConfigPolicyTest work. --> <allow pkg="org.apache.kafka.server.policy" /> @@ -406,6 +407,7 @@ <allow pkg="net.sourceforge.argparse4j" /> <allow pkg="org.apache.log4j" /> <allow pkg="kafka.test" /> + <allow pkg="joptsimple" /> </subpackage> <subpackage name="trogdor"> diff --git a/core/src/main/scala/kafka/Kafka.scala b/core/src/main/scala/kafka/Kafka.scala index dad462dcad6..fa0137e9597 100755 --- a/core/src/main/scala/kafka/Kafka.scala +++ b/core/src/main/scala/kafka/Kafka.scala @@ -22,10 +22,9 @@ import java.util.Properties import joptsimple.OptionParser import kafka.server.{KafkaConfig, KafkaRaftServer, KafkaServer, Server} import kafka.utils.Implicits._ -import kafka.utils.{CommandLineUtils, Exit, Logging} +import kafka.utils.{Exit, Logging} import org.apache.kafka.common.utils.{Java, LoggingSignalHandler, OperatingSystem, Time, Utils} - -import scala.jdk.CollectionConverters._ +import org.apache.kafka.server.util.CommandLineUtils object Kafka extends Logging { @@ -41,12 +40,12 @@ object Kafka extends Logging { optionParser.accepts("version", "Print version information and exit.") if (args.isEmpty || args.contains("--help")) { - CommandLineUtils.printUsageAndDie(optionParser, + CommandLineUtils.printUsageAndExit(optionParser, "USAGE: java [options] %s server.properties [--override property=value]*".format(this.getClass.getCanonicalName.split('$').head)) } if (args.contains("--version")) { - CommandLineUtils.printVersionAndDie() + CommandLineUtils.printVersionAndExit() } val props = Utils.loadProps(args(0)) @@ -55,10 +54,10 @@ object Kafka extends Logging { val options = optionParser.parse(args.slice(1, args.length): _*) if (options.nonOptionArguments().size() > 0) { - CommandLineUtils.printUsageAndDie(optionParser, "Found non argument parameters: " + options.nonOptionArguments().toArray.mkString(",")) + CommandLineUtils.printUsageAndExit(optionParser, "Found non argument parameters: " + options.nonOptionArguments().toArray.mkString(",")) } - props ++= CommandLineUtils.parseKeyValueArgs(options.valuesOf(overrideOpt).asScala) + props ++= CommandLineUtils.parseKeyValueArgs(options.valuesOf(overrideOpt)) } props } diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala index 769e99df737..1c8b6537346 100644 --- a/core/src/main/scala/kafka/admin/AclCommand.scala +++ b/core/src/main/scala/kafka/admin/AclCommand.scala @@ -18,7 +18,6 @@ package kafka.admin import java.util.Properties - import joptsimple._ import joptsimple.util.EnumConverter import kafka.security.authorizer.{AclAuthorizer, AclEntry, AuthorizerUtils} @@ -33,6 +32,7 @@ import org.apache.kafka.common.security.JaasUtils import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.utils.{Utils, SecurityUtils => JSecurityUtils} import org.apache.kafka.server.authorizer.Authorizer +import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils} import scala.jdk.CollectionConverters._ import scala.collection.mutable @@ -51,7 +51,7 @@ object AclCommand extends Logging { val opts = new AclCommandOptions(args) - CommandLineUtils.printHelpAndExitIfNeeded(opts, "This tool helps to manage acls on kafka.") + CommandLineUtils.maybePrintHelpOrVersion(opts, "This tool helps to manage acls on kafka.") opts.checkArgs() @@ -202,8 +202,8 @@ object AclCommand extends Logging { val defaultProps = Map(KafkaConfig.ZkEnableSecureAclsProp -> JaasUtils.isZkSaslEnabled) val authorizerPropertiesWithoutTls = if (opts.options.has(opts.authorizerPropertiesOpt)) { - val authorizerProperties = opts.options.valuesOf(opts.authorizerPropertiesOpt).asScala - defaultProps ++ CommandLineUtils.parseKeyValueArgs(authorizerProperties, acceptMissingValue = false).asScala + val authorizerProperties = opts.options.valuesOf(opts.authorizerPropertiesOpt) + defaultProps ++ CommandLineUtils.parseKeyValueArgs(authorizerProperties, false).asScala } else { defaultProps } @@ -324,7 +324,7 @@ object AclCommand extends Logging { private def getResourceToAcls(opts: AclCommandOptions): Map[ResourcePattern, Set[AccessControlEntry]] = { val patternType = opts.options.valueOf(opts.resourcePatternType) if (!patternType.isSpecific) - CommandLineUtils.printUsageAndDie(opts.parser, s"A '--resource-pattern-type' value of '$patternType' is not valid when adding acls.") + CommandLineUtils.printUsageAndExit(opts.parser, s"A '--resource-pattern-type' value of '$patternType' is not valid when adding acls.") val resourceToAcl = getResourceFilterToAcls(opts).map { case (filter, acls) => @@ -332,7 +332,7 @@ object AclCommand extends Logging { } if (resourceToAcl.values.exists(_.isEmpty)) - CommandLineUtils.printUsageAndDie(opts.parser, "You must specify one of: --allow-principal, --deny-principal when trying to add ACLs.") + CommandLineUtils.printUsageAndExit(opts.parser, "You must specify one of: --allow-principal, --deny-principal when trying to add ACLs.") resourceToAcl } @@ -430,8 +430,8 @@ object AclCommand extends Logging { } yield new AccessControlEntry(principal.toString, host, operation, permissionType) } - private def getHosts(opts: AclCommandOptions, hostOptionSpec: ArgumentAcceptingOptionSpec[String], - principalOptionSpec: ArgumentAcceptingOptionSpec[String]): Set[String] = { + private def getHosts(opts: AclCommandOptions, hostOptionSpec: OptionSpec[String], + principalOptionSpec: OptionSpec[String]): Set[String] = { if (opts.options.has(hostOptionSpec)) opts.options.valuesOf(hostOptionSpec).asScala.map(_.trim).toSet else if (opts.options.has(principalOptionSpec)) @@ -440,7 +440,7 @@ object AclCommand extends Logging { Set.empty[String] } - private def getPrincipals(opts: AclCommandOptions, principalOptionSpec: ArgumentAcceptingOptionSpec[String]): Set[KafkaPrincipal] = { + private def getPrincipals(opts: AclCommandOptions, principalOptionSpec: OptionSpec[String]): Set[KafkaPrincipal] = { if (opts.options.has(principalOptionSpec)) opts.options.valuesOf(principalOptionSpec).asScala.map(s => JSecurityUtils.parseKafkaPrincipal(s.trim)).toSet else @@ -471,7 +471,7 @@ object AclCommand extends Logging { opts.options.valuesOf(opts.userPrincipalOpt).forEach(user => resourceFilters += new ResourcePatternFilter(JResourceType.USER, user.trim, patternType)) if (resourceFilters.isEmpty && dieIfNoResourceFound) - CommandLineUtils.printUsageAndDie(opts.parser, "You must provide at least one resource: --topic <topic> or --cluster or --group <group> or --delegation-token <Delegation Token ID>") + CommandLineUtils.printUsageAndExit(opts.parser, "You must provide at least one resource: --topic <topic> or --cluster or --group <group> or --delegation-token <Delegation Token ID>") resourceFilters } @@ -487,7 +487,7 @@ object AclCommand extends Logging { for ((resource, acls) <- resourceToAcls) { val validOps = AclEntry.supportedOperations(resource.resourceType) + AclOperation.ALL if ((acls.map(_.operation) -- validOps).nonEmpty) - CommandLineUtils.printUsageAndDie(opts.parser, s"ResourceType ${resource.resourceType} only supports operations ${validOps.mkString(",")}") + CommandLineUtils.printUsageAndExit(opts.parser, s"ResourceType ${resource.resourceType} only supports operations ${validOps.mkString(",")}") } } @@ -634,7 +634,7 @@ object AclCommand extends Logging { def checkArgs(): Unit = { if (options.has(bootstrapServerOpt) && options.has(authorizerOpt)) - CommandLineUtils.printUsageAndDie(parser, "Only one of --bootstrap-server or --authorizer must be specified") + CommandLineUtils.printUsageAndExit(parser, "Only one of --bootstrap-server or --authorizer must be specified") if (!options.has(bootstrapServerOpt)) { CommandLineUtils.checkRequiredArgs(parser, options, authorizerPropertiesOpt) @@ -642,32 +642,32 @@ object AclCommand extends Logging { } if (options.has(commandConfigOpt) && !options.has(bootstrapServerOpt)) - CommandLineUtils.printUsageAndDie(parser, "The --command-config option can only be used with --bootstrap-server option") + CommandLineUtils.printUsageAndExit(parser, "The --command-config option can only be used with --bootstrap-server option") if (options.has(authorizerPropertiesOpt) && options.has(bootstrapServerOpt)) - CommandLineUtils.printUsageAndDie(parser, "The --authorizer-properties option can only be used with --authorizer option") + CommandLineUtils.printUsageAndExit(parser, "The --authorizer-properties option can only be used with --authorizer option") val actions = Seq(addOpt, removeOpt, listOpt).count(options.has) if (actions != 1) - CommandLineUtils.printUsageAndDie(parser, "Command must include exactly one action: --list, --add, --remove. ") + CommandLineUtils.printUsageAndExit(parser, "Command must include exactly one action: --list, --add, --remove. ") - CommandLineUtils.checkInvalidArgs(parser, options, listOpt, Set(producerOpt, consumerOpt, allowHostsOpt, allowPrincipalsOpt, denyHostsOpt, denyPrincipalsOpt)) + CommandLineUtils.checkInvalidArgs(parser, options, listOpt, producerOpt, consumerOpt, allowHostsOpt, allowPrincipalsOpt, denyHostsOpt, denyPrincipalsOpt) //when --producer or --consumer is specified , user should not specify operations as they are inferred and we also disallow --deny-principals and --deny-hosts. - CommandLineUtils.checkInvalidArgs(parser, options, producerOpt, Set(operationsOpt, denyPrincipalsOpt, denyHostsOpt)) - CommandLineUtils.checkInvalidArgs(parser, options, consumerOpt, Set(operationsOpt, denyPrincipalsOpt, denyHostsOpt)) + CommandLineUtils.checkInvalidArgs(parser, options, producerOpt, operationsOpt, denyPrincipalsOpt, denyHostsOpt) + CommandLineUtils.checkInvalidArgs(parser, options, consumerOpt, operationsOpt, denyPrincipalsOpt, denyHostsOpt) if (options.has(listPrincipalsOpt) && !options.has(listOpt)) - CommandLineUtils.printUsageAndDie(parser, "The --principal option is only available if --list is set") + CommandLineUtils.printUsageAndExit(parser, "The --principal option is only available if --list is set") if (options.has(producerOpt) && !options.has(topicOpt)) - CommandLineUtils.printUsageAndDie(parser, "With --producer you must specify a --topic") + CommandLineUtils.printUsageAndExit(parser, "With --producer you must specify a --topic") if (options.has(idempotentOpt) && !options.has(producerOpt)) - CommandLineUtils.printUsageAndDie(parser, "The --idempotent option is only available if --producer is set") + CommandLineUtils.printUsageAndExit(parser, "The --idempotent option is only available if --producer is set") if (options.has(consumerOpt) && (!options.has(topicOpt) || !options.has(groupOpt) || (!options.has(producerOpt) && (options.has(clusterOpt) || options.has(transactionalIdOpt))))) - CommandLineUtils.printUsageAndDie(parser, "With --consumer you must specify a --topic and a --group and no --cluster or --transactional-id option should be specified.") + CommandLineUtils.printUsageAndExit(parser, "With --consumer you must specify a --topic and a --group and no --cluster or --transactional-id option should be specified.") } } } diff --git a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala index ea6ffeea65b..990ac6f9448 100644 --- a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala +++ b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala @@ -22,8 +22,6 @@ import java.io.IOException import java.util.Properties import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.{ConcurrentLinkedQueue, TimeUnit} - -import kafka.utils.{CommandDefaultOptions, CommandLineUtils} import kafka.utils.Implicits._ import kafka.utils.Logging import org.apache.kafka.common.utils.Utils @@ -42,6 +40,7 @@ import org.apache.kafka.common.utils.{KafkaThread, Time} import org.apache.kafka.common.Node import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, ApiVersionsRequest, ApiVersionsResponse, MetadataRequest, MetadataResponse} import org.apache.kafka.common.security.auth.SecurityProtocol +import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils} import scala.jdk.CollectionConverters._ import scala.util.{Failure, Success, Try} @@ -94,7 +93,7 @@ object BrokerApiVersionsCommand { checkArgs() def checkArgs(): Unit = { - CommandLineUtils.printHelpAndExitIfNeeded(this, "This tool helps to retrieve broker version information.") + CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to retrieve broker version information.") // check required args CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt) } diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index 687e65378b9..5d915f5e97f 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -23,7 +23,7 @@ import java.util.{Collections, Properties} import joptsimple._ import kafka.server.DynamicConfig.QuotaConfigs import kafka.server.{ConfigEntityName, ConfigType, Defaults, DynamicBrokerConfig, DynamicConfig, KafkaConfig} -import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit, Logging, PasswordEncoder} +import kafka.utils.{Exit, Logging, PasswordEncoder} import kafka.utils.Implicits._ import kafka.zk.{AdminZkClient, KafkaZkClient} import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions, AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, DescribeConfigsOptions, ListTopicsOptions, ScramCredentialInfo, UserScramCredentialDeletion, UserScramCredentialUpsertion, Config => JConfig, ScramMechanism => PublicScramMechanism} @@ -37,6 +37,7 @@ import org.apache.kafka.common.security.JaasUtils import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, ScramFormatter, ScramMechanism} import org.apache.kafka.common.utils.{Sanitizer, Time, Utils} import org.apache.kafka.server.log.internals.LogConfig +import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils} import org.apache.zookeeper.client.ZKClientConfig import scala.annotation.nowarn @@ -84,7 +85,7 @@ object ConfigCommand extends Logging { try { val opts = new ConfigCommandOptions(args) - CommandLineUtils.printHelpAndExitIfNeeded(opts, "This tool helps to manipulate and describe entity config for a topic, client, user, broker or ip") + CommandLineUtils.maybePrintHelpOrVersion(opts, "This tool helps to manipulate and describe entity config for a topic, client, user, broker or ip") opts.checkArgs() @@ -863,10 +864,10 @@ object ConfigCommand extends Logging { // should have exactly one action val actions = Seq(alterOpt, describeOpt).count(options.has _) if (actions != 1) - CommandLineUtils.printUsageAndDie(parser, "Command must include exactly one action: --describe, --alter") + CommandLineUtils.printUsageAndExit(parser, "Command must include exactly one action: --describe, --alter") // check required args - CommandLineUtils.checkInvalidArgs(parser, options, alterOpt, Set(describeOpt)) - CommandLineUtils.checkInvalidArgs(parser, options, describeOpt, Set(alterOpt, addConfig, deleteConfig)) + CommandLineUtils.checkInvalidArgs(parser, options, alterOpt, describeOpt) + CommandLineUtils.checkInvalidArgs(parser, options, describeOpt, alterOpt, addConfig, deleteConfig) val entityTypeVals = entityTypes if (entityTypeVals.size != entityTypeVals.distinct.size) diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index b428c447d77..9c0452b781c 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -28,18 +28,18 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.{KafkaException, Node, TopicPartition} +import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils} import scala.jdk.CollectionConverters._ import scala.collection.mutable.ListBuffer import scala.collection.{Map, Seq, immutable, mutable} import scala.util.{Failure, Success, Try} -import joptsimple.OptionSpec +import joptsimple.{OptionException, OptionSpec} import org.apache.kafka.common.protocol.Errors import scala.collection.immutable.TreeMap import scala.reflect.ClassTag import org.apache.kafka.common.ConsumerGroupState -import joptsimple.OptionException import org.apache.kafka.common.requests.ListOffsetsResponse object ConsumerGroupCommand extends Logging { @@ -49,17 +49,17 @@ object ConsumerGroupCommand extends Logging { val opts = new ConsumerGroupCommandOptions(args) try { opts.checkArgs() - CommandLineUtils.printHelpAndExitIfNeeded(opts, "This tool helps to list all consumer groups, describe a consumer group, delete consumer group info, or reset consumer group offsets.") + CommandLineUtils.maybePrintHelpOrVersion(opts, "This tool helps to list all consumer groups, describe a consumer group, delete consumer group info, or reset consumer group offsets.") // should have exactly one action val actions = Seq(opts.listOpt, opts.describeOpt, opts.deleteOpt, opts.resetOffsetsOpt, opts.deleteOffsetsOpt).count(opts.options.has) if (actions != 1) - CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --list, --describe, --delete, --reset-offsets, --delete-offsets") + CommandLineUtils.printUsageAndExit(opts.parser, "Command must include exactly one action: --list, --describe, --delete, --reset-offsets, --delete-offsets") run(opts) } catch { case e: OptionException => - CommandLineUtils.printUsageAndDie(opts.parser, e.getMessage) + CommandLineUtils.printUsageAndExit(opts.parser, e.getMessage) } } @@ -85,7 +85,7 @@ object ConsumerGroupCommand extends Logging { } } catch { case e: IllegalArgumentException => - CommandLineUtils.printUsageAndDie(opts.parser, e.getMessage) + CommandLineUtils.printUsageAndExit(opts.parser, e.getMessage) case e: Throwable => printError(s"Executing consumer group command failed due to ${e.getMessage}", Some(e)) } finally { @@ -747,7 +747,7 @@ object ConsumerGroupCommand extends Logging { if (opts.options.has(opts.resetFromFileOpt)) Nil else - CommandLineUtils.printUsageAndDie(opts.parser, "One of the reset scopes should be defined: --all-topics, --topic.") + ToolsUtils.printUsageAndExit(opts.parser, "One of the reset scopes should be defined: --all-topics, --topic.") } } @@ -801,7 +801,7 @@ object ConsumerGroupCommand extends Logging { partitionsToReset.map { topicPartition => logStartOffsets.get(topicPartition) match { case Some(LogOffsetResult.LogOffset(offset)) => (topicPartition, new OffsetAndMetadata(offset)) - case _ => CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting starting offset of topic partition: $topicPartition") + case _ => ToolsUtils.printUsageAndExit(opts.parser, s"Error getting starting offset of topic partition: $topicPartition") } }.toMap } else if (opts.options.has(opts.resetToLatestOpt)) { @@ -809,7 +809,7 @@ object ConsumerGroupCommand extends Logging { partitionsToReset.map { topicPartition => logEndOffsets.get(topicPartition) match { case Some(LogOffsetResult.LogOffset(offset)) => (topicPartition, new OffsetAndMetadata(offset)) - case _ => CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting ending offset of topic partition: $topicPartition") + case _ => ToolsUtils.printUsageAndExit(opts.parser, s"Error getting ending offset of topic partition: $topicPartition") } }.toMap } else if (opts.options.has(opts.resetShiftByOpt)) { @@ -830,7 +830,7 @@ object ConsumerGroupCommand extends Logging { val logTimestampOffset = logTimestampOffsets.get(topicPartition) logTimestampOffset match { case Some(LogOffsetResult.LogOffset(offset)) => (topicPartition, new OffsetAndMetadata(offset)) - case _ => CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting offset by timestamp of topic partition: $topicPartition") + case _ => ToolsUtils.printUsageAndExit(opts.parser, s"Error getting offset by timestamp of topic partition: $topicPartition") } }.toMap } else if (opts.options.has(opts.resetByDurationOpt)) { @@ -844,7 +844,7 @@ object ConsumerGroupCommand extends Logging { val logTimestampOffset = logTimestampOffsets.get(topicPartition) logTimestampOffset match { case Some(LogOffsetResult.LogOffset(offset)) => (topicPartition, new OffsetAndMetadata(offset)) - case _ => CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting offset by timestamp of topic partition: $topicPartition") + case _ => ToolsUtils.printUsageAndExit(opts.parser, s"Error getting offset by timestamp of topic partition: $topicPartition") } }.toMap } else if (resetPlanFromFile.isDefined) { @@ -875,12 +875,12 @@ object ConsumerGroupCommand extends Logging { val preparedOffsetsForPartitionsWithoutCommittedOffset = getLogEndOffsets(groupId, partitionsToResetWithoutCommittedOffset).map { case (topicPartition, LogOffsetResult.LogOffset(offset)) => (topicPartition, new OffsetAndMetadata(offset)) - case (topicPartition, _) => CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting ending offset of topic partition: $topicPartition") + case (topicPartition, _) => ToolsUtils.printUsageAndExit(opts.parser, s"Error getting ending offset of topic partition: $topicPartition") } preparedOffsetsForPartitionsWithCommittedOffset ++ preparedOffsetsForPartitionsWithoutCommittedOffset } else { - CommandLineUtils.printUsageAndDie(opts.parser, "Option '%s' requires one of the following scenarios: %s".format(opts.resetOffsetsOpt, opts.allResetOffsetScenarioOpts) ) + ToolsUtils.printUsageAndExit(opts.parser, "Option '%s' requires one of the following scenarios: %s".format(opts.resetOffsetsOpt, opts.allResetOffsetScenarioOpts)) } } @@ -1095,15 +1095,15 @@ object ConsumerGroupCommand extends Logging { if (options.has(describeOpt)) { if (!options.has(groupOpt) && !options.has(allGroupsOpt)) - CommandLineUtils.printUsageAndDie(parser, + CommandLineUtils.printUsageAndExit(parser, s"Option $describeOpt takes one of these options: ${allGroupSelectionScopeOpts.mkString(", ")}") val mutuallyExclusiveOpts: Set[OptionSpec[_]] = Set(membersOpt, offsetsOpt, stateOpt) if (mutuallyExclusiveOpts.toList.map(o => if (options.has(o)) 1 else 0).sum > 1) { - CommandLineUtils.printUsageAndDie(parser, + CommandLineUtils.printUsageAndExit(parser, s"Option $describeOpt takes at most one of these options: ${mutuallyExclusiveOpts.mkString(", ")}") } if (options.has(stateOpt) && options.valueOf(stateOpt) != null) - CommandLineUtils.printUsageAndDie(parser, + CommandLineUtils.printUsageAndExit(parser, s"Option $describeOpt does not take a value for $stateOpt") } else { if (options.has(timeoutMsOpt)) @@ -1112,22 +1112,22 @@ object ConsumerGroupCommand extends Logging { if (options.has(deleteOpt)) { if (!options.has(groupOpt) && !options.has(allGroupsOpt)) - CommandLineUtils.printUsageAndDie(parser, + CommandLineUtils.printUsageAndExit(parser, s"Option $deleteOpt takes one of these options: ${allGroupSelectionScopeOpts.mkString(", ")}") if (options.has(topicOpt)) - CommandLineUtils.printUsageAndDie(parser, s"The consumer does not support topic-specific offset " + + CommandLineUtils.printUsageAndExit(parser, s"The consumer does not support topic-specific offset " + "deletion from a consumer group.") } if (options.has(deleteOffsetsOpt)) { if (!options.has(groupOpt) || !options.has(topicOpt)) - CommandLineUtils.printUsageAndDie(parser, + CommandLineUtils.printUsageAndExit(parser, s"Option $deleteOffsetsOpt takes the following options: ${allDeleteOffsetsOpts.mkString(", ")}") } if (options.has(resetOffsetsOpt)) { if (options.has(dryRunOpt) && options.has(executeOpt)) - CommandLineUtils.printUsageAndDie(parser, s"Option $resetOffsetsOpt only accepts one of $executeOpt and $dryRunOpt") + CommandLineUtils.printUsageAndExit(parser, s"Option $resetOffsetsOpt only accepts one of $executeOpt and $dryRunOpt") if (!options.has(dryRunOpt) && !options.has(executeOpt)) { Console.err.println("WARN: No action will be performed as the --execute option is missing." + @@ -1137,21 +1137,21 @@ object ConsumerGroupCommand extends Logging { } if (!options.has(groupOpt) && !options.has(allGroupsOpt)) - CommandLineUtils.printUsageAndDie(parser, + CommandLineUtils.printUsageAndExit(parser, s"Option $resetOffsetsOpt takes one of these options: ${allGroupSelectionScopeOpts.mkString(", ")}") - CommandLineUtils.checkInvalidArgs(parser, options, resetToOffsetOpt, allResetOffsetScenarioOpts - resetToOffsetOpt) - CommandLineUtils.checkInvalidArgs(parser, options, resetToDatetimeOpt, allResetOffsetScenarioOpts - resetToDatetimeOpt) - CommandLineUtils.checkInvalidArgs(parser, options, resetByDurationOpt, allResetOffsetScenarioOpts - resetByDurationOpt) - CommandLineUtils.checkInvalidArgs(parser, options, resetToEarliestOpt, allResetOffsetScenarioOpts - resetToEarliestOpt) - CommandLineUtils.checkInvalidArgs(parser, options, resetToLatestOpt, allResetOffsetScenarioOpts - resetToLatestOpt) - CommandLineUtils.checkInvalidArgs(parser, options, resetToCurrentOpt, allResetOffsetScenarioOpts - resetToCurrentOpt) - CommandLineUtils.checkInvalidArgs(parser, options, resetShiftByOpt, allResetOffsetScenarioOpts - resetShiftByOpt) - CommandLineUtils.checkInvalidArgs(parser, options, resetFromFileOpt, allResetOffsetScenarioOpts - resetFromFileOpt) + CommandLineUtils.checkInvalidArgs(parser, options, resetToOffsetOpt, (allResetOffsetScenarioOpts - resetToOffsetOpt).asJava) + CommandLineUtils.checkInvalidArgs(parser, options, resetToDatetimeOpt, (allResetOffsetScenarioOpts - resetToDatetimeOpt).asJava) + CommandLineUtils.checkInvalidArgs(parser, options, resetByDurationOpt, (allResetOffsetScenarioOpts - resetByDurationOpt).asJava) + CommandLineUtils.checkInvalidArgs(parser, options, resetToEarliestOpt, (allResetOffsetScenarioOpts - resetToEarliestOpt).asJava) + CommandLineUtils.checkInvalidArgs(parser, options, resetToLatestOpt, (allResetOffsetScenarioOpts - resetToLatestOpt).asJava) + CommandLineUtils.checkInvalidArgs(parser, options, resetToCurrentOpt, (allResetOffsetScenarioOpts - resetToCurrentOpt).asJava) + CommandLineUtils.checkInvalidArgs(parser, options, resetShiftByOpt, (allResetOffsetScenarioOpts - resetShiftByOpt).asJava) + CommandLineUtils.checkInvalidArgs(parser, options, resetFromFileOpt, (allResetOffsetScenarioOpts - resetFromFileOpt).asJava) } - CommandLineUtils.checkInvalidArgs(parser, options, groupOpt, allGroupSelectionScopeOpts - groupOpt) - CommandLineUtils.checkInvalidArgs(parser, options, groupOpt, allConsumerGroupLevelOpts - describeOpt - deleteOpt - resetOffsetsOpt) - CommandLineUtils.checkInvalidArgs(parser, options, topicOpt, allConsumerGroupLevelOpts - deleteOpt - resetOffsetsOpt) + CommandLineUtils.checkInvalidArgs(parser, options, groupOpt, (allGroupSelectionScopeOpts - groupOpt).asJava) + CommandLineUtils.checkInvalidArgs(parser, options, groupOpt, (allConsumerGroupLevelOpts - describeOpt - deleteOpt - resetOffsetsOpt).asJava) + CommandLineUtils.checkInvalidArgs(parser, options, topicOpt, (allConsumerGroupLevelOpts - deleteOpt - resetOffsetsOpt).asJava ) } } } diff --git a/core/src/main/scala/kafka/admin/DelegationTokenCommand.scala b/core/src/main/scala/kafka/admin/DelegationTokenCommand.scala index 78984792ce2..7dce0d58515 100644 --- a/core/src/main/scala/kafka/admin/DelegationTokenCommand.scala +++ b/core/src/main/scala/kafka/admin/DelegationTokenCommand.scala @@ -20,17 +20,16 @@ package kafka.admin import java.text.SimpleDateFormat import java.util import java.util.Base64 - import joptsimple.ArgumentAcceptingOptionSpec -import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit, Logging} +import kafka.utils.{Exit, Logging} import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.admin.{Admin, CreateDelegationTokenOptions, DescribeDelegationTokenOptions, ExpireDelegationTokenOptions, RenewDelegationTokenOptions} import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.security.token.delegation.DelegationToken import org.apache.kafka.common.utils.{SecurityUtils, Utils} +import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils} import scala.jdk.CollectionConverters._ -import scala.collection.Set /** * A command to manage delegation token. @@ -40,12 +39,12 @@ object DelegationTokenCommand extends Logging { def main(args: Array[String]): Unit = { val opts = new DelegationTokenCommandOptions(args) - CommandLineUtils.printHelpAndExitIfNeeded(opts, "This tool helps to create, renew, expire, or describe delegation tokens.") + CommandLineUtils.maybePrintHelpOrVersion(opts, "This tool helps to create, renew, expire, or describe delegation tokens.") // should have exactly one action val actions = Seq(opts.createOpt, opts.renewOpt, opts.expiryOpt, opts.describeOpt).count(opts.options.has _) if(actions != 1) - CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --create, --renew, --expire or --describe") + CommandLineUtils.printUsageAndExit(opts.parser, "Command must include exactly one action: --create, --renew, --expire or --describe") opts.checkArgs() @@ -207,17 +206,19 @@ object DelegationTokenCommand extends Logging { if (options.has(createOpt)) CommandLineUtils.checkRequiredArgs(parser, options, maxLifeTimeOpt) - if (options.has(renewOpt)) + if (options.has(renewOpt)) { CommandLineUtils.checkRequiredArgs(parser, options, hmacOpt, renewTimePeriodOpt) + } - if (options.has(expiryOpt)) + if (options.has(expiryOpt)) { CommandLineUtils.checkRequiredArgs(parser, options, hmacOpt, expiryTimePeriodOpt) + } // check invalid args - CommandLineUtils.checkInvalidArgs(parser, options, createOpt, Set(hmacOpt, renewTimePeriodOpt, expiryTimePeriodOpt)) - CommandLineUtils.checkInvalidArgs(parser, options, renewOpt, Set(renewPrincipalsOpt, maxLifeTimeOpt, expiryTimePeriodOpt, ownerPrincipalsOpt)) - CommandLineUtils.checkInvalidArgs(parser, options, expiryOpt, Set(renewOpt, maxLifeTimeOpt, renewTimePeriodOpt, ownerPrincipalsOpt)) - CommandLineUtils.checkInvalidArgs(parser, options, describeOpt, Set(renewTimePeriodOpt, maxLifeTimeOpt, hmacOpt, renewTimePeriodOpt, expiryTimePeriodOpt)) + CommandLineUtils.checkInvalidArgs(parser, options, createOpt, hmacOpt, renewTimePeriodOpt, expiryTimePeriodOpt) + CommandLineUtils.checkInvalidArgs(parser, options, renewOpt, renewPrincipalsOpt, maxLifeTimeOpt, expiryTimePeriodOpt, ownerPrincipalsOpt) + CommandLineUtils.checkInvalidArgs(parser, options, expiryOpt, renewOpt, maxLifeTimeOpt, renewTimePeriodOpt, ownerPrincipalsOpt) + CommandLineUtils.checkInvalidArgs(parser, options, describeOpt, renewTimePeriodOpt, maxLifeTimeOpt, hmacOpt, renewTimePeriodOpt, expiryTimePeriodOpt) } } } diff --git a/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala b/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala index 71ef6fd6f19..b1747313708 100644 --- a/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala +++ b/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala @@ -19,14 +19,14 @@ package kafka.admin import java.io.PrintStream import java.util.Properties - import kafka.common.AdminCommandFailedException import kafka.utils.json.JsonValue -import kafka.utils.{CommandDefaultOptions, CommandLineUtils, CoreUtils, Json} +import kafka.utils.{CoreUtils, Json} import org.apache.kafka.clients.admin.{Admin, RecordsToDelete} import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.utils.Utils +import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils} import scala.jdk.CollectionConverters._ import scala.collection.Seq @@ -130,7 +130,7 @@ object DeleteRecordsCommand { options = parser.parse(args : _*) - CommandLineUtils.printHelpAndExitIfNeeded(this, "This tool helps to delete records of the given partitions down to the specified offset.") + CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to delete records of the given partitions down to the specified offset.") CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt, offsetJsonFileOpt) } diff --git a/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala index 92edcad003f..140f4b70177 100644 --- a/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala +++ b/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala @@ -20,8 +20,6 @@ import java.util.Properties import java.util.concurrent.ExecutionException import joptsimple.util.EnumConverter import kafka.common.AdminCommandFailedException -import kafka.utils.CommandDefaultOptions -import kafka.utils.CommandLineUtils import kafka.utils.CoreUtils import kafka.utils.Implicits._ import kafka.utils.Json @@ -33,6 +31,8 @@ import org.apache.kafka.common.errors.ClusterAuthorizationException import org.apache.kafka.common.errors.ElectionNotNeededException import org.apache.kafka.common.errors.TimeoutException import org.apache.kafka.common.utils.Utils +import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils} + import scala.jdk.CollectionConverters._ import scala.collection.mutable import scala.concurrent.duration._ @@ -44,7 +44,7 @@ object LeaderElectionCommand extends Logging { def run(args: Array[String], timeout: Duration): Unit = { val commandOptions = new LeaderElectionCommandOptions(args) - CommandLineUtils.printHelpAndExitIfNeeded( + CommandLineUtils.maybePrintHelpOrVersion( commandOptions, "This tool attempts to elect a new leader for a set of topic partitions. The type of elections supported are preferred replicas and unclean replicas." ) diff --git a/core/src/main/scala/kafka/admin/LogDirsCommand.scala b/core/src/main/scala/kafka/admin/LogDirsCommand.scala index d8c802e7d0e..870e6a17ba1 100644 --- a/core/src/main/scala/kafka/admin/LogDirsCommand.scala +++ b/core/src/main/scala/kafka/admin/LogDirsCommand.scala @@ -16,13 +16,12 @@ */ package kafka.admin - import java.io.PrintStream import java.util.Properties - -import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Json} +import kafka.utils.Json import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, LogDirDescription} import org.apache.kafka.common.utils.Utils +import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils} import scala.jdk.CollectionConverters._ import scala.collection.Map @@ -126,7 +125,7 @@ object LogDirsCommand { options = parser.parse(args : _*) - CommandLineUtils.printHelpAndExitIfNeeded(this, "This tool helps to query log directory usage on the specified brokers.") + CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to query log directory usage on the specified brokers.") CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt, describeOpt) } diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index 2f3e3a69117..24c2fafb813 100755 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -21,7 +21,7 @@ import java.util.Optional import java.util.concurrent.ExecutionException import kafka.common.AdminCommandFailedException import kafka.server.DynamicConfig -import kafka.utils.{CommandDefaultOptions, CommandLineUtils, CoreUtils, Exit, Json, Logging} +import kafka.utils.{CoreUtils, Exit, Json, Logging} import kafka.utils.Implicits._ import kafka.utils.json.JsonValue import org.apache.kafka.clients.admin.AlterConfigOp.OpType @@ -31,6 +31,7 @@ import org.apache.kafka.common.errors.{ReplicaNotAvailableException, UnknownTopi import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.{KafkaException, KafkaFuture, TopicPartition, TopicPartitionReplica} import org.apache.kafka.server.log.internals.LogConfig +import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils} import scala.jdk.CollectionConverters._ import scala.collection.{Map, Seq, mutable} @@ -1330,20 +1331,20 @@ object ReassignPartitionsCommand extends Logging { def validateAndParseArgs(args: Array[String]): ReassignPartitionsCommandOptions = { val opts = new ReassignPartitionsCommandOptions(args) - CommandLineUtils.printHelpAndExitIfNeeded(opts, helpText) + CommandLineUtils.maybePrintHelpOrVersion(opts, helpText) // Determine which action we should perform. val validActions = Seq(opts.generateOpt, opts.executeOpt, opts.verifyOpt, opts.cancelOpt, opts.listOpt) val allActions = validActions.filter(opts.options.has _) if (allActions.size != 1) { - CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: %s".format( + CommandLineUtils.printUsageAndExit(opts.parser, "Command must include exactly one action: %s".format( validActions.map("--" + _.options().get(0)).mkString(", "))) } val action = allActions.head if (!opts.options.has(opts.bootstrapServerOpt)) - CommandLineUtils.printUsageAndDie(opts.parser, "Please specify --bootstrap-server") + CommandLineUtils.printUsageAndExit(opts.parser, "Please specify --bootstrap-server") // Make sure that we have all the required arguments for our action. val requiredArgs = Map( @@ -1400,7 +1401,7 @@ object ReassignPartitionsCommand extends Logging { if (!opt.equals(action) && !requiredArgs(action).contains(opt) && !permittedArgs(action).contains(opt)) { - CommandLineUtils.printUsageAndDie(opts.parser, + CommandLineUtils.printUsageAndExit(opts.parser, """Option "%s" can't be used with action "%s"""".format(opt, action)) } }) diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 65148b203f8..3d91b6256b4 100755 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -18,7 +18,7 @@ package kafka.admin import java.util -import java.util.{Collections, Properties} +import java.util.{Collections, Optional, Properties} import joptsimple._ import kafka.common.AdminCommandFailedException import kafka.utils._ @@ -34,6 +34,7 @@ import org.apache.kafka.common.errors.{ClusterAuthorizationException, TopicExist import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.utils.Utils import org.apache.kafka.server.log.internals.LogConfig +import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils} import scala.annotation.nowarn import scala.jdk.CollectionConverters._ @@ -605,53 +606,54 @@ object TopicCommand extends Logging { def checkArgs(): Unit = { if (args.isEmpty) - CommandLineUtils.printUsageAndDie(parser, "Create, delete, describe, or change a topic.") + CommandLineUtils.printUsageAndExit(parser, "Create, delete, describe, or change a topic.") - CommandLineUtils.printHelpAndExitIfNeeded(this, "This tool helps to create, delete, describe, or change a topic.") + CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to create, delete, describe, or change a topic.") // should have exactly one action val actions = Seq(createOpt, listOpt, alterOpt, describeOpt, deleteOpt).count(options.has) if (actions != 1) - CommandLineUtils.printUsageAndDie(parser, "Command must include exactly one action: --list, --describe, --create, --alter or --delete") + CommandLineUtils.printUsageAndExit(parser, "Command must include exactly one action: --list, --describe, --create, --alter or --delete") // check required args if (!has(bootstrapServerOpt)) throw new IllegalArgumentException("--bootstrap-server must be specified") if (has(describeOpt) && has(ifExistsOpt)) { if (!has(topicOpt) && !has(topicIdOpt)) - CommandLineUtils.printUsageAndDie(parser, "--topic or --topic-id is required to describe a topic") + CommandLineUtils.printUsageAndExit(parser, "--topic or --topic-id is required to describe a topic") if (has(topicOpt) && has(topicIdOpt)) println("Only topic id will be used when both --topic and --topic-id are specified and topicId is not Uuid.ZERO_UUID") } if (!has(listOpt) && !has(describeOpt)) CommandLineUtils.checkRequiredArgs(parser, options, topicOpt) if (has(alterOpt)) { - CommandLineUtils.checkInvalidArgsSet(parser, options, Set(bootstrapServerOpt, configOpt), Set(alterOpt), - Some(kafkaConfigsCanAlterTopicConfigsViaBootstrapServer)) + val usedOptions = immutable.Set[OptionSpec[_]](bootstrapServerOpt, configOpt) + val invalidOptions = immutable.Set[OptionSpec[_]](alterOpt) + CommandLineUtils.checkInvalidArgsSet(parser, options, usedOptions.asJava, invalidOptions.asJava, Optional.of(kafkaConfigsCanAlterTopicConfigsViaBootstrapServer)) CommandLineUtils.checkRequiredArgs(parser, options, partitionsOpt) } // check invalid args - CommandLineUtils.checkInvalidArgs(parser, options, configOpt, allTopicLevelOpts -- Set(alterOpt, createOpt)) - CommandLineUtils.checkInvalidArgs(parser, options, deleteConfigOpt, allTopicLevelOpts -- Set(alterOpt) ++ Set(bootstrapServerOpt)) - CommandLineUtils.checkInvalidArgs(parser, options, partitionsOpt, allTopicLevelOpts -- Set(alterOpt, createOpt)) - CommandLineUtils.checkInvalidArgs(parser, options, replicationFactorOpt, allTopicLevelOpts -- Set(createOpt)) - CommandLineUtils.checkInvalidArgs(parser, options, replicaAssignmentOpt, allTopicLevelOpts -- Set(createOpt,alterOpt)) - if(options.has(createOpt)) - CommandLineUtils.checkInvalidArgs(parser, options, replicaAssignmentOpt, Set(partitionsOpt, replicationFactorOpt)) + CommandLineUtils.checkInvalidArgs(parser, options, configOpt, (allTopicLevelOpts -- Set(alterOpt, createOpt)).asJava) + CommandLineUtils.checkInvalidArgs(parser, options, deleteConfigOpt, (allTopicLevelOpts -- Set(alterOpt) ++ Set(bootstrapServerOpt)).asJava) + CommandLineUtils.checkInvalidArgs(parser, options, partitionsOpt, (allTopicLevelOpts -- Set(alterOpt, createOpt)).asJava) + CommandLineUtils.checkInvalidArgs(parser, options, replicationFactorOpt, (allTopicLevelOpts -- Set(createOpt)).asJava) + CommandLineUtils.checkInvalidArgs(parser, options, replicaAssignmentOpt, (allTopicLevelOpts -- Set(createOpt,alterOpt)).asJava) + if (options.has(createOpt)) + CommandLineUtils.checkInvalidArgs(parser, options, replicaAssignmentOpt, partitionsOpt, replicationFactorOpt) CommandLineUtils.checkInvalidArgs(parser, options, reportUnderReplicatedPartitionsOpt, - allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts - reportUnderReplicatedPartitionsOpt + topicsWithOverridesOpt) + (allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts - reportUnderReplicatedPartitionsOpt + topicsWithOverridesOpt).asJava) CommandLineUtils.checkInvalidArgs(parser, options, reportUnderMinIsrPartitionsOpt, - allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts - reportUnderMinIsrPartitionsOpt + topicsWithOverridesOpt) + (allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts - reportUnderMinIsrPartitionsOpt + topicsWithOverridesOpt).asJava) CommandLineUtils.checkInvalidArgs(parser, options, reportAtMinIsrPartitionsOpt, - allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts - reportAtMinIsrPartitionsOpt + topicsWithOverridesOpt) + (allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts - reportAtMinIsrPartitionsOpt + topicsWithOverridesOpt).asJava) CommandLineUtils.checkInvalidArgs(parser, options, reportUnavailablePartitionsOpt, - allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts - reportUnavailablePartitionsOpt + topicsWithOverridesOpt) + (allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts - reportUnavailablePartitionsOpt + topicsWithOverridesOpt).asJava) CommandLineUtils.checkInvalidArgs(parser, options, topicsWithOverridesOpt, - allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts) - CommandLineUtils.checkInvalidArgs(parser, options, ifExistsOpt, allTopicLevelOpts -- Set(alterOpt, deleteOpt, describeOpt)) - CommandLineUtils.checkInvalidArgs(parser, options, ifNotExistsOpt, allTopicLevelOpts -- Set(createOpt)) - CommandLineUtils.checkInvalidArgs(parser, options, excludeInternalTopicOpt, allTopicLevelOpts -- Set(listOpt, describeOpt)) + (allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts).asJava) + CommandLineUtils.checkInvalidArgs(parser, options, ifExistsOpt, (allTopicLevelOpts -- Set(alterOpt, deleteOpt, describeOpt)).asJava) + CommandLineUtils.checkInvalidArgs(parser, options, ifNotExistsOpt, (allTopicLevelOpts -- Set(createOpt)).asJava) + CommandLineUtils.checkInvalidArgs(parser, options, excludeInternalTopicOpt, (allTopicLevelOpts -- Set(listOpt, describeOpt)).asJava) } } } diff --git a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala index 126319503b5..7bc7339d548 100644 --- a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala +++ b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala @@ -19,11 +19,12 @@ package kafka.admin import joptsimple.{ArgumentAcceptingOptionSpec, OptionSet} import kafka.server.KafkaConfig -import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit, Logging} +import kafka.utils.{Exit, Logging, ToolsUtils} import kafka.utils.Implicits._ import kafka.zk.{ControllerZNode, KafkaZkClient, ZkData, ZkSecurityMigratorUtils} import org.apache.kafka.common.security.JaasUtils import org.apache.kafka.common.utils.{Time, Utils} +import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils} import org.apache.zookeeper.AsyncCallback.{ChildrenCallback, StatCallback} import org.apache.zookeeper.KeeperException import org.apache.zookeeper.KeeperException.Code @@ -70,7 +71,7 @@ object ZkSecurityMigrator extends Logging { val jaasFile = System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM) val opts = new ZkSecurityMigratorOptions(args) - CommandLineUtils.printHelpAndExitIfNeeded(opts, usageMessage) + CommandLineUtils.maybePrintHelpOrVersion(opts, usageMessage) // Must have either SASL or TLS mutual authentication enabled to use this tool. // Instantiate the client config we will use so that we take into account config provided via the CLI option @@ -99,7 +100,7 @@ object ZkSecurityMigrator extends Logging { info("zookeeper.acl option is unsecure") false case _ => - CommandLineUtils.printUsageAndDie(opts.parser, usageMessage) + ToolsUtils.printUsageAndExit(opts.parser, usageMessage) } val zkUrl = opts.options.valueOf(opts.zkUrlOpt) val zkSessionTimeout = opts.options.valueOf(opts.zkSessionTimeoutOpt).intValue diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index a70ce920e8e..2774f13397a 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -35,6 +35,7 @@ import org.apache.kafka.common.requests.ListOffsetsRequest import org.apache.kafka.common.serialization.{ByteArrayDeserializer, Deserializer} import org.apache.kafka.common.utils.Time import org.apache.kafka.common.utils.Utils +import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils} import scala.jdk.CollectionConverters._ @@ -293,7 +294,7 @@ object ConsoleConsumer extends Logging { options = tryParse(parser, args) - CommandLineUtils.printHelpAndExitIfNeeded(this, "This tool helps to read data from Kafka topics and outputs it to standard output.") + CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to read data from Kafka topics and outputs it to standard output.") var groupIdPassed = true val enableSystestEventsLogging = options.has(enableSystestEventsLoggingOpt) @@ -302,7 +303,7 @@ object ConsoleConsumer extends Logging { var topicArg: String = _ var includedTopicsArg: String = _ var filterSpec: TopicFilter = _ - val extraConsumerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(consumerPropertyOpt).asScala) + val extraConsumerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(consumerPropertyOpt)) val consumerProps = if (options.has(consumerConfigOpt)) Utils.loadProps(options.valueOf(consumerConfigOpt)) else @@ -315,7 +316,7 @@ object ConsoleConsumer extends Logging { Utils.loadProps(options.valueOf(messageFormatterConfigOpt)) else new Properties() - formatterArgs ++= CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt).asScala) + formatterArgs ++= CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt)) val maxMessages = if (options.has(maxMessagesOpt)) options.valueOf(maxMessagesOpt).intValue else -1 val timeoutMs = if (options.has(timeoutMsOpt)) options.valueOf(timeoutMsOpt).intValue else -1 val bootstrapServer = options.valueOf(bootstrapServerOpt) @@ -341,19 +342,19 @@ object ConsoleConsumer extends Logging { val topicOrFilterArgs = List(topicArg, includedTopicsArg).filterNot(_ == null) // user need to specify value for either --topic or one of the include filters options (--include or --whitelist) if (topicOrFilterArgs.size != 1) - CommandLineUtils.printUsageAndDie(parser, s"Exactly one of --include/--topic is required. " + + CommandLineUtils.printUsageAndExit(parser, s"Exactly one of --include/--topic is required. " + s"${if (options.has(whitelistOpt)) "--whitelist is DEPRECATED use --include instead; ignored if --include specified."}") if (partitionArg.isDefined) { if (!options.has(topicOpt)) - CommandLineUtils.printUsageAndDie(parser, "The topic is required when partition is specified.") + CommandLineUtils.printUsageAndExit(parser, "The topic is required when partition is specified.") if (fromBeginning && options.has(offsetOpt)) - CommandLineUtils.printUsageAndDie(parser, "Options from-beginning and offset cannot be specified together.") + CommandLineUtils.printUsageAndExit(parser, "Options from-beginning and offset cannot be specified together.") } else if (options.has(offsetOpt)) - CommandLineUtils.printUsageAndDie(parser, "The partition is required when offset is specified.") + CommandLineUtils.printUsageAndExit(parser, "The partition is required when offset is specified.") def invalidOffset(offset: String): Nothing = - CommandLineUtils.printUsageAndDie(parser, s"The provided offset value '$offset' is incorrect. Valid values are " + + ToolsUtils.printUsageAndExit(parser, s"The provided offset value '$offset' is incorrect. Valid values are " + "'earliest', 'latest', or a non-negative long.") val offsetArg = @@ -385,7 +386,7 @@ object ConsoleConsumer extends Logging { ).flatten if (groupIdsProvided.size > 1) { - CommandLineUtils.printUsageAndDie(parser, "The group ids provided in different places (directly using '--group', " + CommandLineUtils.printUsageAndExit(parser, "The group ids provided in different places (directly using '--group', " + "via '--consumer-property', or via '--consumer.config') do not match. " + s"Detected group ids: ${groupIdsProvided.mkString("'", "', '", "'")}") } @@ -403,14 +404,14 @@ object ConsoleConsumer extends Logging { } if (groupIdPassed && partitionArg.isDefined) - CommandLineUtils.printUsageAndDie(parser, "Options group and partition cannot be specified together.") + CommandLineUtils.printUsageAndExit(parser, "Options group and partition cannot be specified together.") def tryParse(parser: OptionParser, args: Array[String]): OptionSet = { try parser.parse(args: _*) catch { case e: OptionException => - CommandLineUtils.printUsageAndDie(parser, e.getMessage) + ToolsUtils.printUsageAndExit(parser, e.getMessage) } } } diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala index 668a709d60e..245212471d0 100644 --- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala @@ -24,14 +24,13 @@ import java.util.regex.Pattern import joptsimple.{OptionException, OptionParser, OptionSet} import kafka.common.MessageReader import kafka.utils.Implicits._ -import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit, ToolsUtils} +import kafka.utils.{Exit, ToolsUtils} import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.kafka.common.KafkaException import org.apache.kafka.common.record.CompressionType import org.apache.kafka.common.utils.Utils - -import scala.jdk.CollectionConverters._ +import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils} object ConsoleProducer { @@ -260,7 +259,7 @@ object ConsoleProducer { options = tryParse(parser, args) - CommandLineUtils.printHelpAndExitIfNeeded(this, "This tool helps to read data from standard input and publish it to Kafka.") + CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to read data from standard input and publish it to Kafka.") CommandLineUtils.checkRequiredArgs(parser, options, topicOpt) @@ -280,15 +279,15 @@ object ConsoleProducer { else compressionCodecOptionValue else CompressionType.NONE.name val readerClass = options.valueOf(messageReaderOpt) - val cmdLineProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(propertyOpt).asScala) - val extraProducerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(producerPropertyOpt).asScala) + val cmdLineProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(propertyOpt)) + val extraProducerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(producerPropertyOpt)) def tryParse(parser: OptionParser, args: Array[String]): OptionSet = { try parser.parse(args: _*) catch { case e: OptionException => - CommandLineUtils.printUsageAndDie(parser, e.getMessage) + ToolsUtils.printUsageAndExit(parser, e.getMessage) } } } diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala index 89428e5663b..56f49456705 100644 --- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala +++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala @@ -22,14 +22,14 @@ import java.time.Duration import java.util import java.util.concurrent.atomic.AtomicLong import java.util.{Properties, Random} - import com.typesafe.scalalogging.LazyLogging import joptsimple.OptionException -import kafka.utils.{CommandLineUtils, ToolsUtils} +import kafka.utils.ToolsUtils import org.apache.kafka.clients.consumer.{ConsumerRebalanceListener, KafkaConsumer} import org.apache.kafka.common.serialization.ByteArrayDeserializer import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.{Metric, MetricName, TopicPartition} +import org.apache.kafka.server.util.CommandLineUtils import scala.jdk.CollectionConverters._ import scala.collection.mutable @@ -260,13 +260,13 @@ object ConsumerPerformance extends LazyLogging { options = parser.parse(args: _*) catch { case e: OptionException => - CommandLineUtils.printUsageAndDie(parser, e.getMessage) + CommandLineUtils.printUsageAndExit(parser, e.getMessage) } if(options.has(numThreadsOpt) || options.has(numFetchersOpt)) println("WARNING: option [threads] and [num-fetch-threads] have been deprecated and will be ignored by the test") - CommandLineUtils.printHelpAndExitIfNeeded(this, "This tool helps in performance test for the full zookeeper consumer") + CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps in performance test for the full zookeeper consumer") CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, numMessagesOpt) diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala index ec0fb7d2e73..09d66cb2197 100755 --- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala +++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala @@ -34,6 +34,7 @@ import org.apache.kafka.metadata.MetadataRecordSerde import org.apache.kafka.metadata.bootstrap.BootstrapDirectory import org.apache.kafka.server.log.internals.{CorruptSnapshotException, OffsetIndex, TimeIndex, TransactionIndex} import org.apache.kafka.snapshot.Snapshots +import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils} import scala.jdk.CollectionConverters._ import scala.collection.mutable @@ -46,7 +47,7 @@ object DumpLogSegments { def main(args: Array[String]): Unit = { val opts = new DumpLogSegmentsOptions(args) - CommandLineUtils.printHelpAndExitIfNeeded(opts, "This tool helps to parse a log file and dump its contents to the console, useful for debugging a seemingly corrupt log segment.") + CommandLineUtils.maybePrintHelpOrVersion(opts, "This tool helps to parse a log file and dump its contents to the console, useful for debugging a seemingly corrupt log segment.") opts.checkArgs() val misMatchesForIndexFilesMap = mutable.Map[String, List[(Long, Long)]]() diff --git a/core/src/main/scala/kafka/tools/GetOffsetShell.scala b/core/src/main/scala/kafka/tools/GetOffsetShell.scala index a8fd87cbe87..379b92218c8 100644 --- a/core/src/main/scala/kafka/tools/GetOffsetShell.scala +++ b/core/src/main/scala/kafka/tools/GetOffsetShell.scala @@ -19,11 +19,12 @@ package kafka.tools import joptsimple._ -import kafka.utils.{CommandLineUtils, Exit, IncludeList, ToolsUtils} +import kafka.utils.{Exit, IncludeList, ToolsUtils} import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, ListTopicsOptions, OffsetSpec} import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.kafka.common.requests.{ListOffsetsRequest, ListOffsetsResponse} import org.apache.kafka.common.utils.Utils +import org.apache.kafka.server.util.CommandLineUtils import java.util.Properties import java.util.concurrent.ExecutionException @@ -82,7 +83,7 @@ object GetOffsetShell { val excludeInternalTopicsOpt = parser.accepts("exclude-internal-topics", s"By default, internal topics are included. If specified, internal topics are excluded.") if (args.isEmpty) - CommandLineUtils.printUsageAndDie(parser, "An interactive shell for getting topic-partition offsets.") + CommandLineUtils.printUsageAndExit(parser, "An interactive shell for getting topic-partition offsets.") val options = parser.parse(args : _*) diff --git a/core/src/main/scala/kafka/tools/JmxTool.scala b/core/src/main/scala/kafka/tools/JmxTool.scala index c0f6d4a5ead..223c459bcc2 100644 --- a/core/src/main/scala/kafka/tools/JmxTool.scala +++ b/core/src/main/scala/kafka/tools/JmxTool.scala @@ -23,13 +23,13 @@ import java.text.SimpleDateFormat import javax.management._ import javax.management.remote._ import javax.rmi.ssl.SslRMIClientSocketFactory - import joptsimple.OptionParser import scala.jdk.CollectionConverters._ import scala.collection.mutable import scala.math._ -import kafka.utils.{CommandLineUtils, Exit, Logging} +import kafka.utils.{Exit, Logging} +import org.apache.kafka.server.util.CommandLineUtils /** @@ -99,7 +99,7 @@ object JmxTool extends Logging { if(args.isEmpty) - CommandLineUtils.printUsageAndDie(parser, "Dump JMX values to standard output.") + CommandLineUtils.printUsageAndExit(parser, "Dump JMX values to standard output.") val options = parser.parse(args : _*) @@ -207,7 +207,7 @@ object JmxTool extends Logging { } if(numExpectedAttributes.isEmpty) { - CommandLineUtils.printUsageAndDie(parser, s"No matched attributes for the queried objects $queries.") + CommandLineUtils.printUsageAndExit(parser, s"No matched attributes for the queried objects $queries.") } // print csv header diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 75c2d144b02..067e8aee84c 100755 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -23,7 +23,6 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} import java.util.concurrent.CountDownLatch import java.util.regex.Pattern import java.util.{Collections, Properties} - import kafka.consumer.BaseConsumerRecord import kafka.metrics.KafkaMetricsGroup import kafka.utils._ @@ -35,6 +34,7 @@ import org.apache.kafka.common.record.RecordBatch import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer} import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.{KafkaException, TopicPartition} +import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils} import scala.jdk.CollectionConverters._ import scala.collection.mutable.HashMap @@ -86,7 +86,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { info("Starting mirror maker") try { val opts = new MirrorMakerOptions(args) - CommandLineUtils.printHelpAndExitIfNeeded(opts, "This tool helps to continuously copy data between two Kafka clusters.") + CommandLineUtils.maybePrintHelpOrVersion(opts, "This tool helps to continuously copy data between two Kafka clusters.") opts.checkArgs() } catch { case ct: ControlThrowable => throw ct diff --git a/core/src/main/scala/kafka/tools/PerfConfig.scala b/core/src/main/scala/kafka/tools/PerfConfig.scala index 836163c85fa..6857e401e80 100644 --- a/core/src/main/scala/kafka/tools/PerfConfig.scala +++ b/core/src/main/scala/kafka/tools/PerfConfig.scala @@ -17,8 +17,7 @@ package kafka.tools -import kafka.utils.CommandDefaultOptions - +import org.apache.kafka.server.util.CommandDefaultOptions class PerfConfig(args: Array[String]) extends CommandDefaultOptions(args) { val numMessagesOpt = parser.accepts("messages", "REQUIRED: The number of messages to send or consume") diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala index 599384530b7..746cd1410fe 100644 --- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala +++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala @@ -29,6 +29,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.AbstractRequest.Builder import org.apache.kafka.common.requests.{AbstractRequest, FetchRequest, FetchResponse, ListOffsetsRequest} import org.apache.kafka.common.serialization.StringDeserializer +import org.apache.kafka.server.util.CommandLineUtils import org.apache.kafka.common.utils.{LogContext, Time} import org.apache.kafka.common.{Node, TopicPartition, Uuid} @@ -114,11 +115,11 @@ object ReplicaVerificationTool extends Logging { val options = parser.parse(args: _*) if (args.isEmpty || options.has(helpOpt)) { - CommandLineUtils.printUsageAndDie(parser, "Validate that all replicas for a set of topics have the same data.") + CommandLineUtils.printUsageAndExit(parser, "Validate that all replicas for a set of topics have the same data.") } if (options.has(versionOpt)) { - CommandLineUtils.printVersionAndDie() + CommandLineUtils.printVersionAndExit() } CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt) diff --git a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala index 32e43a6606d..8986d8c8ff0 100755 --- a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala +++ b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala @@ -23,12 +23,12 @@ import scala.util.matching.Regex import collection.mutable import java.util.Date import java.text.SimpleDateFormat +import kafka.utils.{CoreUtils, Exit, Logging} -import kafka.utils.{CommandLineUtils, CoreUtils, Exit, Logging} import java.io.{BufferedOutputStream, OutputStream} import java.nio.charset.StandardCharsets - import org.apache.kafka.common.internals.Topic +import org.apache.kafka.server.util.CommandLineUtils /** * A utility that merges the state change logs (possibly obtained from different brokers and over multiple days). @@ -89,7 +89,7 @@ object StateChangeLogMerger extends Logging { .defaultsTo("9999-12-31 23:59:59,999") if(args.isEmpty) - CommandLineUtils.printUsageAndDie(parser, "A tool for merging the log files from several brokers to reconnstruct a unified history of what happened.") + CommandLineUtils.printUsageAndExit(parser, "A tool for merging the log files from several brokers to reconnstruct a unified history of what happened.") val options = parser.parse(args : _*) diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java index 99c0f789cd9..1b52b2d2ec0 100644 --- a/core/src/main/scala/kafka/tools/StreamsResetter.java +++ b/core/src/main/scala/kafka/tools/StreamsResetter.java @@ -21,7 +21,6 @@ import joptsimple.OptionParser; import joptsimple.OptionSet; import joptsimple.OptionSpec; import joptsimple.OptionSpecBuilder; -import kafka.utils.CommandLineUtils; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.DeleteTopicsResult; import org.apache.kafka.clients.admin.DescribeConsumerGroupsOptions; @@ -39,7 +38,7 @@ import org.apache.kafka.common.requests.ListOffsetsResponse; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Utils; -import scala.collection.JavaConverters; +import org.apache.kafka.server.util.CommandLineUtils; import java.io.IOException; import java.text.ParseException; @@ -282,13 +281,13 @@ public class StreamsResetter { try { options = optionParser.parse(args); if (args.length == 0 || options.has(helpOption)) { - CommandLineUtils.printUsageAndDie(optionParser, USAGE); + CommandLineUtils.printUsageAndExit(optionParser, USAGE); } if (options.has(versionOption)) { - CommandLineUtils.printVersionAndDie(); + CommandLineUtils.printVersionAndExit(); } } catch (final OptionException e) { - CommandLineUtils.printUsageAndDie(optionParser, e.getMessage()); + CommandLineUtils.printUsageAndExit(optionParser, e.getMessage()); } final Set<OptionSpec<?>> allScenarioOptions = new HashSet<>(); @@ -319,7 +318,7 @@ public class StreamsResetter { optionParser, options, option, - JavaConverters.asScalaSetConverter(invalidOptions).asScala()); + invalidOptions); } private int maybeResetInputAndSeekToEndIntermediateTopicOffsets(final Map<Object, Object> consumerConfig, diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala b/core/src/main/scala/kafka/tools/TestRaftServer.scala index 47ba5a747e1..0c31b4187a0 100644 --- a/core/src/main/scala/kafka/tools/TestRaftServer.scala +++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala @@ -24,7 +24,7 @@ import kafka.network.{DataPlaneAcceptor, SocketServer} import kafka.raft.{KafkaRaftManager, RaftManager} import kafka.security.CredentialProvider import kafka.server.{KafkaConfig, KafkaRequestHandlerPool, MetaProperties, SimpleApiVersionManager} -import kafka.utils.{CommandDefaultOptions, CommandLineUtils, CoreUtils, Exit, Logging, ShutdownableThread} +import kafka.utils.{CoreUtils, Exit, Logging, ShutdownableThread} import org.apache.kafka.common.errors.InvalidConfigurationException import org.apache.kafka.common.message.ApiMessageType.ListenerType import org.apache.kafka.common.metrics.Metrics @@ -39,6 +39,7 @@ import org.apache.kafka.raft.errors.NotLeaderException import org.apache.kafka.raft.{Batch, BatchReader, LeaderAndEpoch, RaftClient, RaftConfig} import org.apache.kafka.server.common.serialization.RecordSerde import org.apache.kafka.server.fault.ProcessExitingFaultHandler +import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils} import org.apache.kafka.snapshot.SnapshotReader import scala.jdk.CollectionConverters._ @@ -435,7 +436,7 @@ object TestRaftServer extends Logging { def main(args: Array[String]): Unit = { val opts = new TestRaftServerOptions(args) try { - CommandLineUtils.printHelpAndExitIfNeeded(opts, + CommandLineUtils.maybePrintHelpOrVersion(opts, "Standalone raft server for performance testing") val configFile = opts.options.valueOf(opts.configOpt) @@ -460,7 +461,7 @@ object TestRaftServer extends Logging { Exit.exit(0) } catch { case e: OptionException => - CommandLineUtils.printUsageAndDie(opts.parser, e.getMessage) + CommandLineUtils.printUsageAndExit(opts.parser, e.getMessage) case e: Throwable => fatal("Exiting raft server due to fatal exception", e) Exit.exit(1) diff --git a/core/src/main/scala/kafka/utils/CommandDefaultOptions.scala b/core/src/main/scala/kafka/utils/CommandDefaultOptions.scala deleted file mode 100644 index 2cdb408b4bb..00000000000 --- a/core/src/main/scala/kafka/utils/CommandDefaultOptions.scala +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.utils - -import joptsimple.{OptionParser, OptionSet} - -abstract class CommandDefaultOptions(val args: Array[String], allowCommandOptionAbbreviation: Boolean = false) { - val parser = new OptionParser(allowCommandOptionAbbreviation) - val helpOpt = parser.accepts("help", "Print usage information.").forHelp() - val versionOpt = parser.accepts("version", "Display Kafka version.").forHelp() - var options: OptionSet = _ -} diff --git a/core/src/main/scala/kafka/utils/CommandLineUtils.scala b/core/src/main/scala/kafka/utils/CommandLineUtils.scala deleted file mode 100644 index e9dcee0eb18..00000000000 --- a/core/src/main/scala/kafka/utils/CommandLineUtils.scala +++ /dev/null @@ -1,145 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.utils - -import java.util.Properties - -import joptsimple.{OptionParser, OptionSet, OptionSpec} - -import scala.collection.Set - -/** - * Helper functions for dealing with command line utilities - */ -object CommandLineUtils extends Logging { - /** - * Check if there are no options or `--help` option from command line - * - * @param commandOpts Acceptable options for a command - * @return true on matching the help check condition - */ - def isPrintHelpNeeded(commandOpts: CommandDefaultOptions): Boolean = { - commandOpts.args.isEmpty || commandOpts.options.has(commandOpts.helpOpt) - } - - def isPrintVersionNeeded(commandOpts: CommandDefaultOptions): Boolean = { - commandOpts.options.has(commandOpts.versionOpt) - } - - /** - * Check and print help message if there is no options or `--help` option - * from command line, if `--version` is specified on the command line - * print version information and exit. - * NOTE: The function name is not strictly speaking correct anymore - * as it also checks whether the version needs to be printed, but - * refactoring this would have meant changing all command line tools - * and unnecessarily increased the blast radius of this change. - * - * @param commandOpts Acceptable options for a command - * @param message Message to display on successful check - */ - def printHelpAndExitIfNeeded(commandOpts: CommandDefaultOptions, message: String): Unit = { - if (isPrintHelpNeeded(commandOpts)) - printUsageAndDie(commandOpts.parser, message) - if (isPrintVersionNeeded(commandOpts)) - printVersionAndDie() - } - - /** - * Check that all the listed options are present - */ - def checkRequiredArgs(parser: OptionParser, options: OptionSet, required: OptionSpec[_]*): Unit = { - for (arg <- required) { - if (!options.has(arg)) - printUsageAndDie(parser, "Missing required argument \"" + arg + "\"") - } - } - - /** - * Check that none of the listed options are present - */ - def checkInvalidArgs(parser: OptionParser, options: OptionSet, usedOption: OptionSpec[_], invalidOptions: Set[OptionSpec[_]]): Unit = { - if (options.has(usedOption)) { - for (arg <- invalidOptions) { - if (options.has(arg)) - printUsageAndDie(parser, "Option \"" + usedOption + "\" can't be used with option \"" + arg + "\"") - } - } - } - - /** - * Check that none of the listed options are present with the combination of used options - */ - def checkInvalidArgsSet(parser: OptionParser, options: OptionSet, usedOptions: Set[OptionSpec[_]], invalidOptions: Set[OptionSpec[_]], - trailingAdditionalMessage: Option[String] = None): Unit = { - if (usedOptions.count(options.has) == usedOptions.size) { - for (arg <- invalidOptions) { - if (options.has(arg)) - printUsageAndDie(parser, "Option combination \"" + usedOptions.mkString(",") + "\" can't be used with option \"" + arg + "\"" + trailingAdditionalMessage.getOrElse("")) - } - } - } - - /** - * Print usage and exit - */ - def printUsageAndDie(parser: OptionParser, message: String): Nothing = { - System.err.println(message) - parser.printHelpOn(System.err) - Exit.exit(1, Some(message)) - } - - def printVersionAndDie(): Nothing = { - System.out.println(VersionInfo.getVersionString) - Exit.exit(0) - } - - /** - * Parse key-value pairs in the form key=value - * value may contain equals sign - */ - def parseKeyValueArgs(args: Iterable[String], acceptMissingValue: Boolean = true): Properties = { - val splits = args.map(_.split("=", 2)).filterNot(_.isEmpty) - - val props = new Properties - for (a <- splits) { - if (a.length == 1 || (a.length == 2 && a(1).isEmpty)) { - if (acceptMissingValue) props.put(a(0), "") - else throw new IllegalArgumentException(s"Missing value for key ${a(0)}") - } - else props.put(a(0), a(1)) - } - props - } - - /** - * Merge the options into {@code props} for key {@code key}, with the following precedence, from high to low: - * 1) if {@code spec} is specified on {@code options} explicitly, use the value; - * 2) if {@code props} already has {@code key} set, keep it; - * 3) otherwise, use the default value of {@code spec}. - * A {@code null} value means to remove {@code key} from the {@code props}. - */ - def maybeMergeOptions[V](props: Properties, key: String, options: OptionSet, spec: OptionSpec[V]): Unit = { - if (options.has(spec) || !props.containsKey(key)) { - val value = options.valueOf(spec) - if (value == null) - props.remove(key) - else - props.put(key, value.toString) - } - } -} diff --git a/core/src/main/scala/kafka/utils/ToolsUtils.scala b/core/src/main/scala/kafka/utils/ToolsUtils.scala index 056545cb031..10586317f65 100644 --- a/core/src/main/scala/kafka/utils/ToolsUtils.scala +++ b/core/src/main/scala/kafka/utils/ToolsUtils.scala @@ -18,6 +18,7 @@ package kafka.utils import joptsimple.OptionParser import org.apache.kafka.common.{Metric, MetricName} +import org.apache.kafka.server.util.CommandLineUtils import scala.collection.mutable @@ -32,8 +33,8 @@ object ToolsUtils { org.apache.kafka.common.utils.Utils.getPort(hostPortData) != null } val isValid = !validHostPort.isEmpty && validHostPort.size == hostPorts.length - if(!isValid) - CommandLineUtils.printUsageAndDie(parser, "Please provide valid host:port like host1:9091,host2:9092\n ") + if (!isValid) + CommandLineUtils.printUsageAndExit(parser, "Please provide valid host:port like host1:9091,host2:9092\n ") } /** @@ -64,4 +65,18 @@ object ToolsUtils { println(s"%-${maxLengthOfDisplayName}s : $specifier".format(metricName, value)) } } + + /** + * This is a simple wrapper around `CommandLineUtils.printUsageAndExit`. + * It is needed for tools migration (KAFKA-14525), as there is no Java equivalent for return type `Nothing`. + * Can be removed once [[kafka.admin.ConsumerGroupCommand]], [[kafka.tools.ConsoleConsumer]] + * and [[kafka.tools.ConsoleProducer]] are migrated. + * + * @param parser Command line options parser. + * @param message Error message. + */ + def printUsageAndExit(parser: OptionParser, message: String): Nothing = { + CommandLineUtils.printUsageAndExit(parser, message) + throw new AssertionError("printUsageAndExit should not return, but it did.") + } } diff --git a/core/src/test/scala/kafka/tools/LogCompactionTester.scala b/core/src/test/scala/kafka/tools/LogCompactionTester.scala index 1b3b9f5b18f..141a68396dd 100755 --- a/core/src/test/scala/kafka/tools/LogCompactionTester.scala +++ b/core/src/test/scala/kafka/tools/LogCompactionTester.scala @@ -33,6 +33,7 @@ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produce import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.serialization.{ByteArraySerializer, StringDeserializer} import org.apache.kafka.common.utils.{AbstractIterator, Utils} +import org.apache.kafka.server.util.CommandLineUtils import scala.jdk.CollectionConverters._ @@ -98,7 +99,7 @@ object LogCompactionTester { val options = parser.parse(args: _*) if (args.isEmpty) - CommandLineUtils.printUsageAndDie(parser, "A tool to test log compaction. Valid options are: ") + CommandLineUtils.printUsageAndExit(parser, "A tool to test log compaction. Valid options are: ") CommandLineUtils.checkRequiredArgs(parser, options, brokerOpt, numMessagesOpt) diff --git a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala index 4b930374e38..7cc7d5254e4 100755 --- a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala +++ b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala @@ -31,6 +31,7 @@ import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.server.log.internals.{LogConfig, LogDirFailureChannel} import org.apache.kafka.server.util.{KafkaScheduler, Scheduler} +import org.apache.kafka.server.util.CommandLineUtils import scala.math._ diff --git a/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala b/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala index 507c3ff0973..edb75620eeb 100644 --- a/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala +++ b/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala @@ -26,6 +26,7 @@ import joptsimple._ import kafka.server.{DelayedOperation, DelayedOperationPurgatory} import kafka.utils._ import org.apache.kafka.common.utils.Time +import org.apache.kafka.server.util.CommandLineUtils import scala.math._ import scala.jdk.CollectionConverters._ diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandArgsTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandArgsTest.scala index 13fc262ec4e..98d54629f20 100644 --- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandArgsTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandArgsTest.scala @@ -216,7 +216,7 @@ class ReassignPartitionsCommandArgsTest { @Test def shouldPrintHelpTextIfHelpArg(): Unit = { val args: Array[String]= Array("--help") - // note, this is not actually a failed case, it's just we share the same `printUsageAndDie` method when wrong arg received + // note, this is not actually a failed case, it's just we share the same `printUsageAndExit` method when wrong arg received shouldFailWith(ReassignPartitionsCommand.helpText, args) } diff --git a/core/src/test/scala/unit/kafka/utils/CommandLineUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/CommandLineUtilsTest.scala deleted file mode 100644 index 8b528ed179b..00000000000 --- a/core/src/test/scala/unit/kafka/utils/CommandLineUtilsTest.scala +++ /dev/null @@ -1,223 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.utils - -import java.util.Properties - -import joptsimple.{OptionParser, OptionSpec} -import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.Test - -class CommandLineUtilsTest { - - - @Test - def testParseEmptyArg(): Unit = { - val argArray = Array("my.empty.property=") - - assertThrows(classOf[java.lang.IllegalArgumentException], () => CommandLineUtils.parseKeyValueArgs(argArray, acceptMissingValue = false)) - } - - @Test - def testParseEmptyArgWithNoDelimiter(): Unit = { - val argArray = Array("my.empty.property") - - assertThrows(classOf[java.lang.IllegalArgumentException], () => CommandLineUtils.parseKeyValueArgs(argArray, acceptMissingValue = false)) - } - - @Test - def testParseEmptyArgAsValid(): Unit = { - val argArray = Array("my.empty.property=", "my.empty.property1") - val props = CommandLineUtils.parseKeyValueArgs(argArray) - - assertEquals(props.getProperty("my.empty.property"), "", "Value of a key with missing value should be an empty string") - assertEquals(props.getProperty("my.empty.property1"), "", "Value of a key with missing value with no delimiter should be an empty string") - } - - @Test - def testParseSingleArg(): Unit = { - val argArray = Array("my.property=value") - val props = CommandLineUtils.parseKeyValueArgs(argArray) - - assertEquals(props.getProperty("my.property"), "value", "Value of a single property should be 'value' ") - } - - @Test - def testParseArgs(): Unit = { - val argArray = Array("first.property=first","second.property=second") - val props = CommandLineUtils.parseKeyValueArgs(argArray) - - assertEquals(props.getProperty("first.property"), "first", "Value of first property should be 'first'") - assertEquals(props.getProperty("second.property"), "second", "Value of second property should be 'second'") - } - - @Test - def testParseArgsWithMultipleDelimiters(): Unit = { - val argArray = Array("first.property==first", "second.property=second=", "third.property=thi=rd") - val props = CommandLineUtils.parseKeyValueArgs(argArray) - - assertEquals(props.getProperty("first.property"), "=first", "Value of first property should be '=first'") - assertEquals(props.getProperty("second.property"), "second=", "Value of second property should be 'second='") - assertEquals(props.getProperty("third.property"), "thi=rd", "Value of second property should be 'thi=rd'") - } - - val props = new Properties() - val parser = new OptionParser(false) - var stringOpt : OptionSpec[String] = _ - var intOpt : OptionSpec[java.lang.Integer] = _ - var stringOptOptionalArg : OptionSpec[String] = _ - var intOptOptionalArg : OptionSpec[java.lang.Integer] = _ - var stringOptOptionalArgNoDefault : OptionSpec[String] = _ - var intOptOptionalArgNoDefault : OptionSpec[java.lang.Integer] = _ - - def setUpOptions(): Unit = { - stringOpt = parser.accepts("str") - .withRequiredArg - .ofType(classOf[String]) - .defaultsTo("default-string") - intOpt = parser.accepts("int") - .withRequiredArg() - .ofType(classOf[java.lang.Integer]) - .defaultsTo(100) - stringOptOptionalArg = parser.accepts("str-opt") - .withOptionalArg - .ofType(classOf[String]) - .defaultsTo("default-string-2") - intOptOptionalArg = parser.accepts("int-opt") - .withOptionalArg - .ofType(classOf[java.lang.Integer]) - .defaultsTo(200) - stringOptOptionalArgNoDefault = parser.accepts("str-opt-nodef") - .withOptionalArg - .ofType(classOf[String]) - intOptOptionalArgNoDefault = parser.accepts("int-opt-nodef") - .withOptionalArg - .ofType(classOf[java.lang.Integer]) - } - - @Test - def testMaybeMergeOptionsOverwriteExisting(): Unit = { - setUpOptions() - - props.put("skey", "existing-string") - props.put("ikey", "300") - props.put("sokey", "existing-string-2") - props.put("iokey", "400") - props.put("sondkey", "existing-string-3") - props.put("iondkey", "500") - - val options = parser.parse( - "--str", "some-string", - "--int", "600", - "--str-opt", "some-string-2", - "--int-opt", "700", - "--str-opt-nodef", "some-string-3", - "--int-opt-nodef", "800" - ) - - CommandLineUtils.maybeMergeOptions(props, "skey", options, stringOpt) - CommandLineUtils.maybeMergeOptions(props, "ikey", options, intOpt) - CommandLineUtils.maybeMergeOptions(props, "sokey", options, stringOptOptionalArg) - CommandLineUtils.maybeMergeOptions(props, "iokey", options, intOptOptionalArg) - CommandLineUtils.maybeMergeOptions(props, "sondkey", options, stringOptOptionalArgNoDefault) - CommandLineUtils.maybeMergeOptions(props, "iondkey", options, intOptOptionalArgNoDefault) - - assertEquals("some-string", props.get("skey")) - assertEquals("600", props.get("ikey")) - assertEquals("some-string-2", props.get("sokey")) - assertEquals("700", props.get("iokey")) - assertEquals("some-string-3", props.get("sondkey")) - assertEquals("800", props.get("iondkey")) - } - - @Test - def testMaybeMergeOptionsDefaultOverwriteExisting(): Unit = { - setUpOptions() - - props.put("sokey", "existing-string") - props.put("iokey", "300") - props.put("sondkey", "existing-string-2") - props.put("iondkey", "400") - - val options = parser.parse( - "--str-opt", - "--int-opt", - "--str-opt-nodef", - "--int-opt-nodef" - ) - - CommandLineUtils.maybeMergeOptions(props, "sokey", options, stringOptOptionalArg) - CommandLineUtils.maybeMergeOptions(props, "iokey", options, intOptOptionalArg) - CommandLineUtils.maybeMergeOptions(props, "sondkey", options, stringOptOptionalArgNoDefault) - CommandLineUtils.maybeMergeOptions(props, "iondkey", options, intOptOptionalArgNoDefault) - - assertEquals("default-string-2", props.get("sokey")) - assertEquals("200", props.get("iokey")) - assertNull(props.get("sondkey")) - assertNull(props.get("iondkey")) - } - - @Test - def testMaybeMergeOptionsDefaultValueIfNotExist(): Unit = { - setUpOptions() - - val options = parser.parse() - - CommandLineUtils.maybeMergeOptions(props, "skey", options, stringOpt) - CommandLineUtils.maybeMergeOptions(props, "ikey", options, intOpt) - CommandLineUtils.maybeMergeOptions(props, "sokey", options, stringOptOptionalArg) - CommandLineUtils.maybeMergeOptions(props, "iokey", options, intOptOptionalArg) - CommandLineUtils.maybeMergeOptions(props, "sondkey", options, stringOptOptionalArgNoDefault) - CommandLineUtils.maybeMergeOptions(props, "iondkey", options, intOptOptionalArgNoDefault) - - assertEquals("default-string", props.get("skey")) - assertEquals("100", props.get("ikey")) - assertEquals("default-string-2", props.get("sokey")) - assertEquals("200", props.get("iokey")) - assertNull(props.get("sondkey")) - assertNull(props.get("iondkey")) - } - - @Test - def testMaybeMergeOptionsNotOverwriteExisting(): Unit = { - setUpOptions() - - props.put("skey", "existing-string") - props.put("ikey", "300") - props.put("sokey", "existing-string-2") - props.put("iokey", "400") - props.put("sondkey", "existing-string-3") - props.put("iondkey", "500") - - val options = parser.parse() - - CommandLineUtils.maybeMergeOptions(props, "skey", options, stringOpt) - CommandLineUtils.maybeMergeOptions(props, "ikey", options, intOpt) - CommandLineUtils.maybeMergeOptions(props, "sokey", options, stringOptOptionalArg) - CommandLineUtils.maybeMergeOptions(props, "iokey", options, intOptOptionalArg) - CommandLineUtils.maybeMergeOptions(props, "sondkey", options, stringOptOptionalArgNoDefault) - CommandLineUtils.maybeMergeOptions(props, "iondkey", options, intOptOptionalArgNoDefault) - - assertEquals("existing-string", props.get("skey")) - assertEquals("300", props.get("ikey")) - assertEquals("existing-string-2", props.get("sokey")) - assertEquals("400", props.get("iokey")) - assertEquals("existing-string-3", props.get("sondkey")) - assertEquals("500", props.get("iondkey")) - } -} diff --git a/server-common/src/main/java/org/apache/kafka/server/util/CommandDefaultOptions.java b/server-common/src/main/java/org/apache/kafka/server/util/CommandDefaultOptions.java new file mode 100644 index 00000000000..10bbd0becdd --- /dev/null +++ b/server-common/src/main/java/org/apache/kafka/server/util/CommandDefaultOptions.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.util; + +import joptsimple.AbstractOptionSpec; +import joptsimple.OptionParser; +import joptsimple.OptionSet; + +public abstract class CommandDefaultOptions { + public final String[] args; + public final OptionParser parser; + public final AbstractOptionSpec<Void> helpOpt; + public final AbstractOptionSpec<Void> versionOpt; + public OptionSet options; + + public CommandDefaultOptions(String[] args) { + this(args, false); + } + + public CommandDefaultOptions(String[] args, boolean allowCommandOptionAbbreviation) { + this.args = args; + this.parser = new OptionParser(allowCommandOptionAbbreviation); + this.helpOpt = parser.accepts("help", "Print usage information.").forHelp(); + this.versionOpt = parser.accepts("version", "Display Kafka version.").forHelp(); + this.options = null; + } +} diff --git a/server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java b/server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java new file mode 100644 index 00000000000..0f904127dcd --- /dev/null +++ b/server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.util; + +import joptsimple.OptionParser; +import joptsimple.OptionSet; +import joptsimple.OptionSpec; +import org.apache.kafka.common.utils.AppInfoParser; +import org.apache.kafka.common.utils.Exit; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; + +/** + * Helper functions for dealing with command line utilities. + */ +public class CommandLineUtils { + /** + * Check if there are no options or `--help` option from command line. + * + * @param commandOpts Acceptable options for a command + * @return true on matching the help check condition + */ + public static boolean isPrintHelpNeeded(CommandDefaultOptions commandOpts) { + return commandOpts.args.length == 0 || commandOpts.options.has(commandOpts.helpOpt); + } + + /** + * Check if there is `--version` option from command line. + * + * @param commandOpts Acceptable options for a command + * @return true on matching the help check condition + */ + public static boolean isPrintVersionNeeded(CommandDefaultOptions commandOpts) { + return commandOpts.options.has(commandOpts.versionOpt); + } + + /** + * Check and print help message if there is no options or `--help` option + * from command line, if `--version` is specified on the command line + * print version information and exit. + * + * @param commandOpts Acceptable options for a command + * @param message Message to display on successful check + */ + public static void maybePrintHelpOrVersion(CommandDefaultOptions commandOpts, String message) { + if (isPrintHelpNeeded(commandOpts)) { + printUsageAndExit(commandOpts.parser, message); + } + if (isPrintVersionNeeded(commandOpts)) { + printVersionAndExit(); + } + } + + /** + * Check that all the listed options are present. + */ + public static void checkRequiredArgs(OptionParser parser, OptionSet options, OptionSpec<?>... requiredList) { + for (OptionSpec<?> arg : requiredList) { + if (!options.has(arg)) { + printUsageAndExit(parser, String.format("Missing required argument \"%s\"", arg)); + } + } + } + + /** + * Check that none of the listed options are present. + */ + public static void checkInvalidArgs(OptionParser parser, + OptionSet options, + OptionSpec<?> usedOption, + OptionSpec<?>... invalidOptions) { + if (options.has(usedOption)) { + for (OptionSpec<?> arg : invalidOptions) { + if (options.has(arg)) { + printUsageAndExit(parser, String.format("Option \"%s\" can't be used with option \"%s\"", usedOption, arg)); + } + } + } + } + + /** + * Check that none of the listed options are present. + */ + public static void checkInvalidArgs(OptionParser parser, + OptionSet options, + OptionSpec<?> usedOption, + Set<OptionSpec<?>> invalidOptions) { + OptionSpec<?>[] array = new OptionSpec<?>[invalidOptions.size()]; + invalidOptions.toArray(array); + checkInvalidArgs(parser, options, usedOption, array); + } + + /** + * Check that none of the listed options are present with the combination of used options. + */ + public static void checkInvalidArgsSet(OptionParser parser, + OptionSet options, + Set<OptionSpec<?>> usedOptions, + Set<OptionSpec<?>> invalidOptions, + Optional<String> trailingAdditionalMessage) { + if (usedOptions.stream().filter(options::has).count() == usedOptions.size()) { + for (OptionSpec<?> arg : invalidOptions) { + if (options.has(arg)) { + printUsageAndExit(parser, String.format("Option combination \"%s\" can't be used with option \"%s\"%s", + usedOptions, arg, trailingAdditionalMessage.orElse(""))); + } + } + } + } + + public static void printUsageAndExit(OptionParser parser, String message) { + System.err.println(message); + try { + parser.printHelpOn(System.err); + } catch (IOException e) { + throw new RuntimeException(e); + } + Exit.exit(1, message); + } + + public static void printVersionAndExit() { + System.out.println(AppInfoParser.getVersion()); + Exit.exit(0); + } + + /** + * Parse key-value pairs in the form key=value. + * Value may contain equals sign. + */ + public static Properties parseKeyValueArgs(List<String> args) { + return parseKeyValueArgs(args, true); + } + + /** + * Parse key-value pairs in the form key=value. + * Value may contain equals sign. + */ + public static Properties parseKeyValueArgs(List<String> args, boolean acceptMissingValue) { + Properties props = new Properties(); + List<String[]> splits = new ArrayList<>(); + args.forEach(arg -> { + String[] split = arg.split("=", 2); + if (split.length > 0) { + splits.add(split); + } + }); + splits.forEach(split -> { + if (split.length == 1 || (split.length == 2 && (split[1] == null || split[1].isEmpty()))) { + if (acceptMissingValue) { + props.put(split[0], ""); + } else { + throw new IllegalArgumentException(String.format("Missing value for key %s}", split[0])); + } + } else { + props.put(split[0], split[1]); + } + }); + return props; + } + + /** + * Merge the options into {@code props} for key {@code key}, with the following precedence, from high to low: + * 1) if {@code spec} is specified on {@code options} explicitly, use the value; + * 2) if {@code props} already has {@code key} set, keep it; + * 3) otherwise, use the default value of {@code spec}. + * A {@code null} value means to remove {@code key} from the {@code props}. + */ + public static <T> void maybeMergeOptions(Properties props, String key, OptionSet options, OptionSpec<T> spec) { + if (options.has(spec) || !props.containsKey(key)) { + T value = options.valueOf(spec); + if (value == null) { + props.remove(key); + } else { + props.put(key, value.toString()); + } + } + } +} diff --git a/server-common/src/test/java/org/apache/kafka/server/util/CommandLineUtilsTest.java b/server-common/src/test/java/org/apache/kafka/server/util/CommandLineUtilsTest.java new file mode 100644 index 00000000000..4d12122e3ae --- /dev/null +++ b/server-common/src/test/java/org/apache/kafka/server/util/CommandLineUtilsTest.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.util; + +import joptsimple.OptionParser; +import joptsimple.OptionSet; +import joptsimple.OptionSpec; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class CommandLineUtilsTest { + @Test + public void testParseEmptyArg() { + List<String> argArray = Arrays.asList("my.empty.property="); + + assertThrows(IllegalArgumentException.class, () -> CommandLineUtils.parseKeyValueArgs(argArray, false)); + } + + @Test + public void testParseEmptyArgWithNoDelimiter() { + List<String> argArray = Arrays.asList("my.empty.property"); + + assertThrows(IllegalArgumentException.class, () -> CommandLineUtils.parseKeyValueArgs(argArray, false)); + } + + @Test + public void testParseEmptyArgAsValid() { + List<String> argArray = Arrays.asList("my.empty.property=", "my.empty.property1"); + Properties props = CommandLineUtils.parseKeyValueArgs(argArray); + + assertEquals(props.getProperty("my.empty.property"), "", "Value of a key with missing value should be an empty string"); + assertEquals(props.getProperty("my.empty.property1"), "", "Value of a key with missing value with no delimiter should be an empty string"); + } + + @Test + public void testParseSingleArg() { + List<String> argArray = Arrays.asList("my.property=value"); + Properties props = CommandLineUtils.parseKeyValueArgs(argArray); + + assertEquals(props.getProperty("my.property"), "value", "Value of a single property should be 'value'"); + } + + @Test + public void testParseArgs() { + List<String> argArray = Arrays.asList("first.property=first", "second.property=second"); + Properties props = CommandLineUtils.parseKeyValueArgs(argArray); + + assertEquals(props.getProperty("first.property"), "first", "Value of first property should be 'first'"); + assertEquals(props.getProperty("second.property"), "second", "Value of second property should be 'second'"); + } + + @Test + public void testParseArgsWithMultipleDelimiters() { + List<String> argArray = Arrays.asList("first.property==first", "second.property=second=", "third.property=thi=rd"); + Properties props = CommandLineUtils.parseKeyValueArgs(argArray); + + assertEquals(props.getProperty("first.property"), "=first", "Value of first property should be '=first'"); + assertEquals(props.getProperty("second.property"), "second=", "Value of second property should be 'second='"); + assertEquals(props.getProperty("third.property"), "thi=rd", "Value of second property should be 'thi=rd'"); + } + + Properties props = new Properties(); + OptionParser parser = new OptionParser(false); + OptionSpec<String> stringOpt; + OptionSpec<Integer> intOpt; + OptionSpec<String> stringOptOptionalArg; + OptionSpec<Integer> intOptOptionalArg; + OptionSpec<String> stringOptOptionalArgNoDefault; + OptionSpec<Integer> intOptOptionalArgNoDefault; + + private void setUpOptions() { + stringOpt = parser.accepts("str") + .withRequiredArg() + .ofType(String.class) + .defaultsTo("default-string"); + intOpt = parser.accepts("int") + .withRequiredArg() + .ofType(Integer.class) + .defaultsTo(100); + stringOptOptionalArg = parser.accepts("str-opt") + .withOptionalArg() + .ofType(String.class) + .defaultsTo("default-string-2"); + intOptOptionalArg = parser.accepts("int-opt") + .withOptionalArg() + .ofType(Integer.class) + .defaultsTo(200); + stringOptOptionalArgNoDefault = parser.accepts("str-opt-nodef") + .withOptionalArg() + .ofType(String.class); + intOptOptionalArgNoDefault = parser.accepts("int-opt-nodef") + .withOptionalArg() + .ofType(Integer.class); + } + + @Test + public void testMaybeMergeOptionsOverwriteExisting() { + setUpOptions(); + + props.put("skey", "existing-string"); + props.put("ikey", "300"); + props.put("sokey", "existing-string-2"); + props.put("iokey", "400"); + props.put("sondkey", "existing-string-3"); + props.put("iondkey", "500"); + + OptionSet options = parser.parse( + "--str", "some-string", + "--int", "600", + "--str-opt", "some-string-2", + "--int-opt", "700", + "--str-opt-nodef", "some-string-3", + "--int-opt-nodef", "800" + ); + + CommandLineUtils.maybeMergeOptions(props, "skey", options, stringOpt); + CommandLineUtils.maybeMergeOptions(props, "ikey", options, intOpt); + CommandLineUtils.maybeMergeOptions(props, "sokey", options, stringOptOptionalArg); + CommandLineUtils.maybeMergeOptions(props, "iokey", options, intOptOptionalArg); + CommandLineUtils.maybeMergeOptions(props, "sondkey", options, stringOptOptionalArgNoDefault); + CommandLineUtils.maybeMergeOptions(props, "iondkey", options, intOptOptionalArgNoDefault); + + assertEquals("some-string", props.get("skey")); + assertEquals("600", props.get("ikey")); + assertEquals("some-string-2", props.get("sokey")); + assertEquals("700", props.get("iokey")); + assertEquals("some-string-3", props.get("sondkey")); + assertEquals("800", props.get("iondkey")); + } + + @Test + public void testMaybeMergeOptionsDefaultOverwriteExisting() { + setUpOptions(); + + props.put("sokey", "existing-string"); + props.put("iokey", "300"); + props.put("sondkey", "existing-string-2"); + props.put("iondkey", "400"); + + OptionSet options = parser.parse( + "--str-opt", + "--int-opt", + "--str-opt-nodef", + "--int-opt-nodef" + ); + + CommandLineUtils.maybeMergeOptions(props, "sokey", options, stringOptOptionalArg); + CommandLineUtils.maybeMergeOptions(props, "iokey", options, intOptOptionalArg); + CommandLineUtils.maybeMergeOptions(props, "sondkey", options, stringOptOptionalArgNoDefault); + CommandLineUtils.maybeMergeOptions(props, "iondkey", options, intOptOptionalArgNoDefault); + + assertEquals("default-string-2", props.get("sokey")); + assertEquals("200", props.get("iokey")); + assertNull(props.get("sondkey")); + assertNull(props.get("iondkey")); + } + + @Test + public void testMaybeMergeOptionsDefaultValueIfNotExist() { + setUpOptions(); + + OptionSet options = parser.parse(); + + CommandLineUtils.maybeMergeOptions(props, "skey", options, stringOpt); + CommandLineUtils.maybeMergeOptions(props, "ikey", options, intOpt); + CommandLineUtils.maybeMergeOptions(props, "sokey", options, stringOptOptionalArg); + CommandLineUtils.maybeMergeOptions(props, "iokey", options, intOptOptionalArg); + CommandLineUtils.maybeMergeOptions(props, "sondkey", options, stringOptOptionalArgNoDefault); + CommandLineUtils.maybeMergeOptions(props, "iondkey", options, intOptOptionalArgNoDefault); + + assertEquals("default-string", props.get("skey")); + assertEquals("100", props.get("ikey")); + assertEquals("default-string-2", props.get("sokey")); + assertEquals("200", props.get("iokey")); + assertNull(props.get("sondkey")); + assertNull(props.get("iondkey")); + } + + @Test + public void testMaybeMergeOptionsNotOverwriteExisting() { + setUpOptions(); + + props.put("skey", "existing-string"); + props.put("ikey", "300"); + props.put("sokey", "existing-string-2"); + props.put("iokey", "400"); + props.put("sondkey", "existing-string-3"); + props.put("iondkey", "500"); + + OptionSet options = parser.parse(); + + CommandLineUtils.maybeMergeOptions(props, "skey", options, stringOpt); + CommandLineUtils.maybeMergeOptions(props, "ikey", options, intOpt); + CommandLineUtils.maybeMergeOptions(props, "sokey", options, stringOptOptionalArg); + CommandLineUtils.maybeMergeOptions(props, "iokey", options, intOptOptionalArg); + CommandLineUtils.maybeMergeOptions(props, "sondkey", options, stringOptOptionalArgNoDefault); + CommandLineUtils.maybeMergeOptions(props, "iondkey", options, intOptOptionalArgNoDefault); + + assertEquals("existing-string", props.get("skey")); + assertEquals("300", props.get("ikey")); + assertEquals("existing-string-2", props.get("sokey")); + assertEquals("400", props.get("iokey")); + assertEquals("existing-string-3", props.get("sondkey")); + assertEquals("500", props.get("iondkey")); + } +}