kamalcph commented on code in PR #14667:
URL: https://github.com/apache/kafka/pull/14667#discussion_r1378393848


##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -3117,7 +3117,8 @@ class ReplicaManagerTest {
     isShuttingDown: AtomicBoolean = new AtomicBoolean(false),
     enableRemoteStorage: Boolean = false,
     shouldMockLog: Boolean = false,
-    remoteLogManager: Option[RemoteLogManager] = None
+    remoteLogManager: Option[RemoteLogManager] = None,
+    defaultTopicRemoteLogStorageEnable: Boolean = true

Review Comment:
   Can we change the defaultTopicRemoteLogStorageEnable  value to `false` by 
default to preserve the same behaviour on other tests?



##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -5667,6 +5671,47 @@ class ReplicaManagerTest {
 
     verify(spyRm).checkpointHighWatermarks()
   }
+
+  @Test
+  def testNotCallStopPartitionsForNonTieredTopics(): Unit = {
+    val mockTimer = new MockTimer(time)
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, 
aliveBrokerIds = Seq(0, 1),
+      enableRemoteStorage = true, defaultTopicRemoteLogStorageEnable = false)
+
+    try {
+      val tp0 = new TopicPartition(topic, 0)
+      val offsetCheckpoints = new 
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
+      val partition = replicaManager.createPartition(tp0)
+      // The unified log created is not tiered because 
`defaultTopicRemoteLogStorageEnable` is set to false
+      partition.createLogIfNotExists(isNew = false, isFutureReplica = false, 
offsetCheckpoints, None)
+
+      val becomeLeaderRequest = new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, 
brokerEpoch,
+        Seq(leaderAndIsrPartitionState(tp0, 1, 0, Seq(0, 1), true)).asJava,
+        Collections.singletonMap(tp0.topic(), Uuid.randomUuid()),
+        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava
+      ).build()

Review Comment:
   ```suggestion
         val leaderAndIsr = LeaderAndIsr(0, 1, List(0, 1), 
LeaderRecoveryState.RECOVERED, LeaderAndIsr.InitialPartitionEpoch)
         val becomeLeaderRequest = makeLeaderAndIsrRequest(topicIds(tp0.topic), 
tp0, Seq(0, 1), leaderAndIsr)  
   ```



##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -5667,6 +5671,47 @@ class ReplicaManagerTest {
 
     verify(spyRm).checkpointHighWatermarks()
   }
+
+  @Test
+  def testNotCallStopPartitionsForNonTieredTopics(): Unit = {
+    val mockTimer = new MockTimer(time)
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, 
aliveBrokerIds = Seq(0, 1),
+      enableRemoteStorage = true, defaultTopicRemoteLogStorageEnable = false)
+
+    try {
+      val tp0 = new TopicPartition(topic, 0)
+      val offsetCheckpoints = new 
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
+      val partition = replicaManager.createPartition(tp0)
+      // The unified log created is not tiered because 
`defaultTopicRemoteLogStorageEnable` is set to false
+      partition.createLogIfNotExists(isNew = false, isFutureReplica = false, 
offsetCheckpoints, None)
+
+      val becomeLeaderRequest = new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, 
brokerEpoch,
+        Seq(leaderAndIsrPartitionState(tp0, 1, 0, Seq(0, 1), true)).asJava,
+        Collections.singletonMap(tp0.topic(), Uuid.randomUuid()),
+        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava
+      ).build()
+
+      replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => 
())
+      verifyRLMOnLeadershipChange(Collections.singleton(partition), 
Collections.emptySet())

Review Comment:
   Got it. Thanks!



##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -3126,8 +3127,11 @@ class ReplicaManagerTest {
     propsModifier.apply(props)
     val config = KafkaConfig.fromProps(props)
     val logProps = new Properties()
+    if (defaultTopicRemoteLogStorageEnable) {

Review Comment:
   if (enableRemoteStorage && defaultTopicRemoteLogStorageEnable)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to