jingjia88 commented on code in PR #20014: URL: https://github.com/apache/kafka/pull/20014#discussion_r2164055749
########## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ########## @@ -3881,41 +3825,35 @@ class ReplicaManagerTest { def testInconsistentIdReturnsError(): Unit = { val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time)) try { - val brokerList = Seq[Integer](0, 1).asJava - val topicPartition = new TopicPartition(topic, 0) - val topicIds = Collections.singletonMap(topic, Uuid.randomUuid()) - val topicNames = topicIds.asScala.map(_.swap).asJava - - val invalidTopicIds = Collections.singletonMap(topic, Uuid.randomUuid()) - val invalidTopicNames = invalidTopicIds.asScala.map(_.swap).asJava - - def leaderAndIsrRequest(epoch: Int, topicIds: java.util.Map[String, Uuid]): LeaderAndIsrRequest = - new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch, - Seq(new LeaderAndIsrRequest.PartitionState() - .setTopicName(topic) - .setPartitionIndex(0) - .setControllerEpoch(0) - .setLeader(0) - .setLeaderEpoch(epoch) - .setIsr(brokerList) - .setPartitionEpoch(0) - .setReplicas(brokerList) - .setIsNew(true)).asJava, - topicIds, - Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() - - val response = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, topicIds), (_, _) => ()) - assertEquals(Errors.NONE, response.partitionErrors(topicNames).get(topicPartition)) - - val response2 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(1, topicIds), (_, _) => ()) - assertEquals(Errors.NONE, response2.partitionErrors(topicNames).get(topicPartition)) + val invalidTopicId = Uuid.randomUuid() + + val initialDelta = topicsCreateDelta(0, isStartIdLeader = true, + partitions = List(0), topicName = topic, topicId = topicIds(topic)) + val initialImage = imageFromTopics(initialDelta.apply()) + replicaManager.applyDelta(initialDelta, initialImage) + + val updateDelta = topicsCreateDelta(0, isStartIdLeader = true, + partitions = List(0), topicName = topic, topicId = topicIds(topic), leaderEpoch = 1) + val updateImage = imageFromTopics(updateDelta.apply()) + replicaManager.applyDelta(updateDelta, updateImage) // Send request with inconsistent ID. - val response3 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(1, invalidTopicIds), (_, _) => ()) - assertEquals(Errors.INCONSISTENT_TOPIC_ID, response3.partitionErrors(invalidTopicNames).get(topicPartition)) + val inconsistentDelta1 = topicsCreateDelta(0, isStartIdLeader = true, + partitions = List(0), topicName = topic, topicId = invalidTopicId, leaderEpoch = 1) + val inconsistentImage1 = imageFromTopics(inconsistentDelta1.apply()) + val exception1 = assertThrows(classOf[IllegalStateException], () => { + replicaManager.applyDelta(inconsistentDelta1, inconsistentImage1) + }) + assertTrue(exception1.getMessage.contains("exists, but its ID is")) Review Comment: Thanks for the review. Updated, PTAL~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org