showuon commented on code in PR #15951:
URL: https://github.com/apache/kafka/pull/15951#discussion_r1602524342


##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -314,6 +314,77 @@ class ReplicaManagerTest {
     }
   }
 
+  @ParameterizedTest(name = "testMaybeAddLogDirFetchersPausingCleaning with 
futureLogCreated: {0}")
+  @ValueSource(booleans = Array(true, false))
+  def testMaybeAddLogDirFetchersPausingCleaning(futureLogCreated: Boolean): 
Unit = {
+    val dir1 = TestUtils.tempDir()
+    val dir2 = TestUtils.tempDir()
+    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
+    props.put("log.dirs", dir1.getAbsolutePath + "," + dir2.getAbsolutePath)
+    val config = KafkaConfig.fromProps(props)
+    val logManager = TestUtils.createLogManager(config.logDirs.map(new 
File(_)), new LogConfig(new Properties()))
+    val spyLogManager = spy(logManager)
+    val metadataCache: MetadataCache = mock(classOf[MetadataCache])
+    mockGetAliveBrokerFunctions(metadataCache, Seq(new Node(0, "host0", 0)))
+    
when(metadataCache.metadataVersion()).thenReturn(config.interBrokerProtocolVersion)
+    val tp0 = new TopicPartition(topic, 0)
+    val uuid = Uuid.randomUuid()
+    val rm = new ReplicaManager(
+      metrics = metrics,
+      config = config,
+      time = time,
+      scheduler = new MockScheduler(time),
+      logManager = spyLogManager,
+      quotaManagers = quotaManager,
+      metadataCache = metadataCache,
+      logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
+      alterPartitionManager = alterPartitionManager)
+
+    try {
+      val partition = rm.createPartition(tp0)
+      partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
+        new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), None)
+
+      rm.becomeLeaderOrFollower(0, new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, 
brokerEpoch,
+        Seq(new LeaderAndIsrPartitionState()
+          .setTopicName(topic)
+          .setPartitionIndex(0)
+          .setControllerEpoch(0)
+          .setLeader(0)
+          .setLeaderEpoch(0)
+          .setIsr(Seq[Integer](0).asJava)
+          .setPartitionEpoch(0)
+          .setReplicas(Seq[Integer](0).asJava)
+          .setIsNew(false)).asJava,
+        Collections.singletonMap(topic, Uuid.randomUuid()),

Review Comment:
   No, the future log is just a follower of the original log, so it must have 
the same topic ID as the original one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to