dajac commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r521346721



##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -538,14 +542,20 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
                               val recvBufferSize: Int,
                               brokerId: Int,
                               connectionQuotas: ConnectionQuotas,
-                              metricPrefix: String) extends 
AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
+                              metricPrefix: String,
+                              time: Time) extends 
AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
 
   private val nioSelector = NSelector.open()
   val serverChannel = openServerSocket(endPoint.host, endPoint.port)
   private val processors = new ArrayBuffer[Processor]()
   private val processorsStarted = new AtomicBoolean
   private val blockedPercentMeter = 
newMeter(s"${metricPrefix}AcceptorBlockedPercent",
     "blocked time", TimeUnit.NANOSECONDS, Map(ListenerMetricTag -> 
endPoint.listenerName.value))
+  private var currentProcessorIndex = 0
+  private[network] case class DelayedCloseSocket(socket: SocketChannel, 
endThrottleTimeMs: Long) extends Ordered[DelayedCloseSocket] {
+    override def compare(that: DelayedCloseSocket): Int = endThrottleTimeMs 
compare that.endThrottleTimeMs
+  }
+  private[network] val throttledSockets = new 
mutable.PriorityQueue[DelayedCloseSocket]()

Review comment:
       Out of curiosity, have we considered using a `DelayQueue` instead of 
`PriorityQueue`? I was wondering if that could reduce the logic on our side. I 
am not sure if it is worth doing it though.

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -538,14 +542,20 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
                               val recvBufferSize: Int,
                               brokerId: Int,
                               connectionQuotas: ConnectionQuotas,
-                              metricPrefix: String) extends 
AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
+                              metricPrefix: String,
+                              time: Time) extends 
AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
 
   private val nioSelector = NSelector.open()
   val serverChannel = openServerSocket(endPoint.host, endPoint.port)
   private val processors = new ArrayBuffer[Processor]()
   private val processorsStarted = new AtomicBoolean
   private val blockedPercentMeter = 
newMeter(s"${metricPrefix}AcceptorBlockedPercent",
     "blocked time", TimeUnit.NANOSECONDS, Map(ListenerMetricTag -> 
endPoint.listenerName.value))
+  private var currentProcessorIndex = 0
+  private[network] case class DelayedCloseSocket(socket: SocketChannel, 
endThrottleTimeMs: Long) extends Ordered[DelayedCloseSocket] {

Review comment:
       nit: It is a bit weird to have this case class define in the middle of 
the block of variables. Would it make sense to move it to another place? May be 
after the block of variables?

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -647,9 +624,10 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
         }
       }
     } finally {
-      debug("Closing server socket and selector.")
+      debug("Closing server socket, selector, and any throttled sockets.")
       CoreUtils.swallow(serverChannel.close(), this, Level.ERROR)
       CoreUtils.swallow(nioSelector.close(), this, Level.ERROR)
+      throttledSockets.foreach(throttledSocket => 
closeSocket(throttledSocket.socket))

Review comment:
       nit: We may want to clear `throttledSockets` for completeness. 

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1453,6 +1582,47 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
     }
   }
 
