kamalcph commented on code in PR #16653:
URL: https://github.com/apache/kafka/pull/16653#discussion_r1699561489


##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -437,30 +437,48 @@ public void onLeadershipChange(Set<Partition> 
partitionsBecomeLeader,
             throw new KafkaException("RemoteLogManager is not configured when 
remote storage system is enabled");
         }
 
-        Set<TopicIdPartition> leaderPartitions = 
filterPartitions(partitionsBecomeLeader)
-                .map(p -> new TopicIdPartition(topicIds.get(p.topic()), 
p.topicPartition())).collect(Collectors.toSet());
+        Map<TopicIdPartition, Boolean> leaderPartitions = 
filterPartitions(partitionsBecomeLeader)
+                .collect(Collectors.toMap(p -> new 
TopicIdPartition(topicIds.get(p.topic()), p.topicPartition()),
+                        p -> p.log().exists(log -> 
log.config().remoteCopyDisabled())));
 
-        Set<TopicIdPartition> followerPartitions = 
filterPartitions(partitionsBecomeFollower)
-                .map(p -> new TopicIdPartition(topicIds.get(p.topic()), 
p.topicPartition())).collect(Collectors.toSet());
+        Map<TopicIdPartition, Boolean> followerPartitions = 
filterPartitions(partitionsBecomeFollower)
+                .collect(Collectors.toMap(p -> new 
TopicIdPartition(topicIds.get(p.topic()), p.topicPartition()),
+                        p -> p.log().exists(log -> 
log.config().remoteCopyDisabled())));
 
         if (!leaderPartitions.isEmpty() || !followerPartitions.isEmpty()) {
             LOGGER.debug("Effective topic partitions after filtering compact 
and internal topics, leaders: {} and followers: {}",
                     leaderPartitions, followerPartitions);
 
-            leaderPartitions.forEach(this::cacheTopicPartitionIds);
-            followerPartitions.forEach(this::cacheTopicPartitionIds);
+            leaderPartitions.forEach((tp, __) -> cacheTopicPartitionIds(tp));
+            followerPartitions.forEach((tp, __) -> cacheTopicPartitionIds(tp));
 
-            
remoteLogMetadataManager.onPartitionLeadershipChanges(leaderPartitions, 
followerPartitions);
-            followerPartitions.forEach(this::doHandleFollowerPartition);
+            
remoteLogMetadataManager.onPartitionLeadershipChanges(leaderPartitions.keySet(),
 followerPartitions.keySet());
+            followerPartitions.forEach((tp, __) -> 
doHandleFollowerPartition(tp));
 
             // If this node was the previous leader for the partition, then 
the RLMTask might be running in the
             // background thread and might emit metrics. So, removing the 
metrics after marking this node as follower.
-            
followerPartitions.forEach(this::removeRemoteTopicPartitionMetrics);
+            followerPartitions.forEach((tp, __) -> 
removeRemoteTopicPartitionMetrics(tp));
 
             leaderPartitions.forEach(this::doHandleLeaderPartition);
         }
     }
 
+    public void stopLeaderCopyRLMTasks(Set<Partition> partitions) {
+        for (Partition partition : partitions) {
+            TopicPartition tp = partition.topicPartition();
+            if (topicIdByPartitionMap.containsKey(tp)) {
+                TopicIdPartition tpId = new 
TopicIdPartition(topicIdByPartitionMap.get(tp), tp);
+                leaderCopyRLMTasks.computeIfPresent(tpId, (topicIdPartition, 
task) -> {
+                    LOGGER.info("Cancelling the copy RLM task for tpId: {}", 
tpId);
+                    task.cancel();
+                    LOGGER.info("Resetting remote copy lag metrics for tpId: 
{}", tpId);
+                    ((RLMCopyTask) task.rlmTask).recordLagStats(0L, 0L);

Review Comment:
   we have to refactor the `((RLMCopyTask) task.rlmTask).recordLagStats` 
method, internally it checks for task cancellation status before emitting the 
metric. We have to move the `isCancelled` check outside of 
RLMCopyTask#recordLagStats method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to