m1a2st commented on code in PR #20014:
URL: https://github.com/apache/kafka/pull/20014#discussion_r2163780455


##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -312,38 +311,26 @@ class ReplicaManagerTest {
       alterPartitionManager = alterPartitionManager)
 
     try {
-      val partition = rm.createPartition(new TopicPartition(topic, 0))
-      partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
-        new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava), None)
+      val delta = topicsCreateDelta(0, isStartIdLeader = true, partitions = 
List(0), topicName = topic, topicId =  topicIds(topic))

Review Comment:
   ```suggestion
         val delta = topicsCreateDelta(0, isStartIdLeader = true, partitions = 
List(0), topicName = topic, topicId = topicIds(topic))
   ```



##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -3881,41 +3825,35 @@ class ReplicaManagerTest {
   def testInconsistentIdReturnsError(): Unit = {
     val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time))
     try {
-      val brokerList = Seq[Integer](0, 1).asJava
-      val topicPartition = new TopicPartition(topic, 0)
-      val topicIds = Collections.singletonMap(topic, Uuid.randomUuid())
-      val topicNames = topicIds.asScala.map(_.swap).asJava
-
-      val invalidTopicIds = Collections.singletonMap(topic, Uuid.randomUuid())
-      val invalidTopicNames = invalidTopicIds.asScala.map(_.swap).asJava
-
-      def leaderAndIsrRequest(epoch: Int, topicIds: java.util.Map[String, 
Uuid]): LeaderAndIsrRequest =
-        new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch,
-        Seq(new LeaderAndIsrRequest.PartitionState()
-          .setTopicName(topic)
-          .setPartitionIndex(0)
-          .setControllerEpoch(0)
-          .setLeader(0)
-          .setLeaderEpoch(epoch)
-          .setIsr(brokerList)
-          .setPartitionEpoch(0)
-          .setReplicas(brokerList)
-          .setIsNew(true)).asJava,
-        topicIds,
-        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
-
-      val response = replicaManager.becomeLeaderOrFollower(0, 
leaderAndIsrRequest(0, topicIds), (_, _) => ())
-      assertEquals(Errors.NONE, 
response.partitionErrors(topicNames).get(topicPartition))
-
-      val response2 = replicaManager.becomeLeaderOrFollower(0, 
leaderAndIsrRequest(1, topicIds), (_, _) => ())
-      assertEquals(Errors.NONE, 
response2.partitionErrors(topicNames).get(topicPartition))
+      val invalidTopicId = Uuid.randomUuid()
+
+      val initialDelta = topicsCreateDelta(0, isStartIdLeader = true,
+        partitions = List(0), topicName = topic, topicId = topicIds(topic))
+      val initialImage = imageFromTopics(initialDelta.apply())
+      replicaManager.applyDelta(initialDelta, initialImage)
+
+      val updateDelta = topicsCreateDelta(0, isStartIdLeader = true,
+        partitions = List(0), topicName = topic, topicId = topicIds(topic), 
leaderEpoch = 1)
+      val updateImage = imageFromTopics(updateDelta.apply())
+      replicaManager.applyDelta(updateDelta, updateImage)
 
       // Send request with inconsistent ID.
-      val response3 = replicaManager.becomeLeaderOrFollower(0, 
leaderAndIsrRequest(1, invalidTopicIds), (_, _) => ())
-      assertEquals(Errors.INCONSISTENT_TOPIC_ID, 
response3.partitionErrors(invalidTopicNames).get(topicPartition))
+      val inconsistentDelta1 = topicsCreateDelta(0, isStartIdLeader = true,
+        partitions = List(0), topicName = topic, topicId = invalidTopicId, 
leaderEpoch = 1)
+      val inconsistentImage1 = imageFromTopics(inconsistentDelta1.apply())
+      val exception1 = assertThrows(classOf[IllegalStateException], () => {
+        replicaManager.applyDelta(inconsistentDelta1, inconsistentImage1)
+      })
+      assertTrue(exception1.getMessage.contains("exists, but its ID is"))
+
+      val inconsistentDelta2 = topicsCreateDelta(0, isStartIdLeader = true,
+        partitions = List(0), topicName = topic, topicId = invalidTopicId, 
leaderEpoch = 2)
+      val inconsistentImage2 = imageFromTopics(inconsistentDelta2.apply())
+      val exception2 = assertThrows(classOf[IllegalStateException], () => {
+        replicaManager.applyDelta(inconsistentDelta2, inconsistentImage2)
+      })
+      assertTrue(exception2.getMessage.contains("exists, but its ID is"))

Review Comment:
   ditto



##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -3881,41 +3825,35 @@ class ReplicaManagerTest {
   def testInconsistentIdReturnsError(): Unit = {
     val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time))
     try {
-      val brokerList = Seq[Integer](0, 1).asJava
-      val topicPartition = new TopicPartition(topic, 0)
-      val topicIds = Collections.singletonMap(topic, Uuid.randomUuid())
-      val topicNames = topicIds.asScala.map(_.swap).asJava
-
-      val invalidTopicIds = Collections.singletonMap(topic, Uuid.randomUuid())
-      val invalidTopicNames = invalidTopicIds.asScala.map(_.swap).asJava
-
-      def leaderAndIsrRequest(epoch: Int, topicIds: java.util.Map[String, 
Uuid]): LeaderAndIsrRequest =
-        new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch,
-        Seq(new LeaderAndIsrRequest.PartitionState()
-          .setTopicName(topic)
-          .setPartitionIndex(0)
-          .setControllerEpoch(0)
-          .setLeader(0)
-          .setLeaderEpoch(epoch)
-          .setIsr(brokerList)
-          .setPartitionEpoch(0)
-          .setReplicas(brokerList)
-          .setIsNew(true)).asJava,
-        topicIds,
-        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
-
-      val response = replicaManager.becomeLeaderOrFollower(0, 
leaderAndIsrRequest(0, topicIds), (_, _) => ())
-      assertEquals(Errors.NONE, 
response.partitionErrors(topicNames).get(topicPartition))
-
-      val response2 = replicaManager.becomeLeaderOrFollower(0, 
leaderAndIsrRequest(1, topicIds), (_, _) => ())
-      assertEquals(Errors.NONE, 
response2.partitionErrors(topicNames).get(topicPartition))
+      val invalidTopicId = Uuid.randomUuid()
+
+      val initialDelta = topicsCreateDelta(0, isStartIdLeader = true,
+        partitions = List(0), topicName = topic, topicId = topicIds(topic))
+      val initialImage = imageFromTopics(initialDelta.apply())
+      replicaManager.applyDelta(initialDelta, initialImage)
+
+      val updateDelta = topicsCreateDelta(0, isStartIdLeader = true,
+        partitions = List(0), topicName = topic, topicId = topicIds(topic), 
leaderEpoch = 1)
+      val updateImage = imageFromTopics(updateDelta.apply())
+      replicaManager.applyDelta(updateDelta, updateImage)
 
       // Send request with inconsistent ID.
-      val response3 = replicaManager.becomeLeaderOrFollower(0, 
leaderAndIsrRequest(1, invalidTopicIds), (_, _) => ())
-      assertEquals(Errors.INCONSISTENT_TOPIC_ID, 
response3.partitionErrors(invalidTopicNames).get(topicPartition))
+      val inconsistentDelta1 = topicsCreateDelta(0, isStartIdLeader = true,
+        partitions = List(0), topicName = topic, topicId = invalidTopicId, 
leaderEpoch = 1)
+      val inconsistentImage1 = imageFromTopics(inconsistentDelta1.apply())
+      val exception1 = assertThrows(classOf[IllegalStateException], () => {
+        replicaManager.applyDelta(inconsistentDelta1, inconsistentImage1)
+      })
+      assertTrue(exception1.getMessage.contains("exists, but its ID is"))

Review Comment:
   Could you assert the hole error message?
   ```suggestion
         assertEquals(s"Topic ${topic}-0 exists, but its ID is ${topicId}, not 
${invalidTopicId} as expected", exception1.getMessage)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to