Yunyung commented on code in PR #20405: URL: https://github.com/apache/kafka/pull/20405#discussion_r2300269437
########## core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala: ########## @@ -1568,6 +1589,7 @@ class TestMetricsReporter extends MetricsReporter with Reconfigurable with Close @volatile var closeCount = 0 @volatile var clusterUpdateCount = 0 @volatile var pollingInterval: Int = -1 + @volatile var numFetchers: Int = 1 Review Comment: Maybe changing it to = 0 makes it clearer that it will change statically/dynamically. ########## core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala: ########## @@ -1599,31 +1622,37 @@ class TestMetricsReporter extends MetricsReporter with Reconfigurable with Close } override def reconfigurableConfigs(): util.Set[String] = { - util.Set.of(PollingIntervalProp) + util.Set.of(PollingIntervalProp, ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG) } override def validateReconfiguration(configs: util.Map[String, _]): Unit = { val pollingInterval = configs.get(PollingIntervalProp).toString.toInt if (pollingInterval <= 0) throw new ConfigException(s"Invalid polling interval $pollingInterval") + + val numFetchers = configs.get(ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG).toString.toInt + if (numFetchers <= 0) + throw new ConfigException(s"Invalid num.replica.fetchers $numFetchers") } override def reconfigure(configs: util.Map[String, _]): Unit = { reconfigureCount += 1 pollingInterval = configs.get(PollingIntervalProp).toString.toInt + numFetchers = configs.get(ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG).toString.toInt } override def close(): Unit = { closeCount += 1 } - def verifyState(reconfigureCount: Int, deleteCount: Int, pollingInterval: Int): Unit = { + def verifyState(reconfigureCount: Int, deleteCount: Int, pollingInterval: Int, numFetcher: Int = 1): Unit = { Review Comment: ```suggestion def verifyState(reconfigureCount: Int, deleteCount: Int, pollingInterval: Int, numFetcher: Int): Unit = { ``` ########## core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala: ########## @@ -947,6 +947,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup // Add a new metrics reporter val newProps = new Properties newProps.put(TestMetricsReporter.PollingIntervalProp, "100") + newProps.put(ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG, "1") Review Comment: Should we change all verifyState calls in this method to numFetcher = 1? It's easier to understand, IMO. Like reporters.foreach(_.verifyState(reconfigureCount = 0, deleteCount = 0, pollingInterval = 100, **numFetcher=1**)) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org