This is an automated email from the ASF dual-hosted git repository. rndgstn pushed a commit to branch 3.7 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.7 by this push: new 1455962d57f KAFKA-15366: Modify LogDirFailureTest for KRaft (#14977) 1455962d57f is described below commit 1455962d57f255dfabcc204cffca6263b77d2954 Author: Viktor Somogyi-Vass <viktorsomo...@gmail.com> AuthorDate: Wed Dec 20 03:02:49 2023 +0100 KAFKA-15366: Modify LogDirFailureTest for KRaft (#14977) Reviewers: Omnia G.H Ibrahim <o.g.h.ibra...@gmail.com>, Ron Dagostino <rdagost...@confluent.io>, Igor Soarez <soa...@apple.com> --- .../unit/kafka/server/LogDirFailureTest.scala | 82 ++++++++++++++-------- .../test/scala/unit/kafka/utils/TestUtils.scala | 3 +- 2 files changed, 53 insertions(+), 32 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala index a403088d58b..c63f4664096 100644 --- a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala @@ -21,8 +21,8 @@ import java.util.Collections import java.util.concurrent.{ExecutionException, TimeUnit} import kafka.api.IntegrationTestHarness import kafka.controller.{OfflineReplica, PartitionAndReplica} -import kafka.utils.TestUtils.{Checkpoint, LogDirFailureType, Roll} -import kafka.utils.{CoreUtils, Exit, TestUtils} +import kafka.utils.TestUtils.{waitUntilTrue, Checkpoint, LogDirFailureType, Roll} +import kafka.utils.{CoreUtils, Exit, TestInfoUtils, TestUtils} import org.apache.kafka.clients.consumer.Consumer import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord} import org.apache.kafka.common.TopicPartition @@ -30,6 +30,8 @@ import org.apache.kafka.common.errors.{KafkaStorageException, NotLeaderOrFollowe import org.apache.kafka.common.utils.Utils import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{BeforeEach, Test, TestInfo} +import org.junit.jupiter.params.provider.ValueSource +import org.junit.jupiter.params.ParameterizedTest import java.nio.file.Files import scala.annotation.nowarn @@ -56,20 +58,22 @@ class LogDirFailureTest extends IntegrationTestHarness { createTopic(topic, partitionNum, brokerCount) } - @Test - def testProduceErrorFromFailureOnLogRoll(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testProduceErrorFromFailureOnLogRoll(quorum: String): Unit = { testProduceErrorsFromLogDirFailureOnLeader(Roll) } - @Test - def testIOExceptionDuringLogRoll(): Unit = { - testProduceAfterLogDirFailureOnLeader(Roll) + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testIOExceptionDuringLogRoll(quorum: String): Unit = { + testProduceAfterLogDirFailureOnLeader(Roll, quorum) } // Broker should halt on any log directory failure if inter-broker protocol < 1.0 @nowarn("cat=deprecation") @Test - def brokerWithOldInterBrokerProtocolShouldHaltOnLogDirFailure(): Unit = { + def testZkBrokerWithOldInterBrokerProtocolShouldHaltOnLogDirFailure(): Unit = { @volatile var statusCodeOption: Option[Int] = None Exit.setHaltProcedure { (statusCode, _) => statusCodeOption = Some(statusCode) @@ -97,18 +101,21 @@ class LogDirFailureTest extends IntegrationTestHarness { } } - @Test - def testProduceErrorFromFailureOnCheckpoint(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testProduceErrorFromFailureOnCheckpoint(quorum: String): Unit = { testProduceErrorsFromLogDirFailureOnLeader(Checkpoint) } - @Test - def testIOExceptionDuringCheckpoint(): Unit = { - testProduceAfterLogDirFailureOnLeader(Checkpoint) + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testIOExceptionDuringCheckpoint(quorum: String): Unit = { + testProduceAfterLogDirFailureOnLeader(Checkpoint, quorum) } - @Test - def testReplicaFetcherThreadAfterLogDirFailureOnFollower(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testReplicaFetcherThreadAfterLogDirFailureOnFollower(quorum: String): Unit = { this.producerConfig.setProperty(ProducerConfig.RETRIES_CONFIG, "0") this.producerConfig.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false") val producer = createProducer() @@ -116,9 +123,9 @@ class LogDirFailureTest extends IntegrationTestHarness { val partitionInfo = producer.partitionsFor(topic).asScala.find(_.partition() == 0).get val leaderServerId = partitionInfo.leader().id() - val leaderServer = servers.find(_.config.brokerId == leaderServerId).get + val leaderServer = brokers.find(_.config.brokerId == leaderServerId).get val followerServerId = partitionInfo.replicas().map(_.id()).find(_ != leaderServerId).get - val followerServer = servers.find(_.config.brokerId == followerServerId).get + val followerServer = brokers.find(_.config.brokerId == followerServerId).get followerServer.replicaManager.markPartitionOffline(partition) // Send a message to another partition whose leader is the same as partition 0 @@ -149,7 +156,7 @@ class LogDirFailureTest extends IntegrationTestHarness { val record = new ProducerRecord(topic, 0, s"key".getBytes, s"value".getBytes) val leaderServerId = producer.partitionsFor(topic).asScala.find(_.partition() == 0).get.leader().id() - val leaderServer = servers.find(_.config.brokerId == leaderServerId).get + val leaderServer = brokers.find(_.config.brokerId == leaderServerId).get TestUtils.causeLogDirFailure(failureType, leaderServer, partition) @@ -160,7 +167,7 @@ class LogDirFailureTest extends IntegrationTestHarness { e.getCause.isInstanceOf[NotLeaderOrFollowerException]) } - def testProduceAfterLogDirFailureOnLeader(failureType: LogDirFailureType): Unit = { + def testProduceAfterLogDirFailureOnLeader(failureType: LogDirFailureType, quorum: String): Unit = { val consumer = createConsumer() subscribeAndWaitForAssignment(topic, consumer) @@ -169,20 +176,20 @@ class LogDirFailureTest extends IntegrationTestHarness { val partition = new TopicPartition(topic, 0) val record = new ProducerRecord(topic, 0, s"key".getBytes, s"value".getBytes) - val leaderServerId = producer.partitionsFor(topic).asScala.find(_.partition() == 0).get.leader().id() - val leaderServer = servers.find(_.config.brokerId == leaderServerId).get + val originalLeaderServerId = producer.partitionsFor(topic).asScala.find(_.partition() == 0).get.leader().id() + val originalLeaderServer = brokers.find(_.config.brokerId == originalLeaderServerId).get // The first send() should succeed producer.send(record).get() TestUtils.consumeRecords(consumer, 1) - TestUtils.causeLogDirFailure(failureType, leaderServer, partition) + val failedLogDir = TestUtils.causeLogDirFailure(failureType, originalLeaderServer, partition) TestUtils.waitUntilTrue(() => { // ProduceResponse may contain KafkaStorageException and trigger metadata update producer.send(record) - producer.partitionsFor(topic).asScala.find(_.partition() == 0).get.leader().id() != leaderServerId - }, "Expected new leader for the partition", 6000L) + producer.partitionsFor(topic).asScala.find(_.partition() == 0).get.leader().id() != originalLeaderServerId + }, "Expected new leader for the partition") // Block on send to ensure that new leader accepts a message. producer.send(record).get(6000L, TimeUnit.MILLISECONDS) @@ -190,13 +197,26 @@ class LogDirFailureTest extends IntegrationTestHarness { // Consumer should receive some messages TestUtils.pollUntilAtLeastNumRecords(consumer, 1) - // There should be no remaining LogDirEventNotification znode - assertTrue(zkClient.getAllLogDirEventNotifications.isEmpty) - - // The controller should have marked the replica on the original leader as offline - val controllerServer = servers.find(_.kafkaController.isActive).get - val offlineReplicas = controllerServer.kafkaController.controllerContext.replicasInState(topic, OfflineReplica) - assertTrue(offlineReplicas.contains(PartitionAndReplica(new TopicPartition(topic, 0), leaderServerId))) + if (quorum == "kraft") { + waitUntilTrue(() => { + // get the broker with broker.nodeId == originalLeaderServerId + val brokerWithDirFail = brokers.find(_.config.nodeId == originalLeaderServerId).map(_.asInstanceOf[BrokerServer]) + // check if the broker has the offline log dir + val hasOfflineDir = brokerWithDirFail.exists(_.logDirFailureChannel.hasOfflineLogDir(failedLogDir.toPath.toString)) + // check if the broker has the offline replica + hasOfflineDir && brokerWithDirFail.exists(broker => + broker.replicaManager.metadataCache + .getClusterMetadata(broker.clusterId, broker.config.interBrokerListenerName) + .partition(new TopicPartition(topic, 0)).offlineReplicas().map(_.id()).contains(originalLeaderServerId)) + }, "Expected to find an offline log dir") + } else { + // There should be no remaining LogDirEventNotification znode + assertTrue(zkClient.getAllLogDirEventNotifications.isEmpty) + // The controller should have marked the replica on the original leader as offline + val controllerServer = servers.find(_.kafkaController.isActive).get + val offlineReplicas = controllerServer.kafkaController.controllerContext.replicasInState(topic, OfflineReplica) + assertTrue(offlineReplicas.contains(PartitionAndReplica(new TopicPartition(topic, 0), originalLeaderServerId))) + } } diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 7887753a9d1..9eb284729bd 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -1641,7 +1641,7 @@ object TestUtils extends Logging { } - def causeLogDirFailure(failureType: LogDirFailureType, leaderBroker: KafkaBroker, partition: TopicPartition): Unit = { + def causeLogDirFailure(failureType: LogDirFailureType, leaderBroker: KafkaBroker, partition: TopicPartition): File = { // Make log directory of the partition on the leader broker inaccessible by replacing it with a file val localLog = leaderBroker.replicaManager.localLogOrException(partition) val logDir = localLog.dir.getParentFile @@ -1658,6 +1658,7 @@ object TestUtils extends Logging { // Wait for ReplicaHighWatermarkCheckpoint to happen so that the log directory of the topic will be offline waitUntilTrue(() => !leaderBroker.logManager.isLogDirOnline(logDir.getAbsolutePath), "Expected log directory offline", 3000L) assertTrue(leaderBroker.replicaManager.localLog(partition).isEmpty) + logDir } /**