chia7712 commented on code in PR #16381:
URL: https://github.com/apache/kafka/pull/16381#discussion_r1673154305


##########
core/src/main/scala/kafka/admin/ConfigCommand.scala:
##########
@@ -354,128 +354,150 @@ object ConfigCommand extends Logging {
     }
   }
 
-  @nowarn("cat=deprecation")
   def alterConfig(adminClient: Admin, opts: ConfigCommandOptions): Unit = {
     val entityTypes = opts.entityTypes
     val entityNames = opts.entityNames
     val entityTypeHead = entityTypes.head
-    val entityNameHead = entityNames.head
     val configsToBeAddedMap = parseConfigsToBeAdded(opts).asScala.toMap // no 
need for mutability
     val configsToBeAdded = configsToBeAddedMap.map { case (k, v) => (k, new 
ConfigEntry(k, v)) }
     val configsToBeDeleted = parseConfigsToBeDeleted(opts)
 
     entityTypeHead match {
       case ConfigType.TOPIC =>
-        val oldConfig = getResourceConfig(adminClient, entityTypeHead, 
entityNameHead, includeSynonyms = false, describeAll = false)
-          .map { entry => (entry.name, entry) }.toMap
+        alterTopicConfig(adminClient, entityTypeHead, entityNames, 
configsToBeAdded, configsToBeDeleted)
+      case ConfigType.BROKER =>
+        alterBrokerConfig(adminClient, entityTypeHead, entityNames, 
configsToBeAdded, configsToBeDeleted)
+      case BrokerLoggerConfigType =>
+        alterBrokerLoggingConfig(adminClient, entityTypeHead, entityNames, 
configsToBeAdded, configsToBeDeleted)
+      case ConfigType.USER | ConfigType.CLIENT =>
+        alterUserOrClientConfig(adminClient, entityTypes, entityNames, 
configsToBeAddedMap, configsToBeAdded, configsToBeDeleted)
+      case ConfigType.IP =>
+        alterIpConfig(adminClient, entityTypes, entityNames, 
configsToBeAddedMap, configsToBeAdded, configsToBeDeleted)
+      case ConfigType.CLIENT_METRICS =>
+        alterClientMetricsConfig(adminClient, entityTypeHead, entityNames, 
configsToBeAdded, configsToBeDeleted)
+      case _ => throw new IllegalArgumentException(s"Unsupported entity type: 
$entityTypeHead")
+    }
 
-        // fail the command if any of the configs to be deleted does not exist
-        val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains)
-        if (invalidConfigs.nonEmpty)
-          throw new InvalidConfigurationException(s"Invalid config(s): 
${invalidConfigs.mkString(",")}")
+    if (entityNames.nonEmpty) {
+      entityNames.foreach(
+        entityName => println(s"Completed updating config for 
${entityTypeHead.dropRight(1)} $entityName.")
+      )
+    } else
+      println(s"Completed updating default config for $entityTypeHead in the 
cluster.")
+  }
 
-        val configResource = new ConfigResource(ConfigResource.Type.TOPIC, 
entityNameHead)
-        val alterOptions = new 
AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
-        val alterEntries = (configsToBeAdded.values.map(new AlterConfigOp(_, 
AlterConfigOp.OpType.SET))
-          ++ configsToBeDeleted.map { k => new AlterConfigOp(new 
ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE) }
-        ).asJavaCollection
-        adminClient.incrementalAlterConfigs(Map(configResource -> 
alterEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)
+  private def alterTopicConfig(adminClient: Admin, entityTypeHead: String, 
entityNames: List[String], configsToBeAdded: Predef.Map[String, ConfigEntry], 
configsToBeDeleted: Seq[String]) = {
+    entityNames.foreach { entityName =>
+      getOldConfig(adminClient, entityTypeHead, configsToBeDeleted, entityName)
+    }
+    val alterOptions = new 
AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
+    val alterEntries = (configsToBeAdded.values.map(new AlterConfigOp(_, 
AlterConfigOp.OpType.SET))
+      ++ configsToBeDeleted.map { k => new AlterConfigOp(new ConfigEntry(k, 
""), AlterConfigOp.OpType.DELETE) }
+      ).asJavaCollection
+    adminClient.incrementalAlterConfigs(entityNames.map(new 
ConfigResource(ConfigResource.Type.TOPIC, _))
+        .map(_ -> alterEntries).toMap.asJava, alterOptions)
+      .all().get(60, TimeUnit.SECONDS)
+  }
 
-      case ConfigType.BROKER =>
-        val oldConfig = getResourceConfig(adminClient, entityTypeHead, 
entityNameHead, includeSynonyms = false, describeAll = false)
-          .map { entry => (entry.name, entry) }.toMap
+  private def getOldConfig(adminClient: Admin, entityTypeHead: String, 
configsToBeDeleted: Seq[String], entityName: String): Map[String, ConfigEntry] 
= {
+    val oldConfig = getResourceConfig(adminClient, entityTypeHead, entityName, 
includeSynonyms = false, describeAll = false)
+      .map { entry => (entry.name, entry) }.toMap
 
-        // fail the command if any of the configs to be deleted does not exist
-        val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains)
-        if (invalidConfigs.nonEmpty)
-          throw new InvalidConfigurationException(s"Invalid config(s): 
${invalidConfigs.mkString(",")}")
+    // fail the command if any of the configs to be deleted does not exist
+    val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains)
+    if (invalidConfigs.nonEmpty)
+      throw new InvalidConfigurationException(s"Invalid config(s): 
${invalidConfigs.mkString(",")}")
+    oldConfig
+  }
 
-        val newEntries = oldConfig ++ configsToBeAdded -- configsToBeDeleted
-        val sensitiveEntries = newEntries.filter(_._2.value == null)
-        if (sensitiveEntries.nonEmpty)
-          throw new InvalidConfigurationException(s"All sensitive broker 
config entries must be specified for --alter, missing entries: 
${sensitiveEntries.keySet}")
-        val newConfig = new JConfig(newEntries.asJava.values)
+  @nowarn("cat=deprecation")
+  private def alterBrokerConfig(adminClient: Admin, entityTypeHead: String, 
entityNames: List[String], configsToBeAdded: Predef.Map[String, ConfigEntry], 
configsToBeDeleted: Seq[String]): Unit = {
+    entityNames.foreach { entityName =>
+      val oldConfig = getOldConfig(adminClient, entityTypeHead, 
configsToBeDeleted, entityName)
 
-        val configResource = new ConfigResource(ConfigResource.Type.BROKER, 
entityNameHead)
-        val alterOptions = new 
AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
-        adminClient.alterConfigs(Map(configResource -> newConfig).asJava, 
alterOptions).all().get(60, TimeUnit.SECONDS)
+      val newEntries = oldConfig ++ configsToBeAdded -- configsToBeDeleted
+      val sensitiveEntries = newEntries.filter(_._2.value == null)
+      if (sensitiveEntries.nonEmpty)
+        throw new InvalidConfigurationException(s"All sensitive broker config 
entries must be specified for --alter, missing entries: 
${sensitiveEntries.keySet}")
+      val newConfig = new JConfig(newEntries.asJava.values)
 
-      case BrokerLoggerConfigType =>
-        val validLoggers = getResourceConfig(adminClient, entityTypeHead, 
entityNameHead, includeSynonyms = true, describeAll = false).map(_.name)
-        // fail the command if any of the configured broker loggers do not 
exist
-        val invalidBrokerLoggers = 
configsToBeDeleted.filterNot(validLoggers.contains) ++ 
configsToBeAdded.keys.filterNot(validLoggers.contains)
-        if (invalidBrokerLoggers.nonEmpty)
-          throw new InvalidConfigurationException(s"Invalid broker logger(s): 
${invalidBrokerLoggers.mkString(",")}")
-
-        val configResource = new 
ConfigResource(ConfigResource.Type.BROKER_LOGGER, entityNameHead)
-        val alterOptions = new 
AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
-        val alterLogLevelEntries = (configsToBeAdded.values.map(new 
AlterConfigOp(_, AlterConfigOp.OpType.SET))
-          ++ configsToBeDeleted.map { k => new AlterConfigOp(new 
ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE) }
-        ).asJavaCollection
-        adminClient.incrementalAlterConfigs(Map(configResource -> 
alterLogLevelEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)
+      val configResource = new ConfigResource(ConfigResource.Type.BROKER, 
entityName)
+      val alterOptions = new 
AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
+      adminClient.alterConfigs(Map(configResource -> newConfig).asJava, 
alterOptions).all().get(60, TimeUnit.SECONDS)
+    }
+  }
 
-      case ConfigType.USER | ConfigType.CLIENT =>
-        val hasQuotaConfigsToAdd = 
configsToBeAdded.keys.exists(QuotaConfigs.isClientOrUserQuotaConfig)
-        val scramConfigsToAddMap = configsToBeAdded.filter(entry => 
ScramMechanism.isScram(entry._1))
-        val unknownConfigsToAdd = configsToBeAdded.keys.filterNot(key => 
ScramMechanism.isScram(key) || QuotaConfigs.isClientOrUserQuotaConfig(key))
-        val hasQuotaConfigsToDelete = 
configsToBeDeleted.exists(QuotaConfigs.isClientOrUserQuotaConfig)
-        val scramConfigsToDelete = 
configsToBeDeleted.filter(ScramMechanism.isScram)
-        val unknownConfigsToDelete = configsToBeDeleted.filterNot(key => 
ScramMechanism.isScram(key) || QuotaConfigs.isClientOrUserQuotaConfig(key))
-        if (entityTypeHead == ConfigType.CLIENT || entityTypes.size == 2) { // 
size==2 for case where users is specified first on the command line, before 
clients
-          // either just a client or both a user and a client
-          if (unknownConfigsToAdd.nonEmpty || scramConfigsToAddMap.nonEmpty)
-            throw new IllegalArgumentException(s"Only quota configs can be 
added for '${ConfigType.CLIENT}' using --bootstrap-server. Unexpected config 
names: ${unknownConfigsToAdd ++ scramConfigsToAddMap.keys}")
-          if (unknownConfigsToDelete.nonEmpty || scramConfigsToDelete.nonEmpty)
-            throw new IllegalArgumentException(s"Only quota configs can be 
deleted for '${ConfigType.CLIENT}' using --bootstrap-server. Unexpected config 
names: ${unknownConfigsToDelete ++ scramConfigsToDelete}")
-        } else { // ConfigType.User
-          if (unknownConfigsToAdd.nonEmpty)
-            throw new IllegalArgumentException(s"Only quota and SCRAM 
credential configs can be added for '${ConfigType.USER}' using 
--bootstrap-server. Unexpected config names: $unknownConfigsToAdd")
-          if (unknownConfigsToDelete.nonEmpty)
-            throw new IllegalArgumentException(s"Only quota and SCRAM 
credential configs can be deleted for '${ConfigType.USER}' using 
--bootstrap-server. Unexpected config names: $unknownConfigsToDelete")
-          if (scramConfigsToAddMap.nonEmpty || scramConfigsToDelete.nonEmpty) {
-            if (entityNames.exists(_.isEmpty)) // either --entity-type users 
--entity-default or --user-defaults
-              throw new IllegalArgumentException("The use of --entity-default 
or --user-defaults is not allowed with User SCRAM Credentials using 
--bootstrap-server.")
-            if (hasQuotaConfigsToAdd || hasQuotaConfigsToDelete)
-              throw new IllegalArgumentException(s"Cannot alter both quota and 
SCRAM credential configs simultaneously for '${ConfigType.USER}' using 
--bootstrap-server.")
-          }
-        }
+  private def alterBrokerLoggingConfig(adminClient: Admin, entityTypeHead: 
String, entityNames: List[String], configsToBeAdded: Predef.Map[String, 
ConfigEntry], configsToBeDeleted: Seq[String]) = {
+    entityNames.foreach { entityName =>
+      val validLoggers = getResourceConfig(adminClient, entityTypeHead, 
entityName, includeSynonyms = true, describeAll = false).map(_.name)
+      // fail the command if any of the configured broker loggers do not exist
+      val invalidBrokerLoggers = 
configsToBeDeleted.filterNot(validLoggers.contains) ++ 
configsToBeAdded.keys.filterNot(validLoggers.contains)
+      if (invalidBrokerLoggers.nonEmpty)
+        throw new InvalidConfigurationException(s"Invalid broker logger(s): 
${invalidBrokerLoggers.mkString(",")}")
+    }
+    val alterOptions = new 
AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
+    val alterLogLevelEntries = (configsToBeAdded.values.map(new 
AlterConfigOp(_, AlterConfigOp.OpType.SET))
+      ++ configsToBeDeleted.map { k => new AlterConfigOp(new ConfigEntry(k, 
""), AlterConfigOp.OpType.DELETE) }
+      ).asJavaCollection
+    adminClient.incrementalAlterConfigs(entityNames.map(new 
ConfigResource(ConfigResource.Type.BROKER_LOGGER, _))
+      .map(_ -> alterLogLevelEntries).toMap.asJava, 
alterOptions).all().get(60, TimeUnit.SECONDS)
+  }
 
-        if (hasQuotaConfigsToAdd || hasQuotaConfigsToDelete) {
-          alterQuotaConfigs(adminClient, entityTypes, entityNames, 
configsToBeAddedMap, configsToBeDeleted)
-        } else {
-          // handle altering user SCRAM credential configs
-          if (entityNames.size != 1)
-            // should never happen, if we get here then it is a bug
-            throw new IllegalStateException(s"Altering user SCRAM credentials 
should never occur for more zero or multiple users: $entityNames")
-          alterUserScramCredentialConfigs(adminClient, entityNames.head, 
scramConfigsToAddMap, scramConfigsToDelete)
-        }
-      case ConfigType.IP =>
-        val unknownConfigs = (configsToBeAdded.keys ++ 
configsToBeDeleted).filterNot(key => DynamicConfig.Ip.names.contains(key))
-        if (unknownConfigs.nonEmpty)
-          throw new IllegalArgumentException(s"Only connection quota configs 
can be added for '${ConfigType.IP}' using --bootstrap-server. Unexpected config 
names: ${unknownConfigs.mkString(",")}")
-        alterQuotaConfigs(adminClient, entityTypes, entityNames, 
configsToBeAddedMap, configsToBeDeleted)
-      case ConfigType.CLIENT_METRICS =>
-        val oldConfig = getResourceConfig(adminClient, entityTypeHead, 
entityNameHead, includeSynonyms = false, describeAll = false)
-          .map { entry => (entry.name, entry) }.toMap
-
-        // fail the command if any of the configs to be deleted does not exist
-        val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains)
-        if (invalidConfigs.nonEmpty)
-          throw new InvalidConfigurationException(s"Invalid config(s): 
${invalidConfigs.mkString(",")}")
-
-        val configResource = new 
ConfigResource(ConfigResource.Type.CLIENT_METRICS, entityNameHead)
-        val alterOptions = new 
AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
-        val alterEntries = (configsToBeAdded.values.map(new AlterConfigOp(_, 
AlterConfigOp.OpType.SET))
-          ++ configsToBeDeleted.map { k => new AlterConfigOp(new 
ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE) }
-          ).asJavaCollection
-        adminClient.incrementalAlterConfigs(Map(configResource -> 
alterEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)
-      case _ => throw new IllegalArgumentException(s"Unsupported entity type: 
$entityTypeHead")
+  private def alterUserOrClientConfig(adminClient: Admin, entityTypes: 
List[String], entityNames: List[String], configsToBeAddedMap: 
Predef.Map[String, String], configsToBeAdded: Predef.Map[String, ConfigEntry], 
configsToBeDeleted: Seq[String]) = {
+    val entityTypeHead = entityTypes.head
+    val hasQuotaConfigsToAdd = 
configsToBeAdded.keys.exists(QuotaConfigs.isClientOrUserQuotaConfig)
+    val scramConfigsToAddMap = configsToBeAdded.filter(entry => 
ScramMechanism.isScram(entry._1))
+    val unknownConfigsToAdd = configsToBeAdded.keys.filterNot(key => 
ScramMechanism.isScram(key) || QuotaConfigs.isClientOrUserQuotaConfig(key))
+    val hasQuotaConfigsToDelete = 
configsToBeDeleted.exists(QuotaConfigs.isClientOrUserQuotaConfig)
+    val scramConfigsToDelete = 
configsToBeDeleted.filter(ScramMechanism.isScram)
+    val unknownConfigsToDelete = configsToBeDeleted.filterNot(key => 
ScramMechanism.isScram(key) || QuotaConfigs.isClientOrUserQuotaConfig(key))
+    if (entityTypeHead == ConfigType.CLIENT || entityTypes.size == 2) { // 
size==2 for case where users is specified first on the command line, before 
clients
+      // either just a client or both a user and a client
+      if (unknownConfigsToAdd.nonEmpty || scramConfigsToAddMap.nonEmpty)
+        throw new IllegalArgumentException(s"Only quota configs can be added 
for '${ConfigType.CLIENT}' using --bootstrap-server. Unexpected config names: 
${unknownConfigsToAdd ++ scramConfigsToAddMap.keys}")
+      if (unknownConfigsToDelete.nonEmpty || scramConfigsToDelete.nonEmpty)
+        throw new IllegalArgumentException(s"Only quota configs can be deleted 
for '${ConfigType.CLIENT}' using --bootstrap-server. Unexpected config names: 
${unknownConfigsToDelete ++ scramConfigsToDelete}")
+    } else { // ConfigType.User
+      if (unknownConfigsToAdd.nonEmpty)
+        throw new IllegalArgumentException(s"Only quota and SCRAM credential 
configs can be added for '${ConfigType.USER}' using --bootstrap-server. 
Unexpected config names: $unknownConfigsToAdd")
+      if (unknownConfigsToDelete.nonEmpty)
+        throw new IllegalArgumentException(s"Only quota and SCRAM credential 
configs can be deleted for '${ConfigType.USER}' using --bootstrap-server. 
Unexpected config names: $unknownConfigsToDelete")
+      if (scramConfigsToAddMap.nonEmpty || scramConfigsToDelete.nonEmpty) {
+        if (entityNames.exists(_.isEmpty)) // either --entity-type users 
--entity-default or --user-defaults
+          throw new IllegalArgumentException("The use of --entity-default or 
--user-defaults is not allowed with User SCRAM Credentials using 
--bootstrap-server.")
+        if (hasQuotaConfigsToAdd || hasQuotaConfigsToDelete)
+          throw new IllegalArgumentException(s"Cannot alter both quota and 
SCRAM credential configs simultaneously for '${ConfigType.USER}' using 
--bootstrap-server.")
+      }
     }
 
-    if (entityNameHead.nonEmpty)
-      println(s"Completed updating config for ${entityTypeHead.dropRight(1)} 
$entityNameHead.")
-    else
-      println(s"Completed updating default config for $entityTypeHead in the 
cluster.")
+    if (hasQuotaConfigsToAdd || hasQuotaConfigsToDelete) {
+      alterQuotaConfigs(adminClient, entityTypes, entityNames, 
configsToBeAddedMap, configsToBeDeleted)
+    } else {
+      // handle altering user SCRAM credential configs
+      if (entityNames.size != 1)
+        // should never happen, if we get here then it is a bug
+        throw new IllegalStateException(s"Altering user SCRAM credentials 
should never occur for more zero or multiple users: $entityNames")
+      alterUserScramCredentialConfigs(adminClient, entityNames.head, 
scramConfigsToAddMap, scramConfigsToDelete)
+    }
+  }
+
+  private def alterIpConfig(adminClient: Admin, entityTypes: List[String], 
entityNames: List[String], configsToBeAddedMap: Predef.Map[String, String], 
configsToBeAdded: Predef.Map[String, ConfigEntry], configsToBeDeleted: 
Seq[String]) = {
+    val unknownConfigs = (configsToBeAdded.keys ++ 
configsToBeDeleted).filterNot(key => DynamicConfig.Ip.names.contains(key))
+    if (unknownConfigs.nonEmpty)
+      throw new IllegalArgumentException(s"Only connection quota configs can 
be added for '${ConfigType.IP}' using --bootstrap-server. Unexpected config 
names: ${unknownConfigs.mkString(",")}")
+    alterQuotaConfigs(adminClient, entityTypes, entityNames, 
configsToBeAddedMap, configsToBeDeleted)
+  }
+
+  private def alterClientMetricsConfig(adminClient: Admin, entityTypeHead: 
String, entityNames: List[String], configsToBeAdded: Predef.Map[String, 
ConfigEntry], configsToBeDeleted: Seq[String]) = {
+    entityNames.foreach { entityName =>
+      getOldConfig(adminClient, entityTypeHead, configsToBeDeleted, entityName)

Review Comment:
   BTW, could you add test for `InvalidConfigurationException`?



##########
core/src/main/scala/kafka/admin/ConfigCommand.scala:
##########
@@ -354,128 +354,150 @@ object ConfigCommand extends Logging {
     }
   }
 
-  @nowarn("cat=deprecation")
   def alterConfig(adminClient: Admin, opts: ConfigCommandOptions): Unit = {
     val entityTypes = opts.entityTypes
     val entityNames = opts.entityNames
     val entityTypeHead = entityTypes.head
-    val entityNameHead = entityNames.head
     val configsToBeAddedMap = parseConfigsToBeAdded(opts).asScala.toMap // no 
need for mutability
     val configsToBeAdded = configsToBeAddedMap.map { case (k, v) => (k, new 
ConfigEntry(k, v)) }
     val configsToBeDeleted = parseConfigsToBeDeleted(opts)
 
     entityTypeHead match {
       case ConfigType.TOPIC =>
-        val oldConfig = getResourceConfig(adminClient, entityTypeHead, 
entityNameHead, includeSynonyms = false, describeAll = false)
-          .map { entry => (entry.name, entry) }.toMap
+        alterTopicConfig(adminClient, entityTypeHead, entityNames, 
configsToBeAdded, configsToBeDeleted)
+      case ConfigType.BROKER =>
+        alterBrokerConfig(adminClient, entityTypeHead, entityNames, 
configsToBeAdded, configsToBeDeleted)
+      case BrokerLoggerConfigType =>
+        alterBrokerLoggingConfig(adminClient, entityTypeHead, entityNames, 
configsToBeAdded, configsToBeDeleted)
+      case ConfigType.USER | ConfigType.CLIENT =>
+        alterUserOrClientConfig(adminClient, entityTypes, entityNames, 
configsToBeAddedMap, configsToBeAdded, configsToBeDeleted)
+      case ConfigType.IP =>
+        alterIpConfig(adminClient, entityTypes, entityNames, 
configsToBeAddedMap, configsToBeAdded, configsToBeDeleted)
+      case ConfigType.CLIENT_METRICS =>
+        alterClientMetricsConfig(adminClient, entityTypeHead, entityNames, 
configsToBeAdded, configsToBeDeleted)
+      case _ => throw new IllegalArgumentException(s"Unsupported entity type: 
$entityTypeHead")
+    }
 
-        // fail the command if any of the configs to be deleted does not exist
-        val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains)
-        if (invalidConfigs.nonEmpty)
-          throw new InvalidConfigurationException(s"Invalid config(s): 
${invalidConfigs.mkString(",")}")
+    if (entityNames.nonEmpty) {
+      entityNames.foreach(
+        entityName => println(s"Completed updating config for 
${entityTypeHead.dropRight(1)} $entityName.")
+      )
+    } else
+      println(s"Completed updating default config for $entityTypeHead in the 
cluster.")
+  }
 
-        val configResource = new ConfigResource(ConfigResource.Type.TOPIC, 
entityNameHead)
-        val alterOptions = new 
AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
-        val alterEntries = (configsToBeAdded.values.map(new AlterConfigOp(_, 
AlterConfigOp.OpType.SET))
-          ++ configsToBeDeleted.map { k => new AlterConfigOp(new 
ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE) }
-        ).asJavaCollection
-        adminClient.incrementalAlterConfigs(Map(configResource -> 
alterEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)
+  private def alterTopicConfig(adminClient: Admin, entityTypeHead: String, 
entityNames: List[String], configsToBeAdded: Predef.Map[String, ConfigEntry], 
configsToBeDeleted: Seq[String]) = {
+    entityNames.foreach { entityName =>
+      getOldConfig(adminClient, entityTypeHead, configsToBeDeleted, entityName)
+    }
+    val alterOptions = new 
AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
+    val alterEntries = (configsToBeAdded.values.map(new AlterConfigOp(_, 
AlterConfigOp.OpType.SET))
+      ++ configsToBeDeleted.map { k => new AlterConfigOp(new ConfigEntry(k, 
""), AlterConfigOp.OpType.DELETE) }
+      ).asJavaCollection
+    adminClient.incrementalAlterConfigs(entityNames.map(new 
ConfigResource(ConfigResource.Type.TOPIC, _))
+        .map(_ -> alterEntries).toMap.asJava, alterOptions)
+      .all().get(60, TimeUnit.SECONDS)
+  }
 
-      case ConfigType.BROKER =>
-        val oldConfig = getResourceConfig(adminClient, entityTypeHead, 
entityNameHead, includeSynonyms = false, describeAll = false)
-          .map { entry => (entry.name, entry) }.toMap
+  private def getOldConfig(adminClient: Admin, entityTypeHead: String, 
configsToBeDeleted: Seq[String], entityName: String): Map[String, ConfigEntry] 
= {
+    val oldConfig = getResourceConfig(adminClient, entityTypeHead, entityName, 
includeSynonyms = false, describeAll = false)
+      .map { entry => (entry.name, entry) }.toMap
 
-        // fail the command if any of the configs to be deleted does not exist
-        val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains)
-        if (invalidConfigs.nonEmpty)
-          throw new InvalidConfigurationException(s"Invalid config(s): 
${invalidConfigs.mkString(",")}")
+    // fail the command if any of the configs to be deleted does not exist
+    val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains)
+    if (invalidConfigs.nonEmpty)
+      throw new InvalidConfigurationException(s"Invalid config(s): 
${invalidConfigs.mkString(",")}")
+    oldConfig
+  }
 
-        val newEntries = oldConfig ++ configsToBeAdded -- configsToBeDeleted
-        val sensitiveEntries = newEntries.filter(_._2.value == null)
-        if (sensitiveEntries.nonEmpty)
-          throw new InvalidConfigurationException(s"All sensitive broker 
config entries must be specified for --alter, missing entries: 
${sensitiveEntries.keySet}")
-        val newConfig = new JConfig(newEntries.asJava.values)
+  @nowarn("cat=deprecation")
+  private def alterBrokerConfig(adminClient: Admin, entityTypeHead: String, 
entityNames: List[String], configsToBeAdded: Predef.Map[String, ConfigEntry], 
configsToBeDeleted: Seq[String]): Unit = {
+    entityNames.foreach { entityName =>
+      val oldConfig = getOldConfig(adminClient, entityTypeHead, 
configsToBeDeleted, entityName)
 
-        val configResource = new ConfigResource(ConfigResource.Type.BROKER, 
entityNameHead)
-        val alterOptions = new 
AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
-        adminClient.alterConfigs(Map(configResource -> newConfig).asJava, 
alterOptions).all().get(60, TimeUnit.SECONDS)
+      val newEntries = oldConfig ++ configsToBeAdded -- configsToBeDeleted
+      val sensitiveEntries = newEntries.filter(_._2.value == null)
+      if (sensitiveEntries.nonEmpty)
+        throw new InvalidConfigurationException(s"All sensitive broker config 
entries must be specified for --alter, missing entries: 
${sensitiveEntries.keySet}")
+      val newConfig = new JConfig(newEntries.asJava.values)
 
-      case BrokerLoggerConfigType =>
-        val validLoggers = getResourceConfig(adminClient, entityTypeHead, 
entityNameHead, includeSynonyms = true, describeAll = false).map(_.name)
-        // fail the command if any of the configured broker loggers do not 
exist
-        val invalidBrokerLoggers = 
configsToBeDeleted.filterNot(validLoggers.contains) ++ 
configsToBeAdded.keys.filterNot(validLoggers.contains)
-        if (invalidBrokerLoggers.nonEmpty)
-          throw new InvalidConfigurationException(s"Invalid broker logger(s): 
${invalidBrokerLoggers.mkString(",")}")
-
-        val configResource = new 
ConfigResource(ConfigResource.Type.BROKER_LOGGER, entityNameHead)
-        val alterOptions = new 
AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
-        val alterLogLevelEntries = (configsToBeAdded.values.map(new 
AlterConfigOp(_, AlterConfigOp.OpType.SET))
-          ++ configsToBeDeleted.map { k => new AlterConfigOp(new 
ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE) }
-        ).asJavaCollection
-        adminClient.incrementalAlterConfigs(Map(configResource -> 
alterLogLevelEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)
+      val configResource = new ConfigResource(ConfigResource.Type.BROKER, 
entityName)
+      val alterOptions = new 
AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
+      adminClient.alterConfigs(Map(configResource -> newConfig).asJava, 
alterOptions).all().get(60, TimeUnit.SECONDS)
+    }
+  }
 
-      case ConfigType.USER | ConfigType.CLIENT =>
-        val hasQuotaConfigsToAdd = 
configsToBeAdded.keys.exists(QuotaConfigs.isClientOrUserQuotaConfig)
-        val scramConfigsToAddMap = configsToBeAdded.filter(entry => 
ScramMechanism.isScram(entry._1))
-        val unknownConfigsToAdd = configsToBeAdded.keys.filterNot(key => 
ScramMechanism.isScram(key) || QuotaConfigs.isClientOrUserQuotaConfig(key))
-        val hasQuotaConfigsToDelete = 
configsToBeDeleted.exists(QuotaConfigs.isClientOrUserQuotaConfig)
-        val scramConfigsToDelete = 
configsToBeDeleted.filter(ScramMechanism.isScram)
-        val unknownConfigsToDelete = configsToBeDeleted.filterNot(key => 
ScramMechanism.isScram(key) || QuotaConfigs.isClientOrUserQuotaConfig(key))
-        if (entityTypeHead == ConfigType.CLIENT || entityTypes.size == 2) { // 
size==2 for case where users is specified first on the command line, before 
clients
-          // either just a client or both a user and a client
-          if (unknownConfigsToAdd.nonEmpty || scramConfigsToAddMap.nonEmpty)
-            throw new IllegalArgumentException(s"Only quota configs can be 
added for '${ConfigType.CLIENT}' using --bootstrap-server. Unexpected config 
names: ${unknownConfigsToAdd ++ scramConfigsToAddMap.keys}")
-          if (unknownConfigsToDelete.nonEmpty || scramConfigsToDelete.nonEmpty)
-            throw new IllegalArgumentException(s"Only quota configs can be 
deleted for '${ConfigType.CLIENT}' using --bootstrap-server. Unexpected config 
names: ${unknownConfigsToDelete ++ scramConfigsToDelete}")
-        } else { // ConfigType.User
-          if (unknownConfigsToAdd.nonEmpty)
-            throw new IllegalArgumentException(s"Only quota and SCRAM 
credential configs can be added for '${ConfigType.USER}' using 
--bootstrap-server. Unexpected config names: $unknownConfigsToAdd")
-          if (unknownConfigsToDelete.nonEmpty)
-            throw new IllegalArgumentException(s"Only quota and SCRAM 
credential configs can be deleted for '${ConfigType.USER}' using 
--bootstrap-server. Unexpected config names: $unknownConfigsToDelete")
-          if (scramConfigsToAddMap.nonEmpty || scramConfigsToDelete.nonEmpty) {
-            if (entityNames.exists(_.isEmpty)) // either --entity-type users 
--entity-default or --user-defaults
-              throw new IllegalArgumentException("The use of --entity-default 
or --user-defaults is not allowed with User SCRAM Credentials using 
--bootstrap-server.")
-            if (hasQuotaConfigsToAdd || hasQuotaConfigsToDelete)
-              throw new IllegalArgumentException(s"Cannot alter both quota and 
SCRAM credential configs simultaneously for '${ConfigType.USER}' using 
--bootstrap-server.")
-          }
-        }
+  private def alterBrokerLoggingConfig(adminClient: Admin, entityTypeHead: 
String, entityNames: List[String], configsToBeAdded: Predef.Map[String, 
ConfigEntry], configsToBeDeleted: Seq[String]) = {
+    entityNames.foreach { entityName =>
+      val validLoggers = getResourceConfig(adminClient, entityTypeHead, 
entityName, includeSynonyms = true, describeAll = false).map(_.name)
+      // fail the command if any of the configured broker loggers do not exist
+      val invalidBrokerLoggers = 
configsToBeDeleted.filterNot(validLoggers.contains) ++ 
configsToBeAdded.keys.filterNot(validLoggers.contains)
+      if (invalidBrokerLoggers.nonEmpty)
+        throw new InvalidConfigurationException(s"Invalid broker logger(s): 
${invalidBrokerLoggers.mkString(",")}")
+    }
+    val alterOptions = new 
AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
+    val alterLogLevelEntries = (configsToBeAdded.values.map(new 
AlterConfigOp(_, AlterConfigOp.OpType.SET))
+      ++ configsToBeDeleted.map { k => new AlterConfigOp(new ConfigEntry(k, 
""), AlterConfigOp.OpType.DELETE) }
+      ).asJavaCollection
+    adminClient.incrementalAlterConfigs(entityNames.map(new 
ConfigResource(ConfigResource.Type.BROKER_LOGGER, _))
+      .map(_ -> alterLogLevelEntries).toMap.asJava, 
alterOptions).all().get(60, TimeUnit.SECONDS)
+  }
 
-        if (hasQuotaConfigsToAdd || hasQuotaConfigsToDelete) {
-          alterQuotaConfigs(adminClient, entityTypes, entityNames, 
configsToBeAddedMap, configsToBeDeleted)
-        } else {
-          // handle altering user SCRAM credential configs
-          if (entityNames.size != 1)
-            // should never happen, if we get here then it is a bug
-            throw new IllegalStateException(s"Altering user SCRAM credentials 
should never occur for more zero or multiple users: $entityNames")
-          alterUserScramCredentialConfigs(adminClient, entityNames.head, 
scramConfigsToAddMap, scramConfigsToDelete)
-        }
-      case ConfigType.IP =>
-        val unknownConfigs = (configsToBeAdded.keys ++ 
configsToBeDeleted).filterNot(key => DynamicConfig.Ip.names.contains(key))
-        if (unknownConfigs.nonEmpty)
-          throw new IllegalArgumentException(s"Only connection quota configs 
can be added for '${ConfigType.IP}' using --bootstrap-server. Unexpected config 
names: ${unknownConfigs.mkString(",")}")
-        alterQuotaConfigs(adminClient, entityTypes, entityNames, 
configsToBeAddedMap, configsToBeDeleted)
-      case ConfigType.CLIENT_METRICS =>
-        val oldConfig = getResourceConfig(adminClient, entityTypeHead, 
entityNameHead, includeSynonyms = false, describeAll = false)
-          .map { entry => (entry.name, entry) }.toMap
-
-        // fail the command if any of the configs to be deleted does not exist
-        val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains)
-        if (invalidConfigs.nonEmpty)
-          throw new InvalidConfigurationException(s"Invalid config(s): 
${invalidConfigs.mkString(",")}")
-
-        val configResource = new 
ConfigResource(ConfigResource.Type.CLIENT_METRICS, entityNameHead)
-        val alterOptions = new 
AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
-        val alterEntries = (configsToBeAdded.values.map(new AlterConfigOp(_, 
AlterConfigOp.OpType.SET))
-          ++ configsToBeDeleted.map { k => new AlterConfigOp(new 
ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE) }
-          ).asJavaCollection
-        adminClient.incrementalAlterConfigs(Map(configResource -> 
alterEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)
-      case _ => throw new IllegalArgumentException(s"Unsupported entity type: 
$entityTypeHead")
+  private def alterUserOrClientConfig(adminClient: Admin, entityTypes: 
List[String], entityNames: List[String], configsToBeAddedMap: 
Predef.Map[String, String], configsToBeAdded: Predef.Map[String, ConfigEntry], 
configsToBeDeleted: Seq[String]) = {
+    val entityTypeHead = entityTypes.head
+    val hasQuotaConfigsToAdd = 
configsToBeAdded.keys.exists(QuotaConfigs.isClientOrUserQuotaConfig)
+    val scramConfigsToAddMap = configsToBeAdded.filter(entry => 
ScramMechanism.isScram(entry._1))
+    val unknownConfigsToAdd = configsToBeAdded.keys.filterNot(key => 
ScramMechanism.isScram(key) || QuotaConfigs.isClientOrUserQuotaConfig(key))
+    val hasQuotaConfigsToDelete = 
configsToBeDeleted.exists(QuotaConfigs.isClientOrUserQuotaConfig)
+    val scramConfigsToDelete = 
configsToBeDeleted.filter(ScramMechanism.isScram)
+    val unknownConfigsToDelete = configsToBeDeleted.filterNot(key => 
ScramMechanism.isScram(key) || QuotaConfigs.isClientOrUserQuotaConfig(key))
+    if (entityTypeHead == ConfigType.CLIENT || entityTypes.size == 2) { // 
size==2 for case where users is specified first on the command line, before 
clients
+      // either just a client or both a user and a client
+      if (unknownConfigsToAdd.nonEmpty || scramConfigsToAddMap.nonEmpty)
+        throw new IllegalArgumentException(s"Only quota configs can be added 
for '${ConfigType.CLIENT}' using --bootstrap-server. Unexpected config names: 
${unknownConfigsToAdd ++ scramConfigsToAddMap.keys}")
+      if (unknownConfigsToDelete.nonEmpty || scramConfigsToDelete.nonEmpty)
+        throw new IllegalArgumentException(s"Only quota configs can be deleted 
for '${ConfigType.CLIENT}' using --bootstrap-server. Unexpected config names: 
${unknownConfigsToDelete ++ scramConfigsToDelete}")
+    } else { // ConfigType.User
+      if (unknownConfigsToAdd.nonEmpty)
+        throw new IllegalArgumentException(s"Only quota and SCRAM credential 
configs can be added for '${ConfigType.USER}' using --bootstrap-server. 
Unexpected config names: $unknownConfigsToAdd")
+      if (unknownConfigsToDelete.nonEmpty)
+        throw new IllegalArgumentException(s"Only quota and SCRAM credential 
configs can be deleted for '${ConfigType.USER}' using --bootstrap-server. 
Unexpected config names: $unknownConfigsToDelete")
+      if (scramConfigsToAddMap.nonEmpty || scramConfigsToDelete.nonEmpty) {
+        if (entityNames.exists(_.isEmpty)) // either --entity-type users 
--entity-default or --user-defaults
+          throw new IllegalArgumentException("The use of --entity-default or 
--user-defaults is not allowed with User SCRAM Credentials using 
--bootstrap-server.")
+        if (hasQuotaConfigsToAdd || hasQuotaConfigsToDelete)
+          throw new IllegalArgumentException(s"Cannot alter both quota and 
SCRAM credential configs simultaneously for '${ConfigType.USER}' using 
--bootstrap-server.")
+      }
     }
 
-    if (entityNameHead.nonEmpty)
-      println(s"Completed updating config for ${entityTypeHead.dropRight(1)} 
$entityNameHead.")
-    else
-      println(s"Completed updating default config for $entityTypeHead in the 
cluster.")
+    if (hasQuotaConfigsToAdd || hasQuotaConfigsToDelete) {
+      alterQuotaConfigs(adminClient, entityTypes, entityNames, 
configsToBeAddedMap, configsToBeDeleted)
+    } else {
+      // handle altering user SCRAM credential configs
+      if (entityNames.size != 1)
+        // should never happen, if we get here then it is a bug
+        throw new IllegalStateException(s"Altering user SCRAM credentials 
should never occur for more zero or multiple users: $entityNames")
+      alterUserScramCredentialConfigs(adminClient, entityNames.head, 
scramConfigsToAddMap, scramConfigsToDelete)
+    }
+  }
+
+  private def alterIpConfig(adminClient: Admin, entityTypes: List[String], 
entityNames: List[String], configsToBeAddedMap: Predef.Map[String, String], 
configsToBeAdded: Predef.Map[String, ConfigEntry], configsToBeDeleted: 
Seq[String]) = {
+    val unknownConfigs = (configsToBeAdded.keys ++ 
configsToBeDeleted).filterNot(key => DynamicConfig.Ip.names.contains(key))
+    if (unknownConfigs.nonEmpty)
+      throw new IllegalArgumentException(s"Only connection quota configs can 
be added for '${ConfigType.IP}' using --bootstrap-server. Unexpected config 
names: ${unknownConfigs.mkString(",")}")
+    alterQuotaConfigs(adminClient, entityTypes, entityNames, 
configsToBeAddedMap, configsToBeDeleted)
+  }
+
+  private def alterClientMetricsConfig(adminClient: Admin, entityTypeHead: 
String, entityNames: List[String], configsToBeAdded: Predef.Map[String, 
ConfigEntry], configsToBeDeleted: Seq[String]) = {
+    entityNames.foreach { entityName =>
+      getOldConfig(adminClient, entityTypeHead, configsToBeDeleted, entityName)

Review Comment:
   I guess you want to reuse the validation, right? However, the method name 
"get" is a bit weird since it is used in the "foreach". Could you please do a 
bit refactor?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to