+  /**
+   * To avoid over-recording listener/broker connection rate, we unrecord a 
listener or broker connection
+   * if the IP gets throttled later.
+   *
+   * @param listenerName listener to unrecord connection
+   * @param timeMs current time in milliseconds
+   */
+  private def unrecordListenerConnection(listenerName: ListenerName, timeMs: 
Long): Unit = {
+    if (!protectedListener(listenerName)) {
+      brokerConnectionRateSensor.record(-1.0, timeMs, false)
+    }
+    maxConnectionsPerListener
+      .get(listenerName)
+      .foreach(_.connectionRateSensor.record(-1.0, timeMs, false))
+  }
+
+  /**
+   * Calculates the delay needed to bring the observed connection creation 
rate to the IP limit.
+   * If the connection would cause an IP quota violation, un-record the 
connection for both IP,
+   * and throw ConnectionThrottledException

Review comment:
       nit: `un-record the connection for both IP, and throw 
ConnectionThrottledException`. It seems that something is missing. Perhaps, you 
meant `for both IP and the listener, and throw ...`? We can also add a dot at 
the end of the sentence.

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1324,7 +1401,59 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
   private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): 
Unit = {
     // if there is a connection waiting on the rate throttle delay, we will 
let it wait the original delay even if
     // the rate limit increases, because it is just one connection per 
listener and the code is simpler that way
-    updateConnectionRateQuota(maxConnectionRate)
+    updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity)
+  }
+
+  /**
+   * Update the connection rate quota for a given IP and updates quota configs 
for updated IPs.
+   * If an IP is given, metric config will be updated only for the given IP, 
otherwise
+   * all metric configs will be checked and updated if required.
+   *
+   * @param ip ip to update or default if None
+   * @param maxConnectionRate new connection rate, or resets entity to default 
if None
+   */
+  def updateIpConnectionRateQuota(ip: Option[String], maxConnectionRate: 
Option[Int]): Unit = {
+    def isIpConnectionRateMetric(metricName: MetricName) = {
+      metricName.name == ConnectionRateMetricName &&
+      metricName.group == MetricsGroup &&
+      metricName.tags.containsKey(IpMetricTag)
+    }
+
+    def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = {
+      quotaLimit != metric.config.quota.bound
+    }
+    counts.synchronized {
+      ip match {
+        case Some(addr) =>
+          val address = InetAddress.getByName(addr)

Review comment:
       What would happen if `addr` can't be resolved by name? For instance, say 
that we have `10.0.0.0.1` in ZK or a string which does not represent an IP. It 
may be worth verifying the IP in the `IpConfigHandler`. If the ip is not valid, 
we could handle it properly.

##########
File path: core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
##########
@@ -193,6 +194,64 @@ class DynamicConfigChangeTest extends 
KafkaServerTestHarness {
     assertEquals(Quota.upperBound(200000),  
quotaManagers.fetch.quota("ANONYMOUS", "overriddenUserClientId"))
   }
 
+  @Test
+  def testIpQuotaInitialization(): Unit = {
+    val server = servers.head
+    val ipOverrideProps = new Properties()
+    ipOverrideProps.put(DynamicConfig.Ip.IpConnectionRateOverrideProp, "10")
+    val ipDefaultProps = new Properties()
+    ipDefaultProps.put(DynamicConfig.Ip.IpConnectionRateOverrideProp, "20")
+    server.shutdown()
+
+    adminZkClient.changeIpConfig(ConfigEntityName.Default, ipDefaultProps)
+    adminZkClient.changeIpConfig("1.2.3.4", ipOverrideProps)
+
+    // Remove config change znodes to force quota initialization only through 
loading of ip quotas
+    zkClient.getChildren(ConfigEntityChangeNotificationZNode.path).foreach { p 
=> zkClient.deletePath(ConfigEntityChangeNotificationZNode.path + "/" + p) }

Review comment:
       nit: Could we break this long line?

##########
File path: core/src/main/scala/kafka/server/ConfigHandler.scala
##########
@@ -185,6 +186,19 @@ class UserConfigHandler(private val quotaManagers: 
QuotaManagers, val credential
   }
 }
 
+class IpConfigHandler(private val connectionQuotas: ConnectionQuotas) extends 
ConfigHandler with Logging {
+
+  def processConfigChanges(ip: String, config: Properties): Unit = {
+    val ipConnectionRateQuota = 
Option(config.getProperty(DynamicConfig.Ip.IpConnectionRateOverrideProp)).map(_.toInt)
+    val updatedIp =
+      if (ip != ConfigEntityName.Default)
+        Some(ip)

Review comment:
       As mentioned in a previous comment, it may be worth validating the IP 
here.

##########
File path: core/src/test/scala/unit/kafka/network/SocketServerTest.scala
##########
@@ -880,6 +872,72 @@ class SocketServerTest {
     }
   }
 
+  @Test
+  def testConnectionRatePerIp(): Unit = {
+    val overrideProps = TestUtils.createBrokerConfig(0, 
TestUtils.MockZkConnect, port = 0)
+    overrideProps.remove(KafkaConfig.MaxConnectionsPerIpProp)
+    overrideProps.put(KafkaConfig.NumQuotaSamplesProp, String.valueOf(2))
+    val connectionRate = 5
+    val overrideServer = new 
SocketServer(KafkaConfig.fromProps(overrideProps), new Metrics(), Time.SYSTEM, 
credentialProvider)
+    overrideServer.connectionQuotas.updateIpConnectionRateQuota(None, 
Some(connectionRate))
+    try {
+      overrideServer.startup()
+      // make the maximum allowable number of connections
+      (0 until connectionRate).map(_ => connect(overrideServer))
+      // now try one more (should get throttled)
+      var conn = connect(overrideServer)

Review comment:
       I wonder if this could be a source of flakiness. If I understand this 
correctly, it works because all the connections are within the same sample 
interval (1s). Am I understanding it right? That's probably true most of the 
time but could be false sometimes, especially on overloaded Jenkins machines. 
Could we mock the time to avoid this?

##########
File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
##########
@@ -477,6 +568,99 @@ class ConnectionQuotasTest {
     assertTrue("Expected BlockedPercentMeter metric for EXTERNAL listener to 
be recorded", blockedPercentMeters("EXTERNAL").count() > 0)
   }
 
+  @Test
+  def testIpConnectionRateUpdate(): Unit = {
+    val config = KafkaConfig.fromProps(brokerPropsWithDefaultConnectionLimits)
+    connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+    connectionQuotas.addListener(config, listeners("EXTERNAL").listenerName)
+    connectionQuotas.addListener(config, listeners("ADMIN").listenerName)
+    connectionQuotas.addListener(config, listeners("REPLICATION").listenerName)
+    val defaultIpRate = 50
+    val defaultOverrideRate = 20
+    val overrideIpRate = 30
+    val externalListener = listeners("EXTERNAL")
+    val adminListener = listeners("ADMIN")
+    // set a non-unlimited default quota so that we create ip rate 
sensors/metrics
+    connectionQuotas.updateIpConnectionRateQuota(None, Some(defaultIpRate))
+    connectionQuotas.inc(externalListener.listenerName, 
externalListener.defaultIp, blockedPercentMeters("EXTERNAL"))
+    connectionQuotas.inc(adminListener.listenerName, adminListener.defaultIp, 
blockedPercentMeters("ADMIN"))
+
+    // both IPs should have the default rate
+    verifyIpConnectionQuota(externalListener.defaultIp, defaultIpRate)
+    verifyIpConnectionQuota(adminListener.defaultIp, defaultIpRate)
+
+    // external listener should have its in-memory quota and metric config 
updated
+    
connectionQuotas.updateIpConnectionRateQuota(Some(externalListener.defaultIp.getHostAddress),
 Some(overrideIpRate))
+    verifyIpConnectionQuota(externalListener.defaultIp, overrideIpRate)
+
+    // update default
+    connectionQuotas.updateIpConnectionRateQuota(None, 
Some(defaultOverrideRate))
+
+    // external listener IP should not have its quota updated to the new 
default
+    verifyIpConnectionQuota(externalListener.defaultIp, overrideIpRate)
+    // admin listener IP should have its quota updated with to the new default
+    verifyIpConnectionQuota(adminListener.defaultIp, defaultOverrideRate)
+
+    // remove default connection rate quota
+    connectionQuotas.updateIpConnectionRateQuota(None, None)
+    verifyIpConnectionQuota(adminListener.defaultIp, 
DynamicConfig.Ip.DefaultConnectionCreationRate)

Review comment:
       Could we verify here that the `externalListener` still has the correct 
quota?

##########
File path: core/src/main/scala/kafka/server/DynamicConfig.scala
##########
@@ -127,6 +127,22 @@ object DynamicConfig {
     def validate(props: Properties) = DynamicConfig.validate(userConfigs, 
props, customPropsAllowed = false)
   }
 
+  object Ip {
+    val IpConnectionRateOverrideProp = "connection_creation_rate"
+    val UnlimitedConnectionCreationRate = Int.MaxValue
+    val DefaultConnectionCreationRate = UnlimitedConnectionCreationRate
+    val IpOverrideDoc = "A long representing the upper bound of connections 
accepted for the specified IP."

Review comment:
       Here we say `long` but it is actually an `INT` that is defined below.

##########
File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
##########
@@ -477,6 +568,99 @@ class ConnectionQuotasTest {
     assertTrue("Expected BlockedPercentMeter metric for EXTERNAL listener to 
be recorded", blockedPercentMeters("EXTERNAL").count() > 0)
   }
 
+  @Test
+  def testIpConnectionRateUpdate(): Unit = {
+    val config = KafkaConfig.fromProps(brokerPropsWithDefaultConnectionLimits)
+    connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+    connectionQuotas.addListener(config, listeners("EXTERNAL").listenerName)
+    connectionQuotas.addListener(config, listeners("ADMIN").listenerName)
+    connectionQuotas.addListener(config, listeners("REPLICATION").listenerName)

Review comment:
       It seems that `REPLICATION` is never used in the test. We may remove it.

##########
File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
##########
@@ -477,6 +568,99 @@ class ConnectionQuotasTest {
     assertTrue("Expected BlockedPercentMeter metric for EXTERNAL listener to 
be recorded", blockedPercentMeters("EXTERNAL").count() > 0)
   }
 
+  @Test
+  def testIpConnectionRateUpdate(): Unit = {
+    val config = KafkaConfig.fromProps(brokerPropsWithDefaultConnectionLimits)
+    connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+    connectionQuotas.addListener(config, listeners("EXTERNAL").listenerName)
+    connectionQuotas.addListener(config, listeners("ADMIN").listenerName)
+    connectionQuotas.addListener(config, listeners("REPLICATION").listenerName)
+    val defaultIpRate = 50
+    val defaultOverrideRate = 20
+    val overrideIpRate = 30
+    val externalListener = listeners("EXTERNAL")
+    val adminListener = listeners("ADMIN")
+    // set a non-unlimited default quota so that we create ip rate 
sensors/metrics
+    connectionQuotas.updateIpConnectionRateQuota(None, Some(defaultIpRate))
+    connectionQuotas.inc(externalListener.listenerName, 
externalListener.defaultIp, blockedPercentMeters("EXTERNAL"))
+    connectionQuotas.inc(adminListener.listenerName, adminListener.defaultIp, 
blockedPercentMeters("ADMIN"))
+
+    // both IPs should have the default rate
+    verifyIpConnectionQuota(externalListener.defaultIp, defaultIpRate)
+    verifyIpConnectionQuota(adminListener.defaultIp, defaultIpRate)
+
+    // external listener should have its in-memory quota and metric config 
updated
+    
connectionQuotas.updateIpConnectionRateQuota(Some(externalListener.defaultIp.getHostAddress),
 Some(overrideIpRate))
+    verifyIpConnectionQuota(externalListener.defaultIp, overrideIpRate)
+
+    // update default
+    connectionQuotas.updateIpConnectionRateQuota(None, 
Some(defaultOverrideRate))
+
+    // external listener IP should not have its quota updated to the new 
default
+    verifyIpConnectionQuota(externalListener.defaultIp, overrideIpRate)
+    // admin listener IP should have its quota updated with to the new default
+    verifyIpConnectionQuota(adminListener.defaultIp, defaultOverrideRate)
+
+    // remove default connection rate quota
+    connectionQuotas.updateIpConnectionRateQuota(None, None)
+    verifyIpConnectionQuota(adminListener.defaultIp, 
DynamicConfig.Ip.DefaultConnectionCreationRate)
+
+    // remove override for external listener IP
+    
connectionQuotas.updateIpConnectionRateQuota(Some(externalListener.defaultIp.getHostAddress),
 None)
+    verifyIpConnectionQuota(externalListener.defaultIp, 
DynamicConfig.Ip.DefaultConnectionCreationRate)
+  }
+
+  @Test
+  def testIpConnectionRateQuotaUpdate(): Unit = {
+    val ipConnectionRateLimit = 20
+    val props = brokerPropsWithDefaultConnectionLimits
+    val config = KafkaConfig.fromProps(props)
+    connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+    addListenersAndVerify(config, connectionQuotas)
+    val externalListener = listeners("EXTERNAL")
+    
connectionQuotas.updateIpConnectionRateQuota(Some(externalListener.defaultIp.getHostAddress),
 Some(ipConnectionRateLimit))
+    // create connections with the rate > ip quota
+    val connectionRate = 40
+    assertThrows[ConnectionThrottledException] {
+      acceptConnections(connectionQuotas, externalListener, connectionRate)
+    }
+    assertEquals(s"Number of connections on $externalListener:",
+      ipConnectionRateLimit, connectionQuotas.get(externalListener.defaultIp))
+
+    // increase ip quota, we should accept connections up to the new quota 
limit
+    val updatedRateLimit = 30
+    
connectionQuotas.updateIpConnectionRateQuota(Some(externalListener.defaultIp.getHostAddress),
 Some(updatedRateLimit))
+    assertThrows[ConnectionThrottledException] {
+      acceptConnections(connectionQuotas, externalListener, connectionRate)
+    }
+    assertEquals(s"Number of connections on $externalListener:",
+      updatedRateLimit, connectionQuotas.get(externalListener.defaultIp))
+
+    // remove IP quota, all connections should get accepted
+    
connectionQuotas.updateIpConnectionRateQuota(Some(externalListener.defaultIp.getHostAddress),
 None)
+    acceptConnections(connectionQuotas, externalListener, connectionRate)
+    assertEquals(s"Number of connections on $externalListener:",
+      connectionRate + updatedRateLimit, 
connectionQuotas.get(externalListener.defaultIp))
+
+    // create connections on a different IP,
+    val adminListener = listeners("ADMIN")
+    acceptConnections(connectionQuotas, adminListener, connectionRate)
+    assertEquals(s"Number of connections on $adminListener:",
+      connectionRate, connectionQuotas.get(adminListener.defaultIp))
+
+    // set a default IP quota, verify that quota gets propagated
+    connectionQuotas.updateIpConnectionRateQuota(None, 
Some(ipConnectionRateLimit))
+    assertThrows[ConnectionThrottledException] {
+      acceptConnections(connectionQuotas, adminListener, connectionRate)
+    }
+    assertEquals(s"Number of connections on $adminListener:",
+      connectionRate + ipConnectionRateLimit, 
connectionQuotas.get(adminListener.defaultIp))
+
+

Review comment:
       nit: An empty line could be removed.

##########
File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
##########
@@ -633,18 +837,27 @@ class ConnectionQuotasTest {
                                 listenerName: ListenerName,
                                 address: InetAddress,
                                 numConnections: Long,
-                                timeIntervalMs: Long) : Unit = {
+                                timeIntervalMs: Long,
+                                expectIpThrottle: Boolean): Boolean = {
     var nextSendTime = System.currentTimeMillis + timeIntervalMs
+    var ipThrottled = false
     for (_ <- 0L until numConnections) {
       // this method may block if broker-wide or listener limit on the number 
of connections is reached
-      connectionQuotas.inc(listenerName, address, 
blockedPercentMeters(listenerName.value))
-
+      try {
+        connectionQuotas.inc(listenerName, address, 
blockedPercentMeters(listenerName.value))
+      } catch {
+        case e: ConnectionThrottledException =>
+          if (!expectIpThrottle)
+            throw e
+          ipThrottled = true

Review comment:
       When we hit this line, should we just stopped trying more connections? 
`ipThrottled` will remain true regardless.

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1324,7 +1401,59 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
   private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): 
Unit = {
     // if there is a connection waiting on the rate throttle delay, we will 
let it wait the original delay even if
     // the rate limit increases, because it is just one connection per 
listener and the code is simpler that way
-    updateConnectionRateQuota(maxConnectionRate)
+    updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity)
+  }
+
+  /**
+   * Update the connection rate quota for a given IP and updates quota configs 
for updated IPs.
+   * If an IP is given, metric config will be updated only for the given IP, 
otherwise
+   * all metric configs will be checked and updated if required.
+   *
+   * @param ip ip to update or default if None
+   * @param maxConnectionRate new connection rate, or resets entity to default 
if None
+   */
+  def updateIpConnectionRateQuota(ip: Option[String], maxConnectionRate: 
Option[Int]): Unit = {
+    def isIpConnectionRateMetric(metricName: MetricName) = {
+      metricName.name == ConnectionRateMetricName &&
+      metricName.group == MetricsGroup &&
+      metricName.tags.containsKey(IpMetricTag)
+    }
+
+    def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = {
+      quotaLimit != metric.config.quota.bound
+    }
+    counts.synchronized {

Review comment:
       I do wonder if synchronizing on `counts` is a good idea here. When a 
single IP is updated, that is not an issue. However, we all the metrics are 
updated, that could take some time depending on the number of metrics. Holding 
the lock in this case means that we can't accept any new connections until the 
update is completed. Am I getting this right?

##########
File path: core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
##########
@@ -193,6 +194,64 @@ class DynamicConfigChangeTest extends 
KafkaServerTestHarness {
     assertEquals(Quota.upperBound(200000),  
quotaManagers.fetch.quota("ANONYMOUS", "overriddenUserClientId"))
   }
 
+  @Test
+  def testIpQuotaInitialization(): Unit = {
+    val server = servers.head
+    val ipOverrideProps = new Properties()
+    ipOverrideProps.put(DynamicConfig.Ip.IpConnectionRateOverrideProp, "10")
+    val ipDefaultProps = new Properties()
+    ipDefaultProps.put(DynamicConfig.Ip.IpConnectionRateOverrideProp, "20")
+    server.shutdown()
+
+    adminZkClient.changeIpConfig(ConfigEntityName.Default, ipDefaultProps)
+    adminZkClient.changeIpConfig("1.2.3.4", ipOverrideProps)
+
+    // Remove config change znodes to force quota initialization only through 
loading of ip quotas
+    zkClient.getChildren(ConfigEntityChangeNotificationZNode.path).foreach { p 
=> zkClient.deletePath(ConfigEntityChangeNotificationZNode.path + "/" + p) }
+    server.startup()
+
+    val connectionQuotas = server.socketServer.connectionQuotas
+    assertEquals(10L, 
connectionQuotas.connectionRateForIp(InetAddress.getByName("1.2.3.4")))
+    assertEquals(20L, 
connectionQuotas.connectionRateForIp(InetAddress.getByName("2.4.6.8")))
+  }
+
+  @Test
+  def testIpQuotaConfigChange(): Unit = {
+    val ipOverrideProps = new Properties()
+    ipOverrideProps.put(DynamicConfig.Ip.IpConnectionRateOverrideProp, "10")
+    val ipDefaultProps = new Properties()
+    ipDefaultProps.put(DynamicConfig.Ip.IpConnectionRateOverrideProp, "20")
+
+    val overriddenIp = InetAddress.getByName("1.2.3.4")
+    val defaultQuotaIp = InetAddress.getByName("2.3.4.5")
+    adminZkClient.changeIpConfig(ConfigEntityName.Default, ipDefaultProps)
+    adminZkClient.changeIpConfig(overriddenIp.getHostAddress, ipOverrideProps)
+
+    val connectionQuotas = servers.head.socketServer.connectionQuotas
+
+    TestUtils.retry(10000) {

Review comment:
       This block is somewhat repeated multiple times in the test. How about 
defining a `verifyConnectionQuota` helper which verifies the quota of a given 
IP?

##########
File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
##########
@@ -477,6 +568,99 @@ class ConnectionQuotasTest {
     assertTrue("Expected BlockedPercentMeter metric for EXTERNAL listener to 
be recorded", blockedPercentMeters("EXTERNAL").count() > 0)
   }
 
+  @Test
+  def testIpConnectionRateUpdate(): Unit = {
+    val config = KafkaConfig.fromProps(brokerPropsWithDefaultConnectionLimits)
+    connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+    connectionQuotas.addListener(config, listeners("EXTERNAL").listenerName)
+    connectionQuotas.addListener(config, listeners("ADMIN").listenerName)
+    connectionQuotas.addListener(config, listeners("REPLICATION").listenerName)
+    val defaultIpRate = 50
+    val defaultOverrideRate = 20
+    val overrideIpRate = 30
+    val externalListener = listeners("EXTERNAL")
+    val adminListener = listeners("ADMIN")
+    // set a non-unlimited default quota so that we create ip rate 
sensors/metrics
+    connectionQuotas.updateIpConnectionRateQuota(None, Some(defaultIpRate))
+    connectionQuotas.inc(externalListener.listenerName, 
externalListener.defaultIp, blockedPercentMeters("EXTERNAL"))
+    connectionQuotas.inc(adminListener.listenerName, adminListener.defaultIp, 
blockedPercentMeters("ADMIN"))
+
+    // both IPs should have the default rate
+    verifyIpConnectionQuota(externalListener.defaultIp, defaultIpRate)
+    verifyIpConnectionQuota(adminListener.defaultIp, defaultIpRate)
+
+    // external listener should have its in-memory quota and metric config 
updated
+    
connectionQuotas.updateIpConnectionRateQuota(Some(externalListener.defaultIp.getHostAddress),
 Some(overrideIpRate))
+    verifyIpConnectionQuota(externalListener.defaultIp, overrideIpRate)
+
+    // update default
+    connectionQuotas.updateIpConnectionRateQuota(None, 
Some(defaultOverrideRate))
+
+    // external listener IP should not have its quota updated to the new 
default
+    verifyIpConnectionQuota(externalListener.defaultIp, overrideIpRate)
+    // admin listener IP should have its quota updated with to the new default
+    verifyIpConnectionQuota(adminListener.defaultIp, defaultOverrideRate)
+
+    // remove default connection rate quota
+    connectionQuotas.updateIpConnectionRateQuota(None, None)
+    verifyIpConnectionQuota(adminListener.defaultIp, 
DynamicConfig.Ip.DefaultConnectionCreationRate)
+
+    // remove override for external listener IP
+    
connectionQuotas.updateIpConnectionRateQuota(Some(externalListener.defaultIp.getHostAddress),
 None)
+    verifyIpConnectionQuota(externalListener.defaultIp, 
DynamicConfig.Ip.DefaultConnectionCreationRate)
+  }
+
+  @Test
+  def testIpConnectionRateQuotaUpdate(): Unit = {

Review comment:
       We may find better names for this test and/or the previous one. What is 
the difference between `testIpConnectionRateUpdate` and 
`testIpConnectionRateQuotaUpdate`?

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1453,6 +1582,47 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
     }
   }
 
+  /**
+   * To avoid over-recording listener/broker connection rate, we unrecord a 
listener or broker connection
+   * if the IP gets throttled later.
+   *
+   * @param listenerName listener to unrecord connection
+   * @param timeMs current time in milliseconds
+   */
+  private def unrecordListenerConnection(listenerName: ListenerName, timeMs: 
Long): Unit = {
+    if (!protectedListener(listenerName)) {
+      brokerConnectionRateSensor.record(-1.0, timeMs, false)
+    }
+    maxConnectionsPerListener
+      .get(listenerName)
+      .foreach(_.connectionRateSensor.record(-1.0, timeMs, false))
+  }
+
+  /**
+   * Calculates the delay needed to bring the observed connection creation 
rate to the IP limit.
+   * If the connection would cause an IP quota violation, un-record the 
connection for both IP,
+   * and throw ConnectionThrottledException
+   *
+   * @param listenerName listener to unrecord connection if throttled
+   * @param address ip address to record connection
+   */
+  private def recordIpConnectionMaybeThrottle(listenerName: ListenerName, 
address: InetAddress): Unit = {
+    val connectionRateQuota = connectionRateForIp(address)
+    val quotaEnabled = connectionRateQuota != 
DynamicConfig.Ip.UnlimitedConnectionCreationRate
+    if (quotaEnabled) {
+      val sensor = getOrCreateConnectionRateQuotaSensor(connectionRateQuota, 
IpQuotaEntity(address))
+      val timeMs = time.milliseconds
+      val throttleMs = recordAndGetThrottleTimeMs(sensor, timeMs)
+      if (throttleMs > 0) {
+        trace(s"Throttling $address for $throttleMs ms")
+        // unrecord the connection since we won't accept the connection
+        sensor.record(-1.0, timeMs, false)
+        unrecordListenerConnection(listenerName, timeMs)

Review comment:
       It seems that we don't have any unit tests which exercise this 
un-recording logic. I wonder if we could add a unit test that verifies that the 
listener rate does not change when a connection is rejected due to violating 
the per-ip quota or something along these lines.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to