kamalcph commented on code in PR #16653:
URL: https://github.com/apache/kafka/pull/16653#discussion_r1687413644


##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -461,6 +464,12 @@ public void onLeadershipChange(Set<Partition> 
partitionsBecomeLeader,
         }
     }
 
+    public void stopPartitions(Set<StopPartition> stopPartitions,
+                               BiConsumer<TopicPartition, Throwable> 
errorHandler) {
+        // null means remoteLogDisablePolicy is not applied
+        stopPartitions(stopPartitions, errorHandler, null);

Review Comment:
   Got it. Thanks for the clarification!



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -482,19 +492,29 @@ public void stopPartitions(Set<StopPartition> 
stopPartitions,
                         task.cancel();
                         return null;
                     });
-                    leaderExpirationRLMTasks.computeIfPresent(tpId, 
(topicIdPartition, task) -> {
-                        LOGGER.info("Cancelling the expiration RLM task for 
tpId: {}", tpId);
-                        task.cancel();
-                        return null;
-                    });
                     followerRLMTasks.computeIfPresent(tpId, (topicIdPartition, 
task) -> {
                         LOGGER.info("Cancelling the follower RLM task for 
tpId: {}", tpId);
                         task.cancel();
                         return null;
                     });
 
+                    // here, we have to handle retain and delete separately.
+                    // If "retain" is set, we should not cancel expiration task
+                    if 
(!REMOTE_LOG_DISABLE_POLICY_RETAIN.equals(remoteLogDisablePolicy)) {
+                        leaderExpirationRLMTasks.computeIfPresent(tpId, 
(topicIdPartition, task) -> {
+                            LOGGER.info("Cancelling the expiration RLM task 
for tpId: {}", tpId);
+                            task.cancel();
+                            return null;
+                        });
+                    } else if 
(REMOTE_LOG_DISABLE_POLICY_DELETE.equals(remoteLogDisablePolicy)) {
+                        // update start offset to local log start offset
+                        // we use unused "-2" as the flag to set local log 
start offset
+                        updateRemoteLogStartOffset.accept(tp, 
FLAG_TO_SET_LOCAL_LOG_START_OFFSET);
+                    }
+
                     removeRemoteTopicPartitionMetrics(tpId);
 
+                    // we already set it correctly, just apply it.

Review Comment:
   We also have to update the L531-L539 to stop the RLMM task when the remote 
storage is disabled with "delete" policy but the topic is not deleted.
   
   We can introduce a new field in StopPartition (stopRLMM) to decide when to 
stop the RLMM 



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -470,7 +479,8 @@ public void onLeadershipChange(Set<Partition> 
partitionsBecomeLeader,
      * @param errorHandler   callback to handle any errors while stopping the 
partitions.
      */
     public void stopPartitions(Set<StopPartition> stopPartitions,
-                               BiConsumer<TopicPartition, Throwable> 
errorHandler) {
+                               BiConsumer<TopicPartition, Throwable> 
errorHandler,
+                               String remoteLogDisablePolicy) {

Review Comment:
   Why do we assume that the `remoteLogDisablePolicy` is common for all the 
partitions in the StopPartition request? 



##########
core/src/main/scala/kafka/server/ConfigHandler.scala:
##########
@@ -69,24 +69,46 @@ class TopicConfigHandler(private val replicaManager: 
ReplicaManager,
 
     val logs = logManager.logsByTopic(topic)
     val wasRemoteLogEnabledBeforeUpdate = logs.exists(_.remoteLogEnabled())
+    var oldLogPolicy = logs.head.config.remoteLogDisablePolicy()
+    if (oldLogPolicy == null)
+      oldLogPolicy = "retain"
 
     logManager.updateTopicConfig(topic, props, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
-    maybeBootstrapRemoteLogComponents(topic, logs, 
wasRemoteLogEnabledBeforeUpdate)
+    maybeBootstrapRemoteLogComponents(topic, logs, 
wasRemoteLogEnabledBeforeUpdate, oldLogPolicy)
   }
 
   private[server] def maybeBootstrapRemoteLogComponents(topic: String,
                                                         logs: Seq[UnifiedLog],
-                                                        
wasRemoteLogEnabledBeforeUpdate: Boolean): Unit = {
+                                                        
wasRemoteLogEnabledBeforeUpdate: Boolean,
+                                                        oldLogPolicy: String): 
Unit = {
     val isRemoteLogEnabled = logs.exists(_.remoteLogEnabled())
+    var newRemoteLogPolicy = logs.head.config.remoteLogDisablePolicy()
+    if (newRemoteLogPolicy == null)
+      newRemoteLogPolicy = "retain"
+
+
+
+    val (leaderPartitions, followerPartitions) =
+      logs.flatMap(log => 
replicaManager.onlinePartition(log.topicPartition)).partition(_.isLeader)
     // Topic configs gets updated incrementally. This check is added to 
prevent redundant updates.
     if (!wasRemoteLogEnabledBeforeUpdate && isRemoteLogEnabled) {
-      val (leaderPartitions, followerPartitions) =
-        logs.flatMap(log => 
replicaManager.onlinePartition(log.topicPartition)).partition(_.isLeader)
       val topicIds = Collections.singletonMap(topic, 
replicaManager.metadataCache.getTopicId(topic))
       replicaManager.remoteLogManager.foreach(rlm =>
         rlm.onLeadershipChange(leaderPartitions.toSet.asJava, 
followerPartitions.toSet.asJava, topicIds))
-    } else if (wasRemoteLogEnabledBeforeUpdate && !isRemoteLogEnabled) {
-      warn(s"Disabling remote log on the topic: $topic is not supported.")
+    }
+
+    // When there's a configRecord related to topic, we'll invoke 
updateLogConfig and enter here.
+    // So, here, we can check if the tiered storage is enabled or disabled, or 
remote log policy change or not, then do stopPartitions
+    // for each partition.
+    if (wasRemoteLogEnabledBeforeUpdate != isRemoteLogEnabled || 
!oldLogPolicy.equals(newRemoteLogPolicy)) {
+      val stopPartitions: java.util.HashSet[StopPartition] = new 
java.util.HashSet[StopPartition]()
+      leaderPartitions.toSet.asJava.stream().forEach(partition => {

Review Comment:
   We also have to stop the follower RLMM tasks: (copied the below snipped from 
our internal branch)
   
   ```
   logs.map(log => {
           log.maybeIncrementLogStartOffsetAsRemoteLogStorageDisabled()
           log.topicPartition
         })
           .groupBy(tp => replicaManager.onlinePartition(tp).exists(_.isLeader))
           .foreach {
             case (isLeader, partitions) =>
               val stopPartitions = partitions
                 .map(partition => StopPartition(partition, deleteLocalLog = 
false, deleteRemoteLog = isLeader, stopRLMM = true))
                 .toSet
               replicaManager.remoteLogManager.foreach { rlm =>
                 rlm.stopPartitions(stopPartitions, (_, _) => {})
               }
           }
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to