chia7712 commented on code in PR #16381:
URL: https://github.com/apache/kafka/pull/16381#discussion_r1673154305
##########
core/src/main/scala/kafka/admin/ConfigCommand.scala:
##########
@@ -354,128 +354,150 @@ object ConfigCommand extends Logging {
}
}
- @nowarn("cat=deprecation")
def alterConfig(adminClient: Admin, opts: ConfigCommandOptions): Unit = {
val entityTypes = opts.entityTypes
val entityNames = opts.entityNames
val entityTypeHead = entityTypes.head
- val entityNameHead = entityNames.head
val configsToBeAddedMap = parseConfigsToBeAdded(opts).asScala.toMap // no
need for mutability
val configsToBeAdded = configsToBeAddedMap.map { case (k, v) => (k, new
ConfigEntry(k, v)) }
val configsToBeDeleted = parseConfigsToBeDeleted(opts)
entityTypeHead match {
case ConfigType.TOPIC =>
- val oldConfig = getResourceConfig(adminClient, entityTypeHead,
entityNameHead, includeSynonyms = false, describeAll = false)
- .map { entry => (entry.name, entry) }.toMap
+ alterTopicConfig(adminClient, entityTypeHead, entityNames,
configsToBeAdded, configsToBeDeleted)
+ case ConfigType.BROKER =>
+ alterBrokerConfig(adminClient, entityTypeHead, entityNames,
configsToBeAdded, configsToBeDeleted)
+ case BrokerLoggerConfigType =>
+ alterBrokerLoggingConfig(adminClient, entityTypeHead, entityNames,
configsToBeAdded, configsToBeDeleted)
+ case ConfigType.USER | ConfigType.CLIENT =>
+ alterUserOrClientConfig(adminClient, entityTypes, entityNames,
configsToBeAddedMap, configsToBeAdded, configsToBeDeleted)
+ case ConfigType.IP =>
+ alterIpConfig(adminClient, entityTypes, entityNames,
configsToBeAddedMap, configsToBeAdded, configsToBeDeleted)
+ case ConfigType.CLIENT_METRICS =>
+ alterClientMetricsConfig(adminClient, entityTypeHead, entityNames,
configsToBeAdded, configsToBeDeleted)
+ case _ => throw new IllegalArgumentException(s"Unsupported entity type:
$entityTypeHead")
+ }
- // fail the command if any of the configs to be deleted does not exist
- val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains)
- if (invalidConfigs.nonEmpty)
- throw new InvalidConfigurationException(s"Invalid config(s):
${invalidConfigs.mkString(",")}")
+ if (entityNames.nonEmpty) {
+ entityNames.foreach(
+ entityName => println(s"Completed updating config for
${entityTypeHead.dropRight(1)} $entityName.")
+ )
+ } else
+ println(s"Completed updating default config for $entityTypeHead in the
cluster.")
+ }
- val configResource = new ConfigResource(ConfigResource.Type.TOPIC,
entityNameHead)
- val alterOptions = new
AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
- val alterEntries = (configsToBeAdded.values.map(new AlterConfigOp(_,
AlterConfigOp.OpType.SET))
- ++ configsToBeDeleted.map { k => new AlterConfigOp(new
ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE) }
- ).asJavaCollection
- adminClient.incrementalAlterConfigs(Map(configResource ->
alterEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)
+ private def alterTopicConfig(adminClient: Admin, entityTypeHead: String,
entityNames: List[String], configsToBeAdded: Predef.Map[String, ConfigEntry],
configsToBeDeleted: Seq[String]) = {
+ entityNames.foreach { entityName =>
+ getOldConfig(adminClient, entityTypeHead, configsToBeDeleted, entityName)
+ }
+ val alterOptions = new
AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
+ val alterEntries = (configsToBeAdded.values.map(new AlterConfigOp(_,
AlterConfigOp.OpType.SET))
+ ++ configsToBeDeleted.map { k => new AlterConfigOp(new ConfigEntry(k,
""), AlterConfigOp.OpType.DELETE) }
+ ).asJavaCollection
+ adminClient.incrementalAlterConfigs(entityNames.map(new
ConfigResource(ConfigResource.Type.TOPIC, _))
+ .map(_ -> alterEntries).toMap.asJava, alterOptions)
+ .all().get(60, TimeUnit.SECONDS)
+ }
- case ConfigType.BROKER =>
- val oldConfig = getResourceConfig(adminClient, entityTypeHead,
entityNameHead, includeSynonyms = false, describeAll = false)
- .map { entry => (entry.name, entry) }.toMap
+ private def getOldConfig(adminClient: Admin, entityTypeHead: String,
configsToBeDeleted: Seq[String], entityName: String): Map[String, ConfigEntry]
= {
+ val oldConfig = getResourceConfig(adminClient, entityTypeHead, entityName,
includeSynonyms = false, describeAll = false)
+ .map { entry => (entry.name, entry) }.toMap
- // fail the command if any of the configs to be deleted does not exist
- val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains)
- if (invalidConfigs.nonEmpty)
- throw new InvalidConfigurationException(s"Invalid config(s):
${invalidConfigs.mkString(",")}")
+ // fail the command if any of the configs to be deleted does not exist
+ val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains)
+ if (invalidConfigs.nonEmpty)
+ throw new InvalidConfigurationException(s"Invalid config(s):
${invalidConfigs.mkString(",")}")
+ oldConfig
+ }
- val newEntries = oldConfig ++ configsToBeAdded -- configsToBeDeleted
- val sensitiveEntries = newEntries.filter(_._2.value == null)
- if (sensitiveEntries.nonEmpty)
- throw new InvalidConfigurationException(s"All sensitive broker
config entries must be specified for --alter, missing entries:
${sensitiveEntries.keySet}")
- val newConfig = new JConfig(newEntries.asJava.values)
+ @nowarn("cat=deprecation")
+ private def alterBrokerConfig(adminClient: Admin, entityTypeHead: String,
entityNames: List[String], configsToBeAdded: Predef.Map[String, ConfigEntry],
configsToBeDeleted: Seq[String]): Unit = {
+ entityNames.foreach { entityName =>
+ val oldConfig = getOldConfig(adminClient, entityTypeHead,
configsToBeDeleted, entityName)
- val configResource = new ConfigResource(ConfigResource.Type.BROKER,
entityNameHead)
- val alterOptions = new
AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
- adminClient.alterConfigs(Map(configResource -> newConfig).asJava,
alterOptions).all().get(60, TimeUnit.SECONDS)
+ val newEntries = oldConfig ++ configsToBeAdded -- configsToBeDeleted
+ val sensitiveEntries = newEntries.filter(_._2.value == null)
+ if (sensitiveEntries.nonEmpty)
+ throw new InvalidConfigurationException(s"All sensitive broker config
entries must be specified for --alter, missing entries:
${sensitiveEntries.keySet}")
+ val newConfig = new JConfig(newEntries.asJava.values)
- case BrokerLoggerConfigType =>
- val validLoggers = getResourceConfig(adminClient, entityTypeHead,
entityNameHead, includeSynonyms = true, describeAll = false).map(_.name)
- // fail the command if any of the configured broker loggers do not
exist
- val invalidBrokerLoggers =
configsToBeDeleted.filterNot(validLoggers.contains) ++
configsToBeAdded.keys.filterNot(validLoggers.contains)
- if (invalidBrokerLoggers.nonEmpty)
- throw new InvalidConfigurationException(s"Invalid broker logger(s):
${invalidBrokerLoggers.mkString(",")}")
-
- val configResource = new
ConfigResource(ConfigResource.Type.BROKER_LOGGER, entityNameHead)
- val alterOptions = new
AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
- val alterLogLevelEntries = (configsToBeAdded.values.map(new
AlterConfigOp(_, AlterConfigOp.OpType.SET))
- ++ configsToBeDeleted.map { k => new AlterConfigOp(new
ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE) }
- ).asJavaCollection
- adminClient.incrementalAlterConfigs(Map(configResource ->
alterLogLevelEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)
+ val configResource = new ConfigResource(ConfigResource.Type.BROKER,
entityName)
+ val alterOptions = new
AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
+ adminClient.alterConfigs(Map(configResource -> newConfig).asJava,
alterOptions).all().get(60, TimeUnit.SECONDS)
+ }
+ }
- case ConfigType.USER | ConfigType.CLIENT =>
- val hasQuotaConfigsToAdd =
configsToBeAdded.keys.exists(QuotaConfigs.isClientOrUserQuotaConfig)
- val scramConfigsToAddMap = configsToBeAdded.filter(entry =>
ScramMechanism.isScram(entry._1))
- val unknownConfigsToAdd = configsToBeAdded.keys.filterNot(key =>
ScramMechanism.isScram(key) || QuotaConfigs.isClientOrUserQuotaConfig(key))
- val hasQuotaConfigsToDelete =
configsToBeDeleted.exists(QuotaConfigs.isClientOrUserQuotaConfig)
- val scramConfigsToDelete =
configsToBeDeleted.filter(ScramMechanism.isScram)
- val unknownConfigsToDelete = configsToBeDeleted.filterNot(key =>
ScramMechanism.isScram(key) || QuotaConfigs.isClientOrUserQuotaConfig(key))
- if (entityTypeHead == ConfigType.CLIENT || entityTypes.size == 2) { //
size==2 for case where users is specified first on the command line, before
clients
- // either just a client or both a user and a client
- if (unknownConfigsToAdd.nonEmpty || scramConfigsToAddMap.nonEmpty)
- throw new IllegalArgumentException(s"Only quota configs can be
added for '${ConfigType.CLIENT}' using --bootstrap-server. Unexpected config
names: ${unknownConfigsToAdd ++ scramConfigsToAddMap.keys}")
- if (unknownConfigsToDelete.nonEmpty || scramConfigsToDelete.nonEmpty)
- throw new IllegalArgumentException(s"Only quota configs can be
deleted for '${ConfigType.CLIENT}' using --bootstrap-server. Unexpected config
names: ${unknownConfigsToDelete ++ scramConfigsToDelete}")
- } else { // ConfigType.User
- if (unknownConfigsToAdd.nonEmpty)
- throw new IllegalArgumentException(s"Only quota and SCRAM
credential configs can be added for '${ConfigType.USER}' using
--bootstrap-server. Unexpected config names: $unknownConfigsToAdd")
- if (unknownConfigsToDelete.nonEmpty)
- throw new IllegalArgumentException(s"Only quota and SCRAM
credential configs can be deleted for '${ConfigType.USER}' using
--bootstrap-server. Unexpected config names: $unknownConfigsToDelete")
- if (scramConfigsToAddMap.nonEmpty || scramConfigsToDelete.nonEmpty) {
- if (entityNames.exists(_.isEmpty)) // either --entity-type users
--entity-default or --user-defaults
- throw new IllegalArgumentException("The use of --entity-default
or --user-defaults is not allowed with User SCRAM Credentials using
--bootstrap-server.")
- if (hasQuotaConfigsToAdd || hasQuotaConfigsToDelete)
- throw new IllegalArgumentException(s"Cannot alter both quota and
SCRAM credential configs simultaneously for '${ConfigType.USER}' using
--bootstrap-server.")
- }
- }
+ private def alterBrokerLoggingConfig(adminClient: Admin, entityTypeHead:
String, entityNames: List[String], configsToBeAdded: Predef.Map[String,
ConfigEntry], configsToBeDeleted: Seq[String]) = {
+ entityNames.foreach { entityName =>
+ val validLoggers = getResourceConfig(adminClient, entityTypeHead,
entityName, includeSynonyms = true, describeAll = false).map(_.name)
+ // fail the command if any of the configured broker loggers do not exist
+ val invalidBrokerLoggers =
configsToBeDeleted.filterNot(validLoggers.contains) ++
configsToBeAdded.keys.filterNot(validLoggers.contains)
+ if (invalidBrokerLoggers.nonEmpty)
+ throw new InvalidConfigurationException(s"Invalid broker logger(s):
${invalidBrokerLoggers.mkString(",")}")
+ }
+ val alterOptions = new
AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
+ val alterLogLevelEntries = (configsToBeAdded.values.map(new
AlterConfigOp(_, AlterConfigOp.OpType.SET))
+ ++ configsToBeDeleted.map { k => new AlterConfigOp(new ConfigEntry(k,
""), AlterConfigOp.OpType.DELETE) }
+ ).asJavaCollection
+ adminClient.incrementalAlterConfigs(entityNames.map(new
ConfigResource(ConfigResource.Type.BROKER_LOGGER, _))
+ .map(_ -> alterLogLevelEntries).toMap.asJava,
alterOptions).all().get(60, TimeUnit.SECONDS)
+ }
- if (hasQuotaConfigsToAdd || hasQuotaConfigsToDelete) {
- alterQuotaConfigs(adminClient, entityTypes, entityNames,
configsToBeAddedMap, configsToBeDeleted)
- } else {
- // handle altering user SCRAM credential configs
- if (entityNames.size != 1)
- // should never happen, if we get here then it is a bug
- throw new IllegalStateException(s"Altering user SCRAM credentials
should never occur for more zero or multiple users: $entityNames")
- alterUserScramCredentialConfigs(adminClient, entityNames.head,
scramConfigsToAddMap, scramConfigsToDelete)
- }
- case ConfigType.IP =>
- val unknownConfigs = (configsToBeAdded.keys ++
configsToBeDeleted).filterNot(key => DynamicConfig.Ip.names.contains(key))
- if (unknownConfigs.nonEmpty)
- throw new IllegalArgumentException(s"Only connection quota configs
can be added for '${ConfigType.IP}' using --bootstrap-server. Unexpected config
names: ${unknownConfigs.mkString(",")}")
- alterQuotaConfigs(adminClient, entityTypes, entityNames,
configsToBeAddedMap, configsToBeDeleted)
- case ConfigType.CLIENT_METRICS =>
- val oldConfig = getResourceConfig(adminClient, entityTypeHead,
entityNameHead, includeSynonyms = false, describeAll = false)
- .map { entry => (entry.name, entry) }.toMap
-
- // fail the command if any of the configs to be deleted does not exist
- val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains)
- if (invalidConfigs.nonEmpty)
- throw new InvalidConfigurationException(s"Invalid config(s):
${invalidConfigs.mkString(",")}")
-
- val configResource = new
ConfigResource(ConfigResource.Type.CLIENT_METRICS, entityNameHead)
- val alterOptions = new
AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
- val alterEntries = (configsToBeAdded.values.map(new AlterConfigOp(_,
AlterConfigOp.OpType.SET))
- ++ configsToBeDeleted.map { k => new AlterConfigOp(new
ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE) }
- ).asJavaCollection
- adminClient.incrementalAlterConfigs(Map(configResource ->
alterEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)
- case _ => throw new IllegalArgumentException(s"Unsupported entity type:
$entityTypeHead")
+ private def alterUserOrClientConfig(adminClient: Admin, entityTypes:
List[String], entityNames: List[String], configsToBeAddedMap:
Predef.Map[String, String], configsToBeAdded: Predef.Map[String, ConfigEntry],
configsToBeDeleted: Seq[String]) = {
+ val entityTypeHead = entityTypes.head
+ val hasQuotaConfigsToAdd =
configsToBeAdded.keys.exists(QuotaConfigs.isClientOrUserQuotaConfig)
+ val scramConfigsToAddMap = configsToBeAdded.filter(entry =>
ScramMechanism.isScram(entry._1))
+ val unknownConfigsToAdd = configsToBeAdded.keys.filterNot(key =>
ScramMechanism.isScram(key) || QuotaConfigs.isClientOrUserQuotaConfig(key))
+ val hasQuotaConfigsToDelete =
configsToBeDeleted.exists(QuotaConfigs.isClientOrUserQuotaConfig)
+ val scramConfigsToDelete =
configsToBeDeleted.filter(ScramMechanism.isScram)
+ val unknownConfigsToDelete = configsToBeDeleted.filterNot(key =>
ScramMechanism.isScram(key) || QuotaConfigs.isClientOrUserQuotaConfig(key))
+ if (entityTypeHead == ConfigType.CLIENT || entityTypes.size == 2) { //
size==2 for case where users is specified first on the command line, before
clients
+ // either just a client or both a user and a client
+ if (unknownConfigsToAdd.nonEmpty || scramConfigsToAddMap.nonEmpty)
+ throw new IllegalArgumentException(s"Only quota configs can be added
for '${ConfigType.CLIENT}' using --bootstrap-server. Unexpected config names:
${unknownConfigsToAdd ++ scramConfigsToAddMap.keys}")
+ if (unknownConfigsToDelete.nonEmpty || scramConfigsToDelete.nonEmpty)
+ throw new IllegalArgumentException(s"Only quota configs can be deleted
for '${ConfigType.CLIENT}' using --bootstrap-server. Unexpected config names:
${unknownConfigsToDelete ++ scramConfigsToDelete}")
+ } else { // ConfigType.User
+ if (unknownConfigsToAdd.nonEmpty)
+ throw new IllegalArgumentException(s"Only quota and SCRAM credential
configs can be added for '${ConfigType.USER}' using --bootstrap-server.
Unexpected config names: $unknownConfigsToAdd")
+ if (unknownConfigsToDelete.nonEmpty)
+ throw new IllegalArgumentException(s"Only quota and SCRAM credential
configs can be deleted for '${ConfigType.USER}' using --bootstrap-server.
Unexpected config names: $unknownConfigsToDelete")
+ if (scramConfigsToAddMap.nonEmpty || scramConfigsToDelete.nonEmpty) {
+ if (entityNames.exists(_.isEmpty)) // either --entity-type users
--entity-default or --user-defaults
+ throw new IllegalArgumentException("The use of --entity-default or
--user-defaults is not allowed with User SCRAM Credentials using
--bootstrap-server.")
+ if (hasQuotaConfigsToAdd || hasQuotaConfigsToDelete)
+ throw new IllegalArgumentException(s"Cannot alter both quota and
SCRAM credential configs simultaneously for '${ConfigType.USER}' using
--bootstrap-server.")
+ }
}
- if (entityNameHead.nonEmpty)
- println(s"Completed updating config for ${entityTypeHead.dropRight(1)}
$entityNameHead.")
- else
- println(s"Completed updating default config for $entityTypeHead in the
cluster.")
+ if (hasQuotaConfigsToAdd || hasQuotaConfigsToDelete) {
+ alterQuotaConfigs(adminClient, entityTypes, entityNames,
configsToBeAddedMap, configsToBeDeleted)
+ } else {
+ // handle altering user SCRAM credential configs
+ if (entityNames.size != 1)
+ // should never happen, if we get here then it is a bug
+ throw new IllegalStateException(s"Altering user SCRAM credentials
should never occur for more zero or multiple users: $entityNames")
+ alterUserScramCredentialConfigs(adminClient, entityNames.head,
scramConfigsToAddMap, scramConfigsToDelete)
+ }
+ }
+
+ private def alterIpConfig(adminClient: Admin, entityTypes: List[String],
entityNames: List[String], configsToBeAddedMap: Predef.Map[String, String],
configsToBeAdded: Predef.Map[String, ConfigEntry], configsToBeDeleted:
Seq[String]) = {
+ val unknownConfigs = (configsToBeAdded.keys ++
configsToBeDeleted).filterNot(key => DynamicConfig.Ip.names.contains(key))
+ if (unknownConfigs.nonEmpty)
+ throw new IllegalArgumentException(s"Only connection quota configs can
be added for '${ConfigType.IP}' using --bootstrap-server. Unexpected config
names: ${unknownConfigs.mkString(",")}")
+ alterQuotaConfigs(adminClient, entityTypes, entityNames,
configsToBeAddedMap, configsToBeDeleted)
+ }
+
+ private def alterClientMetricsConfig(adminClient: Admin, entityTypeHead:
String, entityNames: List[String], configsToBeAdded: Predef.Map[String,
ConfigEntry], configsToBeDeleted: Seq[String]) = {
+ entityNames.foreach { entityName =>
+ getOldConfig(adminClient, entityTypeHead, configsToBeDeleted, entityName)
Review Comment:
BTW, could you add test for `InvalidConfigurationException`?
##########
core/src/main/scala/kafka/admin/ConfigCommand.scala:
##########
@@ -354,128 +354,150 @@ object ConfigCommand extends Logging {
}
}
- @nowarn("cat=deprecation")
def alterConfig(adminClient: Admin, opts: ConfigCommandOptions): Unit = {
val entityTypes = opts.entityTypes
val entityNames = opts.entityNames
val entityTypeHead = entityTypes.head
- val entityNameHead = entityNames.head
val configsToBeAddedMap = parseConfigsToBeAdded(opts).asScala.toMap // no
need for mutability
val configsToBeAdded = configsToBeAddedMap.map { case (k, v) => (k, new
ConfigEntry(k, v)) }
val configsToBeDeleted = parseConfigsToBeDeleted(opts)
entityTypeHead match {
case ConfigType.TOPIC =>
- val oldConfig = getResourceConfig(adminClient, entityTypeHead,
entityNameHead, includeSynonyms = false, describeAll = false)
- .map { entry => (entry.name, entry) }.toMap
+ alterTopicConfig(adminClient, entityTypeHead, entityNames,
configsToBeAdded, configsToBeDeleted)
+ case ConfigType.BROKER =>
+ alterBrokerConfig(adminClient, entityTypeHead, entityNames,
configsToBeAdded, configsToBeDeleted)
+ case BrokerLoggerConfigType =>
+ alterBrokerLoggingConfig(adminClient, entityTypeHead, entityNames,
configsToBeAdded, configsToBeDeleted)
+ case ConfigType.USER | ConfigType.CLIENT =>
+ alterUserOrClientConfig(adminClient, entityTypes, entityNames,
configsToBeAddedMap, configsToBeAdded, configsToBeDeleted)
+ case ConfigType.IP =>
+ alterIpConfig(adminClient, entityTypes, entityNames,
configsToBeAddedMap, configsToBeAdded, configsToBeDeleted)
+ case ConfigType.CLIENT_METRICS =>
+ alterClientMetricsConfig(adminClient, entityTypeHead, entityNames,
configsToBeAdded, configsToBeDeleted)
+ case _ => throw new IllegalArgumentException(s"Unsupported entity type:
$entityTypeHead")
+ }
- // fail the command if any of the configs to be deleted does not exist
- val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains)
- if (invalidConfigs.nonEmpty)
- throw new InvalidConfigurationException(s"Invalid config(s):
${invalidConfigs.mkString(",")}")
+ if (entityNames.nonEmpty) {
+ entityNames.foreach(
+ entityName => println(s"Completed updating config for
${entityTypeHead.dropRight(1)} $entityName.")
+ )
+ } else
+ println(s"Completed updating default config for $entityTypeHead in the
cluster.")
+ }
- val configResource = new ConfigResource(ConfigResource.Type.TOPIC,
entityNameHead)
- val alterOptions = new
AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
- val alterEntries = (configsToBeAdded.values.map(new AlterConfigOp(_,
AlterConfigOp.OpType.SET))
- ++ configsToBeDeleted.map { k => new AlterConfigOp(new
ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE) }
- ).asJavaCollection
- adminClient.incrementalAlterConfigs(Map(configResource ->
alterEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)
+ private def alterTopicConfig(adminClient: Admin, entityTypeHead: String,
entityNames: List[String], configsToBeAdded: Predef.Map[String, ConfigEntry],
configsToBeDeleted: Seq[String]) = {
+ entityNames.foreach { entityName =>
+ getOldConfig(adminClient, entityTypeHead, configsToBeDeleted, entityName)
+ }
+ val alterOptions = new
AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
+ val alterEntries = (configsToBeAdded.values.map(new AlterConfigOp(_,
AlterConfigOp.OpType.SET))
+ ++ configsToBeDeleted.map { k => new AlterConfigOp(new ConfigEntry(k,
""), AlterConfigOp.OpType.DELETE) }
+ ).asJavaCollection
+ adminClient.incrementalAlterConfigs(entityNames.map(new
ConfigResource(ConfigResource.Type.TOPIC, _))
+ .map(_ -> alterEntries).toMap.asJava, alterOptions)
+ .all().get(60, TimeUnit.SECONDS)
+ }
- case ConfigType.BROKER =>
- val oldConfig = getResourceConfig(adminClient, entityTypeHead,
entityNameHead, includeSynonyms = false, describeAll = false)
- .map { entry => (entry.name, entry) }.toMap
+ private def getOldConfig(adminClient: Admin, entityTypeHead: String,
configsToBeDeleted: Seq[String], entityName: String): Map[String, ConfigEntry]
= {
+ val oldConfig = getResourceConfig(adminClient, entityTypeHead, entityName,
includeSynonyms = false, describeAll = false)
+ .map { entry => (entry.name, entry) }.toMap
- // fail the command if any of the configs to be deleted does not exist
- val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains)
- if (invalidConfigs.nonEmpty)
- throw new InvalidConfigurationException(s"Invalid config(s):
${invalidConfigs.mkString(",")}")
+ // fail the command if any of the configs to be deleted does not exist
+ val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains)
+ if (invalidConfigs.nonEmpty)
+ throw new InvalidConfigurationException(s"Invalid config(s):
${invalidConfigs.mkString(",")}")
+ oldConfig
+ }
- val newEntries = oldConfig ++ configsToBeAdded -- configsToBeDeleted
- val sensitiveEntries = newEntries.filter(_._2.value == null)
- if (sensitiveEntries.nonEmpty)
- throw new InvalidConfigurationException(s"All sensitive broker
config entries must be specified for --alter, missing entries:
${sensitiveEntries.keySet}")
- val newConfig = new JConfig(newEntries.asJava.values)
+ @nowarn("cat=deprecation")
+ private def alterBrokerConfig(adminClient: Admin, entityTypeHead: String,
entityNames: List[String], configsToBeAdded: Predef.Map[String, ConfigEntry],
configsToBeDeleted: Seq[String]): Unit = {
+ entityNames.foreach { entityName =>
+ val oldConfig = getOldConfig(adminClient, entityTypeHead,
configsToBeDeleted, entityName)
- val configResource = new ConfigResource(ConfigResource.Type.BROKER,
entityNameHead)
- val alterOptions = new
AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
- adminClient.alterConfigs(Map(configResource -> newConfig).asJava,
alterOptions).all().get(60, TimeUnit.SECONDS)
+ val newEntries = oldConfig ++ configsToBeAdded -- configsToBeDeleted
+ val sensitiveEntries = newEntries.filter(_._2.value == null)
+ if (sensitiveEntries.nonEmpty)
+ throw new InvalidConfigurationException(s"All sensitive broker config
entries must be specified for --alter, missing entries:
${sensitiveEntries.keySet}")
+ val newConfig = new JConfig(newEntries.asJava.values)
- case BrokerLoggerConfigType =>
- val validLoggers = getResourceConfig(adminClient, entityTypeHead,
entityNameHead, includeSynonyms = true, describeAll = false).map(_.name)
- // fail the command if any of the configured broker loggers do not
exist
- val invalidBrokerLoggers =
configsToBeDeleted.filterNot(validLoggers.contains) ++
configsToBeAdded.keys.filterNot(validLoggers.contains)
- if (invalidBrokerLoggers.nonEmpty)
- throw new InvalidConfigurationException(s"Invalid broker logger(s):
${invalidBrokerLoggers.mkString(",")}")
-
- val configResource = new
ConfigResource(ConfigResource.Type.BROKER_LOGGER, entityNameHead)
- val alterOptions = new
AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
- val alterLogLevelEntries = (configsToBeAdded.values.map(new
AlterConfigOp(_, AlterConfigOp.OpType.SET))
- ++ configsToBeDeleted.map { k => new AlterConfigOp(new
ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE) }
- ).asJavaCollection
- adminClient.incrementalAlterConfigs(Map(configResource ->
alterLogLevelEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)
+ val configResource = new ConfigResource(ConfigResource.Type.BROKER,
entityName)
+ val alterOptions = new
AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
+ adminClient.alterConfigs(Map(configResource -> newConfig).asJava,
alterOptions).all().get(60, TimeUnit.SECONDS)
+ }
+ }
- case ConfigType.USER | ConfigType.CLIENT =>
- val hasQuotaConfigsToAdd =
configsToBeAdded.keys.exists(QuotaConfigs.isClientOrUserQuotaConfig)
- val scramConfigsToAddMap = configsToBeAdded.filter(entry =>
ScramMechanism.isScram(entry._1))
- val unknownConfigsToAdd = configsToBeAdded.keys.filterNot(key =>
ScramMechanism.isScram(key) || QuotaConfigs.isClientOrUserQuotaConfig(key))
- val hasQuotaConfigsToDelete =
configsToBeDeleted.exists(QuotaConfigs.isClientOrUserQuotaConfig)
- val scramConfigsToDelete =
configsToBeDeleted.filter(ScramMechanism.isScram)
- val unknownConfigsToDelete = configsToBeDeleted.filterNot(key =>
ScramMechanism.isScram(key) || QuotaConfigs.isClientOrUserQuotaConfig(key))
- if (entityTypeHead == ConfigType.CLIENT || entityTypes.size == 2) { //
size==2 for case where users is specified first on the command line, before
clients
- // either just a client or both a user and a client
- if (unknownConfigsToAdd.nonEmpty || scramConfigsToAddMap.nonEmpty)
- throw new IllegalArgumentException(s"Only quota configs can be
added for '${ConfigType.CLIENT}' using --bootstrap-server. Unexpected config
names: ${unknownConfigsToAdd ++ scramConfigsToAddMap.keys}")
- if (unknownConfigsToDelete.nonEmpty || scramConfigsToDelete.nonEmpty)
- throw new IllegalArgumentException(s"Only quota configs can be
deleted for '${ConfigType.CLIENT}' using --bootstrap-server. Unexpected config
names: ${unknownConfigsToDelete ++ scramConfigsToDelete}")
- } else { // ConfigType.User
- if (unknownConfigsToAdd.nonEmpty)
- throw new IllegalArgumentException(s"Only quota and SCRAM
credential configs can be added for '${ConfigType.USER}' using
--bootstrap-server. Unexpected config names: $unknownConfigsToAdd")
- if (unknownConfigsToDelete.nonEmpty)
- throw new IllegalArgumentException(s"Only quota and SCRAM
credential configs can be deleted for '${ConfigType.USER}' using
--bootstrap-server. Unexpected config names: $unknownConfigsToDelete")
- if (scramConfigsToAddMap.nonEmpty || scramConfigsToDelete.nonEmpty) {
- if (entityNames.exists(_.isEmpty)) // either --entity-type users
--entity-default or --user-defaults
- throw new IllegalArgumentException("The use of --entity-default
or --user-defaults is not allowed with User SCRAM Credentials using
--bootstrap-server.")
- if (hasQuotaConfigsToAdd || hasQuotaConfigsToDelete)
- throw new IllegalArgumentException(s"Cannot alter both quota and
SCRAM credential configs simultaneously for '${ConfigType.USER}' using
--bootstrap-server.")
- }
- }
+ private def alterBrokerLoggingConfig(adminClient: Admin, entityTypeHead:
String, entityNames: List[String], configsToBeAdded: Predef.Map[String,
ConfigEntry], configsToBeDeleted: Seq[String]) = {
+ entityNames.foreach { entityName =>
+ val validLoggers = getResourceConfig(adminClient, entityTypeHead,
entityName, includeSynonyms = true, describeAll = false).map(_.name)
+ // fail the command if any of the configured broker loggers do not exist
+ val invalidBrokerLoggers =
configsToBeDeleted.filterNot(validLoggers.contains) ++
configsToBeAdded.keys.filterNot(validLoggers.contains)
+ if (invalidBrokerLoggers.nonEmpty)
+ throw new InvalidConfigurationException(s"Invalid broker logger(s):
${invalidBrokerLoggers.mkString(",")}")
+ }
+ val alterOptions = new
AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
+ val alterLogLevelEntries = (configsToBeAdded.values.map(new
AlterConfigOp(_, AlterConfigOp.OpType.SET))
+ ++ configsToBeDeleted.map { k => new AlterConfigOp(new ConfigEntry(k,
""), AlterConfigOp.OpType.DELETE) }
+ ).asJavaCollection
+ adminClient.incrementalAlterConfigs(entityNames.map(new
ConfigResource(ConfigResource.Type.BROKER_LOGGER, _))
+ .map(_ -> alterLogLevelEntries).toMap.asJava,
alterOptions).all().get(60, TimeUnit.SECONDS)
+ }
- if (hasQuotaConfigsToAdd || hasQuotaConfigsToDelete) {
- alterQuotaConfigs(adminClient, entityTypes, entityNames,
configsToBeAddedMap, configsToBeDeleted)
- } else {
- // handle altering user SCRAM credential configs
- if (entityNames.size != 1)
- // should never happen, if we get here then it is a bug
- throw new IllegalStateException(s"Altering user SCRAM credentials
should never occur for more zero or multiple users: $entityNames")
- alterUserScramCredentialConfigs(adminClient, entityNames.head,
scramConfigsToAddMap, scramConfigsToDelete)
- }
- case ConfigType.IP =>
- val unknownConfigs = (configsToBeAdded.keys ++
configsToBeDeleted).filterNot(key => DynamicConfig.Ip.names.contains(key))
- if (unknownConfigs.nonEmpty)
- throw new IllegalArgumentException(s"Only connection quota configs
can be added for '${ConfigType.IP}' using --bootstrap-server. Unexpected config
names: ${unknownConfigs.mkString(",")}")
- alterQuotaConfigs(adminClient, entityTypes, entityNames,
configsToBeAddedMap, configsToBeDeleted)
- case ConfigType.CLIENT_METRICS =>
- val oldConfig = getResourceConfig(adminClient, entityTypeHead,
entityNameHead, includeSynonyms = false, describeAll = false)
- .map { entry => (entry.name, entry) }.toMap
-
- // fail the command if any of the configs to be deleted does not exist
- val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains)
- if (invalidConfigs.nonEmpty)
- throw new InvalidConfigurationException(s"Invalid config(s):
${invalidConfigs.mkString(",")}")
-
- val configResource = new
ConfigResource(ConfigResource.Type.CLIENT_METRICS, entityNameHead)
- val alterOptions = new
AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
- val alterEntries = (configsToBeAdded.values.map(new AlterConfigOp(_,
AlterConfigOp.OpType.SET))
- ++ configsToBeDeleted.map { k => new AlterConfigOp(new
ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE) }
- ).asJavaCollection
- adminClient.incrementalAlterConfigs(Map(configResource ->
alterEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)
- case _ => throw new IllegalArgumentException(s"Unsupported entity type:
$entityTypeHead")
+ private def alterUserOrClientConfig(adminClient: Admin, entityTypes:
List[String], entityNames: List[String], configsToBeAddedMap:
Predef.Map[String, String], configsToBeAdded: Predef.Map[String, ConfigEntry],
configsToBeDeleted: Seq[String]) = {
+ val entityTypeHead = entityTypes.head
+ val hasQuotaConfigsToAdd =
configsToBeAdded.keys.exists(QuotaConfigs.isClientOrUserQuotaConfig)
+ val scramConfigsToAddMap = configsToBeAdded.filter(entry =>
ScramMechanism.isScram(entry._1))
+ val unknownConfigsToAdd = configsToBeAdded.keys.filterNot(key =>
ScramMechanism.isScram(key) || QuotaConfigs.isClientOrUserQuotaConfig(key))
+ val hasQuotaConfigsToDelete =
configsToBeDeleted.exists(QuotaConfigs.isClientOrUserQuotaConfig)
+ val scramConfigsToDelete =
configsToBeDeleted.filter(ScramMechanism.isScram)
+ val unknownConfigsToDelete = configsToBeDeleted.filterNot(key =>
ScramMechanism.isScram(key) || QuotaConfigs.isClientOrUserQuotaConfig(key))
+ if (entityTypeHead == ConfigType.CLIENT || entityTypes.size == 2) { //
size==2 for case where users is specified first on the command line, before
clients
+ // either just a client or both a user and a client
+ if (unknownConfigsToAdd.nonEmpty || scramConfigsToAddMap.nonEmpty)
+ throw new IllegalArgumentException(s"Only quota configs can be added
for '${ConfigType.CLIENT}' using --bootstrap-server. Unexpected config names:
${unknownConfigsToAdd ++ scramConfigsToAddMap.keys}")
+ if (unknownConfigsToDelete.nonEmpty || scramConfigsToDelete.nonEmpty)
+ throw new IllegalArgumentException(s"Only quota configs can be deleted
for '${ConfigType.CLIENT}' using --bootstrap-server. Unexpected config names:
${unknownConfigsToDelete ++ scramConfigsToDelete}")
+ } else { // ConfigType.User
+ if (unknownConfigsToAdd.nonEmpty)
+ throw new IllegalArgumentException(s"Only quota and SCRAM credential
configs can be added for '${ConfigType.USER}' using --bootstrap-server.
Unexpected config names: $unknownConfigsToAdd")
+ if (unknownConfigsToDelete.nonEmpty)
+ throw new IllegalArgumentException(s"Only quota and SCRAM credential
configs can be deleted for '${ConfigType.USER}' using --bootstrap-server.
Unexpected config names: $unknownConfigsToDelete")
+ if (scramConfigsToAddMap.nonEmpty || scramConfigsToDelete.nonEmpty) {
+ if (entityNames.exists(_.isEmpty)) // either --entity-type users
--entity-default or --user-defaults
+ throw new IllegalArgumentException("The use of --entity-default or
--user-defaults is not allowed with User SCRAM Credentials using
--bootstrap-server.")
+ if (hasQuotaConfigsToAdd || hasQuotaConfigsToDelete)
+ throw new IllegalArgumentException(s"Cannot alter both quota and
SCRAM credential configs simultaneously for '${ConfigType.USER}' using
--bootstrap-server.")
+ }
}
- if (entityNameHead.nonEmpty)
- println(s"Completed updating config for ${entityTypeHead.dropRight(1)}
$entityNameHead.")
- else
- println(s"Completed updating default config for $entityTypeHead in the
cluster.")
+ if (hasQuotaConfigsToAdd || hasQuotaConfigsToDelete) {
+ alterQuotaConfigs(adminClient, entityTypes, entityNames,
configsToBeAddedMap, configsToBeDeleted)
+ } else {
+ // handle altering user SCRAM credential configs
+ if (entityNames.size != 1)
+ // should never happen, if we get here then it is a bug
+ throw new IllegalStateException(s"Altering user SCRAM credentials
should never occur for more zero or multiple users: $entityNames")
+ alterUserScramCredentialConfigs(adminClient, entityNames.head,
scramConfigsToAddMap, scramConfigsToDelete)
+ }
+ }
+
+ private def alterIpConfig(adminClient: Admin, entityTypes: List[String],
entityNames: List[String], configsToBeAddedMap: Predef.Map[String, String],
configsToBeAdded: Predef.Map[String, ConfigEntry], configsToBeDeleted:
Seq[String]) = {
+ val unknownConfigs = (configsToBeAdded.keys ++
configsToBeDeleted).filterNot(key => DynamicConfig.Ip.names.contains(key))
+ if (unknownConfigs.nonEmpty)
+ throw new IllegalArgumentException(s"Only connection quota configs can
be added for '${ConfigType.IP}' using --bootstrap-server. Unexpected config
names: ${unknownConfigs.mkString(",")}")
+ alterQuotaConfigs(adminClient, entityTypes, entityNames,
configsToBeAddedMap, configsToBeDeleted)
+ }
+
+ private def alterClientMetricsConfig(adminClient: Admin, entityTypeHead:
String, entityNames: List[String], configsToBeAdded: Predef.Map[String,
ConfigEntry], configsToBeDeleted: Seq[String]) = {
+ entityNames.foreach { entityName =>
+ getOldConfig(adminClient, entityTypeHead, configsToBeDeleted, entityName)
Review Comment:
I guess you want to reuse the validation, right? However, the method name
"get" is a bit weird since it is used in the "foreach". Could you please do a
bit refactor?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]