cmccabe commented on code in PR #11969: URL: https://github.com/apache/kafka/pull/11969#discussion_r865496509
########## core/src/main/scala/kafka/network/SocketServer.scala: ########## @@ -104,184 +103,141 @@ class SocketServer(val config: KafkaConfig, private[this] val nextProcessorId: AtomicInteger = new AtomicInteger(0) val connectionQuotas = new ConnectionQuotas(config, time, metrics) - private var startedProcessingRequests = false - private var stoppedProcessingRequests = false - // Processors are now created by each Acceptor. However to preserve compatibility, we need to number the processors - // globally, so we keep the nextProcessorId counter in SocketServer - def nextProcessorId(): Int = { - nextProcessorId.getAndIncrement() - } + /** + * A future which is completed once all the authorizer futures are complete. + */ + private val allAuthorizerFuturesComplete = new CompletableFuture[Void] /** - * Starts the socket server and creates all the Acceptors and the Processors. The Acceptors - * start listening at this stage so that the bound port is known when this method completes - * even when ephemeral ports are used. Acceptors and Processors are started if `startProcessingRequests` - * is true. If not, acceptors and processors are only started when [[kafka.network.SocketServer#startProcessingRequests()]] - * is invoked. Delayed starting of acceptors and processors is used to delay processing client - * connections until server is fully initialized, e.g. to ensure that all credentials have been - * loaded before authentications are performed. Incoming connections on this server are processed - * when processors start up and invoke [[org.apache.kafka.common.network.Selector#poll]]. - * - * @param startProcessingRequests Flag indicating whether `Processor`s must be started. - * @param controlPlaneListener The control plane listener, or None if there is none. - * @param dataPlaneListeners The data plane listeners. + * True if the SocketServer is stopped. Must be accessed under the SocketServer lock. */ - def startup(startProcessingRequests: Boolean = true, - controlPlaneListener: Option[EndPoint] = config.controlPlaneListener, - dataPlaneListeners: Seq[EndPoint] = config.dataPlaneListeners): Unit = { - this.synchronized { - createControlPlaneAcceptorAndProcessor(controlPlaneListener) - createDataPlaneAcceptorsAndProcessors(dataPlaneListeners) - if (startProcessingRequests) { - this.startProcessingRequests() - } - } + private var stopped = false + // Socket server metrics + newGauge(s"${DataPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent", () => SocketServer.this.synchronized { val dataPlaneProcessors = dataPlaneAcceptors.asScala.values.flatMap(a => a.processors) - val controlPlaneProcessorOpt = controlPlaneAcceptorOpt.map(a => a.processors(0)) - newGauge(s"${DataPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent", () => SocketServer.this.synchronized { - val ioWaitRatioMetricNames = dataPlaneProcessors.map { p => - metrics.metricName("io-wait-ratio", MetricsGroup, p.metricTags) - } + val ioWaitRatioMetricNames = dataPlaneProcessors.map { p => + metrics.metricName("io-wait-ratio", MetricsGroup, p.metricTags) + } + if (dataPlaneProcessors.isEmpty) { + 1.0 + } else { ioWaitRatioMetricNames.map { metricName => Option(metrics.metric(metricName)).fold(0.0)(m => Math.min(m.metricValue.asInstanceOf[Double], 1.0)) }.sum / dataPlaneProcessors.size - }) - newGauge(s"${ControlPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent", () => SocketServer.this.synchronized { - val ioWaitRatioMetricName = controlPlaneProcessorOpt.map { p => - metrics.metricName("io-wait-ratio", MetricsGroup, p.metricTags) - } - ioWaitRatioMetricName.map { metricName => - Option(metrics.metric(metricName)).fold(0.0)(m => Math.min(m.metricValue.asInstanceOf[Double], 1.0)) - }.getOrElse(Double.NaN) - }) - newGauge("MemoryPoolAvailable", () => memoryPool.availableMemory) - newGauge("MemoryPoolUsed", () => memoryPool.size() - memoryPool.availableMemory) - newGauge(s"${DataPlaneAcceptor.MetricPrefix}ExpiredConnectionsKilledCount", () => SocketServer.this.synchronized { - val expiredConnectionsKilledCountMetricNames = dataPlaneProcessors.map { p => - metrics.metricName("expired-connections-killed-count", MetricsGroup, p.metricTags) - } - expiredConnectionsKilledCountMetricNames.map { metricName => - Option(metrics.metric(metricName)).fold(0.0)(m => m.metricValue.asInstanceOf[Double]) - }.sum - }) - newGauge(s"${ControlPlaneAcceptor.MetricPrefix}ExpiredConnectionsKilledCount", () => SocketServer.this.synchronized { - val expiredConnectionsKilledCountMetricNames = controlPlaneProcessorOpt.map { p => - metrics.metricName("expired-connections-killed-count", MetricsGroup, p.metricTags) - } - expiredConnectionsKilledCountMetricNames.map { metricName => - Option(metrics.metric(metricName)).fold(0.0)(m => m.metricValue.asInstanceOf[Double]) - }.getOrElse(0.0) - }) - } - - /** - * Start processing requests and new connections. This method is used for delayed starting of - * all the acceptors and processors if [[kafka.network.SocketServer#startup]] was invoked with - * `startProcessingRequests=false`. - * - * Before starting processors for each endpoint, we ensure that authorizer has all the metadata - * to authorize requests on that endpoint by waiting on the provided future. We start inter-broker - * listener before other listeners. This allows authorization metadata for other listeners to be - * stored in Kafka topics in this cluster. - * - * @param authorizerFutures Future per [[EndPoint]] used to wait before starting the processor - * corresponding to the [[EndPoint]] - */ - def startProcessingRequests(authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = Map.empty): Unit = { - info("Starting socket server acceptors and processors") - this.synchronized { - if (!startedProcessingRequests) { - startControlPlaneProcessorAndAcceptor(authorizerFutures) - startDataPlaneProcessorsAndAcceptors(authorizerFutures) - startedProcessingRequests = true - } else { - info("Socket server acceptors and processors already started") - } } - info("Started socket server acceptors and processors") + }) + newGauge(s"${ControlPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent", () => SocketServer.this.synchronized { Review Comment: Fair point. Let's skip these metrics when in kraft mode. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org