jingjia88 commented on code in PR #20014:
URL: https://github.com/apache/kafka/pull/20014#discussion_r2164055749


##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -3881,41 +3825,35 @@ class ReplicaManagerTest {
   def testInconsistentIdReturnsError(): Unit = {
     val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time))
     try {
-      val brokerList = Seq[Integer](0, 1).asJava
-      val topicPartition = new TopicPartition(topic, 0)
-      val topicIds = Collections.singletonMap(topic, Uuid.randomUuid())
-      val topicNames = topicIds.asScala.map(_.swap).asJava
-
-      val invalidTopicIds = Collections.singletonMap(topic, Uuid.randomUuid())
-      val invalidTopicNames = invalidTopicIds.asScala.map(_.swap).asJava
-
-      def leaderAndIsrRequest(epoch: Int, topicIds: java.util.Map[String, 
Uuid]): LeaderAndIsrRequest =
-        new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch,
-        Seq(new LeaderAndIsrRequest.PartitionState()
-          .setTopicName(topic)
-          .setPartitionIndex(0)
-          .setControllerEpoch(0)
-          .setLeader(0)
-          .setLeaderEpoch(epoch)
-          .setIsr(brokerList)
-          .setPartitionEpoch(0)
-          .setReplicas(brokerList)
-          .setIsNew(true)).asJava,
-        topicIds,
-        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
-
-      val response = replicaManager.becomeLeaderOrFollower(0, 
leaderAndIsrRequest(0, topicIds), (_, _) => ())
-      assertEquals(Errors.NONE, 
response.partitionErrors(topicNames).get(topicPartition))
-
-      val response2 = replicaManager.becomeLeaderOrFollower(0, 
leaderAndIsrRequest(1, topicIds), (_, _) => ())
-      assertEquals(Errors.NONE, 
response2.partitionErrors(topicNames).get(topicPartition))
+      val invalidTopicId = Uuid.randomUuid()
+
+      val initialDelta = topicsCreateDelta(0, isStartIdLeader = true,
+        partitions = List(0), topicName = topic, topicId = topicIds(topic))
+      val initialImage = imageFromTopics(initialDelta.apply())
+      replicaManager.applyDelta(initialDelta, initialImage)
+
+      val updateDelta = topicsCreateDelta(0, isStartIdLeader = true,
+        partitions = List(0), topicName = topic, topicId = topicIds(topic), 
leaderEpoch = 1)
+      val updateImage = imageFromTopics(updateDelta.apply())
+      replicaManager.applyDelta(updateDelta, updateImage)
 
       // Send request with inconsistent ID.
-      val response3 = replicaManager.becomeLeaderOrFollower(0, 
leaderAndIsrRequest(1, invalidTopicIds), (_, _) => ())
-      assertEquals(Errors.INCONSISTENT_TOPIC_ID, 
response3.partitionErrors(invalidTopicNames).get(topicPartition))
+      val inconsistentDelta1 = topicsCreateDelta(0, isStartIdLeader = true,
+        partitions = List(0), topicName = topic, topicId = invalidTopicId, 
leaderEpoch = 1)
+      val inconsistentImage1 = imageFromTopics(inconsistentDelta1.apply())
+      val exception1 = assertThrows(classOf[IllegalStateException], () => {
+        replicaManager.applyDelta(inconsistentDelta1, inconsistentImage1)
+      })
+      assertTrue(exception1.getMessage.contains("exists, but its ID is"))

Review Comment:
   Thanks for the review. Updated, PTAL~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to