Yunyung commented on code in PR #20405:
URL: https://github.com/apache/kafka/pull/20405#discussion_r2300269437


##########
core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala:
##########
@@ -1568,6 +1589,7 @@ class TestMetricsReporter extends MetricsReporter with 
Reconfigurable with Close
   @volatile var closeCount = 0
   @volatile var clusterUpdateCount = 0
   @volatile var pollingInterval: Int = -1
+  @volatile var numFetchers: Int = 1

Review Comment:
   Maybe changing it to = 0 makes it clearer that it will change 
statically/dynamically.



##########
core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala:
##########
@@ -1599,31 +1622,37 @@ class TestMetricsReporter extends MetricsReporter with 
Reconfigurable with Close
   }
 
   override def reconfigurableConfigs(): util.Set[String] = {
-    util.Set.of(PollingIntervalProp)
+    util.Set.of(PollingIntervalProp, 
ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG)
   }
 
   override def validateReconfiguration(configs: util.Map[String, _]): Unit = {
     val pollingInterval = configs.get(PollingIntervalProp).toString.toInt
     if (pollingInterval <= 0)
       throw new ConfigException(s"Invalid polling interval $pollingInterval")
+
+    val numFetchers = 
configs.get(ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG).toString.toInt
+    if (numFetchers <= 0)
+      throw new ConfigException(s"Invalid num.replica.fetchers $numFetchers")
   }
 
   override def reconfigure(configs: util.Map[String, _]): Unit = {
     reconfigureCount += 1
     pollingInterval = configs.get(PollingIntervalProp).toString.toInt
+    numFetchers = 
configs.get(ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG).toString.toInt
   }
 
   override def close(): Unit = {
     closeCount += 1
   }
 
-  def verifyState(reconfigureCount: Int, deleteCount: Int, pollingInterval: 
Int): Unit = {
+  def verifyState(reconfigureCount: Int, deleteCount: Int, pollingInterval: 
Int, numFetcher: Int = 1): Unit = {

Review Comment:
   ```suggestion
     def verifyState(reconfigureCount: Int, deleteCount: Int, pollingInterval: 
Int, numFetcher: Int): Unit = {
   ```



##########
core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala:
##########
@@ -947,6 +947,7 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
     // Add a new metrics reporter
     val newProps = new Properties
     newProps.put(TestMetricsReporter.PollingIntervalProp, "100")
+    newProps.put(ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG, "1")

Review Comment:
   Should we change all verifyState calls in this method to numFetcher = 1? 
It's easier to understand, IMO.
   Like reporters.foreach(_.verifyState(reconfigureCount = 0, deleteCount = 0, 
pollingInterval = 100, **numFetcher=1**))
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to