kamalcph commented on code in PR #16653: URL: https://github.com/apache/kafka/pull/16653#discussion_r1687413644
########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -461,6 +464,12 @@ public void onLeadershipChange(Set<Partition> partitionsBecomeLeader, } } + public void stopPartitions(Set<StopPartition> stopPartitions, + BiConsumer<TopicPartition, Throwable> errorHandler) { + // null means remoteLogDisablePolicy is not applied + stopPartitions(stopPartitions, errorHandler, null); Review Comment: Got it. Thanks for the clarification! ########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -482,19 +492,29 @@ public void stopPartitions(Set<StopPartition> stopPartitions, task.cancel(); return null; }); - leaderExpirationRLMTasks.computeIfPresent(tpId, (topicIdPartition, task) -> { - LOGGER.info("Cancelling the expiration RLM task for tpId: {}", tpId); - task.cancel(); - return null; - }); followerRLMTasks.computeIfPresent(tpId, (topicIdPartition, task) -> { LOGGER.info("Cancelling the follower RLM task for tpId: {}", tpId); task.cancel(); return null; }); + // here, we have to handle retain and delete separately. + // If "retain" is set, we should not cancel expiration task + if (!REMOTE_LOG_DISABLE_POLICY_RETAIN.equals(remoteLogDisablePolicy)) { + leaderExpirationRLMTasks.computeIfPresent(tpId, (topicIdPartition, task) -> { + LOGGER.info("Cancelling the expiration RLM task for tpId: {}", tpId); + task.cancel(); + return null; + }); + } else if (REMOTE_LOG_DISABLE_POLICY_DELETE.equals(remoteLogDisablePolicy)) { + // update start offset to local log start offset + // we use unused "-2" as the flag to set local log start offset + updateRemoteLogStartOffset.accept(tp, FLAG_TO_SET_LOCAL_LOG_START_OFFSET); + } + removeRemoteTopicPartitionMetrics(tpId); + // we already set it correctly, just apply it. Review Comment: We also have to update the L531-L539 to stop the RLMM task when the remote storage is disabled with "delete" policy but the topic is not deleted. We can introduce a new field in StopPartition (stopRLMM) to decide when to stop the RLMM ########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -470,7 +479,8 @@ public void onLeadershipChange(Set<Partition> partitionsBecomeLeader, * @param errorHandler callback to handle any errors while stopping the partitions. */ public void stopPartitions(Set<StopPartition> stopPartitions, - BiConsumer<TopicPartition, Throwable> errorHandler) { + BiConsumer<TopicPartition, Throwable> errorHandler, + String remoteLogDisablePolicy) { Review Comment: Why do we assume that the `remoteLogDisablePolicy` is common for all the partitions in the StopPartition request? ########## core/src/main/scala/kafka/server/ConfigHandler.scala: ########## @@ -69,24 +69,46 @@ class TopicConfigHandler(private val replicaManager: ReplicaManager, val logs = logManager.logsByTopic(topic) val wasRemoteLogEnabledBeforeUpdate = logs.exists(_.remoteLogEnabled()) + var oldLogPolicy = logs.head.config.remoteLogDisablePolicy() + if (oldLogPolicy == null) + oldLogPolicy = "retain" logManager.updateTopicConfig(topic, props, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) - maybeBootstrapRemoteLogComponents(topic, logs, wasRemoteLogEnabledBeforeUpdate) + maybeBootstrapRemoteLogComponents(topic, logs, wasRemoteLogEnabledBeforeUpdate, oldLogPolicy) } private[server] def maybeBootstrapRemoteLogComponents(topic: String, logs: Seq[UnifiedLog], - wasRemoteLogEnabledBeforeUpdate: Boolean): Unit = { + wasRemoteLogEnabledBeforeUpdate: Boolean, + oldLogPolicy: String): Unit = { val isRemoteLogEnabled = logs.exists(_.remoteLogEnabled()) + var newRemoteLogPolicy = logs.head.config.remoteLogDisablePolicy() + if (newRemoteLogPolicy == null) + newRemoteLogPolicy = "retain" + + + + val (leaderPartitions, followerPartitions) = + logs.flatMap(log => replicaManager.onlinePartition(log.topicPartition)).partition(_.isLeader) // Topic configs gets updated incrementally. This check is added to prevent redundant updates. if (!wasRemoteLogEnabledBeforeUpdate && isRemoteLogEnabled) { - val (leaderPartitions, followerPartitions) = - logs.flatMap(log => replicaManager.onlinePartition(log.topicPartition)).partition(_.isLeader) val topicIds = Collections.singletonMap(topic, replicaManager.metadataCache.getTopicId(topic)) replicaManager.remoteLogManager.foreach(rlm => rlm.onLeadershipChange(leaderPartitions.toSet.asJava, followerPartitions.toSet.asJava, topicIds)) - } else if (wasRemoteLogEnabledBeforeUpdate && !isRemoteLogEnabled) { - warn(s"Disabling remote log on the topic: $topic is not supported.") + } + + // When there's a configRecord related to topic, we'll invoke updateLogConfig and enter here. + // So, here, we can check if the tiered storage is enabled or disabled, or remote log policy change or not, then do stopPartitions + // for each partition. + if (wasRemoteLogEnabledBeforeUpdate != isRemoteLogEnabled || !oldLogPolicy.equals(newRemoteLogPolicy)) { + val stopPartitions: java.util.HashSet[StopPartition] = new java.util.HashSet[StopPartition]() + leaderPartitions.toSet.asJava.stream().forEach(partition => { Review Comment: We also have to stop the follower RLMM tasks: (copied the below snipped from our internal branch) ``` logs.map(log => { log.maybeIncrementLogStartOffsetAsRemoteLogStorageDisabled() log.topicPartition }) .groupBy(tp => replicaManager.onlinePartition(tp).exists(_.isLeader)) .foreach { case (isLeader, partitions) => val stopPartitions = partitions .map(partition => StopPartition(partition, deleteLocalLog = false, deleteRemoteLog = isLeader, stopRLMM = true)) .toSet replicaManager.remoteLogManager.foreach { rlm => rlm.stopPartitions(stopPartitions, (_, _) => {}) } } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org