kamalcph commented on code in PR #16653: URL: https://github.com/apache/kafka/pull/16653#discussion_r1698472467
########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -437,30 +437,46 @@ public void onLeadershipChange(Set<Partition> partitionsBecomeLeader, throw new KafkaException("RemoteLogManager is not configured when remote storage system is enabled"); } - Set<TopicIdPartition> leaderPartitions = filterPartitions(partitionsBecomeLeader) - .map(p -> new TopicIdPartition(topicIds.get(p.topic()), p.topicPartition())).collect(Collectors.toSet()); + Map<TopicIdPartition, Boolean> leaderPartitions = filterPartitions(partitionsBecomeLeader) + .collect(Collectors.toMap(p -> new TopicIdPartition(topicIds.get(p.topic()), p.topicPartition()), + p -> p.log().exists(log -> log.config().remoteCopyDisabled()))); - Set<TopicIdPartition> followerPartitions = filterPartitions(partitionsBecomeFollower) - .map(p -> new TopicIdPartition(topicIds.get(p.topic()), p.topicPartition())).collect(Collectors.toSet()); + Map<TopicIdPartition, Boolean> followerPartitions = filterPartitions(partitionsBecomeFollower) + .collect(Collectors.toMap(p -> new TopicIdPartition(topicIds.get(p.topic()), p.topicPartition()), + p -> p.log().exists(log -> log.config().remoteCopyDisabled()))); if (!leaderPartitions.isEmpty() || !followerPartitions.isEmpty()) { LOGGER.debug("Effective topic partitions after filtering compact and internal topics, leaders: {} and followers: {}", leaderPartitions, followerPartitions); - leaderPartitions.forEach(this::cacheTopicPartitionIds); - followerPartitions.forEach(this::cacheTopicPartitionIds); + leaderPartitions.forEach((tp, __) -> cacheTopicPartitionIds(tp)); + followerPartitions.forEach((tp, __) -> cacheTopicPartitionIds(tp)); - remoteLogMetadataManager.onPartitionLeadershipChanges(leaderPartitions, followerPartitions); - followerPartitions.forEach(this::doHandleFollowerPartition); + remoteLogMetadataManager.onPartitionLeadershipChanges(leaderPartitions.keySet(), followerPartitions.keySet()); + followerPartitions.forEach((tp, __) -> doHandleFollowerPartition(tp)); // If this node was the previous leader for the partition, then the RLMTask might be running in the // background thread and might emit metrics. So, removing the metrics after marking this node as follower. - followerPartitions.forEach(this::removeRemoteTopicPartitionMetrics); + followerPartitions.forEach((tp, __) -> removeRemoteTopicPartitionMetrics(tp)); leaderPartitions.forEach(this::doHandleLeaderPartition); } } + public void stopLeaderCopyRLMTasks(Set<Partition> partitions) { Review Comment: After we stop the copy RLM tasks, we may have to reset the `RemoteCopyLagBytes` and `RemoteCopyLagSegments` metrics to zero to avoid false positive that lag is growing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org