dajac commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r521346721
########## File path: core/src/main/scala/kafka/network/SocketServer.scala ########## @@ -538,14 +542,20 @@ private[kafka] class Acceptor(val endPoint: EndPoint, val recvBufferSize: Int, brokerId: Int, connectionQuotas: ConnectionQuotas, - metricPrefix: String) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup { + metricPrefix: String, + time: Time) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup { private val nioSelector = NSelector.open() val serverChannel = openServerSocket(endPoint.host, endPoint.port) private val processors = new ArrayBuffer[Processor]() private val processorsStarted = new AtomicBoolean private val blockedPercentMeter = newMeter(s"${metricPrefix}AcceptorBlockedPercent", "blocked time", TimeUnit.NANOSECONDS, Map(ListenerMetricTag -> endPoint.listenerName.value)) + private var currentProcessorIndex = 0 + private[network] case class DelayedCloseSocket(socket: SocketChannel, endThrottleTimeMs: Long) extends Ordered[DelayedCloseSocket] { + override def compare(that: DelayedCloseSocket): Int = endThrottleTimeMs compare that.endThrottleTimeMs + } + private[network] val throttledSockets = new mutable.PriorityQueue[DelayedCloseSocket]() Review comment: Out of curiosity, have we considered using a `DelayQueue` instead of `PriorityQueue`? I was wondering if that could reduce the logic on our side. I am not sure if it is worth doing it though. ########## File path: core/src/main/scala/kafka/network/SocketServer.scala ########## @@ -538,14 +542,20 @@ private[kafka] class Acceptor(val endPoint: EndPoint, val recvBufferSize: Int, brokerId: Int, connectionQuotas: ConnectionQuotas, - metricPrefix: String) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup { + metricPrefix: String, + time: Time) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup { private val nioSelector = NSelector.open() val serverChannel = openServerSocket(endPoint.host, endPoint.port) private val processors = new ArrayBuffer[Processor]() private val processorsStarted = new AtomicBoolean private val blockedPercentMeter = newMeter(s"${metricPrefix}AcceptorBlockedPercent", "blocked time", TimeUnit.NANOSECONDS, Map(ListenerMetricTag -> endPoint.listenerName.value)) + private var currentProcessorIndex = 0 + private[network] case class DelayedCloseSocket(socket: SocketChannel, endThrottleTimeMs: Long) extends Ordered[DelayedCloseSocket] { Review comment: nit: It is a bit weird to have this case class define in the middle of the block of variables. Would it make sense to move it to another place? May be after the block of variables? ########## File path: core/src/main/scala/kafka/network/SocketServer.scala ########## @@ -647,9 +624,10 @@ private[kafka] class Acceptor(val endPoint: EndPoint, } } } finally { - debug("Closing server socket and selector.") + debug("Closing server socket, selector, and any throttled sockets.") CoreUtils.swallow(serverChannel.close(), this, Level.ERROR) CoreUtils.swallow(nioSelector.close(), this, Level.ERROR) + throttledSockets.foreach(throttledSocket => closeSocket(throttledSocket.socket)) Review comment: nit: We may want to clear `throttledSockets` for completeness. ########## File path: core/src/main/scala/kafka/network/SocketServer.scala ########## @@ -1453,6 +1582,47 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend } } + /** + * To avoid over-recording listener/broker connection rate, we unrecord a listener or broker connection + * if the IP gets throttled later. + * + * @param listenerName listener to unrecord connection + * @param timeMs current time in milliseconds + */ + private def unrecordListenerConnection(listenerName: ListenerName, timeMs: Long): Unit = { + if (!protectedListener(listenerName)) { + brokerConnectionRateSensor.record(-1.0, timeMs, false) + } + maxConnectionsPerListener + .get(listenerName) + .foreach(_.connectionRateSensor.record(-1.0, timeMs, false)) + } + + /** + * Calculates the delay needed to bring the observed connection creation rate to the IP limit. + * If the connection would cause an IP quota violation, un-record the connection for both IP, + * and throw ConnectionThrottledException Review comment: nit: `un-record the connection for both IP, and throw ConnectionThrottledException`. It seems that something is missing. Perhaps, you meant `for both IP and the listener, and throw ...`? We can also add a dot at the end of the sentence. ########## File path: core/src/main/scala/kafka/network/SocketServer.scala ########## @@ -1324,7 +1401,59 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): Unit = { // if there is a connection waiting on the rate throttle delay, we will let it wait the original delay even if // the rate limit increases, because it is just one connection per listener and the code is simpler that way - updateConnectionRateQuota(maxConnectionRate) + updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity) + } + + /** + * Update the connection rate quota for a given IP and updates quota configs for updated IPs. + * If an IP is given, metric config will be updated only for the given IP, otherwise + * all metric configs will be checked and updated if required. + * + * @param ip ip to update or default if None + * @param maxConnectionRate new connection rate, or resets entity to default if None + */ + def updateIpConnectionRateQuota(ip: Option[String], maxConnectionRate: Option[Int]): Unit = { + def isIpConnectionRateMetric(metricName: MetricName) = { + metricName.name == ConnectionRateMetricName && + metricName.group == MetricsGroup && + metricName.tags.containsKey(IpMetricTag) + } + + def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = { + quotaLimit != metric.config.quota.bound + } + counts.synchronized { + ip match { + case Some(addr) => + val address = InetAddress.getByName(addr) Review comment: What would happen if `addr` can't be resolved by name? For instance, say that we have `10.0.0.0.1` in ZK or a string which does not represent an IP. It may be worth verifying the IP in the `IpConfigHandler`. If the ip is not valid, we could handle it properly. ########## File path: core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala ########## @@ -193,6 +194,64 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness { assertEquals(Quota.upperBound(200000), quotaManagers.fetch.quota("ANONYMOUS", "overriddenUserClientId")) } + @Test + def testIpQuotaInitialization(): Unit = { + val server = servers.head + val ipOverrideProps = new Properties() + ipOverrideProps.put(DynamicConfig.Ip.IpConnectionRateOverrideProp, "10") + val ipDefaultProps = new Properties() + ipDefaultProps.put(DynamicConfig.Ip.IpConnectionRateOverrideProp, "20") + server.shutdown() + + adminZkClient.changeIpConfig(ConfigEntityName.Default, ipDefaultProps) + adminZkClient.changeIpConfig("1.2.3.4", ipOverrideProps) + + // Remove config change znodes to force quota initialization only through loading of ip quotas + zkClient.getChildren(ConfigEntityChangeNotificationZNode.path).foreach { p => zkClient.deletePath(ConfigEntityChangeNotificationZNode.path + "/" + p) } Review comment: nit: Could we break this long line? ########## File path: core/src/main/scala/kafka/server/ConfigHandler.scala ########## @@ -185,6 +186,19 @@ class UserConfigHandler(private val quotaManagers: QuotaManagers, val credential } } +class IpConfigHandler(private val connectionQuotas: ConnectionQuotas) extends ConfigHandler with Logging { + + def processConfigChanges(ip: String, config: Properties): Unit = { + val ipConnectionRateQuota = Option(config.getProperty(DynamicConfig.Ip.IpConnectionRateOverrideProp)).map(_.toInt) + val updatedIp = + if (ip != ConfigEntityName.Default) + Some(ip) Review comment: As mentioned in a previous comment, it may be worth validating the IP here. ########## File path: core/src/test/scala/unit/kafka/network/SocketServerTest.scala ########## @@ -880,6 +872,72 @@ class SocketServerTest { } } + @Test + def testConnectionRatePerIp(): Unit = { + val overrideProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 0) + overrideProps.remove(KafkaConfig.MaxConnectionsPerIpProp) + overrideProps.put(KafkaConfig.NumQuotaSamplesProp, String.valueOf(2)) + val connectionRate = 5 + val overrideServer = new SocketServer(KafkaConfig.fromProps(overrideProps), new Metrics(), Time.SYSTEM, credentialProvider) + overrideServer.connectionQuotas.updateIpConnectionRateQuota(None, Some(connectionRate)) + try { + overrideServer.startup() + // make the maximum allowable number of connections + (0 until connectionRate).map(_ => connect(overrideServer)) + // now try one more (should get throttled) + var conn = connect(overrideServer) Review comment: I wonder if this could be a source of flakiness. If I understand this correctly, it works because all the connections are within the same sample interval (1s). Am I understanding it right? That's probably true most of the time but could be false sometimes, especially on overloaded Jenkins machines. Could we mock the time to avoid this? ########## File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala ########## @@ -477,6 +568,99 @@ class ConnectionQuotasTest { assertTrue("Expected BlockedPercentMeter metric for EXTERNAL listener to be recorded", blockedPercentMeters("EXTERNAL").count() > 0) } + @Test + def testIpConnectionRateUpdate(): Unit = { + val config = KafkaConfig.fromProps(brokerPropsWithDefaultConnectionLimits) + connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics) + connectionQuotas.addListener(config, listeners("EXTERNAL").listenerName) + connectionQuotas.addListener(config, listeners("ADMIN").listenerName) + connectionQuotas.addListener(config, listeners("REPLICATION").listenerName) + val defaultIpRate = 50 + val defaultOverrideRate = 20 + val overrideIpRate = 30 + val externalListener = listeners("EXTERNAL") + val adminListener = listeners("ADMIN") + // set a non-unlimited default quota so that we create ip rate sensors/metrics + connectionQuotas.updateIpConnectionRateQuota(None, Some(defaultIpRate)) + connectionQuotas.inc(externalListener.listenerName, externalListener.defaultIp, blockedPercentMeters("EXTERNAL")) + connectionQuotas.inc(adminListener.listenerName, adminListener.defaultIp, blockedPercentMeters("ADMIN")) + + // both IPs should have the default rate + verifyIpConnectionQuota(externalListener.defaultIp, defaultIpRate) + verifyIpConnectionQuota(adminListener.defaultIp, defaultIpRate) + + // external listener should have its in-memory quota and metric config updated + connectionQuotas.updateIpConnectionRateQuota(Some(externalListener.defaultIp.getHostAddress), Some(overrideIpRate)) + verifyIpConnectionQuota(externalListener.defaultIp, overrideIpRate) + + // update default + connectionQuotas.updateIpConnectionRateQuota(None, Some(defaultOverrideRate)) + + // external listener IP should not have its quota updated to the new default + verifyIpConnectionQuota(externalListener.defaultIp, overrideIpRate) + // admin listener IP should have its quota updated with to the new default + verifyIpConnectionQuota(adminListener.defaultIp, defaultOverrideRate) + + // remove default connection rate quota + connectionQuotas.updateIpConnectionRateQuota(None, None) + verifyIpConnectionQuota(adminListener.defaultIp, DynamicConfig.Ip.DefaultConnectionCreationRate) Review comment: Could we verify here that the `externalListener` still has the correct quota? ########## File path: core/src/main/scala/kafka/server/DynamicConfig.scala ########## @@ -127,6 +127,22 @@ object DynamicConfig { def validate(props: Properties) = DynamicConfig.validate(userConfigs, props, customPropsAllowed = false) } + object Ip { + val IpConnectionRateOverrideProp = "connection_creation_rate" + val UnlimitedConnectionCreationRate = Int.MaxValue + val DefaultConnectionCreationRate = UnlimitedConnectionCreationRate + val IpOverrideDoc = "A long representing the upper bound of connections accepted for the specified IP." Review comment: Here we say `long` but it is actually an `INT` that is defined below. ########## File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala ########## @@ -477,6 +568,99 @@ class ConnectionQuotasTest { assertTrue("Expected BlockedPercentMeter metric for EXTERNAL listener to be recorded", blockedPercentMeters("EXTERNAL").count() > 0) } + @Test + def testIpConnectionRateUpdate(): Unit = { + val config = KafkaConfig.fromProps(brokerPropsWithDefaultConnectionLimits) + connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics) + connectionQuotas.addListener(config, listeners("EXTERNAL").listenerName) + connectionQuotas.addListener(config, listeners("ADMIN").listenerName) + connectionQuotas.addListener(config, listeners("REPLICATION").listenerName) Review comment: It seems that `REPLICATION` is never used in the test. We may remove it. ########## File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala ########## @@ -477,6 +568,99 @@ class ConnectionQuotasTest { assertTrue("Expected BlockedPercentMeter metric for EXTERNAL listener to be recorded", blockedPercentMeters("EXTERNAL").count() > 0) } + @Test + def testIpConnectionRateUpdate(): Unit = { + val config = KafkaConfig.fromProps(brokerPropsWithDefaultConnectionLimits) + connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics) + connectionQuotas.addListener(config, listeners("EXTERNAL").listenerName) + connectionQuotas.addListener(config, listeners("ADMIN").listenerName) + connectionQuotas.addListener(config, listeners("REPLICATION").listenerName) + val defaultIpRate = 50 + val defaultOverrideRate = 20 + val overrideIpRate = 30 + val externalListener = listeners("EXTERNAL") + val adminListener = listeners("ADMIN") + // set a non-unlimited default quota so that we create ip rate sensors/metrics + connectionQuotas.updateIpConnectionRateQuota(None, Some(defaultIpRate)) + connectionQuotas.inc(externalListener.listenerName, externalListener.defaultIp, blockedPercentMeters("EXTERNAL")) + connectionQuotas.inc(adminListener.listenerName, adminListener.defaultIp, blockedPercentMeters("ADMIN")) + + // both IPs should have the default rate + verifyIpConnectionQuota(externalListener.defaultIp, defaultIpRate) + verifyIpConnectionQuota(adminListener.defaultIp, defaultIpRate) + + // external listener should have its in-memory quota and metric config updated + connectionQuotas.updateIpConnectionRateQuota(Some(externalListener.defaultIp.getHostAddress), Some(overrideIpRate)) + verifyIpConnectionQuota(externalListener.defaultIp, overrideIpRate) + + // update default + connectionQuotas.updateIpConnectionRateQuota(None, Some(defaultOverrideRate)) + + // external listener IP should not have its quota updated to the new default + verifyIpConnectionQuota(externalListener.defaultIp, overrideIpRate) + // admin listener IP should have its quota updated with to the new default + verifyIpConnectionQuota(adminListener.defaultIp, defaultOverrideRate) + + // remove default connection rate quota + connectionQuotas.updateIpConnectionRateQuota(None, None) + verifyIpConnectionQuota(adminListener.defaultIp, DynamicConfig.Ip.DefaultConnectionCreationRate) + + // remove override for external listener IP + connectionQuotas.updateIpConnectionRateQuota(Some(externalListener.defaultIp.getHostAddress), None) + verifyIpConnectionQuota(externalListener.defaultIp, DynamicConfig.Ip.DefaultConnectionCreationRate) + } + + @Test + def testIpConnectionRateQuotaUpdate(): Unit = { + val ipConnectionRateLimit = 20 + val props = brokerPropsWithDefaultConnectionLimits + val config = KafkaConfig.fromProps(props) + connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics) + addListenersAndVerify(config, connectionQuotas) + val externalListener = listeners("EXTERNAL") + connectionQuotas.updateIpConnectionRateQuota(Some(externalListener.defaultIp.getHostAddress), Some(ipConnectionRateLimit)) + // create connections with the rate > ip quota + val connectionRate = 40 + assertThrows[ConnectionThrottledException] { + acceptConnections(connectionQuotas, externalListener, connectionRate) + } + assertEquals(s"Number of connections on $externalListener:", + ipConnectionRateLimit, connectionQuotas.get(externalListener.defaultIp)) + + // increase ip quota, we should accept connections up to the new quota limit + val updatedRateLimit = 30 + connectionQuotas.updateIpConnectionRateQuota(Some(externalListener.defaultIp.getHostAddress), Some(updatedRateLimit)) + assertThrows[ConnectionThrottledException] { + acceptConnections(connectionQuotas, externalListener, connectionRate) + } + assertEquals(s"Number of connections on $externalListener:", + updatedRateLimit, connectionQuotas.get(externalListener.defaultIp)) + + // remove IP quota, all connections should get accepted + connectionQuotas.updateIpConnectionRateQuota(Some(externalListener.defaultIp.getHostAddress), None) + acceptConnections(connectionQuotas, externalListener, connectionRate) + assertEquals(s"Number of connections on $externalListener:", + connectionRate + updatedRateLimit, connectionQuotas.get(externalListener.defaultIp)) + + // create connections on a different IP, + val adminListener = listeners("ADMIN") + acceptConnections(connectionQuotas, adminListener, connectionRate) + assertEquals(s"Number of connections on $adminListener:", + connectionRate, connectionQuotas.get(adminListener.defaultIp)) + + // set a default IP quota, verify that quota gets propagated + connectionQuotas.updateIpConnectionRateQuota(None, Some(ipConnectionRateLimit)) + assertThrows[ConnectionThrottledException] { + acceptConnections(connectionQuotas, adminListener, connectionRate) + } + assertEquals(s"Number of connections on $adminListener:", + connectionRate + ipConnectionRateLimit, connectionQuotas.get(adminListener.defaultIp)) + + Review comment: nit: An empty line could be removed. ########## File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala ########## @@ -633,18 +837,27 @@ class ConnectionQuotasTest { listenerName: ListenerName, address: InetAddress, numConnections: Long, - timeIntervalMs: Long) : Unit = { + timeIntervalMs: Long, + expectIpThrottle: Boolean): Boolean = { var nextSendTime = System.currentTimeMillis + timeIntervalMs + var ipThrottled = false for (_ <- 0L until numConnections) { // this method may block if broker-wide or listener limit on the number of connections is reached - connectionQuotas.inc(listenerName, address, blockedPercentMeters(listenerName.value)) - + try { + connectionQuotas.inc(listenerName, address, blockedPercentMeters(listenerName.value)) + } catch { + case e: ConnectionThrottledException => + if (!expectIpThrottle) + throw e + ipThrottled = true Review comment: When we hit this line, should we just stopped trying more connections? `ipThrottled` will remain true regardless. ########## File path: core/src/main/scala/kafka/network/SocketServer.scala ########## @@ -1324,7 +1401,59 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): Unit = { // if there is a connection waiting on the rate throttle delay, we will let it wait the original delay even if // the rate limit increases, because it is just one connection per listener and the code is simpler that way - updateConnectionRateQuota(maxConnectionRate) + updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity) + } + + /** + * Update the connection rate quota for a given IP and updates quota configs for updated IPs. + * If an IP is given, metric config will be updated only for the given IP, otherwise + * all metric configs will be checked and updated if required. + * + * @param ip ip to update or default if None + * @param maxConnectionRate new connection rate, or resets entity to default if None + */ + def updateIpConnectionRateQuota(ip: Option[String], maxConnectionRate: Option[Int]): Unit = { + def isIpConnectionRateMetric(metricName: MetricName) = { + metricName.name == ConnectionRateMetricName && + metricName.group == MetricsGroup && + metricName.tags.containsKey(IpMetricTag) + } + + def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = { + quotaLimit != metric.config.quota.bound + } + counts.synchronized { Review comment: I do wonder if synchronizing on `counts` is a good idea here. When a single IP is updated, that is not an issue. However, we all the metrics are updated, that could take some time depending on the number of metrics. Holding the lock in this case means that we can't accept any new connections until the update is completed. Am I getting this right? ########## File path: core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala ########## @@ -193,6 +194,64 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness { assertEquals(Quota.upperBound(200000), quotaManagers.fetch.quota("ANONYMOUS", "overriddenUserClientId")) } + @Test + def testIpQuotaInitialization(): Unit = { + val server = servers.head + val ipOverrideProps = new Properties() + ipOverrideProps.put(DynamicConfig.Ip.IpConnectionRateOverrideProp, "10") + val ipDefaultProps = new Properties() + ipDefaultProps.put(DynamicConfig.Ip.IpConnectionRateOverrideProp, "20") + server.shutdown() + + adminZkClient.changeIpConfig(ConfigEntityName.Default, ipDefaultProps) + adminZkClient.changeIpConfig("1.2.3.4", ipOverrideProps) + + // Remove config change znodes to force quota initialization only through loading of ip quotas + zkClient.getChildren(ConfigEntityChangeNotificationZNode.path).foreach { p => zkClient.deletePath(ConfigEntityChangeNotificationZNode.path + "/" + p) } + server.startup() + + val connectionQuotas = server.socketServer.connectionQuotas + assertEquals(10L, connectionQuotas.connectionRateForIp(InetAddress.getByName("1.2.3.4"))) + assertEquals(20L, connectionQuotas.connectionRateForIp(InetAddress.getByName("2.4.6.8"))) + } + + @Test + def testIpQuotaConfigChange(): Unit = { + val ipOverrideProps = new Properties() + ipOverrideProps.put(DynamicConfig.Ip.IpConnectionRateOverrideProp, "10") + val ipDefaultProps = new Properties() + ipDefaultProps.put(DynamicConfig.Ip.IpConnectionRateOverrideProp, "20") + + val overriddenIp = InetAddress.getByName("1.2.3.4") + val defaultQuotaIp = InetAddress.getByName("2.3.4.5") + adminZkClient.changeIpConfig(ConfigEntityName.Default, ipDefaultProps) + adminZkClient.changeIpConfig(overriddenIp.getHostAddress, ipOverrideProps) + + val connectionQuotas = servers.head.socketServer.connectionQuotas + + TestUtils.retry(10000) { Review comment: This block is somewhat repeated multiple times in the test. How about defining a `verifyConnectionQuota` helper which verifies the quota of a given IP? ########## File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala ########## @@ -477,6 +568,99 @@ class ConnectionQuotasTest { assertTrue("Expected BlockedPercentMeter metric for EXTERNAL listener to be recorded", blockedPercentMeters("EXTERNAL").count() > 0) } + @Test + def testIpConnectionRateUpdate(): Unit = { + val config = KafkaConfig.fromProps(brokerPropsWithDefaultConnectionLimits) + connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics) + connectionQuotas.addListener(config, listeners("EXTERNAL").listenerName) + connectionQuotas.addListener(config, listeners("ADMIN").listenerName) + connectionQuotas.addListener(config, listeners("REPLICATION").listenerName) + val defaultIpRate = 50 + val defaultOverrideRate = 20 + val overrideIpRate = 30 + val externalListener = listeners("EXTERNAL") + val adminListener = listeners("ADMIN") + // set a non-unlimited default quota so that we create ip rate sensors/metrics + connectionQuotas.updateIpConnectionRateQuota(None, Some(defaultIpRate)) + connectionQuotas.inc(externalListener.listenerName, externalListener.defaultIp, blockedPercentMeters("EXTERNAL")) + connectionQuotas.inc(adminListener.listenerName, adminListener.defaultIp, blockedPercentMeters("ADMIN")) + + // both IPs should have the default rate + verifyIpConnectionQuota(externalListener.defaultIp, defaultIpRate) + verifyIpConnectionQuota(adminListener.defaultIp, defaultIpRate) + + // external listener should have its in-memory quota and metric config updated + connectionQuotas.updateIpConnectionRateQuota(Some(externalListener.defaultIp.getHostAddress), Some(overrideIpRate)) + verifyIpConnectionQuota(externalListener.defaultIp, overrideIpRate) + + // update default + connectionQuotas.updateIpConnectionRateQuota(None, Some(defaultOverrideRate)) + + // external listener IP should not have its quota updated to the new default + verifyIpConnectionQuota(externalListener.defaultIp, overrideIpRate) + // admin listener IP should have its quota updated with to the new default + verifyIpConnectionQuota(adminListener.defaultIp, defaultOverrideRate) + + // remove default connection rate quota + connectionQuotas.updateIpConnectionRateQuota(None, None) + verifyIpConnectionQuota(adminListener.defaultIp, DynamicConfig.Ip.DefaultConnectionCreationRate) + + // remove override for external listener IP + connectionQuotas.updateIpConnectionRateQuota(Some(externalListener.defaultIp.getHostAddress), None) + verifyIpConnectionQuota(externalListener.defaultIp, DynamicConfig.Ip.DefaultConnectionCreationRate) + } + + @Test + def testIpConnectionRateQuotaUpdate(): Unit = { Review comment: We may find better names for this test and/or the previous one. What is the difference between `testIpConnectionRateUpdate` and `testIpConnectionRateQuotaUpdate`? ########## File path: core/src/main/scala/kafka/network/SocketServer.scala ########## @@ -1453,6 +1582,47 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend } } + /** + * To avoid over-recording listener/broker connection rate, we unrecord a listener or broker connection + * if the IP gets throttled later. + * + * @param listenerName listener to unrecord connection + * @param timeMs current time in milliseconds + */ + private def unrecordListenerConnection(listenerName: ListenerName, timeMs: Long): Unit = { + if (!protectedListener(listenerName)) { + brokerConnectionRateSensor.record(-1.0, timeMs, false) + } + maxConnectionsPerListener + .get(listenerName) + .foreach(_.connectionRateSensor.record(-1.0, timeMs, false)) + } + + /** + * Calculates the delay needed to bring the observed connection creation rate to the IP limit. + * If the connection would cause an IP quota violation, un-record the connection for both IP, + * and throw ConnectionThrottledException + * + * @param listenerName listener to unrecord connection if throttled + * @param address ip address to record connection + */ + private def recordIpConnectionMaybeThrottle(listenerName: ListenerName, address: InetAddress): Unit = { + val connectionRateQuota = connectionRateForIp(address) + val quotaEnabled = connectionRateQuota != DynamicConfig.Ip.UnlimitedConnectionCreationRate + if (quotaEnabled) { + val sensor = getOrCreateConnectionRateQuotaSensor(connectionRateQuota, IpQuotaEntity(address)) + val timeMs = time.milliseconds + val throttleMs = recordAndGetThrottleTimeMs(sensor, timeMs) + if (throttleMs > 0) { + trace(s"Throttling $address for $throttleMs ms") + // unrecord the connection since we won't accept the connection + sensor.record(-1.0, timeMs, false) + unrecordListenerConnection(listenerName, timeMs) Review comment: It seems that we don't have any unit tests which exercise this un-recording logic. I wonder if we could add a unit test that verifies that the listener rate does not change when a connection is rejected due to violating the per-ip quota or something along these lines. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org