kamalcph commented on code in PR #17793:
URL: https://github.com/apache/kafka/pull/17793#discussion_r1843560401


##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -800,6 +815,11 @@ public void run() {
                     return;
                 }
 
+                if (!remoteLogMetadataManager.isReady(topicIdPartition)) {

Review Comment:
   Why do we need this check? Could you rebase your branch?



##########
core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:
##########
@@ -1159,7 +1159,10 @@ class DynamicRemoteLogConfig(server: KafkaBroker) 
extends BrokerReconfigurable w
     newConfig.values.forEach { (k, v) =>
       if 
(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP.equals(k)
 ||
         
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP.equals(k)
 ||
-        
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP.equals(k))
 {
+        
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP.equals(k)
 ||
+        
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP.equals(k)
 ||

Review Comment:
   could we move these 3 configs in a separate `if` check? This will help us in 
validating the change in thread count to be in the boundary (t/2 <= newSize <= 
t*2) similar to the validation done for other thread configs. Please refer to 
DynamicThreadPool.validateReconfiguration



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -289,6 +289,21 @@ public void updateFetchQuota(long quota) {
         rlmFetchQuotaManager.updateQuota(new Quota(quota, true));
     }
 
+    public void updateCopyThreadPoolSize(int size) {
+        LOGGER.info("Updating remote copy thread pool size to {}", size);
+        rlmCopyThreadPool.resize(size);
+    }
+
+    public void updateExpirationThreadPoolSize(int size) {
+        LOGGER.info("Updating remote expiration thread pool size to {}", size);

Review Comment:
   shall we print the before and after size in the log? similar in other places.
   
   ```
   Updating remote expiration thread pool size from {} to {}
   ```



##########
core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:
##########
@@ -1199,14 +1202,32 @@ class DynamicRemoteLogConfig(server: KafkaBroker) 
extends BrokerReconfigurable w
         info(s"Dynamic remote log manager config: 
${RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP} 
updated, " +
           s"old value: $oldValue, new value: $newValue")
       }
+      if 
(isChangedLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP))
 {
+        val oldValue = 
oldLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP)

Review Comment:
   this is not a long value, it should be int. We can check directly:
   
   ```
   val newRLMConfig = newConfig.remoteLogManagerConfig
   val oldRLMConfig = oldConfig.remoteLogManagerConfig
   if (newRLMConfig.remoteLogManagerCopierThreadPoolSize() != 
oldRLMConfig.remoteLogManagerCopierThreadPoolSize()) 
          ...
   ``



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to