jsancio commented on code in PR #20481:
URL: https://github.com/apache/kafka/pull/20481#discussion_r2452214495


##########
core/src/main/scala/kafka/server/ControllerServer.scala:
##########
@@ -282,12 +282,12 @@ class ControllerServer(
         registrationsPublisher,
         apiVersionManager,
         metadataCache)
-      controllerApisHandlerPool = new KafkaRequestHandlerPool(config.nodeId,
+      controllerApisHandlerPool = 
sharedServer.requestHandlerPoolFactory.createPool(
+        config.nodeId,
         socketServer.dataPlaneRequestChannel,
         controllerApis,
         time,
         config.numIoThreads,
-        "RequestHandlerAvgIdlePercent",
         "controller")

Review Comment:
   Fix this indentation too. E.g.
   ```java
     "controller"
   )
   ```



##########
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##########
@@ -192,19 +197,47 @@ class KafkaRequestHandler(
 
 }
 
+/**
+ * Factory for creating KafkaRequestHandlerPool instances with shared 
aggregate metrics.
+ * All pools created by the same factory share the same aggregateThreads 
counter.
+ */
+class KafkaRequestHandlerPoolFactory {
+  val aggregateThreads = new AtomicInteger(0)
+  val RequestHandlerAvgIdleMetricName = "RequestHandlerAvgIdlePercent"
+  
+  def createPool(
+    brokerId: Int,
+    requestChannel: RequestChannel,
+    apis: ApiRequestHandler,
+    time: Time,
+    numThreads: Int,
+    nodeName: String = "broker"
+  ): KafkaRequestHandlerPool = {
+    new KafkaRequestHandlerPool(aggregateThreads, 
RequestHandlerAvgIdleMetricName, brokerId, requestChannel, apis, time, 
numThreads, nodeName)
+  }
+}
+
 class KafkaRequestHandlerPool(
+  val aggregateThreads: AtomicInteger,
+  val requestHandlerAvgIdleMetricName: String,
   val brokerId: Int,
   val requestChannel: RequestChannel,
   val apis: ApiRequestHandler,
   time: Time,
   numThreads: Int,
-  requestHandlerAvgIdleMetricName: String,
   nodeName: String = "broker"
 ) extends Logging {
   private val metricsGroup = new KafkaMetricsGroup(this.getClass)
 
   val threadPoolSize: AtomicInteger = new AtomicInteger(numThreads)
-  /* a meter to track the average free capacity of the request handlers */
+  private val perPoolIdleMeterName = if (nodeName == "broker") {
+    "BrokerRequestHandlerAvgIdlePercent"
+  } else {
+    "ControllerRequestHandlerAvgIdlePercent"
+  }

Review Comment:
   How about `else if (nodeName == "controller")` and throwing an 
`IllegalArgumentException` for the else case?



##########
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##########
@@ -213,11 +246,28 @@ class KafkaRequestHandlerPool(
     createHandler(i)
   }
 
-  def createHandler(id: Int): Unit = synchronized {
-    runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, 
threadPoolSize, requestChannel, apis, time, nodeName)
+  private def createHandler(id: Int): Unit = synchronized {
+    runnables += new KafkaRequestHandler(
+      id,
+      brokerId,
+      aggregateIdleMeter,
+      aggregateThreads,
+      perPoolIdleMeter,
+      threadPoolSize,
+      requestChannel,
+      apis,
+      time,
+      nodeName,
+    )
+    aggregateThreads.getAndIncrement()
     KafkaThread.daemon("data-plane-kafka-request-handler-" + id, 
runnables(id)).start()
   }
 
+  private def deleteHandler(id: Int): Unit = synchronized {

Review Comment:
   `synchronized` is not needed since the method is private and it is always 
called from a `synchronized` method.



##########
core/src/main/scala/kafka/server/BrokerServer.scala:
##########
@@ -465,9 +465,12 @@ class BrokerServer(
         clientMetricsManager = clientMetricsManager,
         groupConfigManager = groupConfigManager)
 
-      dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.nodeId,
-        socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
-        config.numIoThreads, "RequestHandlerAvgIdlePercent")
+      dataPlaneRequestHandlerPool = 
sharedServer.requestHandlerPoolFactory.createPool(
+        config.nodeId,
+        socketServer.dataPlaneRequestChannel,
+        dataPlaneRequestProcessor,
+        time,
+        config.numIoThreads)

Review Comment:
   Thanks for fixing the indentation.
   
   Let's make the "broker" string explicit.
   
   You are missing a newline before the `)`.
   
    E.g.
   ```java
         dataPlaneRequestHandlerPool = 
sharedServer.requestHandlerPoolFactory.createPool(
           config.nodeId,
           socketServer.dataPlaneRequestChannel,
           dataPlaneRequestProcessor,
           time,
           config.numIoThreads,
           "broker"
         )
   ```



##########
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##########
@@ -192,19 +197,47 @@ class KafkaRequestHandler(
 
 }
 
+/**
+ * Factory for creating KafkaRequestHandlerPool instances with shared 
aggregate metrics.
+ * All pools created by the same factory share the same aggregateThreads 
counter.
+ */
+class KafkaRequestHandlerPoolFactory {
+  val aggregateThreads = new AtomicInteger(0)
+  val RequestHandlerAvgIdleMetricName = "RequestHandlerAvgIdlePercent"
+  
+  def createPool(
+    brokerId: Int,
+    requestChannel: RequestChannel,
+    apis: ApiRequestHandler,
+    time: Time,
+    numThreads: Int,
+    nodeName: String = "broker"

Review Comment:
   Let's not use default values. E.g.
   
   ```java
       nodeName: String
   ```



##########
core/src/main/scala/kafka/server/SharedServer.scala:
##########
@@ -112,6 +112,9 @@ class SharedServer(
   private var usedByController: Boolean = false
   val brokerConfig = new KafkaConfig(sharedServerConfig.props, false)
   val controllerConfig = new KafkaConfig(sharedServerConfig.props, false)
+  
+  // Factory for creating request handler pools with shared aggregate thread 
counter
+  val requestHandlerPoolFactory = new KafkaRequestHandlerPoolFactory()

Review Comment:
   Add a newline before the volatile fields.



##########
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##########
@@ -213,11 +246,28 @@ class KafkaRequestHandlerPool(
     createHandler(i)
   }
 
-  def createHandler(id: Int): Unit = synchronized {
-    runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, 
threadPoolSize, requestChannel, apis, time, nodeName)
+  private def createHandler(id: Int): Unit = synchronized {

Review Comment:
   `synchronized` is not needed since the method is private and it is always 
called from either the constructor or a `synchronized` method.



##########
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##########
@@ -192,19 +197,47 @@ class KafkaRequestHandler(
 
 }
 
+/**
+ * Factory for creating KafkaRequestHandlerPool instances with shared 
aggregate metrics.
+ * All pools created by the same factory share the same aggregateThreads 
counter.
+ */
+class KafkaRequestHandlerPoolFactory {
+  val aggregateThreads = new AtomicInteger(0)
+  val RequestHandlerAvgIdleMetricName = "RequestHandlerAvgIdlePercent"
+  
+  def createPool(
+    brokerId: Int,
+    requestChannel: RequestChannel,
+    apis: ApiRequestHandler,
+    time: Time,
+    numThreads: Int,
+    nodeName: String = "broker"
+  ): KafkaRequestHandlerPool = {
+    new KafkaRequestHandlerPool(aggregateThreads, 
RequestHandlerAvgIdleMetricName, brokerId, requestChannel, apis, time, 
numThreads, nodeName)
+  }
+}
+
 class KafkaRequestHandlerPool(
+  val aggregateThreads: AtomicInteger,
+  val requestHandlerAvgIdleMetricName: String,
   val brokerId: Int,
   val requestChannel: RequestChannel,
   val apis: ApiRequestHandler,
   time: Time,
   numThreads: Int,
-  requestHandlerAvgIdleMetricName: String,
   nodeName: String = "broker"

Review Comment:
   Do you mind removing the default value? E.g.
   
   ```java
     nodeName: String
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to