kamalcph commented on code in PR #16653:
URL: https://github.com/apache/kafka/pull/16653#discussion_r1698155909


##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -472,6 +502,7 @@ public void onLeadershipChange(Set<Partition> 
partitionsBecomeLeader,
     public void stopPartitions(Set<StopPartition> stopPartitions,
                                BiConsumer<TopicPartition, Throwable> 
errorHandler) {
         LOGGER.debug("Stop partitions: {}", stopPartitions);
+        Set<TopicIdPartition> stopRLMMPartitions = new HashSet<>();

Review Comment:
   can we can avoid using this set?



##########
core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala:
##########
@@ -311,6 +311,35 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
       () => admin.incrementalAlterConfigs(configs).all().get(), "Disabling 
remote storage feature on the topic level is not supported.")
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk"))
+  def testUpdateInvalidRemoteStorageConfigUnderZK(quorum: String): Unit = {
+    val admin = createAdminClient()
+    val errorMsg = "It is invalid to set `remote.log.delete.on.disable` or 
`remote.log.copy.disabled` under Zookeeper's mode."

Review Comment:
   nit:
   
   `remote.log.copy.disabled` to `remote.copy.disabled`



##########
core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala:
##########
@@ -409,10 +438,15 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
             topicConfig.getProperty(TopicConfig.RETENTION_BYTES_CONFIG).toLong 
==
               logBuffer.head.config.retentionSize
         }
-        if 
(topicConfig.contains(TopicConfig.REMOTE_LOG_DISABLE_POLICY_CONFIG)) {
+        if (topicConfig.contains(TopicConfig.REMOTE_COPY_DISABLED_CONFIG)) {
+          result = result &&
+            
topicConfig.getProperty(TopicConfig.REMOTE_COPY_DISABLED_CONFIG).equals(

Review Comment:
   we are comparing the string vs boolean, please change it to:
   ```
   topicConfig.getProperty(TopicConfig.REMOTE_COPY_DISABLED_CONFIG).toBoolean ==
                 logBuffer.head.config.remoteCopyDisabled()
   ```
   
   similarly for the  below `remoteLogDeleteOnDisable()`



##########
core/src/main/scala/kafka/server/ConfigHandler.scala:
##########
@@ -68,25 +68,62 @@ class TopicConfigHandler(private val replicaManager: 
ReplicaManager,
     }
 
     val logs = logManager.logsByTopic(topic)
-    val wasRemoteLogEnabledBeforeUpdate = logs.exists(_.remoteLogEnabled())
+    val wasRemoteLogEnabled = logs.exists(_.remoteLogEnabled())
+    val wasCopyDisabled = logs.exists(_.config.remoteCopyDisabled())
 
-    logManager.updateTopicConfig(topic, props, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
-    maybeBootstrapRemoteLogComponents(topic, logs, 
wasRemoteLogEnabledBeforeUpdate)
+    // kafkaController is only defined in Zookeeper's mode
+    logManager.updateTopicConfig(topic, props, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(),
+      wasRemoteLogEnabled, kafkaController.isDefined)
+    maybeUpdateRemoteLogComponents(topic, logs, wasRemoteLogEnabled, 
wasCopyDisabled)
   }
 
-  private[server] def maybeBootstrapRemoteLogComponents(topic: String,
-                                                        logs: Seq[UnifiedLog],
-                                                        
wasRemoteLogEnabledBeforeUpdate: Boolean): Unit = {
+  private[server] def maybeUpdateRemoteLogComponents(topic: String,
+                                                     logs: Seq[UnifiedLog],
+                                                     wasRemoteLogEnabled: 
Boolean,
+                                                     wasCopyDisabled: 
Boolean): Unit = {
     val isRemoteLogEnabled = logs.exists(_.remoteLogEnabled())
+    val isCopyDisabled = logs.exists(_.config.remoteCopyDisabled())
+    val isDeleteOnDisable = logs.exists(_.config.remoteLogDeleteOnDisable())
+
+    val (leaderPartitions, followerPartitions) =
+      logs.flatMap(log => 
replicaManager.onlinePartition(log.topicPartition)).partition(_.isLeader)
+
     // Topic configs gets updated incrementally. This check is added to 
prevent redundant updates.
-    if (!wasRemoteLogEnabledBeforeUpdate && isRemoteLogEnabled) {
-      val (leaderPartitions, followerPartitions) =
-        logs.flatMap(log => 
replicaManager.onlinePartition(log.topicPartition)).partition(_.isLeader)
+    // When remote log is enabled, or remote copy is enabled, we should create 
RLM tasks accordingly via `onLeadershipChange`.
+    if (isRemoteLogEnabled && (!wasRemoteLogEnabled || (wasCopyDisabled && 
!isCopyDisabled))) {
       val topicIds = Collections.singletonMap(topic, 
replicaManager.metadataCache.getTopicId(topic))
       replicaManager.remoteLogManager.foreach(rlm =>
         rlm.onLeadershipChange(leaderPartitions.toSet.asJava, 
followerPartitions.toSet.asJava, topicIds))
-    } else if (wasRemoteLogEnabledBeforeUpdate && !isRemoteLogEnabled) {
-      warn(s"Disabling remote log on the topic: $topic is not supported.")
+    }
+
+    // When copy disabled, we should stop leaderCopyRLMTask and 
followerRLMTask, but keep expirationTask
+    if (isRemoteLogEnabled && !wasCopyDisabled && isCopyDisabled) {
+      replicaManager.remoteLogManager.foreach(rlm => {
+        rlm.stopLeaderCopyRLMTasks(leaderPartitions.toSet.asJava);
+        rlm.stopFollowerRLMTasks(followerPartitions.toSet.asJava)
+      })
+    }
+
+    // Disabling remote log storage on this topic
+    if (wasRemoteLogEnabled && !isRemoteLogEnabled && isDeleteOnDisable) {
+      val stopPartitions: java.util.HashSet[StopPartition] = new 
java.util.HashSet[StopPartition]()
+      leaderPartitions.foreach(partition => {
+        // delete remote logs and stop RemoteLogMetadataManager
+        stopPartitions.add(StopPartition(partition.topicPartition, 
deleteLocalLog = false,
+          deleteRemoteLog = true, stopRemoteLogMetadataManager = true))
+      })
+
+      followerPartitions.foreach(partition => {
+        // we need to cancel follower tasks and stop RemoteLogMetadataManager
+        stopPartitions.add(StopPartition(partition.topicPartition, 
deleteLocalLog = false,
+          deleteRemoteLog = false, stopRemoteLogMetadataManager = true))
+      })
+
+      // update the log start offset to local log start offset for the leader 
replicas
+      logs.filter(log => leaderPartitions.find(p => 
p.equals(log.topicPartition)).isDefined)

Review Comment:
   typo error in comparison:
   
   ```
   logs.filter(log => leaderPartitions.find(p => 
p.topicPartition.equals(log.topicPartition)).isDefined)
   ```
   
   IntelliJ suggests to further simplify it to:
   ```
   logs.filter(log => leaderPartitions.exists(p => 
p.topicPartition.equals(log.topicPartition)))
   ```
   
   



##########
core/src/test/scala/unit/kafka/log/LogConfigTest.scala:
##########
@@ -426,19 +440,34 @@ class LogConfigTest {
   }
 
   @ParameterizedTest
-  @ValueSource(strings = Array(TopicConfig.REMOTE_LOG_DISABLE_POLICY_RETAIN, 
TopicConfig.REMOTE_LOG_DISABLE_POLICY_DELETE))
-  def testValidRemoteLogDisablePolicy(policy: String): Unit = {
+  @ValueSource(booleans = Array(true, false))
+  def testValidRemoteLogCopyDisabled(copyDisabled: Boolean): Unit = {
     val logProps = new Properties
-    logProps.put(TopicConfig.REMOTE_LOG_DISABLE_POLICY_CONFIG, policy)
+    logProps.put(TopicConfig.REMOTE_COPY_DISABLED_CONFIG, copyDisabled)
     LogConfig.validate(logProps)
   }
 
   @ParameterizedTest
-  @ValueSource(strings = Array("keep", "remove"))
-  def testInvalidRemoteLogDisablePolicy(policy: String): Unit = {
+  @ValueSource(booleans = Array(true, false))
+  def testValidRemoteLogDeleteOnDisable(deleteOnDisable: Boolean): Unit = {
     val logProps = new Properties
-    logProps.put(TopicConfig.REMOTE_LOG_DISABLE_POLICY_CONFIG, policy)
-    assertThrows(classOf[ConfigException], () => LogConfig.validate(logProps))
+    logProps.put(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, 
deleteOnDisable)
+    LogConfig.validate(logProps)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = 
Array(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, 
TopicConfig.REMOTE_COPY_DISABLED_CONFIG))
+  def testInValidRemoteConfigsInZK(configKey: String): Unit = {
+    val kafkaProps = TestUtils.createDummyBrokerConfig()
+    
kafkaProps.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, 
"true")
+    val kafkaConfig = KafkaConfig.fromProps(kafkaProps)
+    val logProps = new Properties
+    logProps.put(configKey, true)
+
+    val message = assertThrows(classOf[InvalidConfigurationException],
+      () => LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap, false, true))

Review Comment:
   ```suggestion
         () => LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap, true, true))
   ```



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java:
##########
@@ -626,14 +638,26 @@ private static void 
validateTopicLogConfigValues(Map<String, String> existingCon
             validateRemoteStorageRetentionTime(newConfigs);
         } else {
             // The new config "remote.storage.enable" is false, validate if 
it's turning from true to false
-            validateNotTurningOffRemoteStorage(existingConfigs);
+            boolean wasRemoteLogEnabled = 
Boolean.parseBoolean(existingConfigs.getOrDefault(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
 "false"));
+            validateTurningOffRemoteStorageWithDelete(newConfigs, 
wasRemoteLogEnabled, isRemoteLogStorageEnabled);
+        }
+    }
+
+    public static void validateTurningOffRemoteStorageWithDelete(Map<?, ?> 
newConfigs, boolean wasRemoteLogEnabled, boolean isRemoteLogStorageEnabled) {
+        boolean isRemoteLogDeleteOnDisable = (Boolean) 
Utils.castToStringObjectMap(newConfigs).getOrDefault(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG,
 false);
+        if (wasRemoteLogEnabled && !isRemoteLogStorageEnabled && 
!isRemoteLogDeleteOnDisable) {
+            throw new InvalidConfigurationException("It is invalid to disable 
remote storage without deleting remote data. " +
+                    "If you want to keep the remote data and turn to read 
only, please set `remote.storage.enable=true,remote.copy.disabled=true`. " +
+                    "If you want to disable remote storage and delete all 
remote data, please set 
`remote.storage.enable=false,remote.log.delete.on.disable=true`.");
         }
     }
 
-    public static void validateNotTurningOffRemoteStorage(Map<String, String> 
existingConfigs) {
-        boolean wasRemoteLogEnabledBeforeUpdate = 
Boolean.parseBoolean(existingConfigs.getOrDefault(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
 "false"));
-        if (wasRemoteLogEnabledBeforeUpdate) {
-            throw new InvalidConfigurationException("Disabling remote storage 
feature on the topic level is not supported.");
+    public static void validateNoInvalidRemoteStorageConfigsInZK(Map<?, ?> 
newConfigs) {
+        boolean isRemoteLogDeleteOnDisable = (Boolean) 
Utils.castToStringObjectMap(newConfigs).getOrDefault(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG,
 false);
+        boolean isRemoteLogCopyDisabled = (Boolean) 
Utils.castToStringObjectMap(newConfigs).getOrDefault(TopicConfig.REMOTE_COPY_DISABLED_CONFIG,
 false);
+        if (isRemoteLogDeleteOnDisable || isRemoteLogCopyDisabled) {
+            throw new InvalidConfigurationException("It is invalid to set 
`remote.log.delete.on.disable` or " +

Review Comment:
   nit:
   
   `remote.log.copy.disabled` to `remote.copy.disabled`



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -507,17 +542,21 @@ public void stopPartitions(Set<StopPartition> 
stopPartitions,
                 LOGGER.error("Error while stopping the partition: {}", 
stopPartition, ex);
             }
         }
-        // Note `deleteLocalLog` will always be true when `deleteRemoteLog` is 
true but not the other way around.
         Set<TopicIdPartition> deleteLocalPartitions = stopPartitions.stream()
                 .filter(sp -> sp.deleteLocalLog() && 
topicIdByPartitionMap.containsKey(sp.topicPartition()))
                 .map(sp -> new 
TopicIdPartition(topicIdByPartitionMap.get(sp.topicPartition()), 
sp.topicPartition()))
                 .collect(Collectors.toSet());

Review Comment:
   ```suggestion
           Set<TopicIdPartition> partitionsWithPendingActions = 
stopPartitions.stream()
                   .filter(sp -> (sp.stopRemoteLogMetadataManager || 
sp.deleteRemoteLog || sp.deleteLocalLog()) && 
topicIdByPartitionMap.containsKey(sp.topicPartition()))
                   .map(sp -> new 
TopicIdPartition(topicIdByPartitionMap.get(sp.topicPartition()), 
sp.topicPartition()))
                   .collect(Collectors.toSet());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to