showuon commented on code in PR #16653:
URL: https://github.com/apache/kafka/pull/16653#discussion_r1687431228


##########
core/src/main/scala/kafka/server/ConfigHandler.scala:
##########
@@ -69,24 +69,46 @@ class TopicConfigHandler(private val replicaManager: 
ReplicaManager,
 
     val logs = logManager.logsByTopic(topic)
     val wasRemoteLogEnabledBeforeUpdate = logs.exists(_.remoteLogEnabled())
+    var oldLogPolicy = logs.head.config.remoteLogDisablePolicy()
+    if (oldLogPolicy == null)
+      oldLogPolicy = "retain"
 
     logManager.updateTopicConfig(topic, props, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
-    maybeBootstrapRemoteLogComponents(topic, logs, 
wasRemoteLogEnabledBeforeUpdate)
+    maybeBootstrapRemoteLogComponents(topic, logs, 
wasRemoteLogEnabledBeforeUpdate, oldLogPolicy)
   }
 
   private[server] def maybeBootstrapRemoteLogComponents(topic: String,
                                                         logs: Seq[UnifiedLog],
-                                                        
wasRemoteLogEnabledBeforeUpdate: Boolean): Unit = {
+                                                        
wasRemoteLogEnabledBeforeUpdate: Boolean,
+                                                        oldLogPolicy: String): 
Unit = {
     val isRemoteLogEnabled = logs.exists(_.remoteLogEnabled())
+    var newRemoteLogPolicy = logs.head.config.remoteLogDisablePolicy()
+    if (newRemoteLogPolicy == null)
+      newRemoteLogPolicy = "retain"
+
+
+
+    val (leaderPartitions, followerPartitions) =
+      logs.flatMap(log => 
replicaManager.onlinePartition(log.topicPartition)).partition(_.isLeader)
     // Topic configs gets updated incrementally. This check is added to 
prevent redundant updates.
     if (!wasRemoteLogEnabledBeforeUpdate && isRemoteLogEnabled) {
-      val (leaderPartitions, followerPartitions) =
-        logs.flatMap(log => 
replicaManager.onlinePartition(log.topicPartition)).partition(_.isLeader)
       val topicIds = Collections.singletonMap(topic, 
replicaManager.metadataCache.getTopicId(topic))
       replicaManager.remoteLogManager.foreach(rlm =>
         rlm.onLeadershipChange(leaderPartitions.toSet.asJava, 
followerPartitions.toSet.asJava, topicIds))
-    } else if (wasRemoteLogEnabledBeforeUpdate && !isRemoteLogEnabled) {
-      warn(s"Disabling remote log on the topic: $topic is not supported.")
+    }
+
+    // When there's a configRecord related to topic, we'll invoke 
updateLogConfig and enter here.
+    // So, here, we can check if the tiered storage is enabled or disabled, or 
remote log policy change or not, then do stopPartitions
+    // for each partition.
+    if (wasRemoteLogEnabledBeforeUpdate != isRemoteLogEnabled || 
!oldLogPolicy.equals(newRemoteLogPolicy)) {
+      val stopPartitions: java.util.HashSet[StopPartition] = new 
java.util.HashSet[StopPartition]()
+      leaderPartitions.toSet.asJava.stream().forEach(partition => {

Review Comment:
   Hmm... good point, let me think about it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to