showuon commented on code in PR #15951:
URL: https://github.com/apache/kafka/pull/15951#discussion_r1604409235


##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -314,6 +314,77 @@ class ReplicaManagerTest {
     }
   }
 
+  @ParameterizedTest(name = "testMaybeAddLogDirFetchersPausingCleaning with 
futureLogCreated: {0}")
+  @ValueSource(booleans = Array(true, false))
+  def testMaybeAddLogDirFetchersPausingCleaning(futureLogCreated: Boolean): 
Unit = {
+    val dir1 = TestUtils.tempDir()
+    val dir2 = TestUtils.tempDir()
+    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
+    props.put("log.dirs", dir1.getAbsolutePath + "," + dir2.getAbsolutePath)
+    val config = KafkaConfig.fromProps(props)
+    val logManager = TestUtils.createLogManager(config.logDirs.map(new 
File(_)), new LogConfig(new Properties()))
+    val spyLogManager = spy(logManager)
+    val metadataCache: MetadataCache = mock(classOf[MetadataCache])
+    mockGetAliveBrokerFunctions(metadataCache, Seq(new Node(0, "host0", 0)))
+    
when(metadataCache.metadataVersion()).thenReturn(config.interBrokerProtocolVersion)
+    val tp0 = new TopicPartition(topic, 0)
+    val uuid = Uuid.randomUuid()
+    val rm = new ReplicaManager(
+      metrics = metrics,
+      config = config,
+      time = time,
+      scheduler = new MockScheduler(time),
+      logManager = spyLogManager,
+      quotaManagers = quotaManager,
+      metadataCache = metadataCache,
+      logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
+      alterPartitionManager = alterPartitionManager)
+
+    try {
+      val partition = rm.createPartition(tp0)
+      partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
+        new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), None)
+
+      rm.becomeLeaderOrFollower(0, new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, 
brokerEpoch,
+        Seq(new LeaderAndIsrPartitionState()
+          .setTopicName(topic)
+          .setPartitionIndex(0)
+          .setControllerEpoch(0)
+          .setLeader(0)
+          .setLeaderEpoch(0)
+          .setIsr(Seq[Integer](0).asJava)
+          .setPartitionEpoch(0)
+          .setReplicas(Seq[Integer](0).asJava)
+          .setIsNew(false)).asJava,
+        Collections.singletonMap(topic, Uuid.randomUuid()),

Review Comment:
   Ah, I know what you're talking about now. The reason it won't fail is 
because the topicId we feed into the partition object is None. So the topicId 
consistency check will always pass because the original topicId is not set. 
I've updated the test and also verify the response has no errors in this 
commit: 
https://github.com/apache/kafka/pull/15951/commits/4a1b76d38effa2233fb7bc062920a2457243fded
 . Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to