showuon commented on code in PR #16203:
URL: https://github.com/apache/kafka/pull/16203#discussion_r1628868196
##########
core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala:
##########
@@ -792,6 +792,36 @@ class DynamicBrokerConfigTest {
verifyIncorrectLogLocalRetentionProps(2000L, 1000L, -1, 100)
}
+ @Test
+ def testDynamicRemoteFetchMaxWaitMsConfig(): Unit = {
+ val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port
= 8181)
+ val config = KafkaConfig(props)
+ val kafkaBroker = mock(classOf[KafkaBroker])
+ when(kafkaBroker.config).thenReturn(config)
+
+ val dynamicRemoteLogConfig = new DynamicRemoteLogConfig(kafkaBroker)
+ config.dynamicConfig.initialize(None, None)
+ config.dynamicConfig.addBrokerReconfigurable(dynamicRemoteLogConfig)
+
+ val newProps = new Properties()
+ newProps.put(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP, "30000")
+ // update default config
+ config.dynamicConfig.validate(newProps, perBrokerConfig = false)
+ config.dynamicConfig.updateDefaultConfig(newProps)
+ assertEquals(30000, config.remoteFetchMaxWaitMs)
+
+ // update per broker config
+ newProps.put(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP, "10000")
+ config.dynamicConfig.validate(newProps, perBrokerConfig = true)
Review Comment:
nit: assertDoesNotThrow
##########
core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala:
##########
@@ -792,6 +792,36 @@ class DynamicBrokerConfigTest {
verifyIncorrectLogLocalRetentionProps(2000L, 1000L, -1, 100)
}
+ @Test
+ def testDynamicRemoteFetchMaxWaitMsConfig(): Unit = {
+ val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port
= 8181)
+ val config = KafkaConfig(props)
+ val kafkaBroker = mock(classOf[KafkaBroker])
+ when(kafkaBroker.config).thenReturn(config)
+
+ val dynamicRemoteLogConfig = new DynamicRemoteLogConfig(kafkaBroker)
+ config.dynamicConfig.initialize(None, None)
+ config.dynamicConfig.addBrokerReconfigurable(dynamicRemoteLogConfig)
+
+ val newProps = new Properties()
+ newProps.put(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP, "30000")
Review Comment:
We might need to assert the default value of `REMOTE_FETCH_MAX_WAIT_MS_PROP`
before any change, to make sure the dynamically change takes effect.
##########
core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala:
##########
@@ -792,6 +792,36 @@ class DynamicBrokerConfigTest {
verifyIncorrectLogLocalRetentionProps(2000L, 1000L, -1, 100)
}
+ @Test
+ def testDynamicRemoteFetchMaxWaitMsConfig(): Unit = {
+ val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port
= 8181)
+ val config = KafkaConfig(props)
+ val kafkaBroker = mock(classOf[KafkaBroker])
+ when(kafkaBroker.config).thenReturn(config)
+
+ val dynamicRemoteLogConfig = new DynamicRemoteLogConfig(kafkaBroker)
+ config.dynamicConfig.initialize(None, None)
+ config.dynamicConfig.addBrokerReconfigurable(dynamicRemoteLogConfig)
+
+ val newProps = new Properties()
+ newProps.put(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP, "30000")
+ // update default config
+ config.dynamicConfig.validate(newProps, perBrokerConfig = false)
+ config.dynamicConfig.updateDefaultConfig(newProps)
+ assertEquals(30000, config.remoteFetchMaxWaitMs)
+
+ // update per broker config
+ newProps.put(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP, "10000")
+ config.dynamicConfig.validate(newProps, perBrokerConfig = true)
+ config.dynamicConfig.updateBrokerConfig(0, newProps)
+ assertEquals(10000, config.remoteFetchMaxWaitMs)
+
+ // invalid value
+ newProps.put(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP, "-1")
Review Comment:
Could we also test `0` for invalid case? Thanks.
##########
core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala:
##########
@@ -792,6 +792,36 @@ class DynamicBrokerConfigTest {
verifyIncorrectLogLocalRetentionProps(2000L, 1000L, -1, 100)
}
+ @Test
+ def testDynamicRemoteFetchMaxWaitMsConfig(): Unit = {
+ val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port
= 8181)
+ val config = KafkaConfig(props)
+ val kafkaBroker = mock(classOf[KafkaBroker])
+ when(kafkaBroker.config).thenReturn(config)
+
+ val dynamicRemoteLogConfig = new DynamicRemoteLogConfig(kafkaBroker)
+ config.dynamicConfig.initialize(None, None)
+ config.dynamicConfig.addBrokerReconfigurable(dynamicRemoteLogConfig)
+
+ val newProps = new Properties()
+ newProps.put(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP, "30000")
+ // update default config
+ config.dynamicConfig.validate(newProps, perBrokerConfig = false)
Review Comment:
nit: assertDoesNotThrow
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]