m1a2st commented on code in PR #20014: URL: https://github.com/apache/kafka/pull/20014#discussion_r2163780455
########## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ########## @@ -312,38 +311,26 @@ class ReplicaManagerTest { alterPartitionManager = alterPartitionManager) try { - val partition = rm.createPartition(new TopicPartition(topic, 0)) - partition.createLogIfNotExists(isNew = false, isFutureReplica = false, - new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava), None) + val delta = topicsCreateDelta(0, isStartIdLeader = true, partitions = List(0), topicName = topic, topicId = topicIds(topic)) Review Comment: ```suggestion val delta = topicsCreateDelta(0, isStartIdLeader = true, partitions = List(0), topicName = topic, topicId = topicIds(topic)) ``` ########## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ########## @@ -3881,41 +3825,35 @@ class ReplicaManagerTest { def testInconsistentIdReturnsError(): Unit = { val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time)) try { - val brokerList = Seq[Integer](0, 1).asJava - val topicPartition = new TopicPartition(topic, 0) - val topicIds = Collections.singletonMap(topic, Uuid.randomUuid()) - val topicNames = topicIds.asScala.map(_.swap).asJava - - val invalidTopicIds = Collections.singletonMap(topic, Uuid.randomUuid()) - val invalidTopicNames = invalidTopicIds.asScala.map(_.swap).asJava - - def leaderAndIsrRequest(epoch: Int, topicIds: java.util.Map[String, Uuid]): LeaderAndIsrRequest = - new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch, - Seq(new LeaderAndIsrRequest.PartitionState() - .setTopicName(topic) - .setPartitionIndex(0) - .setControllerEpoch(0) - .setLeader(0) - .setLeaderEpoch(epoch) - .setIsr(brokerList) - .setPartitionEpoch(0) - .setReplicas(brokerList) - .setIsNew(true)).asJava, - topicIds, - Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() - - val response = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, topicIds), (_, _) => ()) - assertEquals(Errors.NONE, response.partitionErrors(topicNames).get(topicPartition)) - - val response2 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(1, topicIds), (_, _) => ()) - assertEquals(Errors.NONE, response2.partitionErrors(topicNames).get(topicPartition)) + val invalidTopicId = Uuid.randomUuid() + + val initialDelta = topicsCreateDelta(0, isStartIdLeader = true, + partitions = List(0), topicName = topic, topicId = topicIds(topic)) + val initialImage = imageFromTopics(initialDelta.apply()) + replicaManager.applyDelta(initialDelta, initialImage) + + val updateDelta = topicsCreateDelta(0, isStartIdLeader = true, + partitions = List(0), topicName = topic, topicId = topicIds(topic), leaderEpoch = 1) + val updateImage = imageFromTopics(updateDelta.apply()) + replicaManager.applyDelta(updateDelta, updateImage) // Send request with inconsistent ID. - val response3 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(1, invalidTopicIds), (_, _) => ()) - assertEquals(Errors.INCONSISTENT_TOPIC_ID, response3.partitionErrors(invalidTopicNames).get(topicPartition)) + val inconsistentDelta1 = topicsCreateDelta(0, isStartIdLeader = true, + partitions = List(0), topicName = topic, topicId = invalidTopicId, leaderEpoch = 1) + val inconsistentImage1 = imageFromTopics(inconsistentDelta1.apply()) + val exception1 = assertThrows(classOf[IllegalStateException], () => { + replicaManager.applyDelta(inconsistentDelta1, inconsistentImage1) + }) + assertTrue(exception1.getMessage.contains("exists, but its ID is")) + + val inconsistentDelta2 = topicsCreateDelta(0, isStartIdLeader = true, + partitions = List(0), topicName = topic, topicId = invalidTopicId, leaderEpoch = 2) + val inconsistentImage2 = imageFromTopics(inconsistentDelta2.apply()) + val exception2 = assertThrows(classOf[IllegalStateException], () => { + replicaManager.applyDelta(inconsistentDelta2, inconsistentImage2) + }) + assertTrue(exception2.getMessage.contains("exists, but its ID is")) Review Comment: ditto ########## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ########## @@ -3881,41 +3825,35 @@ class ReplicaManagerTest { def testInconsistentIdReturnsError(): Unit = { val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time)) try { - val brokerList = Seq[Integer](0, 1).asJava - val topicPartition = new TopicPartition(topic, 0) - val topicIds = Collections.singletonMap(topic, Uuid.randomUuid()) - val topicNames = topicIds.asScala.map(_.swap).asJava - - val invalidTopicIds = Collections.singletonMap(topic, Uuid.randomUuid()) - val invalidTopicNames = invalidTopicIds.asScala.map(_.swap).asJava - - def leaderAndIsrRequest(epoch: Int, topicIds: java.util.Map[String, Uuid]): LeaderAndIsrRequest = - new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch, - Seq(new LeaderAndIsrRequest.PartitionState() - .setTopicName(topic) - .setPartitionIndex(0) - .setControllerEpoch(0) - .setLeader(0) - .setLeaderEpoch(epoch) - .setIsr(brokerList) - .setPartitionEpoch(0) - .setReplicas(brokerList) - .setIsNew(true)).asJava, - topicIds, - Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() - - val response = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, topicIds), (_, _) => ()) - assertEquals(Errors.NONE, response.partitionErrors(topicNames).get(topicPartition)) - - val response2 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(1, topicIds), (_, _) => ()) - assertEquals(Errors.NONE, response2.partitionErrors(topicNames).get(topicPartition)) + val invalidTopicId = Uuid.randomUuid() + + val initialDelta = topicsCreateDelta(0, isStartIdLeader = true, + partitions = List(0), topicName = topic, topicId = topicIds(topic)) + val initialImage = imageFromTopics(initialDelta.apply()) + replicaManager.applyDelta(initialDelta, initialImage) + + val updateDelta = topicsCreateDelta(0, isStartIdLeader = true, + partitions = List(0), topicName = topic, topicId = topicIds(topic), leaderEpoch = 1) + val updateImage = imageFromTopics(updateDelta.apply()) + replicaManager.applyDelta(updateDelta, updateImage) // Send request with inconsistent ID. - val response3 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(1, invalidTopicIds), (_, _) => ()) - assertEquals(Errors.INCONSISTENT_TOPIC_ID, response3.partitionErrors(invalidTopicNames).get(topicPartition)) + val inconsistentDelta1 = topicsCreateDelta(0, isStartIdLeader = true, + partitions = List(0), topicName = topic, topicId = invalidTopicId, leaderEpoch = 1) + val inconsistentImage1 = imageFromTopics(inconsistentDelta1.apply()) + val exception1 = assertThrows(classOf[IllegalStateException], () => { + replicaManager.applyDelta(inconsistentDelta1, inconsistentImage1) + }) + assertTrue(exception1.getMessage.contains("exists, but its ID is")) Review Comment: Could you assert the hole error message? ```suggestion assertEquals(s"Topic ${topic}-0 exists, but its ID is ${topicId}, not ${invalidTopicId} as expected", exception1.getMessage) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org