jsancio commented on code in PR #12032:
URL: https://github.com/apache/kafka/pull/12032#discussion_r856654851
##########
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala:
##########
@@ -890,65 +890,289 @@ class ControllerIntegrationTest extends
QuorumTestHarness {
TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment =
assignment, servers = servers)
val controller = getController().kafkaController
- var future = captureAlterIsrError(controllerId, controller.brokerEpoch - 1,
- Map(tp -> LeaderAndIsr(controllerId, replicas)))
- var capturedError = future.get(5, TimeUnit.SECONDS)
- assertEquals(Errors.STALE_BROKER_EPOCH, capturedError)
-
- future = captureAlterIsrError(99, controller.brokerEpoch,
- Map(tp -> LeaderAndIsr(controllerId, replicas)))
- capturedError = future.get(5, TimeUnit.SECONDS)
- assertEquals(Errors.STALE_BROKER_EPOCH, capturedError)
-
- val unknownTopicPartition = new TopicPartition("unknown", 99)
- future = captureAlterIsrPartitionError(controllerId,
controller.brokerEpoch,
- Map(unknownTopicPartition -> LeaderAndIsr(controllerId, replicas)),
unknownTopicPartition)
- capturedError = future.get(5, TimeUnit.SECONDS)
- assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, capturedError)
-
- future = captureAlterIsrPartitionError(controllerId,
controller.brokerEpoch,
- Map(tp -> LeaderAndIsr(controllerId, 1, replicas,
LeaderRecoveryState.RECOVERED, 99)), tp)
- capturedError = future.get(5, TimeUnit.SECONDS)
- assertEquals(Errors.INVALID_UPDATE_VERSION, capturedError)
-
- future = captureAlterIsrPartitionError(controllerId,
controller.brokerEpoch,
- Map(tp -> LeaderAndIsr(controllerId, 1, replicas,
LeaderRecoveryState.RECOVERING, 1)), tp)
- capturedError = future.get(5, TimeUnit.SECONDS)
- assertEquals(Errors.INVALID_REQUEST, capturedError)
-
- future = captureAlterIsrPartitionError(controllerId,
controller.brokerEpoch,
- Map(tp -> LeaderAndIsr(controllerId, 1, List(controllerId),
LeaderRecoveryState.RECOVERING, 1)), tp)
- capturedError = future.get(5, TimeUnit.SECONDS)
- assertEquals(Errors.INVALID_REQUEST, capturedError)
+ val partitionState =
controller.controllerContext.partitionLeadershipInfo(tp).get
+ val leaderId = partitionState.leaderAndIsr.leader
+ val leaderBrokerEpoch = servers(leaderId).kafkaController.brokerEpoch
+ val leaderEpoch = partitionState.leaderAndIsr.leaderEpoch
+ val partitionEpoch = partitionState.leaderAndIsr.partitionEpoch
+
+ def assertAlterPartition(
+ topLevelError: Errors = Errors.NONE,
+ partitionError: Errors = Errors.NONE,
+ topicPartition: TopicPartition = tp,
+ leaderId: Int = leaderId,
+ brokerEpoch: Long = leaderBrokerEpoch,
+ leaderEpoch: Int = leaderEpoch,
+ partitionEpoch: Int = partitionEpoch,
+ isr: Set[Int] = replicas.toSet,
+ leaderRecoveryState: LeaderRecoveryState = LeaderRecoveryState.RECOVERED
+ ): Unit = {
+ assertAlterPartitionError(
+ topicPartition = topicPartition,
+ leaderId = leaderId,
+ brokerEpoch = brokerEpoch,
+ leaderEpoch = leaderEpoch,
+ partitionEpoch = partitionEpoch,
+ isr = isr,
+ leaderRecoveryState = leaderRecoveryState,
+ topLevelError = topLevelError,
+ partitionError = partitionError
+ )
+ }
+
+ assertAlterPartition(
+ topLevelError = Errors.STALE_BROKER_EPOCH,
+ brokerEpoch = leaderBrokerEpoch - 1
+ )
+
+ assertAlterPartition(
+ topLevelError = Errors.STALE_BROKER_EPOCH,
+ leaderId = 99,
+ )
+
+ assertAlterPartition(
+ partitionError = Errors.UNKNOWN_TOPIC_OR_PARTITION,
+ topicPartition = new TopicPartition("unknown", 0)
+ )
+
+ assertAlterPartition(
+ partitionError = Errors.UNKNOWN_TOPIC_OR_PARTITION,
+ topicPartition = new TopicPartition(tp.topic, 1)
+ )
+
+ assertAlterPartition(
+ partitionError = Errors.INVALID_UPDATE_VERSION,
+ partitionEpoch = partitionEpoch - 1
+ )
+
+ assertAlterPartition(
+ partitionError = Errors.FENCED_LEADER_EPOCH,
+ leaderEpoch = leaderEpoch - 1
+ )
+
+ assertAlterPartition(
+ partitionError = Errors.FENCED_LEADER_EPOCH,
+ leaderEpoch = leaderEpoch + 1
+ )
+
+ assertAlterPartition(
+ partitionError = Errors.INVALID_REQUEST,
+ leaderRecoveryState = LeaderRecoveryState.RECOVERING
+ )
+
+ assertAlterPartition(
+ partitionError = Errors.INVALID_REQUEST,
+ leaderRecoveryState = LeaderRecoveryState.RECOVERING,
+ isr = Set(controllerId)
+ )
+
+ // Version/epoch errors take precedence over other validations since
+ // the leader may be working with outdated state.
+
+ assertAlterPartition(
+ partitionError = Errors.INVALID_UPDATE_VERSION,
+ leaderRecoveryState = LeaderRecoveryState.RECOVERING,
+ partitionEpoch = partitionEpoch - 1
+ )
+
+ assertAlterPartition(
+ partitionError = Errors.FENCED_LEADER_EPOCH,
+ leaderRecoveryState = LeaderRecoveryState.RECOVERING,
+ leaderEpoch = leaderEpoch - 1
+ )
+
+ assertAlterPartition(
+ partitionError = Errors.FENCED_LEADER_EPOCH,
+ leaderRecoveryState = LeaderRecoveryState.RECOVERING,
+ leaderEpoch = leaderEpoch + 1
+ )
}
- def captureAlterIsrError(brokerId: Int, brokerEpoch: Long, isrsToAlter:
Map[TopicPartition, LeaderAndIsr]): CompletableFuture[Errors] = {
- val future = new CompletableFuture[Errors]()
+ @Test
+ def testAlterPartitionErrorsAfterUncleanElection(): Unit = {
+ // - Start 3 brokers with unclean election enabled
+ // - Create a topic with two non-controller replicas: A and B
+ // - Shutdown A to bring ISR to [B]
+ // - Shutdown B to make partition offline
+ // - Restart A to force unclean election with ISR [A]
+ // - Verify AlterPartition handling in this state
+
+ servers = makeServers(numConfigs = 3, uncleanLeaderElectionEnable = true)
+ val controllerId = TestUtils.waitUntilControllerElected(zkClient)
val controller = getController().kafkaController
- val callback: AlterPartitionCallback = {
- case Left(_: Map[TopicPartition, Either[Errors, LeaderAndIsr]]) =>
- future.completeExceptionally(new AssertionError(s"Should have seen
top-level error"))
- case Right(error: Errors) =>
- future.complete(error)
+
+ val tp = new TopicPartition("t", 0)
+ val replicas = servers.map(_.config.nodeId).filter(_ !=
controllerId).take(2).toList
+ val assignment = Map(tp.partition -> replicas)
+
+ val replica1 = replicas.head
+ val replica2 = replicas.last
Review Comment:
Minor but with Scala you can deconstruct a `List` with `val replica1 ::
replica2 :: Nil = replicas`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]