hachikuji commented on code in PR #11969:
URL: https://github.com/apache/kafka/pull/11969#discussion_r865430367


##########
core/src/main/scala/kafka/network/SocketServer.scala:
##########
@@ -1864,6 +1780,18 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
       sensor
     }
   }
+
+  /**
+   * Close `channel` and decrement the connection count.
+   */
+  def closeChannel(listenerName: ListenerName, channel: SocketChannel): Unit = 
{
+    if (channel != null) {
+      debug(s"Closing connection from 
${channel.socket.getRemoteSocketAddress}")
+      dec(listenerName, channel.socket.getInetAddress)
+      closeSocket(channel, this)

Review Comment:
   It's surprising to find this in `ListenerConnectionQuota`, also that we end 
up with a different logger. Is there any way we can pull it back to where it 
was? For example, maybe we could generalize `closeSocket`:
   
   ```scala
     def closeSocket(
       listenerName: ListenerName,
       channel: SocketChannel,
       connectionQuota: ConnectionQuotas,
       logging: Logging    
     )
   ```
   Or maybe we can stick it into a trait so that we can get rid of the 
`logging` parameter.



##########
core/src/main/scala/kafka/network/SocketServer.scala:
##########
@@ -681,24 +580,27 @@ private[kafka] abstract class Acceptor(val socketServer: 
SocketServer,
   private val blockedPercentMeter = 
newMeter(blockedPercentMeterMetricName,"blocked time", TimeUnit.NANOSECONDS)
   private var currentProcessorIndex = 0
   private[network] val throttledSockets = new 
mutable.PriorityQueue[DelayedCloseSocket]()
+  private var started = false
+  private[network] val startFuture = new CompletableFuture[Void]()
 
-  private[network] case class DelayedCloseSocket(socket: SocketChannel, 
endThrottleTimeMs: Long) extends Ordered[DelayedCloseSocket] {
-    override def compare(that: DelayedCloseSocket): Int = endThrottleTimeMs 
compare that.endThrottleTimeMs
-  }
+  val thread = KafkaThread.nonDaemon(
+    
s"${threadPrefix()}-kafka-socket-acceptor-${endPoint.listenerName}-${endPoint.securityProtocol}-${endPoint.port}",
+    this)
 
-  private[network] def startProcessors(): Unit = synchronized {
-    if (!processorsStarted.getAndSet(true)) {
-      startProcessors(processors)
+  startFuture.thenRun(() => synchronized {
+    if (!shouldRun.get()) {
+      debug(s"Ignoring start future for ${endPoint.listenerName} since it has 
already been shut down.")

Review Comment:
   nit: "it" -> "the acceptor"?



##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -906,12 +906,35 @@ private void appendRaftEvent(String name, Runnable 
runnable) {
                 if (this != metaLogListener) {
                     log.debug("Ignoring {} raft event from an old 
registration", name);
                 } else {
-                    runnable.run();
+                    try {
+                        runnable.run();
+                    } finally {
+                        maybeCompleteAuthorizerInitialLoad();
+                    }
                 }
             });
         }
     }
 
+    private void maybeCompleteAuthorizerInitialLoad() {
+        if (!needToCompleteAuthorizerLoad) return;
+        OptionalLong highWatermark = raftClient.highWatermark();
+        if (highWatermark.isPresent()) {
+            if (lastCommittedOffset + 1 >= highWatermark.getAsLong()) {

Review Comment:
   I guess the only issue with this is that the high watermark is a moving 
target. It probably works ok since writes to the metadata log should be 
infrequent. Not sure I have any better ideas. Maybe we could refresh the high 
watermark value only once every second or something like that.



##########
core/src/main/scala/kafka/network/SocketServer.scala:
##########
@@ -104,184 +103,141 @@ class SocketServer(val config: KafkaConfig,
 
   private[this] val nextProcessorId: AtomicInteger = new AtomicInteger(0)
   val connectionQuotas = new ConnectionQuotas(config, time, metrics)
-  private var startedProcessingRequests = false
-  private var stoppedProcessingRequests = false
 
-  // Processors are now created by each Acceptor. However to preserve 
compatibility, we need to number the processors
-  // globally, so we keep the nextProcessorId counter in SocketServer
-  def nextProcessorId(): Int = {
-    nextProcessorId.getAndIncrement()
-  }
+  /**
+   * A future which is completed once all the authorizer futures are complete.
+   */
+  private val allAuthorizerFuturesComplete = new CompletableFuture[Void]
 
   /**
-   * Starts the socket server and creates all the Acceptors and the 
Processors. The Acceptors
-   * start listening at this stage so that the bound port is known when this 
method completes
-   * even when ephemeral ports are used. Acceptors and Processors are started 
if `startProcessingRequests`
-   * is true. If not, acceptors and processors are only started when 
[[kafka.network.SocketServer#startProcessingRequests()]]
-   * is invoked. Delayed starting of acceptors and processors is used to delay 
processing client
-   * connections until server is fully initialized, e.g. to ensure that all 
credentials have been
-   * loaded before authentications are performed. Incoming connections on this 
server are processed
-   * when processors start up and invoke 
[[org.apache.kafka.common.network.Selector#poll]].
-   *
-   * @param startProcessingRequests Flag indicating whether `Processor`s must 
be started.
-   * @param controlPlaneListener    The control plane listener, or None if 
there is none.
-   * @param dataPlaneListeners      The data plane listeners.
+   * True if the SocketServer is stopped. Must be accessed under the 
SocketServer lock.
    */
-  def startup(startProcessingRequests: Boolean = true,
-              controlPlaneListener: Option[EndPoint] = 
config.controlPlaneListener,
-              dataPlaneListeners: Seq[EndPoint] = config.dataPlaneListeners): 
Unit = {
-    this.synchronized {
-      createControlPlaneAcceptorAndProcessor(controlPlaneListener)
-      createDataPlaneAcceptorsAndProcessors(dataPlaneListeners)
-      if (startProcessingRequests) {
-        this.startProcessingRequests()
-      }
-    }
+  private var stopped = false
 
+  // Socket server metrics
+  newGauge(s"${DataPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent", 
() => SocketServer.this.synchronized {
     val dataPlaneProcessors = dataPlaneAcceptors.asScala.values.flatMap(a => 
a.processors)
-    val controlPlaneProcessorOpt = controlPlaneAcceptorOpt.map(a => 
a.processors(0))
-    
newGauge(s"${DataPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent", () 
=> SocketServer.this.synchronized {
-      val ioWaitRatioMetricNames = dataPlaneProcessors.map { p =>
-        metrics.metricName("io-wait-ratio", MetricsGroup, p.metricTags)
-      }
+    val ioWaitRatioMetricNames = dataPlaneProcessors.map { p =>
+      metrics.metricName("io-wait-ratio", MetricsGroup, p.metricTags)
+    }
+    if (dataPlaneProcessors.isEmpty) {
+      1.0
+    } else {
       ioWaitRatioMetricNames.map { metricName =>
         Option(metrics.metric(metricName)).fold(0.0)(m => 
Math.min(m.metricValue.asInstanceOf[Double], 1.0))
       }.sum / dataPlaneProcessors.size
-    })
-    
newGauge(s"${ControlPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent", 
() => SocketServer.this.synchronized {
-      val ioWaitRatioMetricName = controlPlaneProcessorOpt.map { p =>
-        metrics.metricName("io-wait-ratio", MetricsGroup, p.metricTags)
-      }
-      ioWaitRatioMetricName.map { metricName =>
-        Option(metrics.metric(metricName)).fold(0.0)(m => 
Math.min(m.metricValue.asInstanceOf[Double], 1.0))
-      }.getOrElse(Double.NaN)
-    })
-    newGauge("MemoryPoolAvailable", () => memoryPool.availableMemory)
-    newGauge("MemoryPoolUsed", () => memoryPool.size() - 
memoryPool.availableMemory)
-    
newGauge(s"${DataPlaneAcceptor.MetricPrefix}ExpiredConnectionsKilledCount", () 
=> SocketServer.this.synchronized {
-      val expiredConnectionsKilledCountMetricNames = dataPlaneProcessors.map { 
p =>
-        metrics.metricName("expired-connections-killed-count", MetricsGroup, 
p.metricTags)
-      }
-      expiredConnectionsKilledCountMetricNames.map { metricName =>
-        Option(metrics.metric(metricName)).fold(0.0)(m => 
m.metricValue.asInstanceOf[Double])
-      }.sum
-    })
-    
newGauge(s"${ControlPlaneAcceptor.MetricPrefix}ExpiredConnectionsKilledCount", 
() => SocketServer.this.synchronized {
-      val expiredConnectionsKilledCountMetricNames = 
controlPlaneProcessorOpt.map { p =>
-        metrics.metricName("expired-connections-killed-count", MetricsGroup, 
p.metricTags)
-      }
-      expiredConnectionsKilledCountMetricNames.map { metricName =>
-        Option(metrics.metric(metricName)).fold(0.0)(m => 
m.metricValue.asInstanceOf[Double])
-      }.getOrElse(0.0)
-    })
-  }
-
-  /**
-   * Start processing requests and new connections. This method is used for 
delayed starting of
-   * all the acceptors and processors if 
[[kafka.network.SocketServer#startup]] was invoked with
-   * `startProcessingRequests=false`.
-   *
-   * Before starting processors for each endpoint, we ensure that authorizer 
has all the metadata
-   * to authorize requests on that endpoint by waiting on the provided future. 
We start inter-broker
-   * listener before other listeners. This allows authorization metadata for 
other listeners to be
-   * stored in Kafka topics in this cluster.
-   *
-   * @param authorizerFutures Future per [[EndPoint]] used to wait before 
starting the processor
-   *                          corresponding to the [[EndPoint]]
-   */
-  def startProcessingRequests(authorizerFutures: Map[Endpoint, 
CompletableFuture[Void]] = Map.empty): Unit = {
-    info("Starting socket server acceptors and processors")
-    this.synchronized {
-      if (!startedProcessingRequests) {
-        startControlPlaneProcessorAndAcceptor(authorizerFutures)
-        startDataPlaneProcessorsAndAcceptors(authorizerFutures)
-        startedProcessingRequests = true
-      } else {
-        info("Socket server acceptors and processors already started")
-      }
     }
-    info("Started socket server acceptors and processors")
+  })
+  
newGauge(s"${ControlPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent", 
() => SocketServer.this.synchronized {

Review Comment:
   Do we register this metric even in kraft?



##########
core/src/main/scala/kafka/server/BrokerServer.scala:
##########
@@ -356,10 +359,13 @@ class BrokerServer(
           endpoints.asScala.map(ep => 
ep.listenerName().orElse("(none)")).mkString(", "))
       }
       val authorizerInfo = ServerInfo(new ClusterResource(clusterId),
-        config.nodeId, endpoints, interBrokerListener)
+        config.nodeId,
+        endpoints,
+        interBrokerListener,
+        config.earlyStartListeners.map(_.value()).asJava)
 
-      /* Get the authorizer and initialize it if one is specified.*/
-      authorizer = config.authorizer
+      // Create and intiialize an authorizer if one is configured.

Review Comment:
   nit: typo "intiialize"



##########
core/src/main/scala/kafka/network/SocketServer.scala:
##########
@@ -1392,15 +1299,24 @@ private[kafka] class Processor(val id: Int,
   private[network] def channel(connectionId: String): Option[KafkaChannel] =
     Option(selector.channel(connectionId))
 
+  def start(): Unit = thread.start()
+
   /**
    * Wakeup the thread for selection.
    */
-  override def wakeup(): Unit = selector.wakeup()
+  def wakeup(): Unit = selector.wakeup()
+
+  def beginShutdown(): Unit = {
+    if (shouldRun.getAndSet(false)) {
+      wakeup()
+      removeMetric("IdlePercent", Map("networkProcessor" -> id.toString))
+      metrics.removeMetric(expiredConnectionsKilledCountMetricName)

Review Comment:
   I know the code already does this, but metric removal seems like something 
to do after shutdown is complete. Maybe we can do it in `close`?
   
   ```scala
     def close(): Unit = {
       try {
         beginShutdown()
         thread.join()
       } finally {
         removeMetric("IdlePercent", Map("networkProcessor" -> id.toString))
         metrics.removeMetric(expiredConnectionsKilledCountMetricName)
       }
     }
   



##########
core/src/main/scala/kafka/network/SocketServer.scala:
##########
@@ -104,184 +103,141 @@ class SocketServer(val config: KafkaConfig,
 
   private[this] val nextProcessorId: AtomicInteger = new AtomicInteger(0)
   val connectionQuotas = new ConnectionQuotas(config, time, metrics)
-  private var startedProcessingRequests = false
-  private var stoppedProcessingRequests = false
 
-  // Processors are now created by each Acceptor. However to preserve 
compatibility, we need to number the processors
-  // globally, so we keep the nextProcessorId counter in SocketServer
-  def nextProcessorId(): Int = {
-    nextProcessorId.getAndIncrement()
-  }
+  /**
+   * A future which is completed once all the authorizer futures are complete.
+   */
+  private val allAuthorizerFuturesComplete = new CompletableFuture[Void]
 
   /**
-   * Starts the socket server and creates all the Acceptors and the 
Processors. The Acceptors
-   * start listening at this stage so that the bound port is known when this 
method completes
-   * even when ephemeral ports are used. Acceptors and Processors are started 
if `startProcessingRequests`
-   * is true. If not, acceptors and processors are only started when 
[[kafka.network.SocketServer#startProcessingRequests()]]
-   * is invoked. Delayed starting of acceptors and processors is used to delay 
processing client
-   * connections until server is fully initialized, e.g. to ensure that all 
credentials have been
-   * loaded before authentications are performed. Incoming connections on this 
server are processed
-   * when processors start up and invoke 
[[org.apache.kafka.common.network.Selector#poll]].
-   *
-   * @param startProcessingRequests Flag indicating whether `Processor`s must 
be started.
-   * @param controlPlaneListener    The control plane listener, or None if 
there is none.
-   * @param dataPlaneListeners      The data plane listeners.
+   * True if the SocketServer is stopped. Must be accessed under the 
SocketServer lock.
    */
-  def startup(startProcessingRequests: Boolean = true,
-              controlPlaneListener: Option[EndPoint] = 
config.controlPlaneListener,
-              dataPlaneListeners: Seq[EndPoint] = config.dataPlaneListeners): 
Unit = {
-    this.synchronized {
-      createControlPlaneAcceptorAndProcessor(controlPlaneListener)
-      createDataPlaneAcceptorsAndProcessors(dataPlaneListeners)
-      if (startProcessingRequests) {
-        this.startProcessingRequests()
-      }
-    }
+  private var stopped = false
 
+  // Socket server metrics
+  newGauge(s"${DataPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent", 
() => SocketServer.this.synchronized {

Review Comment:
   The synchronization in these gauges seems less than ideal. Is it worth 
filing a jira to come up with a better approach?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to