This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new cdd9c62c553 KAFKA-15711: KRaft support in LogRecoveryTest (#14693)
cdd9c62c553 is described below

commit cdd9c62c5533e67a9057a47005fa1d296b903a11
Author: Gantigmaa Selenge <39860586+tinasele...@users.noreply.github.com>
AuthorDate: Wed Jan 31 10:34:42 2024 +0000

    KAFKA-15711: KRaft support in LogRecoveryTest (#14693)
    
    
    Reviewers: Mickael Maison <mickael.mai...@gmail.com>, Zihao Lin
---
 .../scala/unit/kafka/server/LogRecoveryTest.scala  | 59 +++++++++++++---------
 .../test/scala/unit/kafka/utils/TestUtils.scala    | 16 ++++++
 2 files changed, 51 insertions(+), 24 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 
b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index 37c5a097bc0..53110040885 100755
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@ -20,17 +20,22 @@ import java.util.Properties
 
 import scala.collection.Seq
 
-import kafka.utils.TestUtils
+import kafka.utils.{TestUtils, TestInfoUtils}
 import TestUtils._
 import kafka.server.QuorumTestHarness
 import java.io.File
 
 import kafka.server.checkpoints.OffsetCheckpointFile
+import org.apache.kafka.clients.admin.Admin
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.serialization.{IntegerSerializer, 
StringSerializer}
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
 import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 
 class LogRecoveryTest extends QuorumTestHarness {
 
@@ -49,18 +54,19 @@ class LogRecoveryTest extends QuorumTestHarness {
   val partitionId = 0
   val topicPartition = new TopicPartition(topic, partitionId)
 
-  var server1: KafkaServer = _
-  var server2: KafkaServer = _
+  var server1: KafkaBroker = _
+  var server2: KafkaBroker = _
 
   def configProps1 = configs.head
   def configProps2 = configs.last
 
   val message = "hello"
 
+  var admin: Admin = _
   var producer: KafkaProducer[Integer, String] = _
   def hwFile1 = new OffsetCheckpointFile(new File(configProps1.logDirs.head, 
ReplicaManager.HighWatermarkFilename))
   def hwFile2 = new OffsetCheckpointFile(new File(configProps2.logDirs.head, 
ReplicaManager.HighWatermarkFilename))
-  var servers = Seq.empty[KafkaServer]
+  var servers = Seq.empty[KafkaBroker]
 
   // Some tests restart the brokers then produce more data. But since test 
brokers use random ports, we need
   // to use a new producer that knows the new ports
@@ -78,15 +84,15 @@ class LogRecoveryTest extends QuorumTestHarness {
   override def setUp(testInfo: TestInfo): Unit = {
     super.setUp(testInfo)
 
-    configs = TestUtils.createBrokerConfigs(2, zkConnect, 
enableControlledShutdown = false).map(KafkaConfig.fromProps(_, overridingProps))
+    configs = TestUtils.createBrokerConfigs(2, zkConnectOrNull, 
enableControlledShutdown = false).map(KafkaConfig.fromProps(_, overridingProps))
 
     // start both servers
-    server1 = TestUtils.createServer(configProps1)
-    server2 = TestUtils.createServer(configProps2)
+    server1 = createBroker(configProps1)
+    server2 = createBroker(configProps2)
     servers = List(server1, server2)
 
-    // create topic with 1 partition, 2 replicas, one on each broker
-    createTopic(zkClient, topic, partitionReplicaAssignment = Map(0 -> 
Seq(0,1)), servers = servers)
+    admin = createAdminClient(servers, 
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
+    createTopicWithAdmin(admin, topic, servers, controllerServers, 
replicaAssignment = Map(0 -> Seq(0, 1)))
 
     // create the producer
     updateProducer()
@@ -95,12 +101,14 @@ class LogRecoveryTest extends QuorumTestHarness {
   @AfterEach
   override def tearDown(): Unit = {
     producer.close()
+    if (admin != null) admin.close()
     TestUtils.shutdownServers(servers)
     super.tearDown()
   }
 
-  @Test
-  def testHWCheckpointNoFailuresSingleLogSegment(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testHWCheckpointNoFailuresSingleLogSegment(quorum: String): Unit = {
     val numMessages = 2L
     sendMessages(numMessages.toInt)
 
@@ -116,9 +124,10 @@ class LogRecoveryTest extends QuorumTestHarness {
     assertEquals(numMessages, followerHW)
   }
 
-  @Test
-  def testHWCheckpointWithFailuresSingleLogSegment(): Unit = {
-    var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, 
partitionId)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testHWCheckpointWithFailuresSingleLogSegment(quorum: String): Unit = {
+    var leader = getLeaderIdForPartition(servers, topicPartition)
 
     assertEquals(0L, hwFile1.read().getOrElse(topicPartition, 0L))
 
@@ -131,7 +140,7 @@ class LogRecoveryTest extends QuorumTestHarness {
     assertEquals(hw, hwFile1.read().getOrElse(topicPartition, 0L))
 
     // check if leader moves to the other server
-    leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 
oldLeaderOpt = Some(leader))
+    leader = awaitLeaderChange(servers, topicPartition, leader)
     assertEquals(1, leader, "Leader must move to broker 1")
 
     // bring the preferred replica back
@@ -139,7 +148,7 @@ class LogRecoveryTest extends QuorumTestHarness {
     // Update producer with new server settings
     updateProducer()
 
-    leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId)
+    leader = getLeaderIdForPartition(servers, topicPartition)
     assertTrue(leader == 0 || leader == 1,
       "Leader must remain on broker 1, in case of ZooKeeper session expiration 
it can move to broker 0")
 
@@ -159,7 +168,7 @@ class LogRecoveryTest extends QuorumTestHarness {
 
     server2.startup()
     updateProducer()
-    leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 
oldLeaderOpt = Some(leader))
+    leader = awaitLeaderChange(servers, topicPartition, leader)
     assertTrue(leader == 0 || leader == 1,
       "Leader must remain on broker 0, in case of ZooKeeper session expiration 
it can move to broker 1")
 
@@ -176,8 +185,9 @@ class LogRecoveryTest extends QuorumTestHarness {
     assertEquals(hw, hwFile2.read().getOrElse(topicPartition, 0L))
   }
 
-  @Test
-  def testHWCheckpointNoFailuresMultipleLogSegments(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testHWCheckpointNoFailuresMultipleLogSegments(quorum: String): Unit = {
     sendMessages(20)
     val hw = 20L
     // give some time for follower 1 to record leader HW of 600
@@ -192,9 +202,10 @@ class LogRecoveryTest extends QuorumTestHarness {
     assertEquals(hw, followerHW)
   }
 
-  @Test
-  def testHWCheckpointWithFailuresMultipleLogSegments(): Unit = {
-    var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, 
partitionId)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testHWCheckpointWithFailuresMultipleLogSegments(quorum: String): Unit = {
+    var leader = getLeaderIdForPartition(servers, topicPartition)
 
     sendMessages(2)
     var hw = 2L
@@ -212,7 +223,7 @@ class LogRecoveryTest extends QuorumTestHarness {
     server2.startup()
     updateProducer()
     // check if leader moves to the other server
-    leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 
oldLeaderOpt = Some(leader))
+    leader = awaitLeaderChange(servers, topicPartition, leader)
     assertEquals(1, leader, "Leader must move to broker 1")
 
     assertEquals(hw, hwFile1.read().getOrElse(topicPartition, 0L))
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 3e179f5d04c..16a7e95c0bf 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -1359,6 +1359,22 @@ object TestUtils extends Logging {
     newLeaderExists.get
   }
 
+  def getLeaderIdForPartition[B <: KafkaBroker](
+      brokers: Seq[B],
+      tp: TopicPartition,
+      timeout: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Int = {
+    def leaderExists: Option[Int] = {
+      brokers.find { broker =>
+          
broker.replicaManager.onlinePartition(tp).exists(_.leaderLogIfLocal.isDefined)
+      }.map(_.config.brokerId)
+    }
+
+    waitUntilTrue(() => leaderExists.isDefined,
+      s"Did not find a leader for partition $tp after $timeout ms", waitTimeMs 
= timeout)
+
+    leaderExists.get
+  }
+
   def waitUntilLeaderIsKnown[B <: KafkaBroker](
       brokers: Seq[B],
       tp: TopicPartition,

Reply via email to