apovzner commented on a change in pull request #8768: URL: https://github.com/apache/kafka/pull/8768#discussion_r452502811
########## File path: core/src/main/scala/kafka/network/SocketServer.scala ########## @@ -1289,15 +1311,95 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) extends Logging { private def maxListenerConnections(listenerName: ListenerName): Int = maxConnectionsPerListener.get(listenerName).map(_.maxConnections).getOrElse(Int.MaxValue) + /** + * Calculates the delay needed to bring the observed connection creation rate to listener-level limit or to broker-wide + * limit, whichever the longest. The delay is capped to the quota window size defined by QuotaWindowSizeSecondsProp + * + * @param listenerName listener for which calculate the delay + * @param timeMs current time in milliseconds + * @return delay in milliseconds + */ + private def recordConnectionAndGetThrottleTimeMs(listenerName: ListenerName, timeMs: Long): Long = { + val listenerThrottleTimeMs = maxConnectionsPerListener + .get(listenerName) + .map(listenerQuota => recordAndGetThrottleTimeMs(listenerQuota.connectionRateSensor, timeMs)) + .getOrElse(0) + + if (protectedListener(listenerName)) { + listenerThrottleTimeMs + } else { + val brokerThrottleTimeMs = recordAndGetThrottleTimeMs(brokerConnectionRateSensor, timeMs) + math.max(brokerThrottleTimeMs, listenerThrottleTimeMs) + } + } + + private def recordAndGetThrottleTimeMs(sensor: Sensor, timeMs: Long): Int = { + try { + sensor.record(1.0, timeMs) + 0 + } catch { + case e: QuotaViolationException => + val throttleTimeMs = QuotaUtils.boundedThrottleTime( + e.value, e.bound, QuotaUtils.rateMetricWindowSize(e.metric, timeMs), maxThrottleTimeMs).toInt + debug(s"Quota violated for sensor (${sensor.name}). Delay time: $throttleTimeMs ms") + throttleTimeMs + } + } + + /** + * Creates sensor for tracking the connection creation rate and corresponding connection rate quota for a given + * listener or broker-wide, if listener is not provided. + * @param quotaLimit connection creation rate quota + * @param listenerOpt listener name if sensor is for a listener + */ + private def createConnectionRateQuotaSensor(quotaLimit: Int, listenerOpt: Option[String] = None): Sensor = { + val quotaEntity = listenerOpt.getOrElse("broker") + val sensor = metrics.sensor(s"ConnectionCreationRate-$quotaEntity", rateQuotaMetricConfig(quotaLimit)) + sensor.add(connectionRateMetricName(listenerOpt), new Rate, null, false) + info(s"Created ConnectionCreationRate-$quotaEntity sensor, quotaLimit=$quotaLimit") + sensor + } + + private def updateConnectionRateQuota(quotaLimit: Int, listenerOpt: Option[String] = None): Unit = { + val metric = metrics.metric(connectionRateMetricName((listenerOpt))) + metric.config(rateQuotaMetricConfig(quotaLimit)) + info(s"Updated ${listenerOpt.getOrElse("broker")} max connection creation rate to $quotaLimit") + } + + private def connectionRateMetricName(listenerOpt: Option[String]): MetricName = { + val quotaEntity = listenerOpt.getOrElse("broker") + metrics.metricName( + s"connection-creation-rate-$quotaEntity", + "connection-quota-no-jmx", + s"Tracking $quotaEntity connection creation rate", + rateQuotaMetricTags(listenerOpt)) + } + + private def rateQuotaMetricConfig(quotaLimit: Int): MetricConfig = { + new MetricConfig() + .timeWindow(config.quotaWindowSizeSeconds.toLong, TimeUnit.SECONDS) + .samples(config.numQuotaSamples) + .quota(new Quota(quotaLimit, true)) + } + + private def rateQuotaMetricTags(listenerOpt: Option[String]): util.Map[String, String] = { + val tags = new util.LinkedHashMap[String, String] + listenerOpt.foreach(listener => tags.put("listener", listener)) + tags Review comment: I realized that I don't need tags here anymore, since the name of the metric contains the name of the listener, so the metrics are already distinct per listener (and broker-wide). I removed that method altogether. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org