dajac commented on a change in pull request #9317:
URL: https://github.com/apache/kafka/pull/9317#discussion_r492570653



##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1414,7 +1420,8 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
 
   class ListenerConnectionQuota(lock: Object, listener: ListenerName) extends 
ListenerReconfigurable {
     @volatile private var _maxConnections = Int.MaxValue
-    val connectionRateSensor = createConnectionRateQuotaSensor(Int.MaxValue, 
Some(listener.value))
+    val connectionRateSensor: Sensor = 
createConnectionRateQuotaSensor(Int.MaxValue, Some(listener.value))
+    val connectionRateThrottleSensor: Sensor = 
createConnectionRateThrottleSensor()

Review comment:
       nit: Do we really need to provide the type here? It seems that we 
usually don't provide it in that file.

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1292,6 +1292,12 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
     counts.synchronized {
       val startThrottleTimeMs = time.milliseconds
       val throttleTimeMs = 
math.max(recordConnectionAndGetThrottleTimeMs(listenerName, 
startThrottleTimeMs), 0)
+      if (throttleTimeMs > 0) {
+        // record throttle time due to hitting connection rate limit
+        // connection could be throttled longer if the limit of the number of 
active connections is reached as well
+        maxConnectionsPerListener.get(listenerName)
+          
.foreach(_.connectionRateThrottleSensor.record(throttleTimeMs.toDouble, 
startThrottleTimeMs))

Review comment:
       It is a bit annoying that we have to lookup the 
`ListenerConnectionQuota` twice. Once in `recordConnectionAndGetThrottleTimeMs` 
and once here. I wonder if we could look it up once to record the rate and to 
record the throttle time.
   
   I am thinking about the following to be more concrete. We could add two 
methods in `ListenerConnectionQuota`: 1) `record` and 2) `recordThrottleTime`. 
They would encapsulate the logic to respectively record the rate (for the 
listener and the broker like we do now) and the throttle time. So here we could 
look up the `ListenerConnectionQuota` record and get the throttle time, and 
record the throttle time if > 0.
   
   That could improve the readability a bit. I am not sure that it would make a 
difference from a performance point of view. WDYT?

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1447,13 +1454,33 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
       }
     }
 
+    def removeSensors(): Unit = {

Review comment:
       nit: What about naming this `close`? It feels a bit more natural to call 
`close` when we don't need the object anymore and it would be more aligned with 
the `close` methods that we already have in this file. For instance, 
`ConnectionQuotas#close` which also cleanup the metrics.
   
   BTW, not related to this PR but it seems that we don't close the 
`ListenerConnectionQuota` when the `ConnectionQuotas` is closed. I suppose that 
we leave metrics around. Is it the case?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to