kamalcph commented on code in PR #14667:
URL: https://github.com/apache/kafka/pull/14667#discussion_r1377397772
##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -378,6 +378,8 @@ public void stopPartitions(Set<StopPartition>
stopPartitions,
if (stopPartition.deleteRemoteLog()) {
LOGGER.info("Deleting the remote log segments task for
partition: {}", tpId);
deleteRemoteLogPartition(tpId);
+ } else {
Review Comment:
The `else` block should go into Line 384. Also, please update the log
statement, the partition may not be internal:
```
if (topicIdByPartitionMap.containsKey(tp)) {
...
} else {
LOGGER.warn("StopPartition call is not expected for partition: {}",
tpId);
}
```
##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -630,13 +630,15 @@ class ReplicaManager(val config: KafkaConfig,
// Third delete the logs and checkpoint.
val errorMap = new mutable.HashMap[TopicPartition, Throwable]()
+ // `tieredEnabledPartitions` has excluded internal topics because the
logic of `unifiedLog.remoteLogEnabled()`
+ val tieredEnabledPartitions = partitionsToStop.filter(sp =>
logManager.getLog(sp.topicPartition).exists(unifiedLog =>
unifiedLog.remoteLogEnabled()))
Review Comment:
For readability, can we split this line into two and rename the variable:
```
val remotePartitionsToStop = partitionsToStop.filter(
sp => logManager.getLog(sp.topicPartition).exists(unifiedLog =>
unifiedLog.remoteLogEnabled())
)
```
Also, can we remove the comment? It's not clear. A cluster can have topics
with and without tiered storage, we cannot assume them as internal.
or something in similar lines.
##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -630,13 +630,15 @@ class ReplicaManager(val config: KafkaConfig,
// Third delete the logs and checkpoint.
val errorMap = new mutable.HashMap[TopicPartition, Throwable]()
+ // `tieredEnabledPartitions` has excluded internal topics because the
logic of `unifiedLog.remoteLogEnabled()`
+ val tieredEnabledPartitions = partitionsToStop.filter(sp =>
logManager.getLog(sp.topicPartition).exists(unifiedLog =>
unifiedLog.remoteLogEnabled()))
if (partitionsToDelete.nonEmpty) {
// Delete the logs and checkpoint.
logManager.asyncDelete(partitionsToDelete, isStray = false, (tp, e) =>
errorMap.put(tp, e))
}
remoteLogManager.foreach { rlm =>
- // exclude the partitions with offline/error state
- val partitions = partitionsToStop.filterNot(sp =>
errorMap.contains(sp.topicPartition)).toSet.asJava
+ // exclude the partitions with offline/error state and internal topics
+ val partitions = partitionsToStop.filter(sp =>
!errorMap.contains(sp.topicPartition) &&
tieredEnabledPartitions.contains(sp)).asJava
Review Comment:
we can preserve the same check and use the `remotePartitionsToStop` instead
of `partitionsToStop`:
```
// exclude the partitions with offline/error state
val partitions = remotePartitionsToStop.filterNot(sp =>
errorMap.contains(sp.topicPartition)).toSet.asJava
```
##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -3126,8 +3127,11 @@ class ReplicaManagerTest {
propsModifier.apply(props)
val config = KafkaConfig.fromProps(props)
val logProps = new Properties()
+ if (defaultTopicRemoteLogStorageEnable) {
+ logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
+ }
val mockLog = setupMockLog(path1)
- val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new
File(_)), new LogConfig(logProps), log = if (shouldMockLog) Some(mockLog) else
None)
+ val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new
File(_)), new LogConfig(logProps), log = if (shouldMockLog) Some(mockLog) else
None, remoteStorageSystemEnable = true)
Review Comment:
Why `remoteStorageSystemEnable` is set to `true` by default. There is
already one parameter `enableRemoteStorage` is available. Can we use it instead?
##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -5667,6 +5671,47 @@ class ReplicaManagerTest {
verify(spyRm).checkpointHighWatermarks()
}
+
+ @Test
+ def testNotCallStopPartitionsForNonTieredTopics(): Unit = {
+ val mockTimer = new MockTimer(time)
+ val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer,
aliveBrokerIds = Seq(0, 1),
+ enableRemoteStorage = true, defaultTopicRemoteLogStorageEnable = false)
+
+ try {
+ val tp0 = new TopicPartition(topic, 0)
+ val offsetCheckpoints = new
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
+ val partition = replicaManager.createPartition(tp0)
+ // The unified log created is not tiered because
`defaultTopicRemoteLogStorageEnable` is set to false
+ partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
offsetCheckpoints, None)
+
+ val becomeLeaderRequest = new
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0,
brokerEpoch,
+ Seq(leaderAndIsrPartitionState(tp0, 1, 0, Seq(0, 1), true)).asJava,
+ Collections.singletonMap(tp0.topic(), Uuid.randomUuid()),
+ Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava
+ ).build()
+
+ replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) =>
())
+ verifyRLMOnLeadershipChange(Collections.singleton(partition),
Collections.emptySet())
Review Comment:
If remote storage is disabled, then we should not call
`RLMOnLeadershipChange`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]