kamalcph commented on code in PR #16653: URL: https://github.com/apache/kafka/pull/16653#discussion_r1698155909
########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -472,6 +502,7 @@ public void onLeadershipChange(Set<Partition> partitionsBecomeLeader, public void stopPartitions(Set<StopPartition> stopPartitions, BiConsumer<TopicPartition, Throwable> errorHandler) { LOGGER.debug("Stop partitions: {}", stopPartitions); + Set<TopicIdPartition> stopRLMMPartitions = new HashSet<>(); Review Comment: can we can avoid using this set? ########## core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala: ########## @@ -311,6 +311,35 @@ class RemoteTopicCrudTest extends IntegrationTestHarness { () => admin.incrementalAlterConfigs(configs).all().get(), "Disabling remote storage feature on the topic level is not supported.") } + @ParameterizedTest + @ValueSource(strings = Array("zk")) + def testUpdateInvalidRemoteStorageConfigUnderZK(quorum: String): Unit = { + val admin = createAdminClient() + val errorMsg = "It is invalid to set `remote.log.delete.on.disable` or `remote.log.copy.disabled` under Zookeeper's mode." Review Comment: nit: `remote.log.copy.disabled` to `remote.copy.disabled` ########## core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala: ########## @@ -409,10 +438,15 @@ class RemoteTopicCrudTest extends IntegrationTestHarness { topicConfig.getProperty(TopicConfig.RETENTION_BYTES_CONFIG).toLong == logBuffer.head.config.retentionSize } - if (topicConfig.contains(TopicConfig.REMOTE_LOG_DISABLE_POLICY_CONFIG)) { + if (topicConfig.contains(TopicConfig.REMOTE_COPY_DISABLED_CONFIG)) { + result = result && + topicConfig.getProperty(TopicConfig.REMOTE_COPY_DISABLED_CONFIG).equals( Review Comment: we are comparing the string vs boolean, please change it to: ``` topicConfig.getProperty(TopicConfig.REMOTE_COPY_DISABLED_CONFIG).toBoolean == logBuffer.head.config.remoteCopyDisabled() ``` similarly for the below `remoteLogDeleteOnDisable()` ########## core/src/main/scala/kafka/server/ConfigHandler.scala: ########## @@ -68,25 +68,62 @@ class TopicConfigHandler(private val replicaManager: ReplicaManager, } val logs = logManager.logsByTopic(topic) - val wasRemoteLogEnabledBeforeUpdate = logs.exists(_.remoteLogEnabled()) + val wasRemoteLogEnabled = logs.exists(_.remoteLogEnabled()) + val wasCopyDisabled = logs.exists(_.config.remoteCopyDisabled()) - logManager.updateTopicConfig(topic, props, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) - maybeBootstrapRemoteLogComponents(topic, logs, wasRemoteLogEnabledBeforeUpdate) + // kafkaController is only defined in Zookeeper's mode + logManager.updateTopicConfig(topic, props, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), + wasRemoteLogEnabled, kafkaController.isDefined) + maybeUpdateRemoteLogComponents(topic, logs, wasRemoteLogEnabled, wasCopyDisabled) } - private[server] def maybeBootstrapRemoteLogComponents(topic: String, - logs: Seq[UnifiedLog], - wasRemoteLogEnabledBeforeUpdate: Boolean): Unit = { + private[server] def maybeUpdateRemoteLogComponents(topic: String, + logs: Seq[UnifiedLog], + wasRemoteLogEnabled: Boolean, + wasCopyDisabled: Boolean): Unit = { val isRemoteLogEnabled = logs.exists(_.remoteLogEnabled()) + val isCopyDisabled = logs.exists(_.config.remoteCopyDisabled()) + val isDeleteOnDisable = logs.exists(_.config.remoteLogDeleteOnDisable()) + + val (leaderPartitions, followerPartitions) = + logs.flatMap(log => replicaManager.onlinePartition(log.topicPartition)).partition(_.isLeader) + // Topic configs gets updated incrementally. This check is added to prevent redundant updates. - if (!wasRemoteLogEnabledBeforeUpdate && isRemoteLogEnabled) { - val (leaderPartitions, followerPartitions) = - logs.flatMap(log => replicaManager.onlinePartition(log.topicPartition)).partition(_.isLeader) + // When remote log is enabled, or remote copy is enabled, we should create RLM tasks accordingly via `onLeadershipChange`. + if (isRemoteLogEnabled && (!wasRemoteLogEnabled || (wasCopyDisabled && !isCopyDisabled))) { val topicIds = Collections.singletonMap(topic, replicaManager.metadataCache.getTopicId(topic)) replicaManager.remoteLogManager.foreach(rlm => rlm.onLeadershipChange(leaderPartitions.toSet.asJava, followerPartitions.toSet.asJava, topicIds)) - } else if (wasRemoteLogEnabledBeforeUpdate && !isRemoteLogEnabled) { - warn(s"Disabling remote log on the topic: $topic is not supported.") + } + + // When copy disabled, we should stop leaderCopyRLMTask and followerRLMTask, but keep expirationTask + if (isRemoteLogEnabled && !wasCopyDisabled && isCopyDisabled) { + replicaManager.remoteLogManager.foreach(rlm => { + rlm.stopLeaderCopyRLMTasks(leaderPartitions.toSet.asJava); + rlm.stopFollowerRLMTasks(followerPartitions.toSet.asJava) + }) + } + + // Disabling remote log storage on this topic + if (wasRemoteLogEnabled && !isRemoteLogEnabled && isDeleteOnDisable) { + val stopPartitions: java.util.HashSet[StopPartition] = new java.util.HashSet[StopPartition]() + leaderPartitions.foreach(partition => { + // delete remote logs and stop RemoteLogMetadataManager + stopPartitions.add(StopPartition(partition.topicPartition, deleteLocalLog = false, + deleteRemoteLog = true, stopRemoteLogMetadataManager = true)) + }) + + followerPartitions.foreach(partition => { + // we need to cancel follower tasks and stop RemoteLogMetadataManager + stopPartitions.add(StopPartition(partition.topicPartition, deleteLocalLog = false, + deleteRemoteLog = false, stopRemoteLogMetadataManager = true)) + }) + + // update the log start offset to local log start offset for the leader replicas + logs.filter(log => leaderPartitions.find(p => p.equals(log.topicPartition)).isDefined) Review Comment: typo error in comparison: ``` logs.filter(log => leaderPartitions.find(p => p.topicPartition.equals(log.topicPartition)).isDefined) ``` IntelliJ suggests to further simplify it to: ``` logs.filter(log => leaderPartitions.exists(p => p.topicPartition.equals(log.topicPartition))) ``` ########## core/src/test/scala/unit/kafka/log/LogConfigTest.scala: ########## @@ -426,19 +440,34 @@ class LogConfigTest { } @ParameterizedTest - @ValueSource(strings = Array(TopicConfig.REMOTE_LOG_DISABLE_POLICY_RETAIN, TopicConfig.REMOTE_LOG_DISABLE_POLICY_DELETE)) - def testValidRemoteLogDisablePolicy(policy: String): Unit = { + @ValueSource(booleans = Array(true, false)) + def testValidRemoteLogCopyDisabled(copyDisabled: Boolean): Unit = { val logProps = new Properties - logProps.put(TopicConfig.REMOTE_LOG_DISABLE_POLICY_CONFIG, policy) + logProps.put(TopicConfig.REMOTE_COPY_DISABLED_CONFIG, copyDisabled) LogConfig.validate(logProps) } @ParameterizedTest - @ValueSource(strings = Array("keep", "remove")) - def testInvalidRemoteLogDisablePolicy(policy: String): Unit = { + @ValueSource(booleans = Array(true, false)) + def testValidRemoteLogDeleteOnDisable(deleteOnDisable: Boolean): Unit = { val logProps = new Properties - logProps.put(TopicConfig.REMOTE_LOG_DISABLE_POLICY_CONFIG, policy) - assertThrows(classOf[ConfigException], () => LogConfig.validate(logProps)) + logProps.put(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, deleteOnDisable) + LogConfig.validate(logProps) + } + + @ParameterizedTest + @ValueSource(strings = Array(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, TopicConfig.REMOTE_COPY_DISABLED_CONFIG)) + def testInValidRemoteConfigsInZK(configKey: String): Unit = { + val kafkaProps = TestUtils.createDummyBrokerConfig() + kafkaProps.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true") + val kafkaConfig = KafkaConfig.fromProps(kafkaProps) + val logProps = new Properties + logProps.put(configKey, true) + + val message = assertThrows(classOf[InvalidConfigurationException], + () => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, false, true)) Review Comment: ```suggestion () => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, true, true)) ``` ########## storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java: ########## @@ -626,14 +638,26 @@ private static void validateTopicLogConfigValues(Map<String, String> existingCon validateRemoteStorageRetentionTime(newConfigs); } else { // The new config "remote.storage.enable" is false, validate if it's turning from true to false - validateNotTurningOffRemoteStorage(existingConfigs); + boolean wasRemoteLogEnabled = Boolean.parseBoolean(existingConfigs.getOrDefault(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false")); + validateTurningOffRemoteStorageWithDelete(newConfigs, wasRemoteLogEnabled, isRemoteLogStorageEnabled); + } + } + + public static void validateTurningOffRemoteStorageWithDelete(Map<?, ?> newConfigs, boolean wasRemoteLogEnabled, boolean isRemoteLogStorageEnabled) { + boolean isRemoteLogDeleteOnDisable = (Boolean) Utils.castToStringObjectMap(newConfigs).getOrDefault(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, false); + if (wasRemoteLogEnabled && !isRemoteLogStorageEnabled && !isRemoteLogDeleteOnDisable) { + throw new InvalidConfigurationException("It is invalid to disable remote storage without deleting remote data. " + + "If you want to keep the remote data and turn to read only, please set `remote.storage.enable=true,remote.copy.disabled=true`. " + + "If you want to disable remote storage and delete all remote data, please set `remote.storage.enable=false,remote.log.delete.on.disable=true`."); } } - public static void validateNotTurningOffRemoteStorage(Map<String, String> existingConfigs) { - boolean wasRemoteLogEnabledBeforeUpdate = Boolean.parseBoolean(existingConfigs.getOrDefault(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false")); - if (wasRemoteLogEnabledBeforeUpdate) { - throw new InvalidConfigurationException("Disabling remote storage feature on the topic level is not supported."); + public static void validateNoInvalidRemoteStorageConfigsInZK(Map<?, ?> newConfigs) { + boolean isRemoteLogDeleteOnDisable = (Boolean) Utils.castToStringObjectMap(newConfigs).getOrDefault(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, false); + boolean isRemoteLogCopyDisabled = (Boolean) Utils.castToStringObjectMap(newConfigs).getOrDefault(TopicConfig.REMOTE_COPY_DISABLED_CONFIG, false); + if (isRemoteLogDeleteOnDisable || isRemoteLogCopyDisabled) { + throw new InvalidConfigurationException("It is invalid to set `remote.log.delete.on.disable` or " + Review Comment: nit: `remote.log.copy.disabled` to `remote.copy.disabled` ########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -507,17 +542,21 @@ public void stopPartitions(Set<StopPartition> stopPartitions, LOGGER.error("Error while stopping the partition: {}", stopPartition, ex); } } - // Note `deleteLocalLog` will always be true when `deleteRemoteLog` is true but not the other way around. Set<TopicIdPartition> deleteLocalPartitions = stopPartitions.stream() .filter(sp -> sp.deleteLocalLog() && topicIdByPartitionMap.containsKey(sp.topicPartition())) .map(sp -> new TopicIdPartition(topicIdByPartitionMap.get(sp.topicPartition()), sp.topicPartition())) .collect(Collectors.toSet()); Review Comment: ```suggestion Set<TopicIdPartition> partitionsWithPendingActions = stopPartitions.stream() .filter(sp -> (sp.stopRemoteLogMetadataManager || sp.deleteRemoteLog || sp.deleteLocalLog()) && topicIdByPartitionMap.containsKey(sp.topicPartition())) .map(sp -> new TopicIdPartition(topicIdByPartitionMap.get(sp.topicPartition()), sp.topicPartition())) .collect(Collectors.toSet()); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org