showuon commented on code in PR #15951: URL: https://github.com/apache/kafka/pull/15951#discussion_r1602524342
########## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ########## @@ -314,6 +314,77 @@ class ReplicaManagerTest { } } + @ParameterizedTest(name = "testMaybeAddLogDirFetchersPausingCleaning with futureLogCreated: {0}") + @ValueSource(booleans = Array(true, false)) + def testMaybeAddLogDirFetchersPausingCleaning(futureLogCreated: Boolean): Unit = { + val dir1 = TestUtils.tempDir() + val dir2 = TestUtils.tempDir() + val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect) + props.put("log.dirs", dir1.getAbsolutePath + "," + dir2.getAbsolutePath) + val config = KafkaConfig.fromProps(props) + val logManager = TestUtils.createLogManager(config.logDirs.map(new File(_)), new LogConfig(new Properties())) + val spyLogManager = spy(logManager) + val metadataCache: MetadataCache = mock(classOf[MetadataCache]) + mockGetAliveBrokerFunctions(metadataCache, Seq(new Node(0, "host0", 0))) + when(metadataCache.metadataVersion()).thenReturn(config.interBrokerProtocolVersion) + val tp0 = new TopicPartition(topic, 0) + val uuid = Uuid.randomUuid() + val rm = new ReplicaManager( + metrics = metrics, + config = config, + time = time, + scheduler = new MockScheduler(time), + logManager = spyLogManager, + quotaManagers = quotaManager, + metadataCache = metadataCache, + logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), + alterPartitionManager = alterPartitionManager) + + try { + val partition = rm.createPartition(tp0) + partition.createLogIfNotExists(isNew = false, isFutureReplica = false, + new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), None) + + rm.becomeLeaderOrFollower(0, new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, + Seq(new LeaderAndIsrPartitionState() + .setTopicName(topic) + .setPartitionIndex(0) + .setControllerEpoch(0) + .setLeader(0) + .setLeaderEpoch(0) + .setIsr(Seq[Integer](0).asJava) + .setPartitionEpoch(0) + .setReplicas(Seq[Integer](0).asJava) + .setIsNew(false)).asJava, + Collections.singletonMap(topic, Uuid.randomUuid()), Review Comment: No, the future log is just a follower of the original log, so it must have the same topic ID as the original one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org