abhijeetk88 commented on code in PR #16502:
URL: https://github.com/apache/kafka/pull/16502#discussion_r1677490420


##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -1761,19 +1796,48 @@ public Future<Void> asyncRead(RemoteStorageFetchInfo 
fetchInfo, Consumer<RemoteL
                 new RemoteLogReader(fetchInfo, this, callback, 
brokerTopicStats, rlmFetchQuotaManager, remoteReadTimer));
     }
 
-    void doHandleLeaderOrFollowerPartitions(TopicIdPartition topicPartition,
-                                            Consumer<RLMTask> 
convertToLeaderOrFollower) {
-        RLMTaskWithFuture rlmTaskWithFuture = 
leaderOrFollowerTasks.computeIfAbsent(topicPartition,
-                topicIdPartition -> {
-                    RLMTask task = new RLMTask(topicIdPartition, 
rlmConfig.remoteLogMetadataCustomMetadataMaxBytes());
-                    // set this upfront when it is getting initialized instead 
of doing it after scheduling.
-                    convertToLeaderOrFollower.accept(task);
-                    LOGGER.info("Created a new task: {} and getting 
scheduled", task);
-                    ScheduledFuture<?> future = 
rlmScheduledThreadPool.scheduleWithFixedDelay(task, 0, delayInMs, 
TimeUnit.MILLISECONDS);
-                    return new RLMTaskWithFuture(task, future);
-                }
-        );
-        convertToLeaderOrFollower.accept(rlmTaskWithFuture.rlmTask);
+    void doHandleLeaderPartition(TopicIdPartition topicPartition, int 
leaderEpoch) {
+        RLMTaskWithFuture followerRLMTaskWithFuture = 
followerRLMTasks.remove(topicPartition);
+        if (followerRLMTaskWithFuture != null) {
+            LOGGER.info("Cancelling the follower task: {}", 
followerRLMTaskWithFuture.rlmTask);
+            followerRLMTaskWithFuture.cancel();
+        }
+
+        leaderCopyRLMTasks.computeIfAbsent(topicPartition, topicIdPartition -> 
{
+            RLMCopyTask task = new RLMCopyTask(topicIdPartition, 
this.rlmConfig.remoteLogMetadataCustomMetadataMaxBytes(), leaderEpoch);

Review Comment:
   The leaderEpoch is only used for logging right now. If it is useful for 
debugging, we can keep it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to