[GitHub] [kafka] apovzner commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)
apovzner commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r526361289 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -1324,7 +1401,59 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): Unit = { // if there is a connection waiting on the rate throttle delay, we will let it wait the original delay even if // the rate limit increases, because it is just one connection per listener and the code is simpler that way -updateConnectionRateQuota(maxConnectionRate) +updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity) + } + + /** + * Update the connection rate quota for a given IP and updates quota configs for updated IPs. + * If an IP is given, metric config will be updated only for the given IP, otherwise + * all metric configs will be checked and updated if required. + * + * @param ip ip to update or default if None + * @param maxConnectionRate new connection rate, or resets entity to default if None + */ + def updateIpConnectionRateQuota(ip: Option[String], maxConnectionRate: Option[Int]): Unit = { +def isIpConnectionRateMetric(metricName: MetricName) = { + metricName.name == ConnectionRateMetricName && + metricName.group == MetricsGroup && + metricName.tags.containsKey(IpMetricTag) +} + +def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = { + quotaLimit != metric.config.quota.bound +} +counts.synchronized { Review comment: I reviewed the commit with the locking change, and it looks correct to me. Thanks for writing up a thorough evaluation of the new logic. I think using `counts` as a lock is reasonable here, because we need to synchronize a lot of the same data that is used on the `inc()` path (protected by `counts`). I agree about adding clear comments. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] apovzner commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)
apovzner commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r512951535 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -1246,7 +1337,57 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): Unit = { // if there is a connection waiting on the rate throttle delay, we will let it wait the original delay even if // the rate limit increases, because it is just one connection per listener and the code is simpler that way -updateConnectionRateQuota(maxConnectionRate) +updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity) + } + + /** + * Update the connection rate quota for a given IP and updates quota configs for updated IPs. + * If an IP is given, metric config will be updated only for the given IP, otherwise + * all metric configs will be checked and updated if required + * + * @param ip ip to update or default if None + * @param maxConnectionRate new connection rate, or resets entity to default if None + */ + def updateIpConnectionRate(ip: Option[String], maxConnectionRate: Option[Int]): Unit = { +def isIpConnectionRateMetric(metricName: MetricName) = { + metricName.name == ConnectionRateMetricName && + metricName.group == MetricsGroup && + metricName.tags.containsKey(IpMetricTag) +} + +def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = { + quotaLimit != metric.config.quota.bound +} + +ip match { + case Some(addr) => +val address = InetAddress.getByName(addr) +maxConnectionRate match { + case Some(rate) => +info(s"Updating max connection rate override for $address to $rate") +connectionRatePerIp.put(address, rate) + case None => +info(s"Removing max connection rate override for $address") +connectionRatePerIp.remove(address) +} +updateConnectionRateQuota(connectionRateForIp(address), IpQuotaEntity(address)) + case None => +val newQuota = maxConnectionRate.getOrElse(DynamicConfig.Ip.DefaultConnectionCreationRate) +info(s"Updating default max IP connection rate to $newQuota") +defaultConnectionRatePerIp = newQuota +val allMetrics = metrics.metrics +allMetrics.forEach { (metricName, metric) => + if (isIpConnectionRateMetric(metricName) && shouldUpdateQuota(metric, newQuota)) { +info(s"Updating existing connection rate sensor for ${metricName.tags} to $newQuota") +metric.config(rateQuotaMetricConfig(newQuota)) + } Review comment: @splett2 Maybe I was not looking at the right place in this PR, but does this PR handles the case if someone sets non-unlimited per IP default quota? Basically, /config/ips/ znode. Because if default is set and it is not unlimited, removing quota for an IP should fall back to configured IP default, and if IP default is not set, then fall back to `DynamicConfig.Ip.DefaultConnectionCreationRate`. Which I think means that we need to have "" ip in the `connectionRatePerIp` if default is not unlimited, and use it when creating per-IP sensor with the right quota. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] apovzner commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)
apovzner commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r512928815 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -1207,14 +1286,26 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend private val listenerCounts = mutable.Map[ListenerName, Int]() private[network] val maxConnectionsPerListener = mutable.Map[ListenerName, ListenerConnectionQuota]() @volatile private var totalCount = 0 - + @volatile private var defaultConnectionRatePerIp = DynamicConfig.Ip.DefaultConnectionCreationRate + private val connectionRatePerIp = new ConcurrentHashMap[InetAddress, Int]() + private val lock = new ReentrantReadWriteLock() + private val sensorAccessor = new SensorAccess(lock, metrics) // sensor that tracks broker-wide connection creation rate and limit (quota) - private val brokerConnectionRateSensor = createConnectionRateQuotaSensor(config.maxConnectionCreationRate) + private val brokerConnectionRateSensor = getOrCreateConnectionRateQuotaSensor(config.maxConnectionCreationRate, BrokerQuotaEntity) private val maxThrottleTimeMs = TimeUnit.SECONDS.toMillis(config.quotaWindowSizeSeconds.toLong) def inc(listenerName: ListenerName, address: InetAddress, acceptorBlockedPercentMeter: com.yammer.metrics.core.Meter): Unit = { counts.synchronized { - waitForConnectionSlot(listenerName, acceptorBlockedPercentMeter) + val startThrottleTimeMs = time.milliseconds + + waitForConnectionSlot(listenerName, startThrottleTimeMs, acceptorBlockedPercentMeter) + + val ipThrottleTimeMs = recordIpConnectionMaybeThrottle(address, startThrottleTimeMs) Review comment: There are some corner cases here where `startThrottleTimeMs` could be in the past if `waitForConnectionSlot()` waited for an active connection slot to become available (if we exceeded the limit for the number of active connections), or waited for broker-wide or listener-wide rate to get back to quota. In some cases, ipThrottleTimeMs would be zero if we checked against the current time here. Since getting System.currentTimeMillis() is not that expensive (as it used to be), I think it would be better to revert to `waitForConnectionSlot` getting its own time (as before this PR), and then `recordIpConnectionMaybeThrottle` getting current time and also calling the code block below and throwing ConnectionThrottledException. And then adding a comment here that `recordIpConnectionMaybeThrottle` would throw an exception if per-IP quota is exceeded. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] apovzner commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)
apovzner commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r512888360 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -526,10 +527,14 @@ private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQ if (channel != null) { debug(s"Closing connection from ${channel.socket.getRemoteSocketAddress()}") connectionQuotas.dec(listenerName, channel.socket.getInetAddress) - CoreUtils.swallow(channel.socket().close(), this, Level.ERROR) - CoreUtils.swallow(channel.close(), this, Level.ERROR) + closeSocket(channel) } } + + protected def closeSocket(channel: SocketChannel): Unit = { Review comment: this method is also called from the derived class. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] apovzner commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)
apovzner commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r512887499 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -697,6 +714,31 @@ private[kafka] class Acceptor(val endPoint: EndPoint, info(s"Rejected connection from ${e.ip}, address already has the configured maximum of ${e.count} connections.") close(endPoint.listenerName, socketChannel) None + case e: ConnectionThrottledException => +val ip = socketChannel.socket.getInetAddress +debug(s"Delaying closing of connection from $ip for ${e.throttleTimeMs} ms") +val delayQueue = throttledSockets.computeIfAbsent(ip, _ => new mutable.Queue[DelayedCloseSocket]) Review comment: I agree, that it is better to optimize for checking connections vs. overhead of adding/removing to the queue, because `closeThrottledConnections` runs pretty often (on a loop), which I think also means that finding that there are no connections yet to unthrottle would also be common. Or very few connections to unthrottle. So, after reading all your evaluations above, I am also leaning towards using a delay queue here. Not sure if the question # 1 about why we need to delay closing a connection got answered, so answering just in case. Since we want to throttle accepting connections from an IP, closing a connection due to reaching IP quota right away would not help with throttling accepting connections since that IP is going to reconnect. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] apovzner commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)
apovzner commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r512887499 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -697,6 +714,31 @@ private[kafka] class Acceptor(val endPoint: EndPoint, info(s"Rejected connection from ${e.ip}, address already has the configured maximum of ${e.count} connections.") close(endPoint.listenerName, socketChannel) None + case e: ConnectionThrottledException => +val ip = socketChannel.socket.getInetAddress +debug(s"Delaying closing of connection from $ip for ${e.throttleTimeMs} ms") +val delayQueue = throttledSockets.computeIfAbsent(ip, _ => new mutable.Queue[DelayedCloseSocket]) Review comment: I agree, that it is better to optimize for checking connections vs. overhead of adding/removing to the queue, because `closeThrottledConnections` runs pretty often (on a loop), which I think also means that finding that there are no connections yet to unthrottle would also be common. Or very few connections to unthrottle. So, after reading all your evaluations above, I am also leaning towards using a delay queue here. Not sure if the question #1 about why we need to delay closing a connection got answered, so answering just in case. Since we want to throttle accepting connections from an IP, closing a connection due to reaching IP quota right away would not help with throttling accepting connections since that IP is going to reconnect. if we close the connection right away, that IP is going to reconnect right away, adn ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -697,6 +714,31 @@ private[kafka] class Acceptor(val endPoint: EndPoint, info(s"Rejected connection from ${e.ip}, address already has the configured maximum of ${e.count} connections.") close(endPoint.listenerName, socketChannel) None + case e: ConnectionThrottledException => +val ip = socketChannel.socket.getInetAddress +debug(s"Delaying closing of connection from $ip for ${e.throttleTimeMs} ms") +val delayQueue = throttledSockets.computeIfAbsent(ip, _ => new mutable.Queue[DelayedCloseSocket]) Review comment: I agree, that it is better to optimize for checking connections vs. overhead of adding/removing to the queue, because `closeThrottledConnections` runs pretty often (on a loop), which I think also means that finding that there are no connections yet to unthrottle would also be common. Or very few connections to unthrottle. So, after reading all your evaluations above, I am also leaning towards using a delay queue here. Not sure if the question #1 about why we need to delay closing a connection got answered, so answering just in case. Since we want to throttle accepting connections from an IP, closing a connection due to reaching IP quota right away would not help with throttling accepting connections since that IP is going to reconnect. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] apovzner commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits
apovzner commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r502084924 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -1203,14 +1262,27 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend private val listenerCounts = mutable.Map[ListenerName, Int]() private[network] val maxConnectionsPerListener = mutable.Map[ListenerName, ListenerConnectionQuota]() @volatile private var totalCount = 0 - + @volatile private var defaultConnectionRatePerIp = DynamicConfig.Ip.DefaultConnectionCreationRate + private val inactiveSensorExpirationTimeSeconds = TimeUnit.HOURS.toSeconds(1); + private val connectionRatePerIp = new ConcurrentHashMap[InetAddress, Int]() + private val lock = new ReentrantReadWriteLock() + private val sensorAccessor = new SensorAccess(lock, metrics) // sensor that tracks broker-wide connection creation rate and limit (quota) - private val brokerConnectionRateSensor = createConnectionRateQuotaSensor(config.maxConnectionCreationRate) + private val brokerConnectionRateSensor = getOrCreateConnectionRateQuotaSensor(config.maxConnectionCreationRate, BrokerQuotaEntity) private val maxThrottleTimeMs = TimeUnit.SECONDS.toMillis(config.quotaWindowSizeSeconds.toLong) + def inc(listenerName: ListenerName, address: InetAddress, acceptorBlockedPercentMeter: com.yammer.metrics.core.Meter): Unit = { counts.synchronized { - waitForConnectionSlot(listenerName, acceptorBlockedPercentMeter) + val startThrottleTimeMs = time.milliseconds + + val ipThrottleTimeMs = recordIpConnectionMaybeThrottle(address, startThrottleTimeMs) Review comment: It would be more efficient if we throttled IPs **after** we know that we can accept a connection based on broker-wide and per-listener limits, since reaching broker/listener limits block the acceptor thread while throttling IPs needs more processing. Otherwise, if you reach both broker and per IP limit, the broker will continue accepting and delaying connections where it is justified to block an acceptor thread based on reaching a broker rate limit. Basically, call `waitForConnectionSlot` first. Similar how we check per IP limit on number of connections after we know that we can accept a new connection based on broker/listener limits. ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -1242,7 +1314,56 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): Unit = { // if there is a connection waiting on the rate throttle delay, we will let it wait the original delay even if // the rate limit increases, because it is just one connection per listener and the code is simpler that way -updateConnectionRateQuota(maxConnectionRate) +updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity) + } + + /** + * Update the connection rate quota for a given IP and updates quota configs for updated IPs. + * If an IP is given, metric config will be updated only for the given IP, otherwise + * all metric configs will be checked and updated if required + * + * @param ip ip to update or default if None + * @param maxConnectionRate new connection rate, or resets entity to default if None + */ + def updateIpConnectionRate(ip: Option[String], maxConnectionRate: Option[Int]): Unit = { +def isIpConnectionRateMetric(metricName: MetricName) = { + metricName.name == "connection-accept-rate" && + metricName.group == MetricsGroup && + metricName.tags.containsKey("ip") +} + +def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = { + quotaLimit != metric.config.quota.bound +} + +ip match { + case Some(addr) => +val address = InetAddress.getByName(addr) +if (maxConnectionRate.isDefined) { + info(s"Updating max connection rate override for $address to ${maxConnectionRate.get}") + connectionRatePerIp.put(address, maxConnectionRate.get) +} else { + info(s"Removing max connection rate override for $address") + connectionRatePerIp.remove(address) +} +updateConnectionRateQuota(connectionRateForIp(address), IpQuotaEntity(address)) + case None => +val newQuota = maxConnectionRate.getOrElse(Int.MaxValue) Review comment: You can use you new constant `DynamicConfig.Ip.UnlimitedConnectionCreationRate` instead of `Int.MaxValue` here. ## File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala ## @@ -409,6 +409,67 @@ class ConnectionQuotasTest { verifyConnectionCountOnEveryListener(connectionQuotas, connectionsPerListener) } + @Test Review comment: It would be useful to add a test where we have both per-listener and per IP limit, and
[GitHub] [kafka] apovzner commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits
apovzner commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r502092158 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -1203,14 +1262,27 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend private val listenerCounts = mutable.Map[ListenerName, Int]() private[network] val maxConnectionsPerListener = mutable.Map[ListenerName, ListenerConnectionQuota]() @volatile private var totalCount = 0 - + @volatile private var defaultConnectionRatePerIp = DynamicConfig.Ip.DefaultConnectionCreationRate + private val inactiveSensorExpirationTimeSeconds = TimeUnit.HOURS.toSeconds(1); + private val connectionRatePerIp = new ConcurrentHashMap[InetAddress, Int]() + private val lock = new ReentrantReadWriteLock() + private val sensorAccessor = new SensorAccess(lock, metrics) // sensor that tracks broker-wide connection creation rate and limit (quota) - private val brokerConnectionRateSensor = createConnectionRateQuotaSensor(config.maxConnectionCreationRate) + private val brokerConnectionRateSensor = getOrCreateConnectionRateQuotaSensor(config.maxConnectionCreationRate, BrokerQuotaEntity) private val maxThrottleTimeMs = TimeUnit.SECONDS.toMillis(config.quotaWindowSizeSeconds.toLong) + def inc(listenerName: ListenerName, address: InetAddress, acceptorBlockedPercentMeter: com.yammer.metrics.core.Meter): Unit = { counts.synchronized { - waitForConnectionSlot(listenerName, acceptorBlockedPercentMeter) + val startThrottleTimeMs = time.milliseconds + + val ipThrottleTimeMs = recordIpConnectionMaybeThrottle(address, startThrottleTimeMs) Review comment: I see. Yes, I think unrecording is more efficient than keeping more delayed connections than needed. Basically, when you unrecord from per-IP metric, you can also unrecord from broker and listener metric as well. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] apovzner commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits
apovzner commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r502091323 ## File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala ## @@ -409,6 +409,67 @@ class ConnectionQuotasTest { verifyConnectionCountOnEveryListener(connectionQuotas, connectionsPerListener) } + @Test Review comment: It would be useful to add a test where we have both per-listener and per IP limit, and verify that it throttles based on which limit is reached first. Something like: 2 IPs, each per IP limit < per-listener limit, but sum of per IP limits > listener limit. So, if you reach limit on one IP, the broker would not throttle the second IP until it reaches per listener limit. Does not have to be exactly this, just need to verify how per IP throttling interacts with per listener throttling. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] apovzner commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits
apovzner commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r502086777 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -1242,7 +1314,56 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): Unit = { // if there is a connection waiting on the rate throttle delay, we will let it wait the original delay even if // the rate limit increases, because it is just one connection per listener and the code is simpler that way -updateConnectionRateQuota(maxConnectionRate) +updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity) + } + + /** + * Update the connection rate quota for a given IP and updates quota configs for updated IPs. + * If an IP is given, metric config will be updated only for the given IP, otherwise + * all metric configs will be checked and updated if required + * + * @param ip ip to update or default if None + * @param maxConnectionRate new connection rate, or resets entity to default if None + */ + def updateIpConnectionRate(ip: Option[String], maxConnectionRate: Option[Int]): Unit = { +def isIpConnectionRateMetric(metricName: MetricName) = { + metricName.name == "connection-accept-rate" && + metricName.group == MetricsGroup && + metricName.tags.containsKey("ip") +} + +def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = { + quotaLimit != metric.config.quota.bound +} + +ip match { + case Some(addr) => +val address = InetAddress.getByName(addr) +if (maxConnectionRate.isDefined) { + info(s"Updating max connection rate override for $address to ${maxConnectionRate.get}") + connectionRatePerIp.put(address, maxConnectionRate.get) +} else { + info(s"Removing max connection rate override for $address") + connectionRatePerIp.remove(address) +} +updateConnectionRateQuota(connectionRateForIp(address), IpQuotaEntity(address)) + case None => +val newQuota = maxConnectionRate.getOrElse(Int.MaxValue) Review comment: You can use you new constant `DynamicConfig.Ip.UnlimitedConnectionCreationRate` instead of `Int.MaxValue` here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] apovzner commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits
apovzner commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r502084924 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -1203,14 +1262,27 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend private val listenerCounts = mutable.Map[ListenerName, Int]() private[network] val maxConnectionsPerListener = mutable.Map[ListenerName, ListenerConnectionQuota]() @volatile private var totalCount = 0 - + @volatile private var defaultConnectionRatePerIp = DynamicConfig.Ip.DefaultConnectionCreationRate + private val inactiveSensorExpirationTimeSeconds = TimeUnit.HOURS.toSeconds(1); + private val connectionRatePerIp = new ConcurrentHashMap[InetAddress, Int]() + private val lock = new ReentrantReadWriteLock() + private val sensorAccessor = new SensorAccess(lock, metrics) // sensor that tracks broker-wide connection creation rate and limit (quota) - private val brokerConnectionRateSensor = createConnectionRateQuotaSensor(config.maxConnectionCreationRate) + private val brokerConnectionRateSensor = getOrCreateConnectionRateQuotaSensor(config.maxConnectionCreationRate, BrokerQuotaEntity) private val maxThrottleTimeMs = TimeUnit.SECONDS.toMillis(config.quotaWindowSizeSeconds.toLong) + def inc(listenerName: ListenerName, address: InetAddress, acceptorBlockedPercentMeter: com.yammer.metrics.core.Meter): Unit = { counts.synchronized { - waitForConnectionSlot(listenerName, acceptorBlockedPercentMeter) + val startThrottleTimeMs = time.milliseconds + + val ipThrottleTimeMs = recordIpConnectionMaybeThrottle(address, startThrottleTimeMs) Review comment: It would be more efficient if we throttled IPs **after** we know that we can accept a connection based on broker-wide and per-listener limits, since reaching broker/listener limits block the acceptor thread while throttling IPs needs more processing. Otherwise, if you reach both broker and per IP limit, the broker will continue accepting and delaying connections where it is justified to block an acceptor thread based on reaching a broker rate limit. Basically, call `waitForConnectionSlot` first. Similar how we check per IP limit on number of connections after we know that we can accept a new connection based on broker/listener limits. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org