kevin-wu24 commented on code in PR #20356:
URL: https://github.com/apache/kafka/pull/20356#discussion_r2279358716


##########
core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala:
##########
@@ -194,6 +248,38 @@ class KafkaRequestHandlerTest {
     assertEquals(1, handledCount)
   }
 
+  @Test
+  def testResizeThreadPoolUpdatesThreadPoolSize(): Unit = {
+    val time = Time.SYSTEM
+    val metrics = new RequestChannelMetrics(java.util.Set.of[ApiKeys])
+    val requestChannel = new RequestChannel(10, time, metrics)
+    val apiHandler = mock(classOf[ApiRequestHandler])
+
+    // start with 3 threads
+    val pool = new KafkaRequestHandlerPool(
+      0,
+      requestChannel,
+      apiHandler,
+      time,
+      3,
+      "RequestHandlerAvgIdlePercent",
+    )
+    try {
+      assertEquals(3, pool.threadPoolSize.get)
+
+      // grow to 5
+      pool.resizeThreadPool(5)
+      assertEquals(5, pool.threadPoolSize.get)
+
+      // shrink to 2
+      pool.resizeThreadPool(2)
+      assertEquals(2, pool.threadPoolSize.get)

Review Comment:
   I'm not sure this test tests anything new, since the previous implementation 
would pass it as well.



##########
core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala:
##########
@@ -51,6 +51,60 @@ class KafkaRequestHandlerTest {
   val brokerTopicMetrics: BrokerTopicMetrics = 
brokerTopicStats.topicStats(topic)
   val allTopicMetrics: BrokerTopicMetrics = brokerTopicStats.allTopicsStats
 
+  @Test
+  def testCombinedModeIdlePercent(): Unit = {
+    val time = Time.SYSTEM
+    val metricsBroker = new RequestChannelMetrics(java.util.Set.of[ApiKeys])
+    val metricsController = new 
RequestChannelMetrics(java.util.Set.of[ApiKeys])
+    val requestChannelBroker = new RequestChannel(10, time, metricsBroker)
+    val requestChannelController = new RequestChannel(10, time, 
metricsController)
+    val apiHandler = mock(classOf[ApiRequestHandler])
+
+    // Create both pools with 0 threads so we can wire a shared Meter before 
handlers are created
+    val brokerPool = new KafkaRequestHandlerPool(
+      0,
+      requestChannelBroker,
+      apiHandler,
+      time,
+      0,
+      "RequestHandlerAvgIdlePercent",
+      isCombinedMode = true
+    )
+
+    val controllerPool = new KafkaRequestHandlerPool(
+      0,
+      requestChannelController,
+      apiHandler,
+      time,
+      0,
+      "RequestHandlerAvgIdlePercent",
+      isCombinedMode = true
+    )
+
+    try {
+      val field = 
classOf[KafkaRequestHandlerPool].getDeclaredField("aggregateIdleMeter")
+      field.setAccessible(true)
+      val sharedMeter = field.get(brokerPool).asInstanceOf[Meter]
+      field.set(controllerPool, sharedMeter)
+
+      brokerPool.resizeThreadPool(4)
+      controllerPool.resizeThreadPool(4)
+
+      val deadline = System.currentTimeMillis() + 8000
+      var value = 0.0
+      while (System.currentTimeMillis() < deadline && value == 0.0) {
+        Thread.sleep(200)
+        value = sharedMeter.oneMinuteRate()
+      }
+      assertTrue(value >= 0.0 && value <= 1.05, s"idle percent should be 
within [0,1], got $value")

Review Comment:
   Shouldn't this assert be [0,1], not [0,1.05]?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to