dajac commented on a change in pull request #9628: URL: https://github.com/apache/kafka/pull/9628#discussion_r536134783
########## File path: core/src/main/scala/kafka/admin/ConfigCommand.scala ########## @@ -864,11 +885,21 @@ object ConfigCommand extends Config { } } + if (hasEntityName && entityTypeVals.contains(ConfigType.Ip)) { + Seq(entityName, ip).filter(options.has(_)).map(options.valueOf(_)).foreach { ipAddress => + if (!Utils.validHostPattern(ipAddress)) + throw new IllegalArgumentException(s"The entity name for ${entityTypeVals.head} must be a valid IP, but it is: $ipAddress") + } + } + if (options.has(describeOpt) && entityTypeVals.contains(BrokerLoggerConfigType) && !hasEntityName) throw new IllegalArgumentException(s"an entity name must be specified with --describe of ${entityTypeVals.mkString(",")}") if (options.has(alterOpt)) { - if (entityTypeVals.contains(ConfigType.User) || entityTypeVals.contains(ConfigType.Client) || entityTypeVals.contains(ConfigType.Broker)) { + if (entityTypeVals.contains(ConfigType.User) || + entityTypeVals.contains(ConfigType.Client) || + entityTypeVals.contains(ConfigType.Broker) || + entityTypeVals.contains(ConfigType.Ip)) { if (!hasEntityName && !hasEntityDefault) throw new IllegalArgumentException("an entity-name or default entity must be specified with --alter of users, clients or brokers") Review comment: nit: Could we add ip in the list as well? ########## File path: core/src/main/scala/kafka/admin/ConfigCommand.scala ########## @@ -778,6 +793,10 @@ object ConfigCommand extends Config { val brokerLogger = parser.accepts("broker-logger", "The broker's ID for its logger config.") .withRequiredArg .ofType(classOf[String]) + val ipDefaults = parser.accepts("ip-defaults", "The config defaults for all IPs.") + val ip = parser.accepts("ip", "The IP address.") Review comment: It would be good to add these two flags in the KIP as well for completeness. ########## File path: core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala ########## @@ -472,6 +512,134 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging { EasyMock.reset(alterResult, describeResult) } + @Test + def shouldNotAlterNonQuotaIpConfigsUsingBootstrapServer(): Unit = { + // when using --bootstrap-server, it should be illegal to alter anything that is not a connection quota + // for ip entities + val node = new Node(1, "localhost", 9092) + val mockAdminClient = new MockAdminClient(util.Collections.singletonList(node), node) + + def verifyCommand(entityType: String, alterOpts: String*): Unit = { + val opts = new ConfigCommandOptions(Array("--bootstrap-server", "localhost:9092", + "--entity-type", entityType, "--entity-name", "admin", + "--alter") ++ alterOpts) + val e = intercept[IllegalArgumentException] { + ConfigCommand.alterConfig(mockAdminClient, opts) + } + assertTrue(s"Unexpected exception: $e", e.getMessage.contains("some_config")) + } + + verifyCommand("ips", "--add-config", "connection_creation_rate=10000,some_config=10") + verifyCommand("ips", "--add-config", "some_config=10") + verifyCommand("ips", "--delete-config", "connection_creation_rate=10000,some_config=10") + verifyCommand("ips", "--delete-config", "some_config=10") + } + + @Test + def testDescribeIpConfigs(): Unit = { + def testShouldDescribeIpConfig(ip: Option[String]): Unit = { + val entityType = ClientQuotaEntity.IP + val (ipArgs, filterComponent) = ip match { + case Some(null) => (Array("--entity-default"), ClientQuotaFilterComponent.ofDefaultEntity(entityType)) + case Some(addr) => (Array("--entity-name", addr), ClientQuotaFilterComponent.ofEntity(entityType, addr)) + case None => (Array.empty[String], ClientQuotaFilterComponent.ofEntityType(entityType)) + } + + val describeOpts = new ConfigCommandOptions(Array("--bootstrap-server", "localhost:9092", + "--describe", "--entity-type", "ips") ++ ipArgs) + + val expectedFilter = ClientQuotaFilter.containsOnly(List(filterComponent).asJava) + + var describedConfigs = false + val describeFuture = new KafkaFutureImpl[util.Map[ClientQuotaEntity, util.Map[String, java.lang.Double]]] + describeFuture.complete(Map.empty[ClientQuotaEntity, util.Map[String, java.lang.Double]].asJava) + val describeResult: DescribeClientQuotasResult = EasyMock.createNiceMock(classOf[DescribeClientQuotasResult]) + EasyMock.expect(describeResult.entities()).andReturn(describeFuture) + + val node = new Node(1, "localhost", 9092) + val mockAdminClient = new MockAdminClient(util.Collections.singletonList(node), node) { + override def describeClientQuotas(filter: ClientQuotaFilter, options: DescribeClientQuotasOptions): DescribeClientQuotasResult = { + assertTrue(filter.strict) + assertEquals(expectedFilter, filter) + describedConfigs = true + describeResult + } + } Review comment: Have you considered using a real mock instead of doing this? Is there a reason not to? It seems to be that it would be more appropriate. ########## File path: core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala ########## @@ -472,6 +512,134 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging { EasyMock.reset(alterResult, describeResult) } + @Test + def shouldNotAlterNonQuotaIpConfigsUsingBootstrapServer(): Unit = { + // when using --bootstrap-server, it should be illegal to alter anything that is not a connection quota + // for ip entities + val node = new Node(1, "localhost", 9092) + val mockAdminClient = new MockAdminClient(util.Collections.singletonList(node), node) + + def verifyCommand(entityType: String, alterOpts: String*): Unit = { + val opts = new ConfigCommandOptions(Array("--bootstrap-server", "localhost:9092", + "--entity-type", entityType, "--entity-name", "admin", + "--alter") ++ alterOpts) + val e = intercept[IllegalArgumentException] { + ConfigCommand.alterConfig(mockAdminClient, opts) + } + assertTrue(s"Unexpected exception: $e", e.getMessage.contains("some_config")) + } + + verifyCommand("ips", "--add-config", "connection_creation_rate=10000,some_config=10") + verifyCommand("ips", "--add-config", "some_config=10") + verifyCommand("ips", "--delete-config", "connection_creation_rate=10000,some_config=10") + verifyCommand("ips", "--delete-config", "some_config=10") + } + + @Test + def testDescribeIpConfigs(): Unit = { + def testShouldDescribeIpConfig(ip: Option[String]): Unit = { Review comment: nit: I feel like this inner method is quite large and may merit to be extracted. It feels a bit weird when the inner method is way larger than the actual body of the method. I guess that it is a matter of taste but I believe that the code would be more readable. I leave this up to you. ########## File path: core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala ########## @@ -472,6 +512,134 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging { EasyMock.reset(alterResult, describeResult) } + @Test + def shouldNotAlterNonQuotaIpConfigsUsingBootstrapServer(): Unit = { + // when using --bootstrap-server, it should be illegal to alter anything that is not a connection quota + // for ip entities + val node = new Node(1, "localhost", 9092) + val mockAdminClient = new MockAdminClient(util.Collections.singletonList(node), node) + + def verifyCommand(entityType: String, alterOpts: String*): Unit = { + val opts = new ConfigCommandOptions(Array("--bootstrap-server", "localhost:9092", + "--entity-type", entityType, "--entity-name", "admin", Review comment: Actually, I believe that the command should fail due to `admin` which is not valid IP. It may be better to use a valid IP address here as you want to tests the configs. ########## File path: core/src/main/scala/kafka/admin/ConfigCommand.scala ########## @@ -864,11 +885,21 @@ object ConfigCommand extends Config { } } + if (hasEntityName && entityTypeVals.contains(ConfigType.Ip)) { + Seq(entityName, ip).filter(options.has(_)).map(options.valueOf(_)).foreach { ipAddress => + if (!Utils.validHostPattern(ipAddress)) Review comment: I think that we should validate that `ipAddress` is actually an IP. `validHostPattern` only ensure that it is a valid host name. ########## File path: core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala ########## @@ -398,19 +418,39 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging { ConfigCommand.alterConfigWithZk(null, createOpts, new TestAdminZkClient(zkClient)) } - def testShouldAddClientConfig(user: Option[String], clientId: Option[String]): Unit = { - def toValues(entityName: Option[String], entityType: String, command: String): - (Array[String], Option[String], Option[ClientQuotaFilterComponent]) = { - entityName match { - case Some(null) => - (Array("--entity-type", command, "--entity-default"), Some(null), - Some(ClientQuotaFilterComponent.ofDefaultEntity(entityType))) - case Some(name) => - (Array("--entity-type", command, "--entity-name", name), Some(name), - Some(ClientQuotaFilterComponent.ofEntity(entityType, name))) - case None => (Array.empty, None, None) + @Test + def shouldAddIpConfigsUsingZookeeper(): Unit = { + val createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect, + "--entity-name", "1.2.3.4", + "--entity-type", "ips", + "--alter", + "--add-config", "a=b,c=d")) + + class TestAdminZkClient(zkClient: KafkaZkClient) extends AdminZkClient(zkClient) { + override def changeIpConfig(ip: String, configChange: Properties): Unit = { + assertEquals("1.2.3.4", ip) + assertEquals("b", configChange.get("a")) + assertEquals("d", configChange.get("c")) } } + + ConfigCommand.alterConfigWithZk(null, createOpts, new TestAdminZkClient(zkClient)) + } + + private def toValues(entityName: Option[String], entityType: String, command: String): + (Array[String], Option[String], Option[ClientQuotaFilterComponent]) = { Review comment: nit: Indentation seems wrong here. ########## File path: core/src/main/scala/kafka/server/AdminManager.scala ########## @@ -920,32 +954,51 @@ class AdminManager(val config: KafkaConfig, !name.isDefined || !strict } - def fromProps(props: Map[String, String]): Map[String, Double] = { - props.map { case (key, value) => - val doubleValue = try value.toDouble catch { - case _: NumberFormatException => - throw new IllegalStateException(s"Unexpected client quota configuration value: $key -> $value") - } - key -> doubleValue - } - } - - (userEntries ++ clientIdEntries ++ bothEntries).map { case ((u, c), p) => + (userEntries ++ clientIdEntries ++ bothEntries).flatMap { case ((u, c), p) => val quotaProps = p.asScala.filter { case (key, _) => QuotaConfigs.isQuotaConfig(key) } if (quotaProps.nonEmpty && matches(userComponent, u) && matches(clientIdComponent, c)) Some(userClientIdToEntity(u, c) -> fromProps(quotaProps)) else None - }.flatten.toMap + }.toMap + } + + def handleDescribeIpQuotas(ipComponent: Option[ClientQuotaFilterComponent], strict: Boolean): Map[ClientQuotaEntity, Map[String, Double]] = { + val ip = ipComponent.flatMap(c => toOption(c.`match`)) + val exactIp = wantExact(ipComponent) + val allIps = ipComponent.exists(_.`match` == null) || (ipComponent.isEmpty && !strict) + val ipEntries = if (exactIp) + Map(Some(ip.get) -> adminZkClient.fetchEntityConfig(ConfigType.Ip, sanitized(ip))) + else if (allIps) + adminZkClient.fetchAllEntityConfigs(ConfigType.Ip).map { case (name, props) => + Some(desanitizeEntityName(name)) -> props + } + else + Map.empty + + def ipToQuotaEntity(ip: Option[String]): ClientQuotaEntity = { + new ClientQuotaEntity(ip.map(ipName => ClientQuotaEntity.IP -> ipName).toMap.asJava) + } + + ipEntries.flatMap { case (ip, props) => + val ipQuotaProps = props.asScala.filter { case (key, _) => DynamicConfig.Ip.names.contains(key) } + if (ipQuotaProps.nonEmpty) + Some(ipToQuotaEntity(ip) -> fromProps(ipQuotaProps)) + else + None + } } def alterClientQuotas(entries: Seq[ClientQuotaAlteration], validateOnly: Boolean): Map[ClientQuotaEntity, ApiError] = { def alterEntityQuotas(entity: ClientQuotaEntity, ops: Iterable[ClientQuotaAlteration.Op]): Unit = { - val (path, configType, configKeys) = entityToSanitizedUserClientId(entity) match { - case (Some(user), Some(clientId)) => (user + "/clients/" + clientId, ConfigType.User, DynamicConfig.User.configKeys) - case (Some(user), None) => (user, ConfigType.User, DynamicConfig.User.configKeys) - case (None, Some(clientId)) => (clientId, ConfigType.Client, DynamicConfig.Client.configKeys) - case _ => throw new InvalidRequestException("Invalid empty client quota entity") + val (path, configType, configKeys) = parseAndSanitizeQuotaEntity(entity) match { + case (Some(user), Some(clientId), None) => (user + "/clients/" + clientId, ConfigType.User, DynamicConfig.User.configKeys) + case (Some(user), None, None) => (user, ConfigType.User, DynamicConfig.User.configKeys) + case (None, Some(clientId), None) => (clientId, ConfigType.Client, DynamicConfig.Client.configKeys) + case (None, None, Some(ip)) => (ip, ConfigType.Ip, DynamicConfig.Ip.configKeys) + case (_, _, Some(_)) => throw new InvalidRequestException(s"Invalid quota entity combination, " + Review comment: nit: Shouldn't we use `(Some(_), Some(_), Some(_))` here in order to be really accurate? As it is, the first two could be anything. That works because it is after all the others but I would prefer to be explicit if possible. ########## File path: core/src/main/scala/kafka/server/AdminManager.scala ########## @@ -920,32 +954,51 @@ class AdminManager(val config: KafkaConfig, !name.isDefined || !strict } - def fromProps(props: Map[String, String]): Map[String, Double] = { - props.map { case (key, value) => - val doubleValue = try value.toDouble catch { - case _: NumberFormatException => - throw new IllegalStateException(s"Unexpected client quota configuration value: $key -> $value") - } - key -> doubleValue - } - } - - (userEntries ++ clientIdEntries ++ bothEntries).map { case ((u, c), p) => + (userEntries ++ clientIdEntries ++ bothEntries).flatMap { case ((u, c), p) => val quotaProps = p.asScala.filter { case (key, _) => QuotaConfigs.isQuotaConfig(key) } if (quotaProps.nonEmpty && matches(userComponent, u) && matches(clientIdComponent, c)) Some(userClientIdToEntity(u, c) -> fromProps(quotaProps)) else None - }.flatten.toMap + }.toMap + } + + def handleDescribeIpQuotas(ipComponent: Option[ClientQuotaFilterComponent], strict: Boolean): Map[ClientQuotaEntity, Map[String, Double]] = { + val ip = ipComponent.flatMap(c => toOption(c.`match`)) + val exactIp = wantExact(ipComponent) + val allIps = ipComponent.exists(_.`match` == null) || (ipComponent.isEmpty && !strict) + val ipEntries = if (exactIp) + Map(Some(ip.get) -> adminZkClient.fetchEntityConfig(ConfigType.Ip, sanitized(ip))) + else if (allIps) + adminZkClient.fetchAllEntityConfigs(ConfigType.Ip).map { case (name, props) => + Some(desanitizeEntityName(name)) -> props + } + else + Map.empty + + def ipToQuotaEntity(ip: Option[String]): ClientQuotaEntity = { + new ClientQuotaEntity(ip.map(ipName => ClientQuotaEntity.IP -> ipName).toMap.asJava) + } + + ipEntries.flatMap { case (ip, props) => + val ipQuotaProps = props.asScala.filter { case (key, _) => DynamicConfig.Ip.names.contains(key) } + if (ipQuotaProps.nonEmpty) + Some(ipToQuotaEntity(ip) -> fromProps(ipQuotaProps)) + else + None + } } def alterClientQuotas(entries: Seq[ClientQuotaAlteration], validateOnly: Boolean): Map[ClientQuotaEntity, ApiError] = { def alterEntityQuotas(entity: ClientQuotaEntity, ops: Iterable[ClientQuotaAlteration.Op]): Unit = { - val (path, configType, configKeys) = entityToSanitizedUserClientId(entity) match { - case (Some(user), Some(clientId)) => (user + "/clients/" + clientId, ConfigType.User, DynamicConfig.User.configKeys) - case (Some(user), None) => (user, ConfigType.User, DynamicConfig.User.configKeys) - case (None, Some(clientId)) => (clientId, ConfigType.Client, DynamicConfig.Client.configKeys) - case _ => throw new InvalidRequestException("Invalid empty client quota entity") + val (path, configType, configKeys) = parseAndSanitizeQuotaEntity(entity) match { Review comment: It seems that we should validate the provided `ip` somewhere here. I just tried with the following command: ``` ./bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type ips --entity-name ip --add-config connection_creation_rate=10 ``` The command completes but then the broker fails while applying the configuration: ``` [2020-12-04 15:31:01,951] ERROR error processing change notification {"version":2,"entity_path":"ips/ip"} from /config/changes/config_change_0000000001 (kafka.common.ZkNodeChangeNotificationListener) java.lang.IllegalArgumentException: Unable to resolve address ip at kafka.server.IpConfigHandler.processConfigChanges(ConfigHandler.scala:199) at kafka.server.DynamicConfigManager$ConfigChangedNotificationHandler$.processEntityConfigChangeVersion2(DynamicConfigManager.scala:151) at kafka.server.DynamicConfigManager$ConfigChangedNotificationHandler$.$anonfun$processNotification$1(DynamicConfigManager.scala:105) at kafka.server.DynamicConfigManager$ConfigChangedNotificationHandler$.processNotification(DynamicConfigManager.scala:96) at kafka.common.ZkNodeChangeNotificationListener.$anonfun$processNotification$1(ZkNodeChangeNotificationListener.scala:106) at kafka.common.ZkNodeChangeNotificationListener.processNotification(ZkNodeChangeNotificationListener.scala:106) at kafka.common.ZkNodeChangeNotificationListener.$anonfun$processNotifications$2(ZkNodeChangeNotificationListener.scala:90) at kafka.common.ZkNodeChangeNotificationListener.$anonfun$processNotifications$2$adapted(ZkNodeChangeNotificationListener.scala:87) at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563) at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561) at scala.collection.AbstractIterable.foreach(Iterable.scala:919) at kafka.common.ZkNodeChangeNotificationListener.kafka$common$ZkNodeChangeNotificationListener$$processNotifications(ZkNodeChangeNotificationListener.scala:87) at kafka.common.ZkNodeChangeNotificationListener$ChangeNotification.process(ZkNodeChangeNotificationListener.scala:120) at kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread.doWork(ZkNodeChangeNotificationListener.scala:146) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) ``` It would be good to cover this in the tests, both for the command and the api. This is also true with: ``` ./bin/kafka-configs.sh --zookeeper localhost:2181 --alter --ip ip --add-config connection_creation_rate=2 ``` ########## File path: core/src/main/scala/kafka/admin/ConfigCommand.scala ########## @@ -864,11 +885,21 @@ object ConfigCommand extends Config { } } + if (hasEntityName && entityTypeVals.contains(ConfigType.Ip)) { + Seq(entityName, ip).filter(options.has(_)).map(options.valueOf(_)).foreach { ipAddress => Review comment: nit: We can probably remove `(_)` in `filter` and `map`. ########## File path: core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala ########## @@ -236,13 +310,22 @@ class ClientQuotasRequestTest extends BaseRequestTest { (Some("user-3"), None, 58.58), (Some(null), None, 59.59), (None, Some("client-id-2"), 60.60) - ).map { case (u, c, v) => (toEntity(u, c), v) } + ).map { case (u, c, v) => (toClientEntity(u, c), v) } + + private val matchIpEntities = List( + (Some("1.2.3.4"), 10.0), + (Some("2.3.4.5"), 20.0) + ).map { case (ip, quota) => (toIpEntity(ip), quota)} private def setupDescribeClientQuotasMatchTest() = { - val result = alterClientQuotas(matchEntities.map { case (e, v) => - (e -> Map((RequestPercentageProp, Some(v)))) - }.toMap, validateOnly = false) - matchEntities.foreach(e => result.get(e._1).get.get(10, TimeUnit.SECONDS)) + val userClientQuotas = matchUserClientEntities.map { case (e, v) => + e -> Map((RequestPercentageProp, Some(v))) + }.toMap + val ipQuotas = matchIpEntities.map { case (e, v) => + e -> Map((IpConnectionRateProp, Some(v))) + }.toMap + val result = alterClientQuotas(userClientQuotas ++ ipQuotas, validateOnly = false) + (matchUserClientEntities ++ matchIpEntities).foreach(e => result(e._1).get(10, TimeUnit.SECONDS)) // Allow time for watch callbacks to be triggered. Thread.sleep(500) Review comment: This is not related to your PR but I wonder if it would be possible to replace this sleep by `retry` or `waitUntilTrue`. Thoughts? ########## File path: core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala ########## @@ -472,6 +512,134 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging { EasyMock.reset(alterResult, describeResult) } + @Test + def shouldNotAlterNonQuotaIpConfigsUsingBootstrapServer(): Unit = { + // when using --bootstrap-server, it should be illegal to alter anything that is not a connection quota + // for ip entities + val node = new Node(1, "localhost", 9092) + val mockAdminClient = new MockAdminClient(util.Collections.singletonList(node), node) + + def verifyCommand(entityType: String, alterOpts: String*): Unit = { + val opts = new ConfigCommandOptions(Array("--bootstrap-server", "localhost:9092", + "--entity-type", entityType, "--entity-name", "admin", + "--alter") ++ alterOpts) + val e = intercept[IllegalArgumentException] { + ConfigCommand.alterConfig(mockAdminClient, opts) + } + assertTrue(s"Unexpected exception: $e", e.getMessage.contains("some_config")) + } + + verifyCommand("ips", "--add-config", "connection_creation_rate=10000,some_config=10") + verifyCommand("ips", "--add-config", "some_config=10") + verifyCommand("ips", "--delete-config", "connection_creation_rate=10000,some_config=10") + verifyCommand("ips", "--delete-config", "some_config=10") + } + + @Test + def testDescribeIpConfigs(): Unit = { + def testShouldDescribeIpConfig(ip: Option[String]): Unit = { + val entityType = ClientQuotaEntity.IP + val (ipArgs, filterComponent) = ip match { + case Some(null) => (Array("--entity-default"), ClientQuotaFilterComponent.ofDefaultEntity(entityType)) + case Some(addr) => (Array("--entity-name", addr), ClientQuotaFilterComponent.ofEntity(entityType, addr)) + case None => (Array.empty[String], ClientQuotaFilterComponent.ofEntityType(entityType)) + } + + val describeOpts = new ConfigCommandOptions(Array("--bootstrap-server", "localhost:9092", + "--describe", "--entity-type", "ips") ++ ipArgs) + + val expectedFilter = ClientQuotaFilter.containsOnly(List(filterComponent).asJava) + + var describedConfigs = false + val describeFuture = new KafkaFutureImpl[util.Map[ClientQuotaEntity, util.Map[String, java.lang.Double]]] + describeFuture.complete(Map.empty[ClientQuotaEntity, util.Map[String, java.lang.Double]].asJava) + val describeResult: DescribeClientQuotasResult = EasyMock.createNiceMock(classOf[DescribeClientQuotasResult]) + EasyMock.expect(describeResult.entities()).andReturn(describeFuture) + + val node = new Node(1, "localhost", 9092) + val mockAdminClient = new MockAdminClient(util.Collections.singletonList(node), node) { + override def describeClientQuotas(filter: ClientQuotaFilter, options: DescribeClientQuotasOptions): DescribeClientQuotasResult = { + assertTrue(filter.strict) + assertEquals(expectedFilter, filter) + describedConfigs = true + describeResult + } + } + EasyMock.replay(describeResult) + ConfigCommand.describeConfig(mockAdminClient, describeOpts) + assertTrue(describedConfigs) + EasyMock.reset(describeResult) + } + + testShouldDescribeIpConfig(Some("1.2.3.4")) + testShouldDescribeIpConfig(Some(null)) + testShouldDescribeIpConfig(None) + } + + @Test + def testAlterIpConfig(): Unit = { + def testShouldAlterIpConfig(ip: Option[String], remove: Boolean): Unit = { + val (ipArgs, ipEntity, ipComponent) = toValues(ip, ClientQuotaEntity.IP, "ips") + val alterOpts = if (remove) + Array("--delete-config", "connection_creation_rate") + else + Array("--add-config", "connection_creation_rate=100") + val createOpts = new ConfigCommandOptions(Array("--bootstrap-server", "localhost:9092", + "--alter") ++ ipArgs ++ alterOpts) + + // Explicitly populate a HashMap to ensure nulls are recorded properly. + val entityMap = new java.util.HashMap[String, String] + ipEntity.foreach(u => entityMap.put(ClientQuotaEntity.IP, u)) + val entity = new ClientQuotaEntity(entityMap) + + var describedConfigs = false + val describeFuture = new KafkaFutureImpl[util.Map[ClientQuotaEntity, util.Map[String, java.lang.Double]]] + describeFuture.complete(Map(entity -> Map("connection_creation_rate" -> Double.box(50.0)).asJava).asJava) + val describeResult: DescribeClientQuotasResult = EasyMock.createNiceMock(classOf[DescribeClientQuotasResult]) + EasyMock.expect(describeResult.entities()).andReturn(describeFuture) + + var alteredConfigs = false + val alterFuture = new KafkaFutureImpl[Void] + alterFuture.complete(null) + val alterResult: AlterClientQuotasResult = EasyMock.createNiceMock(classOf[AlterClientQuotasResult]) + EasyMock.expect(alterResult.all()).andReturn(alterFuture) + + val node = new Node(1, "localhost", 9092) + val mockAdminClient = new MockAdminClient(util.Collections.singletonList(node), node) { Review comment: ditto ########## File path: core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala ########## @@ -187,6 +190,59 @@ class ClientQuotasRequestTest extends BaseRequestTest { )) } + @Test + def testAlterIpQuotasRequest(): Unit = { Review comment: We must add tests with invalid IPs. ########## File path: core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala ########## @@ -259,13 +264,19 @@ class DynamicConnectionQuotaTest extends BaseRequestTest { } private def updateIpConnectionRate(ip: Option[String], updatedRate: Int): Unit = { - adminZkClient.changeIpConfig(ip.getOrElse(ConfigEntityName.Default), - CoreUtils.propsWith(DynamicConfig.Ip.IpConnectionRateOverrideProp, updatedRate.toString)) + val initialConnectionCount = connectionCount + val adminClient = createAdminClient() Review comment: I second this. ########## File path: core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala ########## @@ -472,6 +512,134 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging { EasyMock.reset(alterResult, describeResult) } + @Test + def shouldNotAlterNonQuotaIpConfigsUsingBootstrapServer(): Unit = { + // when using --bootstrap-server, it should be illegal to alter anything that is not a connection quota + // for ip entities + val node = new Node(1, "localhost", 9092) + val mockAdminClient = new MockAdminClient(util.Collections.singletonList(node), node) + + def verifyCommand(entityType: String, alterOpts: String*): Unit = { + val opts = new ConfigCommandOptions(Array("--bootstrap-server", "localhost:9092", + "--entity-type", entityType, "--entity-name", "admin", + "--alter") ++ alterOpts) + val e = intercept[IllegalArgumentException] { + ConfigCommand.alterConfig(mockAdminClient, opts) + } + assertTrue(s"Unexpected exception: $e", e.getMessage.contains("some_config")) + } + + verifyCommand("ips", "--add-config", "connection_creation_rate=10000,some_config=10") Review comment: I do agree. It took me a while to grasp it. Moreover, the very same `verifyCommand` is defined in other places. I wonder if we could consolidate them into one. What do you think? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org