jsancio commented on code in PR #20481:
URL: https://github.com/apache/kafka/pull/20481#discussion_r2436605910
##########
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##########
@@ -89,7 +90,8 @@ class KafkaRequestHandler(
id: Int,
brokerId: Int,
val aggregateIdleMeter: Meter,
- val totalHandlerThreads: AtomicInteger,
+ val perPoolIdleMeter: Meter,
+ val poolHandlerThreads: AtomicInteger,
Review Comment:
Let's use similar naming conventions. E.g. `poolIdleMeter` and
`poolHandlerThreads`.
##########
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##########
@@ -112,7 +114,10 @@ class KafkaRequestHandler(
val req = requestChannel.receiveRequest(300)
val endTime = time.nanoseconds
val idleTime = endTime - startSelectTime
- aggregateIdleMeter.mark(idleTime / totalHandlerThreads.get)
+ // Per-pool idle ratio uses the pool's own thread count as denominator
+ perPoolIdleMeter.mark(idleTime / poolHandlerThreads.get)
+ // Aggregate idle ratio uses the total threads across all pools as
denominator
+ aggregateIdleMeter.mark(idleTime / aggregateThreads.get)
Review Comment:
It is an inconsistent design that the `aggregateIdleMeter` is defined in the
constructor while the `aggregateThreads` is defined in an object. Why is that?
Why not pass the aggregateThreads through the constructor?
##########
core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala:
##########
@@ -50,6 +50,7 @@ class KafkaRequestHandlerTest {
val topic2 = "topic2"
val brokerTopicMetrics: BrokerTopicMetrics =
brokerTopicStats.topicStats(topic)
val allTopicMetrics: BrokerTopicMetrics = brokerTopicStats.allTopicsStats
+ KafkaRequestHandlerPool.aggregateThreads.set(1)
Review Comment:
This code smell is a good hit that something is not right with the design
and implementation.
##########
core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala:
##########
@@ -698,4 +699,94 @@ class KafkaRequestHandlerTest {
// cleanup
brokerTopicStats.close()
}
+
+ @Test
+ def testRequestThreadMetrics(): Unit = {
+ val time = Time.SYSTEM
+ val metricsBroker = new RequestChannelMetrics(java.util.Set.of[ApiKeys])
+ val metricsController = new
RequestChannelMetrics(java.util.Set.of[ApiKeys])
+ val requestChannelBroker = new RequestChannel(10, time, metricsBroker)
+ val requestChannelController = new RequestChannel(10, time,
metricsController)
+ val apiHandler = mock(classOf[ApiRequestHandler])
+
+ // Reset global shared counter for test
+ KafkaRequestHandlerPool.aggregateThreads.set(0)
+
+ // Create broker pool with 4 threads
+ val brokerPool = new KafkaRequestHandlerPool(
+ 0,
+ requestChannelBroker,
+ apiHandler,
+ time,
+ 4,
+ "broker"
+ )
+
+ // Verify global counter is updated
+ assertEquals(4, KafkaRequestHandlerPool.aggregateThreads.get, "global
counter should be 4 after broker pool")
+
+ // Create controller pool with 4 threads
+ val controllerPool = new KafkaRequestHandlerPool(
+ 0,
+ requestChannelController,
+ apiHandler,
+ time,
+ 4,
+ "controller"
+ )
+
+ // Verify global counter is updated to sum of both pools
+ assertEquals(8, KafkaRequestHandlerPool.aggregateThreads.get, "global
counter should be 8 after both pools")
+
+ try {
+ val aggregateMeterField =
classOf[KafkaRequestHandlerPool].getDeclaredField("aggregateIdleMeter")
+ aggregateMeterField.setAccessible(true)
+ val aggregateMeter =
aggregateMeterField.get(brokerPool).asInstanceOf[Meter]
+
+ val perPoolIdleMeterField =
classOf[KafkaRequestHandlerPool].getDeclaredField("perPoolIdleMeter")
+ perPoolIdleMeterField.setAccessible(true)
+ val brokerPerPoolIdleMeter =
perPoolIdleMeterField.get(brokerPool).asInstanceOf[Meter]
+ val controllerPerPoolIdleMeter =
perPoolIdleMeterField.get(controllerPool).asInstanceOf[Meter]
+
+ var aggregateValue = 0.0
+ var brokerPerPoolValue = 0.0
+ var controllerPerPoolValue = 0.0
+
+ Thread.sleep(2000)
Review Comment:
Please do not called `Thread.sleep` in tests. This slows down tests,
software development and can make tests unreliable. Kafka has a mocked `Time`
that you can use finely control the time returned to objects that rely on the
`Time`.
This means that instead of using `val time = Time.SYSTEM`, you can use `val
time = new MockTime()`.
Please remove all calls to `Thread.sleep()` in this PR.
##########
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##########
@@ -192,19 +197,26 @@ class KafkaRequestHandler(
}
+object KafkaRequestHandlerPool {
+ val aggregateThreads = new AtomicInteger(0)
+ val requestHandlerAvgIdleMetricName = "RequestHandlerAvgIdlePercent"
Review Comment:
In Scala "constants" at `object` use upper camel case by convention. E.g.
`RequestHandlerAvgIdleMetricName`.
##########
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##########
@@ -214,7 +226,18 @@ class KafkaRequestHandlerPool(
}
def createHandler(id: Int): Unit = synchronized {
- runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter,
threadPoolSize, requestChannel, apis, time, nodeName)
+ runnables += new KafkaRequestHandler(
+ id,
+ brokerId,
+ aggregateIdleMeter,
+ perPoolIdleMeter,
+ threadPoolSize,
+ requestChannel,
+ apis,
+ time,
+ nodeName,
+ )
+ aggregateThreads.getAndIncrement()
Review Comment:
It is an inconsistent design that the aggregate threads are incremented
through `createHandler` but they are decremented through `resizeThreadPool`.
You should be able to remove this inconsistency by fixing the constructor and
calling `resizeThreadPool` in the constructor.
You can also enforce this by making `createHandler` private and removing the
`synchronized`. You can also add a private method (internalResizeThreadPool)
that doesn't `synchronized` which is call by the constructor and
`resizeThreadPool` which does `synchronized`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]