kamalcph commented on code in PR #17793:
URL: https://github.com/apache/kafka/pull/17793#discussion_r1845012900
##########
core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:
##########
@@ -771,7 +772,10 @@ object DynamicThreadPool {
ServerConfigs.NUM_IO_THREADS_CONFIG,
ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG,
ServerLogConfigs.NUM_RECOVERY_THREADS_PER_DATA_DIR_CONFIG,
- ServerConfigs.BACKGROUND_THREADS_CONFIG)
+ ServerConfigs.BACKGROUND_THREADS_CONFIG,
+ RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP,
+ RemoteLogManagerConfig.REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP,
+ RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP)
Review Comment:
We also have to update the BrokerDynamicThreadPool.reconfigurableConfigs()
method to return only the below configs:
1. ServerConfigs.NUM_IO_THREADS_CONFIG
2. ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG
3. ServerLogConfigs.NUM_RECOVERY_THREADS_PER_DATA_DIR_CONFIG and
4. ServerConfigs.BACKGROUND_THREADS_CONFIG
Otherwise for the newly added RemoteLogManager configs, it gets double
registered as brokerReconfigurable in both BrokerDynamicThreadPool and
RemoteLogDynamicThreadPool.
##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -2163,6 +2178,10 @@ public RLMScheduledThreadPool(int poolSize, String
threadPoolName, String thread
scheduledThreadPool = createPool();
}
+ public void resize(int newSize) {
Review Comment:
nit: Can we rename this method to `setCorePoolSize`? Also, add one getter
method `getPoolSize`
##########
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java:
##########
@@ -61,6 +61,10 @@ protected void afterExecute(Runnable runnable, Throwable th)
{
}
}
+ public void resize(int newSize) {
Review Comment:
this method is not required, we can directly call `setCorePoolSize`.
##########
core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:
##########
@@ -840,6 +847,43 @@ class BrokerDynamicThreadPool(server: KafkaBroker) extends
BrokerReconfigurable
}
}
+
+class RemoteLogDynamicThreadPool(server: KafkaBroker) extends
BrokerReconfigurable with Logging {
+ override def reconfigurableConfigs: Set[String] = Set(
+ RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP,
+ RemoteLogManagerConfig.REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP,
+ RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP)
+
+ override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
+ DynamicThreadPool.validateReconfiguration(server.config, newConfig)
+ }
+
+ override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig):
Unit = {
+ val remoteLogManager = server.remoteLogManagerOpt
Review Comment:
Also, please add one if check as remoteLogManger is optional:
```suggestion
if (server.remoteLogManagerOpt.nonEmpty) {
val remoteLogManager = server.remoteLogManagerOpt.get
...
...
```
##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -289,6 +289,21 @@ public void updateFetchQuota(long quota) {
rlmFetchQuotaManager.updateQuota(new Quota(quota, true));
}
+ public void resizeCopierThreadPool(int size) {
+ LOGGER.info("Updating remote copy thread pool size to {}", size);
+ rlmCopyThreadPool.resize(size);
+ }
Review Comment:
```suggestion
public void resizeCopierThreadPool(int newSize) {
int currentSize = rlmCopyThreadPool.getPoolSize();
LOGGER.info("Updating remote copy thread pool size from {} to {}",
currentSize, newSize);
rlmCopyThreadPool.resize(newSize);
}
```
##########
core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:
##########
@@ -797,6 +801,9 @@ object DynamicThreadPool {
case ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG =>
config.numReplicaFetchers
case ServerLogConfigs.NUM_RECOVERY_THREADS_PER_DATA_DIR_CONFIG =>
config.numRecoveryThreadsPerDataDir
case ServerConfigs.BACKGROUND_THREADS_CONFIG => config.backgroundThreads
+ case
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP =>
config.remoteLogCopierThreads
Review Comment:
Get the copier thread size using the below way, we don't want to expose the
remoteLogManager configs in KafkaConfig class:
```suggestion
case
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP =>
config.remoteLogManagerConfig.remoteLogManagerCopierThreadPoolSize()
```
same for other configs
##########
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala:
##########
@@ -1114,6 +1114,9 @@ class KafkaConfigTest {
case RemoteLogManagerConfig.REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP
=> assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP =>
assertPropertyInvalid(baseProperties, name, "not_a_number", -3)
case RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP =>
assertPropertyInvalid(baseProperties, name, "not_a_number", -3)
+ case
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP =>
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
+ case
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP =>
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
+ case
RemoteLogManagerConfig.REMOTE_LIST_OFFSETS_REQUEST_TIMEOUT_MS_PROP =>
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
Review Comment:
this is not required.
##########
core/src/main/scala/kafka/server/KafkaConfig.scala:
##########
@@ -380,6 +380,9 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
def backgroundThreads = getInt(ServerConfigs.BACKGROUND_THREADS_CONFIG)
def numIoThreads = getInt(ServerConfigs.NUM_IO_THREADS_CONFIG)
+ def remoteLogCopierThreads =
getInt(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP)
+ def remoteLogExpirationThreads =
getInt(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP)
+ def remoteLogReaderThreads =
getInt(RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP)
Review Comment:
These getters are not required, please see above comments.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]