kamalcph commented on code in PR #16502:
URL: https://github.com/apache/kafka/pull/16502#discussion_r1673686505


##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -241,12 +245,17 @@ public RemoteLogManager(RemoteLogManagerConfig rlmConfig,
 
         indexCache = new 
RemoteIndexCache(rlmConfig.remoteLogIndexFileCacheTotalSizeBytes(), 
remoteLogStorageManager, logDir);
         delayInMs = rlmConfig.remoteLogManagerTaskIntervalMs();
-        rlmScheduledThreadPool = new 
RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize());
+        rlmCopyThreadPool = new 
RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize(),
+            "RLMCopyThreadPool", "kafka-rlm-copy-thread-pool-");
+        rlmExpirationThreadPool = new 
RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize(),

Review Comment:
   We can start using the respective threadpool-size:
   
   ```
   rlmConfig.remoteLogManagerExpirationThreadPoolSize()
   ```



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -241,12 +245,17 @@ public RemoteLogManager(RemoteLogManagerConfig rlmConfig,
 
         indexCache = new 
RemoteIndexCache(rlmConfig.remoteLogIndexFileCacheTotalSizeBytes(), 
remoteLogStorageManager, logDir);
         delayInMs = rlmConfig.remoteLogManagerTaskIntervalMs();
-        rlmScheduledThreadPool = new 
RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize());
+        rlmCopyThreadPool = new 
RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize(),
+            "RLMCopyThreadPool", "kafka-rlm-copy-thread-pool-");
+        rlmExpirationThreadPool = new 
RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize(),
+            "RLMExpirationThreadPool", "kafka-rlm-expiration-thread-pool-");
+        followerThreadPool = new 
RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize(),
+            "RLMFollowerScheduledThreadPool", 
"kafka-rlm-follower-thread-pool-");

Review Comment:
   The amount of threads configured by the user and instantiated will be 2X, 
Why do we need a separate follower thread pool? 
   
   We can have three different tasks for clarity: RLMCopyTask, 
RLMExpirationTask and RLMFollowerTask. RLMCopyTask and RLMFollowerTask can 
reuse the same thread-pool. WDYT?



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -686,60 +702,98 @@ List<EpochEntry> getLeaderEpochEntries(UnifiedLog log, 
long startOffset, long en
     }
 
     // VisibleForTesting
-    RLMTask rlmTask(TopicIdPartition topicIdPartition) {
-        RLMTaskWithFuture task = leaderOrFollowerTasks.get(topicIdPartition);
+    RLMTask rlmCopyTask(TopicIdPartition topicIdPartition) {
+        RLMTaskWithFuture task = leaderCopyRLMTasks.get(topicIdPartition);
         if (task != null) {
             return task.rlmTask;
         }
         return null;
     }
 
-    class RLMTask extends CancellableRunnable {
+    abstract class RLMTask extends CancellableRunnable {
 
-        private final TopicIdPartition topicIdPartition;
-        private final int customMetadataSizeLimit;
+        protected final TopicIdPartition topicIdPartition;
         private final Logger logger;
 
-        private volatile int leaderEpoch = -1;
-
-        public RLMTask(TopicIdPartition topicIdPartition, int 
customMetadataSizeLimit) {
+        public RLMTask(TopicIdPartition topicIdPartition) {
             this.topicIdPartition = topicIdPartition;
-            this.customMetadataSizeLimit = customMetadataSizeLimit;
-            LogContext logContext = new LogContext("[RemoteLogManager=" + 
brokerId + " partition=" + topicIdPartition + "] ");
-            logger = logContext.logger(RLMTask.class);
+            this.logger = 
getLogContext(topicIdPartition).logger(RLMTask.class);
         }
 
-        boolean isLeader() {
-            return leaderEpoch >= 0;
+        protected LogContext getLogContext(TopicIdPartition topicIdPartition) {

Review Comment:
   We can avoid taking the `topicIdPartition` parameter since it is also the 
class variable



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -1761,19 +1796,48 @@ public Future<Void> asyncRead(RemoteStorageFetchInfo 
fetchInfo, Consumer<RemoteL
                 new RemoteLogReader(fetchInfo, this, callback, 
brokerTopicStats, rlmFetchQuotaManager, remoteReadTimer));
     }
 
-    void doHandleLeaderOrFollowerPartitions(TopicIdPartition topicPartition,
-                                            Consumer<RLMTask> 
convertToLeaderOrFollower) {
-        RLMTaskWithFuture rlmTaskWithFuture = 
leaderOrFollowerTasks.computeIfAbsent(topicPartition,
-                topicIdPartition -> {
-                    RLMTask task = new RLMTask(topicIdPartition, 
rlmConfig.remoteLogMetadataCustomMetadataMaxBytes());
-                    // set this upfront when it is getting initialized instead 
of doing it after scheduling.
-                    convertToLeaderOrFollower.accept(task);
-                    LOGGER.info("Created a new task: {} and getting 
scheduled", task);
-                    ScheduledFuture<?> future = 
rlmScheduledThreadPool.scheduleWithFixedDelay(task, 0, delayInMs, 
TimeUnit.MILLISECONDS);
-                    return new RLMTaskWithFuture(task, future);
-                }
-        );
-        convertToLeaderOrFollower.accept(rlmTaskWithFuture.rlmTask);
+    void doHandleLeaderPartition(TopicIdPartition topicPartition, int 
leaderEpoch) {
+        RLMTaskWithFuture followerRLMTaskWithFuture = 
followerRLMTasks.remove(topicPartition);
+        if (followerRLMTaskWithFuture != null) {
+            LOGGER.info("Cancelling the follower task: {}", 
followerRLMTaskWithFuture.rlmTask);
+            followerRLMTaskWithFuture.cancel();
+        }
+
+        leaderCopyRLMTasks.computeIfAbsent(topicPartition, topicIdPartition -> 
{
+            RLMCopyTask task = new RLMCopyTask(topicIdPartition, 
this.rlmConfig.remoteLogMetadataCustomMetadataMaxBytes(), leaderEpoch);

Review Comment:
   Will there be a leaderEpoch bump and the same node can still be the leader? 
If yes, then the leaderEpoch maintained inside the RLMCopyTask might become 
stale. Currently, leaderEpoch is unused inside the RLMCopyTask, we can refactor 
the code to remove it later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to