kamalcph commented on code in PR #14667:
URL: https://github.com/apache/kafka/pull/14667#discussion_r1377397772


##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -378,6 +378,8 @@ public void stopPartitions(Set<StopPartition> 
stopPartitions,
                     if (stopPartition.deleteRemoteLog()) {
                         LOGGER.info("Deleting the remote log segments task for 
partition: {}", tpId);
                         deleteRemoteLogPartition(tpId);
+                    } else {

Review Comment:
   The `else` block should go into Line 384. Also, please update the log 
statement, the partition may not be internal:
   
   ```
   if (topicIdByPartitionMap.containsKey(tp)) {
       ...
   } else {
       LOGGER.warn("StopPartition call is not expected for partition: {}", 
tpId);
   }  
   ```
   
   
   
   



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -630,13 +630,15 @@ class ReplicaManager(val config: KafkaConfig,
 
     // Third delete the logs and checkpoint.
     val errorMap = new mutable.HashMap[TopicPartition, Throwable]()
+    // `tieredEnabledPartitions` has excluded internal topics because the 
logic of `unifiedLog.remoteLogEnabled()`
+    val tieredEnabledPartitions = partitionsToStop.filter(sp => 
logManager.getLog(sp.topicPartition).exists(unifiedLog => 
unifiedLog.remoteLogEnabled()))

Review Comment:
   For readability, can we split this line into two and rename the variable:
   
   ```
       val remotePartitionsToStop = partitionsToStop.filter(
         sp => logManager.getLog(sp.topicPartition).exists(unifiedLog => 
unifiedLog.remoteLogEnabled())
       )
   ```
   Also, can we remove the comment? It's not clear. A cluster can have topics 
with and without tiered storage, we cannot assume them as internal.
   
   or something in similar lines.
   
   
   
   



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -630,13 +630,15 @@ class ReplicaManager(val config: KafkaConfig,
 
     // Third delete the logs and checkpoint.
     val errorMap = new mutable.HashMap[TopicPartition, Throwable]()
+    // `tieredEnabledPartitions` has excluded internal topics because the 
logic of `unifiedLog.remoteLogEnabled()`
+    val tieredEnabledPartitions = partitionsToStop.filter(sp => 
logManager.getLog(sp.topicPartition).exists(unifiedLog => 
unifiedLog.remoteLogEnabled()))
     if (partitionsToDelete.nonEmpty) {
       // Delete the logs and checkpoint.
       logManager.asyncDelete(partitionsToDelete, isStray = false, (tp, e) => 
errorMap.put(tp, e))
     }
     remoteLogManager.foreach { rlm =>
-      // exclude the partitions with offline/error state
-      val partitions = partitionsToStop.filterNot(sp => 
errorMap.contains(sp.topicPartition)).toSet.asJava
+      // exclude the partitions with offline/error state and internal topics
+      val partitions = partitionsToStop.filter(sp => 
!errorMap.contains(sp.topicPartition) && 
tieredEnabledPartitions.contains(sp)).asJava

Review Comment:
   we can preserve the same check and use the `remotePartitionsToStop` instead 
of `partitionsToStop`:
   
   ```
   // exclude the partitions with offline/error state
   val partitions = remotePartitionsToStop.filterNot(sp => 
errorMap.contains(sp.topicPartition)).toSet.asJava
   ```



##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -3126,8 +3127,11 @@ class ReplicaManagerTest {
     propsModifier.apply(props)
     val config = KafkaConfig.fromProps(props)
     val logProps = new Properties()
+    if (defaultTopicRemoteLogStorageEnable) {
+      logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
+    }
     val mockLog = setupMockLog(path1)
-    val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new 
File(_)), new LogConfig(logProps), log = if (shouldMockLog) Some(mockLog) else 
None)
+    val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new 
File(_)), new LogConfig(logProps), log = if (shouldMockLog) Some(mockLog) else 
None, remoteStorageSystemEnable = true)

Review Comment:
   Why `remoteStorageSystemEnable` is set to `true` by default. There is 
already one parameter `enableRemoteStorage` is available. Can we use it instead?



##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -5667,6 +5671,47 @@ class ReplicaManagerTest {
 
     verify(spyRm).checkpointHighWatermarks()
   }
+
+  @Test
+  def testNotCallStopPartitionsForNonTieredTopics(): Unit = {
+    val mockTimer = new MockTimer(time)
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, 
aliveBrokerIds = Seq(0, 1),
+      enableRemoteStorage = true, defaultTopicRemoteLogStorageEnable = false)
+
+    try {
+      val tp0 = new TopicPartition(topic, 0)
+      val offsetCheckpoints = new 
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
+      val partition = replicaManager.createPartition(tp0)
+      // The unified log created is not tiered because 
`defaultTopicRemoteLogStorageEnable` is set to false
+      partition.createLogIfNotExists(isNew = false, isFutureReplica = false, 
offsetCheckpoints, None)
+
+      val becomeLeaderRequest = new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, 
brokerEpoch,
+        Seq(leaderAndIsrPartitionState(tp0, 1, 0, Seq(0, 1), true)).asJava,
+        Collections.singletonMap(tp0.topic(), Uuid.randomUuid()),
+        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava
+      ).build()
+
+      replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => 
())
+      verifyRLMOnLeadershipChange(Collections.singleton(partition), 
Collections.emptySet())

Review Comment:
   If remote storage is disabled, then we should not call 
`RLMOnLeadershipChange`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to