apovzner commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r512928815
########## File path: core/src/main/scala/kafka/network/SocketServer.scala ########## @@ -1207,14 +1286,26 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend private val listenerCounts = mutable.Map[ListenerName, Int]() private[network] val maxConnectionsPerListener = mutable.Map[ListenerName, ListenerConnectionQuota]() @volatile private var totalCount = 0 - + @volatile private var defaultConnectionRatePerIp = DynamicConfig.Ip.DefaultConnectionCreationRate + private val connectionRatePerIp = new ConcurrentHashMap[InetAddress, Int]() + private val lock = new ReentrantReadWriteLock() + private val sensorAccessor = new SensorAccess(lock, metrics) // sensor that tracks broker-wide connection creation rate and limit (quota) - private val brokerConnectionRateSensor = createConnectionRateQuotaSensor(config.maxConnectionCreationRate) + private val brokerConnectionRateSensor = getOrCreateConnectionRateQuotaSensor(config.maxConnectionCreationRate, BrokerQuotaEntity) private val maxThrottleTimeMs = TimeUnit.SECONDS.toMillis(config.quotaWindowSizeSeconds.toLong) def inc(listenerName: ListenerName, address: InetAddress, acceptorBlockedPercentMeter: com.yammer.metrics.core.Meter): Unit = { counts.synchronized { - waitForConnectionSlot(listenerName, acceptorBlockedPercentMeter) + val startThrottleTimeMs = time.milliseconds + + waitForConnectionSlot(listenerName, startThrottleTimeMs, acceptorBlockedPercentMeter) + + val ipThrottleTimeMs = recordIpConnectionMaybeThrottle(address, startThrottleTimeMs) Review comment: There are some corner cases here where `startThrottleTimeMs` could be in the past if `waitForConnectionSlot()` waited for an active connection slot to become available (if we exceeded the limit for the number of active connections), or waited for broker-wide or listener-wide rate to get back to quota. In some cases, ipThrottleTimeMs would be zero if we checked against the current time here. Since getting System.currentTimeMillis() is not that expensive (as it used to be), I think it would be better to revert to `waitForConnectionSlot` getting its own time (as before this PR), and then `recordIpConnectionMaybeThrottle` getting current time and also calling the code block below and throwing ConnectionThrottledException. And then adding a comment here that `recordIpConnectionMaybeThrottle` would throw an exception if per-IP quota is exceeded. What do you think? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org