chia7712 commented on code in PR #17859:
URL: https://github.com/apache/kafka/pull/17859#discussion_r3291940768


##########
core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala:
##########
@@ -202,6 +202,81 @@ class DynamicBrokerConfigTest {
     )
   }
 
+  @Test
+  def testUpdateRemoteLogManagerDynamicThreadPool(): Unit = {
+    val origProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, 
port = 8181)
+    val config = KafkaConfig(origProps)
+    
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE,
 config.remoteLogManagerConfig.remoteLogManagerCopierThreadPoolSize())
+    
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE,
 config.remoteLogManagerConfig.remoteLogManagerExpirationThreadPoolSize())
+    assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_READER_THREADS, 
config.remoteLogManagerConfig.remoteLogReaderThreads())
+
+    val serverMock = mock(classOf[KafkaBroker])
+    val remoteLogManager = mock(classOf[RemoteLogManager])
+    when(serverMock.config).thenReturn(config)
+    when(serverMock.remoteLogManagerOpt).thenReturn(Some(remoteLogManager))
+
+    config.dynamicConfig.initialize(None, None)
+    config.dynamicConfig.addBrokerReconfigurable(new 
DynamicRemoteLogConfig(serverMock))
+
+    // Test dynamic update with valid values
+    val props = new Properties()
+    
props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP,
 "8")
+    config.dynamicConfig.validate(props, perBrokerConfig = true)
+    config.dynamicConfig.updateDefaultConfig(props)
+    assertEquals(8, 
config.remoteLogManagerConfig.remoteLogManagerCopierThreadPoolSize())
+    verify(remoteLogManager).resizeCopierThreadPool(8)
+
+    
props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP,
 "7")
+    config.dynamicConfig.validate(props, perBrokerConfig = false)
+    config.dynamicConfig.updateDefaultConfig(props)
+    assertEquals(7, 
config.remoteLogManagerConfig.remoteLogManagerExpirationThreadPoolSize())
+    verify(remoteLogManager).resizeExpirationThreadPool(7)
+
+    props.put(RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP, "6")
+    config.dynamicConfig.validate(props, perBrokerConfig = true)
+    config.dynamicConfig.updateDefaultConfig(props)
+    assertEquals(6, config.remoteLogManagerConfig.remoteLogReaderThreads())
+    verify(remoteLogManager).resizeReaderThreadPool(6)
+    props.clear()
+    verifyNoMoreInteractions(remoteLogManager)
+  }
+
+  @Test
+  def testRemoteLogDynamicThreadPoolWithInvalidValues(): Unit = {
+    val origProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, 
port = 8181)
+    val config = KafkaConfig(origProps)
+
+    val serverMock = mock(classOf[KafkaBroker])
+    val remoteLogManager = mock(classOf[RemoteLogManager])
+    when(serverMock.config).thenReturn(config)
+    when(serverMock.remoteLogManagerOpt).thenReturn(Some(remoteLogManager))
+
+    config.dynamicConfig.initialize(None, None)
+    config.dynamicConfig.addBrokerReconfigurable(new 
DynamicRemoteLogConfig(serverMock))
+
+    // Test dynamic update with invalid values
+    val props = new Properties()
+    
props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP,
 "0")
+    val err = assertThrows(classOf[ConfigException], () => 
config.dynamicConfig.validate(props, perBrokerConfig = true))
+    assertTrue(err.getMessage.contains("Value must be at least 1"))
+
+    val props1 = new Properties()
+    
props1.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP,
 "-1")
+    val err1 = assertThrows(classOf[ConfigException], () => 
config.dynamicConfig.validate(props1, perBrokerConfig = false))
+    assertTrue(err1.getMessage.contains("Value must be at least 1"))
+
+    val props2 = new Properties()
+    props2.put(RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP, "2")
+    val err2 = assertThrows(classOf[ConfigException], () => 
config.dynamicConfig.validate(props2, perBrokerConfig = false))
+    assertTrue(err2.getMessage.contains("value should be at least half the 
current value"))
+
+    val props3 = new Properties()
+    props3.put(RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP, "-1")
+    val err3 = assertThrows(classOf[ConfigException], () => 
config.dynamicConfig.validate(props, perBrokerConfig = true))

Review Comment:
   It should be `props3` rather than `props`. I noticed this when reviewing 
#22349 :)
   
   we will file a small patch for it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to