apovzner commented on a change in pull request #8768:
URL: https://github.com/apache/kafka/pull/8768#discussion_r452502811



##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1289,15 +1311,95 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) 
extends Logging {
   private def maxListenerConnections(listenerName: ListenerName): Int =
     
maxConnectionsPerListener.get(listenerName).map(_.maxConnections).getOrElse(Int.MaxValue)
 
+  /**
+   * Calculates the delay needed to bring the observed connection creation 
rate to listener-level limit or to broker-wide
+   * limit, whichever the longest. The delay is capped to the quota window 
size defined by QuotaWindowSizeSecondsProp
+   *
+   * @param listenerName listener for which calculate the delay
+   * @param timeMs current time in milliseconds
+   * @return delay in milliseconds
+   */
+  private def recordConnectionAndGetThrottleTimeMs(listenerName: ListenerName, 
timeMs: Long): Long = {
+    val listenerThrottleTimeMs = maxConnectionsPerListener
+      .get(listenerName)
+      .map(listenerQuota => 
recordAndGetThrottleTimeMs(listenerQuota.connectionRateSensor, timeMs))
+      .getOrElse(0)
+
+    if (protectedListener(listenerName)) {
+      listenerThrottleTimeMs
+    } else {
+      val brokerThrottleTimeMs = 
recordAndGetThrottleTimeMs(brokerConnectionRateSensor, timeMs)
+      math.max(brokerThrottleTimeMs, listenerThrottleTimeMs)
+    }
+  }
+
+  private def recordAndGetThrottleTimeMs(sensor: Sensor, timeMs: Long): Int = {
+    try {
+      sensor.record(1.0, timeMs)
+      0
+    } catch {
+      case e: QuotaViolationException =>
+        val throttleTimeMs = QuotaUtils.boundedThrottleTime(
+          e.value, e.bound, QuotaUtils.rateMetricWindowSize(e.metric, timeMs), 
maxThrottleTimeMs).toInt
+        debug(s"Quota violated for sensor (${sensor.name}). Delay time: 
$throttleTimeMs ms")
+        throttleTimeMs
+    }
+  }
+
+  /**
+   * Creates sensor for tracking the connection creation rate and 
corresponding connection rate quota for a given
+   * listener or broker-wide, if listener is not provided.
+   * @param quotaLimit connection creation rate quota
+   * @param listenerOpt listener name if sensor is for a listener
+   */
+  private def createConnectionRateQuotaSensor(quotaLimit: Int, listenerOpt: 
Option[String] = None): Sensor = {
+    val quotaEntity = listenerOpt.getOrElse("broker")
+    val sensor = metrics.sensor(s"ConnectionCreationRate-$quotaEntity", 
rateQuotaMetricConfig(quotaLimit))
+    sensor.add(connectionRateMetricName(listenerOpt), new Rate, null, false)
+    info(s"Created ConnectionCreationRate-$quotaEntity sensor, 
quotaLimit=$quotaLimit")
+    sensor
+  }
+
+  private def updateConnectionRateQuota(quotaLimit: Int, listenerOpt: 
Option[String] = None): Unit = {
+    val metric = metrics.metric(connectionRateMetricName((listenerOpt)))
+    metric.config(rateQuotaMetricConfig(quotaLimit))
+    info(s"Updated ${listenerOpt.getOrElse("broker")} max connection creation 
rate to $quotaLimit")
+  }
+
+  private def connectionRateMetricName(listenerOpt: Option[String]): 
MetricName = {
+    val quotaEntity = listenerOpt.getOrElse("broker")
+    metrics.metricName(
+      s"connection-creation-rate-$quotaEntity",
+      "connection-quota-no-jmx",
+      s"Tracking $quotaEntity connection creation rate",
+      rateQuotaMetricTags(listenerOpt))
+  }
+
+  private def rateQuotaMetricConfig(quotaLimit: Int): MetricConfig = {
+    new MetricConfig()
+      .timeWindow(config.quotaWindowSizeSeconds.toLong, TimeUnit.SECONDS)
+      .samples(config.numQuotaSamples)
+      .quota(new Quota(quotaLimit, true))
+  }
+
+  private def rateQuotaMetricTags(listenerOpt: Option[String]): 
util.Map[String, String] = {
+    val tags = new util.LinkedHashMap[String, String]
+    listenerOpt.foreach(listener => tags.put("listener", listener))
+    tags

Review comment:
       I realized that I don't need tags here anymore, since the name of the 
metric contains the name of the listener, so the metrics are already distinct 
per listener (and broker-wide). I removed that method altogether.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to