showuon commented on code in PR #16653: URL: https://github.com/apache/kafka/pull/16653#discussion_r1699611222
########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -437,30 +437,48 @@ public void onLeadershipChange(Set<Partition> partitionsBecomeLeader, throw new KafkaException("RemoteLogManager is not configured when remote storage system is enabled"); } - Set<TopicIdPartition> leaderPartitions = filterPartitions(partitionsBecomeLeader) - .map(p -> new TopicIdPartition(topicIds.get(p.topic()), p.topicPartition())).collect(Collectors.toSet()); + Map<TopicIdPartition, Boolean> leaderPartitions = filterPartitions(partitionsBecomeLeader) + .collect(Collectors.toMap(p -> new TopicIdPartition(topicIds.get(p.topic()), p.topicPartition()), + p -> p.log().exists(log -> log.config().remoteCopyDisabled()))); - Set<TopicIdPartition> followerPartitions = filterPartitions(partitionsBecomeFollower) - .map(p -> new TopicIdPartition(topicIds.get(p.topic()), p.topicPartition())).collect(Collectors.toSet()); + Map<TopicIdPartition, Boolean> followerPartitions = filterPartitions(partitionsBecomeFollower) + .collect(Collectors.toMap(p -> new TopicIdPartition(topicIds.get(p.topic()), p.topicPartition()), + p -> p.log().exists(log -> log.config().remoteCopyDisabled()))); if (!leaderPartitions.isEmpty() || !followerPartitions.isEmpty()) { LOGGER.debug("Effective topic partitions after filtering compact and internal topics, leaders: {} and followers: {}", leaderPartitions, followerPartitions); - leaderPartitions.forEach(this::cacheTopicPartitionIds); - followerPartitions.forEach(this::cacheTopicPartitionIds); + leaderPartitions.forEach((tp, __) -> cacheTopicPartitionIds(tp)); + followerPartitions.forEach((tp, __) -> cacheTopicPartitionIds(tp)); - remoteLogMetadataManager.onPartitionLeadershipChanges(leaderPartitions, followerPartitions); - followerPartitions.forEach(this::doHandleFollowerPartition); + remoteLogMetadataManager.onPartitionLeadershipChanges(leaderPartitions.keySet(), followerPartitions.keySet()); + followerPartitions.forEach((tp, __) -> doHandleFollowerPartition(tp)); // If this node was the previous leader for the partition, then the RLMTask might be running in the // background thread and might emit metrics. So, removing the metrics after marking this node as follower. - followerPartitions.forEach(this::removeRemoteTopicPartitionMetrics); + followerPartitions.forEach((tp, __) -> removeRemoteTopicPartitionMetrics(tp)); leaderPartitions.forEach(this::doHandleLeaderPartition); } } + public void stopLeaderCopyRLMTasks(Set<Partition> partitions) { + for (Partition partition : partitions) { + TopicPartition tp = partition.topicPartition(); + if (topicIdByPartitionMap.containsKey(tp)) { + TopicIdPartition tpId = new TopicIdPartition(topicIdByPartitionMap.get(tp), tp); + leaderCopyRLMTasks.computeIfPresent(tpId, (topicIdPartition, task) -> { + LOGGER.info("Cancelling the copy RLM task for tpId: {}", tpId); + task.cancel(); + LOGGER.info("Resetting remote copy lag metrics for tpId: {}", tpId); + ((RLMCopyTask) task.rlmTask).recordLagStats(0L, 0L); Review Comment: Good call! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org