showuon commented on code in PR #16653: URL: https://github.com/apache/kafka/pull/16653#discussion_r1687431228
########## core/src/main/scala/kafka/server/ConfigHandler.scala: ########## @@ -69,24 +69,46 @@ class TopicConfigHandler(private val replicaManager: ReplicaManager, val logs = logManager.logsByTopic(topic) val wasRemoteLogEnabledBeforeUpdate = logs.exists(_.remoteLogEnabled()) + var oldLogPolicy = logs.head.config.remoteLogDisablePolicy() + if (oldLogPolicy == null) + oldLogPolicy = "retain" logManager.updateTopicConfig(topic, props, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) - maybeBootstrapRemoteLogComponents(topic, logs, wasRemoteLogEnabledBeforeUpdate) + maybeBootstrapRemoteLogComponents(topic, logs, wasRemoteLogEnabledBeforeUpdate, oldLogPolicy) } private[server] def maybeBootstrapRemoteLogComponents(topic: String, logs: Seq[UnifiedLog], - wasRemoteLogEnabledBeforeUpdate: Boolean): Unit = { + wasRemoteLogEnabledBeforeUpdate: Boolean, + oldLogPolicy: String): Unit = { val isRemoteLogEnabled = logs.exists(_.remoteLogEnabled()) + var newRemoteLogPolicy = logs.head.config.remoteLogDisablePolicy() + if (newRemoteLogPolicy == null) + newRemoteLogPolicy = "retain" + + + + val (leaderPartitions, followerPartitions) = + logs.flatMap(log => replicaManager.onlinePartition(log.topicPartition)).partition(_.isLeader) // Topic configs gets updated incrementally. This check is added to prevent redundant updates. if (!wasRemoteLogEnabledBeforeUpdate && isRemoteLogEnabled) { - val (leaderPartitions, followerPartitions) = - logs.flatMap(log => replicaManager.onlinePartition(log.topicPartition)).partition(_.isLeader) val topicIds = Collections.singletonMap(topic, replicaManager.metadataCache.getTopicId(topic)) replicaManager.remoteLogManager.foreach(rlm => rlm.onLeadershipChange(leaderPartitions.toSet.asJava, followerPartitions.toSet.asJava, topicIds)) - } else if (wasRemoteLogEnabledBeforeUpdate && !isRemoteLogEnabled) { - warn(s"Disabling remote log on the topic: $topic is not supported.") + } + + // When there's a configRecord related to topic, we'll invoke updateLogConfig and enter here. + // So, here, we can check if the tiered storage is enabled or disabled, or remote log policy change or not, then do stopPartitions + // for each partition. + if (wasRemoteLogEnabledBeforeUpdate != isRemoteLogEnabled || !oldLogPolicy.equals(newRemoteLogPolicy)) { + val stopPartitions: java.util.HashSet[StopPartition] = new java.util.HashSet[StopPartition]() + leaderPartitions.toSet.asJava.stream().forEach(partition => { Review Comment: Hmm... good point, let me think about it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org