kamalcph commented on code in PR #16653: URL: https://github.com/apache/kafka/pull/16653#discussion_r1686640322
########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -461,6 +464,12 @@ public void onLeadershipChange(Set<Partition> partitionsBecomeLeader, } } + public void stopPartitions(Set<StopPartition> stopPartitions, + BiConsumer<TopicPartition, Throwable> errorHandler) { + // null means remoteLogDisablePolicy is not applied + stopPartitions(stopPartitions, errorHandler, null); Review Comment: Instead of `null`, can we supply the default value (retain)? ########## core/src/main/scala/kafka/server/ConfigHandler.scala: ########## @@ -69,24 +69,46 @@ class TopicConfigHandler(private val replicaManager: ReplicaManager, val logs = logManager.logsByTopic(topic) val wasRemoteLogEnabledBeforeUpdate = logs.exists(_.remoteLogEnabled()) + var oldLogPolicy = logs.head.config.remoteLogDisablePolicy() + if (oldLogPolicy == null) + oldLogPolicy = "retain" logManager.updateTopicConfig(topic, props, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) - maybeBootstrapRemoteLogComponents(topic, logs, wasRemoteLogEnabledBeforeUpdate) + maybeBootstrapRemoteLogComponents(topic, logs, wasRemoteLogEnabledBeforeUpdate, oldLogPolicy) } private[server] def maybeBootstrapRemoteLogComponents(topic: String, logs: Seq[UnifiedLog], - wasRemoteLogEnabledBeforeUpdate: Boolean): Unit = { + wasRemoteLogEnabledBeforeUpdate: Boolean, + oldLogPolicy: String): Unit = { val isRemoteLogEnabled = logs.exists(_.remoteLogEnabled()) + var newRemoteLogPolicy = logs.head.config.remoteLogDisablePolicy() + if (newRemoteLogPolicy == null) + newRemoteLogPolicy = "retain" + + + + val (leaderPartitions, followerPartitions) = + logs.flatMap(log => replicaManager.onlinePartition(log.topicPartition)).partition(_.isLeader) // Topic configs gets updated incrementally. This check is added to prevent redundant updates. if (!wasRemoteLogEnabledBeforeUpdate && isRemoteLogEnabled) { - val (leaderPartitions, followerPartitions) = - logs.flatMap(log => replicaManager.onlinePartition(log.topicPartition)).partition(_.isLeader) val topicIds = Collections.singletonMap(topic, replicaManager.metadataCache.getTopicId(topic)) replicaManager.remoteLogManager.foreach(rlm => rlm.onLeadershipChange(leaderPartitions.toSet.asJava, followerPartitions.toSet.asJava, topicIds)) - } else if (wasRemoteLogEnabledBeforeUpdate && !isRemoteLogEnabled) { - warn(s"Disabling remote log on the topic: $topic is not supported.") + } + + // When there's a configRecord related to topic, we'll invoke updateLogConfig and enter here. + // So, here, we can check if the tiered storage is enabled or disabled, or remote log policy change or not, then do stopPartitions + // for each partition. + if (wasRemoteLogEnabledBeforeUpdate != isRemoteLogEnabled || !oldLogPolicy.equals(newRemoteLogPolicy)) { + val stopPartitions: java.util.HashSet[StopPartition] = new java.util.HashSet[StopPartition]() + leaderPartitions.toSet.asJava.stream().forEach(partition => { Review Comment: ```suggestion leaderPartitions.foreach { partition => ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org