dajac commented on a change in pull request #9628:
URL: https://github.com/apache/kafka/pull/9628#discussion_r536134783



##########
File path: core/src/main/scala/kafka/admin/ConfigCommand.scala
##########
@@ -864,11 +885,21 @@ object ConfigCommand extends Config {
         }
       }
 
+      if (hasEntityName && entityTypeVals.contains(ConfigType.Ip)) {
+        Seq(entityName, 
ip).filter(options.has(_)).map(options.valueOf(_)).foreach { ipAddress =>
+          if (!Utils.validHostPattern(ipAddress))
+            throw new IllegalArgumentException(s"The entity name for 
${entityTypeVals.head} must be a valid IP, but it is: $ipAddress")
+        }
+      }
+
       if (options.has(describeOpt) && 
entityTypeVals.contains(BrokerLoggerConfigType) && !hasEntityName)
         throw new IllegalArgumentException(s"an entity name must be specified 
with --describe of ${entityTypeVals.mkString(",")}")
 
       if (options.has(alterOpt)) {
-        if (entityTypeVals.contains(ConfigType.User) || 
entityTypeVals.contains(ConfigType.Client) || 
entityTypeVals.contains(ConfigType.Broker)) {
+        if (entityTypeVals.contains(ConfigType.User) ||
+            entityTypeVals.contains(ConfigType.Client) ||
+            entityTypeVals.contains(ConfigType.Broker) ||
+            entityTypeVals.contains(ConfigType.Ip)) {
           if (!hasEntityName && !hasEntityDefault)
             throw new IllegalArgumentException("an entity-name or default 
entity must be specified with --alter of users, clients or brokers")

Review comment:
       nit: Could we add ip in the list as well?

##########
File path: core/src/main/scala/kafka/admin/ConfigCommand.scala
##########
@@ -778,6 +793,10 @@ object ConfigCommand extends Config {
     val brokerLogger = parser.accepts("broker-logger", "The broker's ID for 
its logger config.")
       .withRequiredArg
       .ofType(classOf[String])
+    val ipDefaults = parser.accepts("ip-defaults", "The config defaults for 
all IPs.")
+    val ip = parser.accepts("ip", "The IP address.")

Review comment:
       It would be good to add these two flags in the KIP as well for 
completeness.

##########
File path: core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
##########
@@ -472,6 +512,134 @@ class ConfigCommandTest extends ZooKeeperTestHarness with 
Logging {
     EasyMock.reset(alterResult, describeResult)
   }
 
+  @Test
+  def shouldNotAlterNonQuotaIpConfigsUsingBootstrapServer(): Unit = {
+    // when using --bootstrap-server, it should be illegal to alter anything 
that is not a connection quota
+    // for ip entities
+    val node = new Node(1, "localhost", 9092)
+    val mockAdminClient = new 
MockAdminClient(util.Collections.singletonList(node), node)
+
+    def verifyCommand(entityType: String, alterOpts: String*): Unit = {
+      val opts = new ConfigCommandOptions(Array("--bootstrap-server", 
"localhost:9092",
+        "--entity-type", entityType, "--entity-name", "admin",
+        "--alter") ++ alterOpts)
+      val e = intercept[IllegalArgumentException] {
+        ConfigCommand.alterConfig(mockAdminClient, opts)
+      }
+      assertTrue(s"Unexpected exception: $e", 
e.getMessage.contains("some_config"))
+    }
+
+    verifyCommand("ips", "--add-config", 
"connection_creation_rate=10000,some_config=10")
+    verifyCommand("ips", "--add-config", "some_config=10")
+    verifyCommand("ips", "--delete-config", 
"connection_creation_rate=10000,some_config=10")
+    verifyCommand("ips", "--delete-config", "some_config=10")
+  }
+
+  @Test
+  def testDescribeIpConfigs(): Unit = {
+    def testShouldDescribeIpConfig(ip: Option[String]): Unit = {
+      val entityType = ClientQuotaEntity.IP
+      val (ipArgs, filterComponent) = ip match {
+        case Some(null) => (Array("--entity-default"), 
ClientQuotaFilterComponent.ofDefaultEntity(entityType))
+        case Some(addr) => (Array("--entity-name", addr), 
ClientQuotaFilterComponent.ofEntity(entityType, addr))
+        case None => (Array.empty[String], 
ClientQuotaFilterComponent.ofEntityType(entityType))
+      }
+
+      val describeOpts = new ConfigCommandOptions(Array("--bootstrap-server", 
"localhost:9092",
+        "--describe", "--entity-type", "ips") ++ ipArgs)
+
+      val expectedFilter = 
ClientQuotaFilter.containsOnly(List(filterComponent).asJava)
+
+      var describedConfigs = false
+      val describeFuture = new KafkaFutureImpl[util.Map[ClientQuotaEntity, 
util.Map[String, java.lang.Double]]]
+      describeFuture.complete(Map.empty[ClientQuotaEntity, util.Map[String, 
java.lang.Double]].asJava)
+      val describeResult: DescribeClientQuotasResult = 
EasyMock.createNiceMock(classOf[DescribeClientQuotasResult])
+      EasyMock.expect(describeResult.entities()).andReturn(describeFuture)
+
+      val node = new Node(1, "localhost", 9092)
+      val mockAdminClient = new 
MockAdminClient(util.Collections.singletonList(node), node) {
+        override def describeClientQuotas(filter: ClientQuotaFilter, options: 
DescribeClientQuotasOptions): DescribeClientQuotasResult = {
+          assertTrue(filter.strict)
+          assertEquals(expectedFilter, filter)
+          describedConfigs = true
+          describeResult
+        }
+      }

Review comment:
       Have you considered using a real mock instead of doing this? Is there a 
reason not to? It seems to be that it would be more appropriate.

##########
File path: core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
##########
@@ -472,6 +512,134 @@ class ConfigCommandTest extends ZooKeeperTestHarness with 
Logging {
     EasyMock.reset(alterResult, describeResult)
   }
 
+  @Test
+  def shouldNotAlterNonQuotaIpConfigsUsingBootstrapServer(): Unit = {
+    // when using --bootstrap-server, it should be illegal to alter anything 
that is not a connection quota
+    // for ip entities
+    val node = new Node(1, "localhost", 9092)
+    val mockAdminClient = new 
MockAdminClient(util.Collections.singletonList(node), node)
+
+    def verifyCommand(entityType: String, alterOpts: String*): Unit = {
+      val opts = new ConfigCommandOptions(Array("--bootstrap-server", 
"localhost:9092",
+        "--entity-type", entityType, "--entity-name", "admin",
+        "--alter") ++ alterOpts)
+      val e = intercept[IllegalArgumentException] {
+        ConfigCommand.alterConfig(mockAdminClient, opts)
+      }
+      assertTrue(s"Unexpected exception: $e", 
e.getMessage.contains("some_config"))
+    }
+
+    verifyCommand("ips", "--add-config", 
"connection_creation_rate=10000,some_config=10")
+    verifyCommand("ips", "--add-config", "some_config=10")
+    verifyCommand("ips", "--delete-config", 
"connection_creation_rate=10000,some_config=10")
+    verifyCommand("ips", "--delete-config", "some_config=10")
+  }
+
+  @Test
+  def testDescribeIpConfigs(): Unit = {
+    def testShouldDescribeIpConfig(ip: Option[String]): Unit = {

Review comment:
       nit: I feel like this inner method is quite large and may merit to be 
extracted. It feels a bit weird when the inner method is way larger than the 
actual body of the method. I guess that it is a matter of taste but I believe 
that the code would be more readable. I leave this up to you.

##########
File path: core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
##########
@@ -472,6 +512,134 @@ class ConfigCommandTest extends ZooKeeperTestHarness with 
Logging {
     EasyMock.reset(alterResult, describeResult)
   }
 
+  @Test
+  def shouldNotAlterNonQuotaIpConfigsUsingBootstrapServer(): Unit = {
+    // when using --bootstrap-server, it should be illegal to alter anything 
that is not a connection quota
+    // for ip entities
+    val node = new Node(1, "localhost", 9092)
+    val mockAdminClient = new 
MockAdminClient(util.Collections.singletonList(node), node)
+
+    def verifyCommand(entityType: String, alterOpts: String*): Unit = {
+      val opts = new ConfigCommandOptions(Array("--bootstrap-server", 
"localhost:9092",
+        "--entity-type", entityType, "--entity-name", "admin",

Review comment:
       Actually, I believe that the command should fail due to `admin` which is 
not valid IP. It may be better to use a valid IP address here as you want to 
tests the configs.

##########
File path: core/src/main/scala/kafka/admin/ConfigCommand.scala
##########
@@ -864,11 +885,21 @@ object ConfigCommand extends Config {
         }
       }
 
+      if (hasEntityName && entityTypeVals.contains(ConfigType.Ip)) {
+        Seq(entityName, 
ip).filter(options.has(_)).map(options.valueOf(_)).foreach { ipAddress =>
+          if (!Utils.validHostPattern(ipAddress))

Review comment:
       I think that we should validate that `ipAddress` is actually an IP. 
`validHostPattern` only ensure that it is a valid host name.

##########
File path: core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
##########
@@ -398,19 +418,39 @@ class ConfigCommandTest extends ZooKeeperTestHarness with 
Logging {
     ConfigCommand.alterConfigWithZk(null, createOpts, new 
TestAdminZkClient(zkClient))
   }
 
-  def testShouldAddClientConfig(user: Option[String], clientId: 
Option[String]): Unit = {
-    def toValues(entityName: Option[String], entityType: String, command: 
String):
-        (Array[String], Option[String], Option[ClientQuotaFilterComponent]) = {
-      entityName match {
-        case Some(null) =>
-          (Array("--entity-type", command, "--entity-default"), Some(null),
-            Some(ClientQuotaFilterComponent.ofDefaultEntity(entityType)))
-        case Some(name) =>
-          (Array("--entity-type", command, "--entity-name", name), Some(name),
-            Some(ClientQuotaFilterComponent.ofEntity(entityType, name)))
-        case None => (Array.empty, None, None)
+  @Test
+  def shouldAddIpConfigsUsingZookeeper(): Unit = {
+    val createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
+      "--entity-name", "1.2.3.4",
+      "--entity-type", "ips",
+      "--alter",
+      "--add-config", "a=b,c=d"))
+
+    class TestAdminZkClient(zkClient: KafkaZkClient) extends 
AdminZkClient(zkClient) {
+      override def changeIpConfig(ip: String, configChange: Properties): Unit 
= {
+        assertEquals("1.2.3.4", ip)
+        assertEquals("b", configChange.get("a"))
+        assertEquals("d", configChange.get("c"))
       }
     }
+
+    ConfigCommand.alterConfigWithZk(null, createOpts, new 
TestAdminZkClient(zkClient))
+  }
+
+  private def toValues(entityName: Option[String], entityType: String, 
command: String):
+  (Array[String], Option[String], Option[ClientQuotaFilterComponent]) = {

Review comment:
       nit: Indentation seems wrong here.

##########
File path: core/src/main/scala/kafka/server/AdminManager.scala
##########
@@ -920,32 +954,51 @@ class AdminManager(val config: KafkaConfig,
         !name.isDefined || !strict
     }
 
-    def fromProps(props: Map[String, String]): Map[String, Double] = {
-      props.map { case (key, value) =>
-        val doubleValue = try value.toDouble catch {
-          case _: NumberFormatException =>
-            throw new IllegalStateException(s"Unexpected client quota 
configuration value: $key -> $value")
-        }
-        key -> doubleValue
-      }
-    }
-
-    (userEntries ++ clientIdEntries ++ bothEntries).map { case ((u, c), p) =>
+    (userEntries ++ clientIdEntries ++ bothEntries).flatMap { case ((u, c), p) 
=>
       val quotaProps = p.asScala.filter { case (key, _) => 
QuotaConfigs.isQuotaConfig(key) }
       if (quotaProps.nonEmpty && matches(userComponent, u) && 
matches(clientIdComponent, c))
         Some(userClientIdToEntity(u, c) -> fromProps(quotaProps))
       else
         None
-    }.flatten.toMap
+    }.toMap
+  }
+
+  def handleDescribeIpQuotas(ipComponent: Option[ClientQuotaFilterComponent], 
strict: Boolean): Map[ClientQuotaEntity, Map[String, Double]] = {
+    val ip = ipComponent.flatMap(c => toOption(c.`match`))
+    val exactIp = wantExact(ipComponent)
+    val allIps = ipComponent.exists(_.`match` == null) || (ipComponent.isEmpty 
&& !strict)
+    val ipEntries = if (exactIp)
+      Map(Some(ip.get) -> adminZkClient.fetchEntityConfig(ConfigType.Ip, 
sanitized(ip)))
+    else if (allIps)
+      adminZkClient.fetchAllEntityConfigs(ConfigType.Ip).map { case (name, 
props) =>
+        Some(desanitizeEntityName(name)) -> props
+      }
+    else
+      Map.empty
+
+    def ipToQuotaEntity(ip: Option[String]): ClientQuotaEntity = {
+      new ClientQuotaEntity(ip.map(ipName => ClientQuotaEntity.IP -> 
ipName).toMap.asJava)
+    }
+
+    ipEntries.flatMap { case (ip, props) =>
+      val ipQuotaProps = props.asScala.filter { case (key, _) => 
DynamicConfig.Ip.names.contains(key) }
+      if (ipQuotaProps.nonEmpty)
+        Some(ipToQuotaEntity(ip) -> fromProps(ipQuotaProps))
+      else
+        None
+    }
   }
 
   def alterClientQuotas(entries: Seq[ClientQuotaAlteration], validateOnly: 
Boolean): Map[ClientQuotaEntity, ApiError] = {
     def alterEntityQuotas(entity: ClientQuotaEntity, ops: 
Iterable[ClientQuotaAlteration.Op]): Unit = {
-      val (path, configType, configKeys) = 
entityToSanitizedUserClientId(entity) match {
-        case (Some(user), Some(clientId)) => (user + "/clients/" + clientId, 
ConfigType.User, DynamicConfig.User.configKeys)
-        case (Some(user), None) => (user, ConfigType.User, 
DynamicConfig.User.configKeys)
-        case (None, Some(clientId)) => (clientId, ConfigType.Client, 
DynamicConfig.Client.configKeys)
-        case _ => throw new InvalidRequestException("Invalid empty client 
quota entity")
+      val (path, configType, configKeys) = parseAndSanitizeQuotaEntity(entity) 
match {
+        case (Some(user), Some(clientId), None) => (user + "/clients/" + 
clientId, ConfigType.User, DynamicConfig.User.configKeys)
+        case (Some(user), None, None) => (user, ConfigType.User, 
DynamicConfig.User.configKeys)
+        case (None, Some(clientId), None) => (clientId, ConfigType.Client, 
DynamicConfig.Client.configKeys)
+        case (None, None, Some(ip)) => (ip, ConfigType.Ip, 
DynamicConfig.Ip.configKeys)
+        case (_, _, Some(_)) => throw new InvalidRequestException(s"Invalid 
quota entity combination, " +

Review comment:
       nit: Shouldn't we use `(Some(_), Some(_), Some(_))` here in order to be 
really accurate? As it is, the first two could be anything. That works because 
it is after all the others but I would prefer to be explicit if possible.

##########
File path: core/src/main/scala/kafka/server/AdminManager.scala
##########
@@ -920,32 +954,51 @@ class AdminManager(val config: KafkaConfig,
         !name.isDefined || !strict
     }
 
-    def fromProps(props: Map[String, String]): Map[String, Double] = {
-      props.map { case (key, value) =>
-        val doubleValue = try value.toDouble catch {
-          case _: NumberFormatException =>
-            throw new IllegalStateException(s"Unexpected client quota 
configuration value: $key -> $value")
-        }
-        key -> doubleValue
-      }
-    }
-
-    (userEntries ++ clientIdEntries ++ bothEntries).map { case ((u, c), p) =>
+    (userEntries ++ clientIdEntries ++ bothEntries).flatMap { case ((u, c), p) 
=>
       val quotaProps = p.asScala.filter { case (key, _) => 
QuotaConfigs.isQuotaConfig(key) }
       if (quotaProps.nonEmpty && matches(userComponent, u) && 
matches(clientIdComponent, c))
         Some(userClientIdToEntity(u, c) -> fromProps(quotaProps))
       else
         None
-    }.flatten.toMap
+    }.toMap
+  }
+
+  def handleDescribeIpQuotas(ipComponent: Option[ClientQuotaFilterComponent], 
strict: Boolean): Map[ClientQuotaEntity, Map[String, Double]] = {
+    val ip = ipComponent.flatMap(c => toOption(c.`match`))
+    val exactIp = wantExact(ipComponent)
+    val allIps = ipComponent.exists(_.`match` == null) || (ipComponent.isEmpty 
&& !strict)
+    val ipEntries = if (exactIp)
+      Map(Some(ip.get) -> adminZkClient.fetchEntityConfig(ConfigType.Ip, 
sanitized(ip)))
+    else if (allIps)
+      adminZkClient.fetchAllEntityConfigs(ConfigType.Ip).map { case (name, 
props) =>
+        Some(desanitizeEntityName(name)) -> props
+      }
+    else
+      Map.empty
+
+    def ipToQuotaEntity(ip: Option[String]): ClientQuotaEntity = {
+      new ClientQuotaEntity(ip.map(ipName => ClientQuotaEntity.IP -> 
ipName).toMap.asJava)
+    }
+
+    ipEntries.flatMap { case (ip, props) =>
+      val ipQuotaProps = props.asScala.filter { case (key, _) => 
DynamicConfig.Ip.names.contains(key) }
+      if (ipQuotaProps.nonEmpty)
+        Some(ipToQuotaEntity(ip) -> fromProps(ipQuotaProps))
+      else
+        None
+    }
   }
 
   def alterClientQuotas(entries: Seq[ClientQuotaAlteration], validateOnly: 
Boolean): Map[ClientQuotaEntity, ApiError] = {
     def alterEntityQuotas(entity: ClientQuotaEntity, ops: 
Iterable[ClientQuotaAlteration.Op]): Unit = {
-      val (path, configType, configKeys) = 
entityToSanitizedUserClientId(entity) match {
-        case (Some(user), Some(clientId)) => (user + "/clients/" + clientId, 
ConfigType.User, DynamicConfig.User.configKeys)
-        case (Some(user), None) => (user, ConfigType.User, 
DynamicConfig.User.configKeys)
-        case (None, Some(clientId)) => (clientId, ConfigType.Client, 
DynamicConfig.Client.configKeys)
-        case _ => throw new InvalidRequestException("Invalid empty client 
quota entity")
+      val (path, configType, configKeys) = parseAndSanitizeQuotaEntity(entity) 
match {

Review comment:
       It seems that we should validate the provided `ip` somewhere here. I 
just tried with the following command:
   ```
   ./bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter 
--entity-type ips --entity-name ip --add-config  connection_creation_rate=10
   ```
   The command completes but then the broker fails while applying the 
configuration:
   ```
   [2020-12-04 15:31:01,951] ERROR error processing change notification 
{"version":2,"entity_path":"ips/ip"} from 
/config/changes/config_change_0000000001 
(kafka.common.ZkNodeChangeNotificationListener)
   java.lang.IllegalArgumentException: Unable to resolve address ip
        at 
kafka.server.IpConfigHandler.processConfigChanges(ConfigHandler.scala:199)
        at 
kafka.server.DynamicConfigManager$ConfigChangedNotificationHandler$.processEntityConfigChangeVersion2(DynamicConfigManager.scala:151)
        at 
kafka.server.DynamicConfigManager$ConfigChangedNotificationHandler$.$anonfun$processNotification$1(DynamicConfigManager.scala:105)
        at 
kafka.server.DynamicConfigManager$ConfigChangedNotificationHandler$.processNotification(DynamicConfigManager.scala:96)
        at 
kafka.common.ZkNodeChangeNotificationListener.$anonfun$processNotification$1(ZkNodeChangeNotificationListener.scala:106)
        at 
kafka.common.ZkNodeChangeNotificationListener.processNotification(ZkNodeChangeNotificationListener.scala:106)
        at 
kafka.common.ZkNodeChangeNotificationListener.$anonfun$processNotifications$2(ZkNodeChangeNotificationListener.scala:90)
        at 
kafka.common.ZkNodeChangeNotificationListener.$anonfun$processNotifications$2$adapted(ZkNodeChangeNotificationListener.scala:87)
        at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563)
        at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:919)
        at 
kafka.common.ZkNodeChangeNotificationListener.kafka$common$ZkNodeChangeNotificationListener$$processNotifications(ZkNodeChangeNotificationListener.scala:87)
        at 
kafka.common.ZkNodeChangeNotificationListener$ChangeNotification.process(ZkNodeChangeNotificationListener.scala:120)
        at 
kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread.doWork(ZkNodeChangeNotificationListener.scala:146)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
   ```
   
   It would be good to cover this in the tests, both for the command and the 
api.
   
   This is also true with:
   ```
   ./bin/kafka-configs.sh --zookeeper localhost:2181 --alter --ip ip 
--add-config connection_creation_rate=2
   ```

##########
File path: core/src/main/scala/kafka/admin/ConfigCommand.scala
##########
@@ -864,11 +885,21 @@ object ConfigCommand extends Config {
         }
       }
 
+      if (hasEntityName && entityTypeVals.contains(ConfigType.Ip)) {
+        Seq(entityName, 
ip).filter(options.has(_)).map(options.valueOf(_)).foreach { ipAddress =>

Review comment:
       nit: We can probably remove `(_)` in `filter` and `map`. 

##########
File path: core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
##########
@@ -236,13 +310,22 @@ class ClientQuotasRequestTest extends BaseRequestTest {
     (Some("user-3"), None, 58.58),
     (Some(null), None, 59.59),
     (None, Some("client-id-2"), 60.60)
-  ).map { case (u, c, v) => (toEntity(u, c), v) }
+  ).map { case (u, c, v) => (toClientEntity(u, c), v) }
+
+  private val matchIpEntities = List(
+    (Some("1.2.3.4"), 10.0),
+    (Some("2.3.4.5"), 20.0)
+  ).map { case (ip, quota) => (toIpEntity(ip), quota)}
 
   private def setupDescribeClientQuotasMatchTest() = {
-    val result = alterClientQuotas(matchEntities.map { case (e, v) =>
-      (e -> Map((RequestPercentageProp, Some(v))))
-    }.toMap, validateOnly = false)
-    matchEntities.foreach(e => result.get(e._1).get.get(10, TimeUnit.SECONDS))
+    val userClientQuotas = matchUserClientEntities.map { case (e, v) =>
+      e -> Map((RequestPercentageProp, Some(v)))
+    }.toMap
+    val ipQuotas = matchIpEntities.map { case (e, v) =>
+      e -> Map((IpConnectionRateProp, Some(v)))
+    }.toMap
+    val result = alterClientQuotas(userClientQuotas ++ ipQuotas, validateOnly 
= false)
+    (matchUserClientEntities ++ matchIpEntities).foreach(e => 
result(e._1).get(10, TimeUnit.SECONDS))
 
     // Allow time for watch callbacks to be triggered.
     Thread.sleep(500)

Review comment:
       This is not related to your PR but I wonder if it would be possible to 
replace this sleep by `retry` or `waitUntilTrue`. Thoughts?

##########
File path: core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
##########
@@ -472,6 +512,134 @@ class ConfigCommandTest extends ZooKeeperTestHarness with 
Logging {
     EasyMock.reset(alterResult, describeResult)
   }
 
+  @Test
+  def shouldNotAlterNonQuotaIpConfigsUsingBootstrapServer(): Unit = {
+    // when using --bootstrap-server, it should be illegal to alter anything 
that is not a connection quota
+    // for ip entities
+    val node = new Node(1, "localhost", 9092)
+    val mockAdminClient = new 
MockAdminClient(util.Collections.singletonList(node), node)
+
+    def verifyCommand(entityType: String, alterOpts: String*): Unit = {
+      val opts = new ConfigCommandOptions(Array("--bootstrap-server", 
"localhost:9092",
+        "--entity-type", entityType, "--entity-name", "admin",
+        "--alter") ++ alterOpts)
+      val e = intercept[IllegalArgumentException] {
+        ConfigCommand.alterConfig(mockAdminClient, opts)
+      }
+      assertTrue(s"Unexpected exception: $e", 
e.getMessage.contains("some_config"))
+    }
+
+    verifyCommand("ips", "--add-config", 
"connection_creation_rate=10000,some_config=10")
+    verifyCommand("ips", "--add-config", "some_config=10")
+    verifyCommand("ips", "--delete-config", 
"connection_creation_rate=10000,some_config=10")
+    verifyCommand("ips", "--delete-config", "some_config=10")
+  }
+
+  @Test
+  def testDescribeIpConfigs(): Unit = {
+    def testShouldDescribeIpConfig(ip: Option[String]): Unit = {
+      val entityType = ClientQuotaEntity.IP
+      val (ipArgs, filterComponent) = ip match {
+        case Some(null) => (Array("--entity-default"), 
ClientQuotaFilterComponent.ofDefaultEntity(entityType))
+        case Some(addr) => (Array("--entity-name", addr), 
ClientQuotaFilterComponent.ofEntity(entityType, addr))
+        case None => (Array.empty[String], 
ClientQuotaFilterComponent.ofEntityType(entityType))
+      }
+
+      val describeOpts = new ConfigCommandOptions(Array("--bootstrap-server", 
"localhost:9092",
+        "--describe", "--entity-type", "ips") ++ ipArgs)
+
+      val expectedFilter = 
ClientQuotaFilter.containsOnly(List(filterComponent).asJava)
+
+      var describedConfigs = false
+      val describeFuture = new KafkaFutureImpl[util.Map[ClientQuotaEntity, 
util.Map[String, java.lang.Double]]]
+      describeFuture.complete(Map.empty[ClientQuotaEntity, util.Map[String, 
java.lang.Double]].asJava)
+      val describeResult: DescribeClientQuotasResult = 
EasyMock.createNiceMock(classOf[DescribeClientQuotasResult])
+      EasyMock.expect(describeResult.entities()).andReturn(describeFuture)
+
+      val node = new Node(1, "localhost", 9092)
+      val mockAdminClient = new 
MockAdminClient(util.Collections.singletonList(node), node) {
+        override def describeClientQuotas(filter: ClientQuotaFilter, options: 
DescribeClientQuotasOptions): DescribeClientQuotasResult = {
+          assertTrue(filter.strict)
+          assertEquals(expectedFilter, filter)
+          describedConfigs = true
+          describeResult
+        }
+      }
+      EasyMock.replay(describeResult)
+      ConfigCommand.describeConfig(mockAdminClient, describeOpts)
+      assertTrue(describedConfigs)
+      EasyMock.reset(describeResult)
+    }
+
+    testShouldDescribeIpConfig(Some("1.2.3.4"))
+    testShouldDescribeIpConfig(Some(null))
+    testShouldDescribeIpConfig(None)
+  }
+
+  @Test
+  def testAlterIpConfig(): Unit = {
+    def testShouldAlterIpConfig(ip: Option[String], remove: Boolean): Unit = {
+      val (ipArgs, ipEntity, ipComponent) = toValues(ip, ClientQuotaEntity.IP, 
"ips")
+      val alterOpts = if (remove)
+        Array("--delete-config", "connection_creation_rate")
+      else
+        Array("--add-config", "connection_creation_rate=100")
+      val createOpts = new ConfigCommandOptions(Array("--bootstrap-server", 
"localhost:9092",
+        "--alter") ++ ipArgs ++ alterOpts)
+
+      // Explicitly populate a HashMap to ensure nulls are recorded properly.
+      val entityMap = new java.util.HashMap[String, String]
+      ipEntity.foreach(u => entityMap.put(ClientQuotaEntity.IP, u))
+      val entity = new ClientQuotaEntity(entityMap)
+
+      var describedConfigs = false
+      val describeFuture = new KafkaFutureImpl[util.Map[ClientQuotaEntity, 
util.Map[String, java.lang.Double]]]
+      describeFuture.complete(Map(entity -> Map("connection_creation_rate" -> 
Double.box(50.0)).asJava).asJava)
+      val describeResult: DescribeClientQuotasResult = 
EasyMock.createNiceMock(classOf[DescribeClientQuotasResult])
+      EasyMock.expect(describeResult.entities()).andReturn(describeFuture)
+
+      var alteredConfigs = false
+      val alterFuture = new KafkaFutureImpl[Void]
+      alterFuture.complete(null)
+      val alterResult: AlterClientQuotasResult = 
EasyMock.createNiceMock(classOf[AlterClientQuotasResult])
+      EasyMock.expect(alterResult.all()).andReturn(alterFuture)
+
+      val node = new Node(1, "localhost", 9092)
+      val mockAdminClient = new 
MockAdminClient(util.Collections.singletonList(node), node) {

Review comment:
       ditto

##########
File path: core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
##########
@@ -187,6 +190,59 @@ class ClientQuotasRequestTest extends BaseRequestTest {
     ))
   }
 
+  @Test
+  def testAlterIpQuotasRequest(): Unit = {

Review comment:
       We must add tests with invalid IPs.

##########
File path: 
core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala
##########
@@ -259,13 +264,19 @@ class DynamicConnectionQuotaTest extends BaseRequestTest {
   }
 
   private def updateIpConnectionRate(ip: Option[String], updatedRate: Int): 
Unit = {
-    adminZkClient.changeIpConfig(ip.getOrElse(ConfigEntityName.Default),
-      CoreUtils.propsWith(DynamicConfig.Ip.IpConnectionRateOverrideProp, 
updatedRate.toString))
+    val initialConnectionCount = connectionCount
+    val adminClient = createAdminClient()

Review comment:
       I second this.

##########
File path: core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
##########
@@ -472,6 +512,134 @@ class ConfigCommandTest extends ZooKeeperTestHarness with 
Logging {
     EasyMock.reset(alterResult, describeResult)
   }
 
+  @Test
+  def shouldNotAlterNonQuotaIpConfigsUsingBootstrapServer(): Unit = {
+    // when using --bootstrap-server, it should be illegal to alter anything 
that is not a connection quota
+    // for ip entities
+    val node = new Node(1, "localhost", 9092)
+    val mockAdminClient = new 
MockAdminClient(util.Collections.singletonList(node), node)
+
+    def verifyCommand(entityType: String, alterOpts: String*): Unit = {
+      val opts = new ConfigCommandOptions(Array("--bootstrap-server", 
"localhost:9092",
+        "--entity-type", entityType, "--entity-name", "admin",
+        "--alter") ++ alterOpts)
+      val e = intercept[IllegalArgumentException] {
+        ConfigCommand.alterConfig(mockAdminClient, opts)
+      }
+      assertTrue(s"Unexpected exception: $e", 
e.getMessage.contains("some_config"))
+    }
+
+    verifyCommand("ips", "--add-config", 
"connection_creation_rate=10000,some_config=10")

Review comment:
       I do agree. It took me a while to grasp it. Moreover, the very same 
`verifyCommand` is defined in other places. I wonder if we could consolidate 
them into one. What do you think?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to