This is an automated email from the ASF dual-hosted git repository. mimaison pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new cdd9c62c553 KAFKA-15711: KRaft support in LogRecoveryTest (#14693) cdd9c62c553 is described below commit cdd9c62c5533e67a9057a47005fa1d296b903a11 Author: Gantigmaa Selenge <39860586+tinasele...@users.noreply.github.com> AuthorDate: Wed Jan 31 10:34:42 2024 +0000 KAFKA-15711: KRaft support in LogRecoveryTest (#14693) Reviewers: Mickael Maison <mickael.mai...@gmail.com>, Zihao Lin --- .../scala/unit/kafka/server/LogRecoveryTest.scala | 59 +++++++++++++--------- .../test/scala/unit/kafka/utils/TestUtils.scala | 16 ++++++ 2 files changed, 51 insertions(+), 24 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala index 37c5a097bc0..53110040885 100755 --- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala @@ -20,17 +20,22 @@ import java.util.Properties import scala.collection.Seq -import kafka.utils.TestUtils +import kafka.utils.{TestUtils, TestInfoUtils} import TestUtils._ import kafka.server.QuorumTestHarness import java.io.File import kafka.server.checkpoints.OffsetCheckpointFile +import org.apache.kafka.clients.admin.Admin import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} +import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.serialization.{IntegerSerializer, StringSerializer} -import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo} +import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource class LogRecoveryTest extends QuorumTestHarness { @@ -49,18 +54,19 @@ class LogRecoveryTest extends QuorumTestHarness { val partitionId = 0 val topicPartition = new TopicPartition(topic, partitionId) - var server1: KafkaServer = _ - var server2: KafkaServer = _ + var server1: KafkaBroker = _ + var server2: KafkaBroker = _ def configProps1 = configs.head def configProps2 = configs.last val message = "hello" + var admin: Admin = _ var producer: KafkaProducer[Integer, String] = _ def hwFile1 = new OffsetCheckpointFile(new File(configProps1.logDirs.head, ReplicaManager.HighWatermarkFilename)) def hwFile2 = new OffsetCheckpointFile(new File(configProps2.logDirs.head, ReplicaManager.HighWatermarkFilename)) - var servers = Seq.empty[KafkaServer] + var servers = Seq.empty[KafkaBroker] // Some tests restart the brokers then produce more data. But since test brokers use random ports, we need // to use a new producer that knows the new ports @@ -78,15 +84,15 @@ class LogRecoveryTest extends QuorumTestHarness { override def setUp(testInfo: TestInfo): Unit = { super.setUp(testInfo) - configs = TestUtils.createBrokerConfigs(2, zkConnect, enableControlledShutdown = false).map(KafkaConfig.fromProps(_, overridingProps)) + configs = TestUtils.createBrokerConfigs(2, zkConnectOrNull, enableControlledShutdown = false).map(KafkaConfig.fromProps(_, overridingProps)) // start both servers - server1 = TestUtils.createServer(configProps1) - server2 = TestUtils.createServer(configProps2) + server1 = createBroker(configProps1) + server2 = createBroker(configProps2) servers = List(server1, server2) - // create topic with 1 partition, 2 replicas, one on each broker - createTopic(zkClient, topic, partitionReplicaAssignment = Map(0 -> Seq(0,1)), servers = servers) + admin = createAdminClient(servers, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)) + createTopicWithAdmin(admin, topic, servers, controllerServers, replicaAssignment = Map(0 -> Seq(0, 1))) // create the producer updateProducer() @@ -95,12 +101,14 @@ class LogRecoveryTest extends QuorumTestHarness { @AfterEach override def tearDown(): Unit = { producer.close() + if (admin != null) admin.close() TestUtils.shutdownServers(servers) super.tearDown() } - @Test - def testHWCheckpointNoFailuresSingleLogSegment(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testHWCheckpointNoFailuresSingleLogSegment(quorum: String): Unit = { val numMessages = 2L sendMessages(numMessages.toInt) @@ -116,9 +124,10 @@ class LogRecoveryTest extends QuorumTestHarness { assertEquals(numMessages, followerHW) } - @Test - def testHWCheckpointWithFailuresSingleLogSegment(): Unit = { - var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId) + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testHWCheckpointWithFailuresSingleLogSegment(quorum: String): Unit = { + var leader = getLeaderIdForPartition(servers, topicPartition) assertEquals(0L, hwFile1.read().getOrElse(topicPartition, 0L)) @@ -131,7 +140,7 @@ class LogRecoveryTest extends QuorumTestHarness { assertEquals(hw, hwFile1.read().getOrElse(topicPartition, 0L)) // check if leader moves to the other server - leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, oldLeaderOpt = Some(leader)) + leader = awaitLeaderChange(servers, topicPartition, leader) assertEquals(1, leader, "Leader must move to broker 1") // bring the preferred replica back @@ -139,7 +148,7 @@ class LogRecoveryTest extends QuorumTestHarness { // Update producer with new server settings updateProducer() - leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId) + leader = getLeaderIdForPartition(servers, topicPartition) assertTrue(leader == 0 || leader == 1, "Leader must remain on broker 1, in case of ZooKeeper session expiration it can move to broker 0") @@ -159,7 +168,7 @@ class LogRecoveryTest extends QuorumTestHarness { server2.startup() updateProducer() - leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, oldLeaderOpt = Some(leader)) + leader = awaitLeaderChange(servers, topicPartition, leader) assertTrue(leader == 0 || leader == 1, "Leader must remain on broker 0, in case of ZooKeeper session expiration it can move to broker 1") @@ -176,8 +185,9 @@ class LogRecoveryTest extends QuorumTestHarness { assertEquals(hw, hwFile2.read().getOrElse(topicPartition, 0L)) } - @Test - def testHWCheckpointNoFailuresMultipleLogSegments(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testHWCheckpointNoFailuresMultipleLogSegments(quorum: String): Unit = { sendMessages(20) val hw = 20L // give some time for follower 1 to record leader HW of 600 @@ -192,9 +202,10 @@ class LogRecoveryTest extends QuorumTestHarness { assertEquals(hw, followerHW) } - @Test - def testHWCheckpointWithFailuresMultipleLogSegments(): Unit = { - var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId) + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testHWCheckpointWithFailuresMultipleLogSegments(quorum: String): Unit = { + var leader = getLeaderIdForPartition(servers, topicPartition) sendMessages(2) var hw = 2L @@ -212,7 +223,7 @@ class LogRecoveryTest extends QuorumTestHarness { server2.startup() updateProducer() // check if leader moves to the other server - leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, oldLeaderOpt = Some(leader)) + leader = awaitLeaderChange(servers, topicPartition, leader) assertEquals(1, leader, "Leader must move to broker 1") assertEquals(hw, hwFile1.read().getOrElse(topicPartition, 0L)) diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 3e179f5d04c..16a7e95c0bf 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -1359,6 +1359,22 @@ object TestUtils extends Logging { newLeaderExists.get } + def getLeaderIdForPartition[B <: KafkaBroker]( + brokers: Seq[B], + tp: TopicPartition, + timeout: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Int = { + def leaderExists: Option[Int] = { + brokers.find { broker => + broker.replicaManager.onlinePartition(tp).exists(_.leaderLogIfLocal.isDefined) + }.map(_.config.brokerId) + } + + waitUntilTrue(() => leaderExists.isDefined, + s"Did not find a leader for partition $tp after $timeout ms", waitTimeMs = timeout) + + leaderExists.get + } + def waitUntilLeaderIsKnown[B <: KafkaBroker]( brokers: Seq[B], tp: TopicPartition,