This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new e26772ef61 MINOR: convert some more junit tests to support KRaft (#12456) e26772ef61 is described below commit e26772ef616d1095efb7e48baa44842df8aeb058 Author: Colin Patrick McCabe <cmcc...@apache.org> AuthorDate: Fri Jul 29 13:36:40 2022 -0700 MINOR: convert some more junit tests to support KRaft (#12456) * MINOR: convert some more junit tests to support KRaft Introduce TestUtils#waitUntilLeaderIsElectedOrChangedWithAdmin, a ZK-free alternative to TestUtils#waitUntilLeaderIsElectedOrChanged. Convert PlaintextProducerSendTest, SslProducerSendTest, TransactionsWithMaxInFlightOneTest, AddPartitionsToTxnRequestServerTest and KafkaMetricsReporterTest to support KRaft Reviewers: dengziming <dengziming1...@gmail.com>, David Arthur <mum...@gmail.com> --- .../kafka/api/BaseProducerSendTest.scala | 102 ++++++++++++--------- .../kafka/api/PlaintextProducerSendTest.scala | 51 ++++++----- .../api/TransactionsWithMaxInFlightOneTest.scala | 24 ++--- .../kafka/server/QuorumTestHarness.scala | 40 +++++--- .../AddPartitionsToTxnRequestServerTest.scala | 16 ++-- .../kafka/server/KafkaMetricsReporterTest.scala | 55 ++++++----- .../test/scala/unit/kafka/utils/TestUtils.scala | 60 +++++++++++- 7 files changed, 226 insertions(+), 122 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala index 61870b073d..ce3cd32afd 100644 --- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala @@ -19,22 +19,24 @@ package kafka.api import java.time.Duration import java.nio.charset.StandardCharsets -import java.util.Properties +import java.util.{Collections, Properties} import java.util.concurrent.TimeUnit - import kafka.integration.KafkaServerTestHarness import kafka.log.LogConfig import kafka.server.KafkaConfig -import kafka.utils.TestUtils +import kafka.utils.{TestInfoUtils, TestUtils} +import org.apache.kafka.clients.admin.{Admin, NewPartitions} import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.clients.producer._ import org.apache.kafka.common.errors.TimeoutException -import org.apache.kafka.common.network.ListenerName +import org.apache.kafka.common.network.{ListenerName, Mode} import org.apache.kafka.common.record.TimestampType import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.{KafkaException, TopicPartition} import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo} +import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource import scala.jdk.CollectionConverters._ import scala.collection.mutable.Buffer @@ -42,16 +44,17 @@ import scala.concurrent.ExecutionException abstract class BaseProducerSendTest extends KafkaServerTestHarness { - def generateConfigs = { + def generateConfigs: scala.collection.Seq[KafkaConfig] = { val overridingProps = new Properties() val numServers = 2 overridingProps.put(KafkaConfig.NumPartitionsProp, 4.toString) - TestUtils.createBrokerConfigs(numServers, zkConnect, false, interBrokerSecurityProtocol = Some(securityProtocol), + TestUtils.createBrokerConfigs(numServers, zkConnectOrNull, false, interBrokerSecurityProtocol = Some(securityProtocol), trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties).map(KafkaConfig.fromProps(_, overridingProps)) } private var consumer: KafkaConsumer[Array[Byte], Array[Byte]] = _ private val producers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]() + protected var admin: Admin = null protected val topic = "topic" private val numRecords = 100 @@ -59,6 +62,15 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { @BeforeEach override def setUp(testInfo: TestInfo): Unit = { super.setUp(testInfo) + + admin = TestUtils.createAdminClient(brokers, listenerName, + TestUtils.securityConfigs(Mode.CLIENT, + securityProtocol, + trustStoreFile, + "adminClient", + TestUtils.SslCertificateCn, + clientSaslProperties)) + consumer = TestUtils.createConsumer( bootstrapServers(listenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)), securityProtocol = SecurityProtocol.PLAINTEXT @@ -70,6 +82,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { consumer.close() // Ensure that all producers are closed since unclosed producers impact other tests when Kafka server ports are reused producers.foreach(_.close()) + admin.close() super.tearDown() } @@ -105,8 +118,9 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { * 1. Send with null key/value/partition-id should be accepted; send with null topic should be rejected. * 2. Last message of the non-blocking send should return the correct offset metadata */ - @Test - def testSendOffset(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testSendOffset(quorum: String): Unit = { val producer = createProducer() val partition = 0 @@ -134,7 +148,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { try { // create topic - createTopic(topic, 1, 2) + TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2) // send a normal record val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, "key".getBytes(StandardCharsets.UTF_8), @@ -166,8 +180,9 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { } } - @Test - def testSendCompressedMessageWithCreateTime(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testSendCompressedMessageWithCreateTime(quorum: String): Unit = { val producer = createProducer( compressionType = "gzip", lingerMs = Int.MaxValue, @@ -175,8 +190,9 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { sendAndVerifyTimestamp(producer, TimestampType.CREATE_TIME) } - @Test - def testSendNonCompressedMessageWithCreateTime(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testSendNonCompressedMessageWithCreateTime(quorum: String): Unit = { val producer = createProducer(lingerMs = Int.MaxValue, deliveryTimeoutMs = Int.MaxValue) sendAndVerifyTimestamp(producer, TimestampType.CREATE_TIME) } @@ -186,7 +202,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { timeoutMs: Long = 20000L): Unit = { val partition = 0 try { - createTopic(topic, 1, 2) + TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2) val futures = for (i <- 1 to numRecords) yield { val record = new ProducerRecord(topic, partition, s"key$i".getBytes(StandardCharsets.UTF_8), @@ -241,7 +257,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { topicProps.setProperty(LogConfig.MessageTimestampTypeProp, "LogAppendTime") else topicProps.setProperty(LogConfig.MessageTimestampTypeProp, "CreateTime") - createTopic(topic, 1, 2, topicProps) + TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2, topicConfig = topicProps) val recordAndFutures = for (i <- 1 to numRecords) yield { val record = new ProducerRecord(topic, partition, baseTimestamp + i, s"key$i".getBytes(StandardCharsets.UTF_8), @@ -267,13 +283,14 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { * * After close() returns, all messages should be sent with correct returned offset metadata */ - @Test - def testClose(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testClose(quorum: String): Unit = { val producer = createProducer() try { // create topic - createTopic(topic, 1, 2) + TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2) // non-blocking send a list of records val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, null, "key".getBytes(StandardCharsets.UTF_8), @@ -300,12 +317,13 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { * * The specified partition-id should be respected */ - @Test - def testSendToPartition(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testSendToPartition(quorum: String): Unit = { val producer = createProducer() try { - createTopic(topic, 2, 2) + TestUtils.createTopicWithAdmin(admin, topic, brokers, 2, 2) val partition = 1 val now = System.currentTimeMillis() @@ -345,14 +363,15 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { * Producer will attempt to send messages to the partition specified in each record, and should * succeed as long as the partition is included in the metadata. */ - @Test - def testSendBeforeAndAfterPartitionExpansion(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testSendBeforeAndAfterPartitionExpansion(quorum: String): Unit = { val producer = createProducer(maxBlockMs = 5 * 1000L) // create topic - createTopic(topic, 1, 2) - val partition0 = 0 + TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2) + val partition0 = 0 var futures0 = (1 to numRecords).map { i => producer.send(new ProducerRecord(topic, partition0, null, ("value" + i).getBytes(StandardCharsets.UTF_8))) }.map(_.get(30, TimeUnit.SECONDS)) @@ -369,13 +388,11 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { val e = assertThrows(classOf[ExecutionException], () => producer.send(new ProducerRecord(topic, partition1, null, "value".getBytes(StandardCharsets.UTF_8))).get()) assertEquals(classOf[TimeoutException], e.getCause.getClass) - val existingAssignment = zkClient.getFullReplicaAssignmentForTopics(Set(topic)).map { - case (topicPartition, assignment) => topicPartition.partition -> assignment - } - adminZkClient.addPartitions(topic, existingAssignment, adminZkClient.getBrokerMetadatas(), 2) + admin.createPartitions(Collections.singletonMap(topic, NewPartitions.increaseTo(2))).all().get() + // read metadata from a broker and verify the new topic partitions exist - TestUtils.waitForPartitionMetadata(servers, topic, 0) - TestUtils.waitForPartitionMetadata(servers, topic, 1) + TestUtils.waitForPartitionMetadata(brokers, topic, 0) + TestUtils.waitForPartitionMetadata(brokers, topic, 1) // send records to the newly added partition after confirming that metadata have been updated. val futures1 = (1 to numRecords).map { i => @@ -404,11 +421,12 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { /** * Test that flush immediately sends all accumulated requests. */ - @Test - def testFlush(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testFlush(quorum: String): Unit = { val producer = createProducer(lingerMs = Int.MaxValue, deliveryTimeoutMs = Int.MaxValue) try { - createTopic(topic, 2, 2) + TestUtils.createTopicWithAdmin(admin, topic, brokers, 2, 2) val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, "value".getBytes(StandardCharsets.UTF_8)) for (_ <- 0 until 50) { @@ -425,9 +443,10 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { /** * Test close with zero timeout from caller thread */ - @Test - def testCloseWithZeroTimeoutFromCallerThread(): Unit = { - createTopic(topic, 2, 2) + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testCloseWithZeroTimeoutFromCallerThread(quorum: String): Unit = { + TestUtils.createTopicWithAdmin(admin, topic, brokers, 2, 2) val partition = 0 consumer.assign(List(new TopicPartition(topic, partition)).asJava) val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, null, @@ -450,9 +469,10 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { /** * Test close with zero and non-zero timeout from sender thread */ - @Test - def testCloseWithZeroTimeoutFromSenderThread(): Unit = { - createTopic(topic, 1, 2) + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testCloseWithZeroTimeoutFromSenderThread(quorum: String): Unit = { + TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2) val partition = 0 consumer.assign(List(new TopicPartition(topic, partition)).asJava) val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, null, "value".getBytes(StandardCharsets.UTF_8)) diff --git a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala index 06ff201e0b..c25eb184b3 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala @@ -19,22 +19,23 @@ package kafka.api import java.util.Properties import java.util.concurrent.{ExecutionException, Future, TimeUnit} - import kafka.log.LogConfig import kafka.server.Defaults -import kafka.utils.TestUtils +import kafka.utils.{TestInfoUtils, TestUtils} import org.apache.kafka.clients.producer.{BufferExhaustedException, KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata} import org.apache.kafka.common.errors.{InvalidTimestampException, RecordTooLargeException, SerializationException, TimeoutException} import org.apache.kafka.common.record.{DefaultRecord, DefaultRecordBatch, Records, TimestampType} import org.apache.kafka.common.serialization.ByteArraySerializer import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.Test +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource class PlaintextProducerSendTest extends BaseProducerSendTest { - @Test - def testWrongSerializer(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testWrongSerializer(quorum: String): Unit = { val producerProps = new Properties() producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()) producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") @@ -44,8 +45,9 @@ class PlaintextProducerSendTest extends BaseProducerSendTest { assertThrows(classOf[SerializationException], () => producer.send(record)) } - @Test - def testBatchSizeZero(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testBatchSizeZero(quorum: String): Unit = { val producer = createProducer( lingerMs = Int.MaxValue, deliveryTimeoutMs = Int.MaxValue, @@ -53,8 +55,9 @@ class PlaintextProducerSendTest extends BaseProducerSendTest { sendAndVerify(producer) } - @Test - def testSendCompressedMessageWithLogAppendTime(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testSendCompressedMessageWithLogAppendTime(quorum: String): Unit = { val producer = createProducer( compressionType = "gzip", lingerMs = Int.MaxValue, @@ -62,8 +65,9 @@ class PlaintextProducerSendTest extends BaseProducerSendTest { sendAndVerifyTimestamp(producer, TimestampType.LOG_APPEND_TIME) } - @Test - def testSendNonCompressedMessageWithLogAppendTime(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testSendNonCompressedMessageWithLogAppendTime(quorum: String): Unit = { val producer = createProducer(lingerMs = Int.MaxValue, deliveryTimeoutMs = Int.MaxValue) sendAndVerifyTimestamp(producer, TimestampType.LOG_APPEND_TIME) } @@ -73,8 +77,9 @@ class PlaintextProducerSendTest extends BaseProducerSendTest { * * The topic should be created upon sending the first message */ - @Test - def testAutoCreateTopic(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testAutoCreateTopic(quorum: String): Unit = { val producer = createProducer() try { // Send a message to auto-create the topic @@ -82,18 +87,18 @@ class PlaintextProducerSendTest extends BaseProducerSendTest { assertEquals(0L, producer.send(record).get.offset, "Should have offset 0") // double check that the topic is created with leader elected - TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) - + TestUtils.waitUntilLeaderIsElectedOrChangedWithAdmin(admin, topic, 0) } finally { producer.close() } } - @Test - def testSendWithInvalidCreateTime(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testSendWithInvalidCreateTime(quorum: String): Unit = { val topicProps = new Properties() topicProps.setProperty(LogConfig.MessageTimestampDifferenceMaxMsProp, "1000") - createTopic(topic, 1, 2, topicProps) + TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2, topicConfig = topicProps) val producer = createProducer() try { @@ -118,8 +123,9 @@ class PlaintextProducerSendTest extends BaseProducerSendTest { // Test that producer with max.block.ms=0 can be used to send in non-blocking mode // where requests are failed immediately without blocking if metadata is not available // or buffer is full. - @Test - def testNonBlockingProducer(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testNonBlockingProducer(quorum: String): Unit = { def send(producer: KafkaProducer[Array[Byte],Array[Byte]]): Future[RecordMetadata] = { producer.send(new ProducerRecord(topic, 0, "key".getBytes, new Array[Byte](1000))) @@ -173,8 +179,9 @@ class PlaintextProducerSendTest extends BaseProducerSendTest { verifySendSuccess(future2) // previous batch should be completed and sent now } - @Test - def testSendRecordBatchWithMaxRequestSizeAndHigher(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testSendRecordBatchWithMaxRequestSizeAndHigher(quorum: String): Unit = { val producerProps = new Properties() producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()) val producer = registerProducer(new KafkaProducer(producerProps, new ByteArraySerializer, new ByteArraySerializer)) diff --git a/core/src/test/scala/integration/kafka/api/TransactionsWithMaxInFlightOneTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsWithMaxInFlightOneTest.scala index eacc58e76c..5dd82b6b22 100644 --- a/core/src/test/scala/integration/kafka/api/TransactionsWithMaxInFlightOneTest.scala +++ b/core/src/test/scala/integration/kafka/api/TransactionsWithMaxInFlightOneTest.scala @@ -18,15 +18,16 @@ package kafka.api import java.util.Properties - import kafka.integration.KafkaServerTestHarness import kafka.server.KafkaConfig -import kafka.utils.TestUtils +import kafka.utils.{TestInfoUtils, TestUtils} import kafka.utils.TestUtils.consumeRecords import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.clients.producer.KafkaProducer import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo} +import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource import scala.collection.Seq import scala.collection.mutable.Buffer @@ -37,7 +38,7 @@ import scala.jdk.CollectionConverters._ * A single broker is used to verify edge cases where different requests are queued on the same connection. */ class TransactionsWithMaxInFlightOneTest extends KafkaServerTestHarness { - val numServers = 1 + val numBrokers = 1 val topic1 = "topic1" val topic2 = "topic2" @@ -47,7 +48,7 @@ class TransactionsWithMaxInFlightOneTest extends KafkaServerTestHarness { val transactionalConsumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]() override def generateConfigs: Seq[KafkaConfig] = { - TestUtils.createBrokerConfigs(numServers, zkConnect).map(KafkaConfig.fromProps(_, serverProps())) + TestUtils.createBrokerConfigs(numBrokers, zkConnectOrNull).map(KafkaConfig.fromProps(_, serverProps())) } @BeforeEach @@ -55,8 +56,8 @@ class TransactionsWithMaxInFlightOneTest extends KafkaServerTestHarness { super.setUp(testInfo) val topicConfig = new Properties() topicConfig.put(KafkaConfig.MinInSyncReplicasProp, 1.toString) - createTopic(topic1, numPartitions, numServers, topicConfig) - createTopic(topic2, numPartitions, numServers, topicConfig) + createTopic(topic1, numPartitions, numBrokers, topicConfig) + createTopic(topic2, numPartitions, numBrokers, topicConfig) createTransactionalProducer("transactional-producer") createReadCommittedConsumer("transactional-group") @@ -69,10 +70,11 @@ class TransactionsWithMaxInFlightOneTest extends KafkaServerTestHarness { super.tearDown() } - @Test - def testTransactionalProducerSingleBrokerMaxInFlightOne(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testTransactionalProducerSingleBrokerMaxInFlightOne(quorum: String): Unit = { // We want to test with one broker to verify multiple requests queued on a connection - assertEquals(1, servers.size) + assertEquals(1, brokers.size) val producer = transactionalProducers.head val consumer = transactionalConsumers.head @@ -124,7 +126,7 @@ class TransactionsWithMaxInFlightOneTest extends KafkaServerTestHarness { } private def createTransactionalProducer(transactionalId: String): KafkaProducer[Array[Byte], Array[Byte]] = { - val producer = TestUtils.createTransactionalProducer(transactionalId, servers, maxInFlight = 1) + val producer = TestUtils.createTransactionalProducer(transactionalId, brokers, maxInFlight = 1) transactionalProducers += producer producer } diff --git a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala index b82f86a8cb..a2393cdccb 100755 --- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala +++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala @@ -47,9 +47,12 @@ import scala.compat.java8.OptionConverters._ import scala.jdk.CollectionConverters._ trait QuorumImplementation { - def createBroker(config: KafkaConfig, - time: Time, - startup: Boolean): KafkaBroker + def createBroker( + config: KafkaConfig, + time: Time = Time.SYSTEM, + startup: Boolean = true, + threadNamePrefix: Option[String] = None, + ): KafkaBroker def shutdown(): Unit } @@ -61,10 +64,13 @@ class ZooKeeperQuorumImplementation( val adminZkClient: AdminZkClient, val log: Logging ) extends QuorumImplementation { - override def createBroker(config: KafkaConfig, - time: Time, - startup: Boolean): KafkaBroker = { - val server = new KafkaServer(config, time, None, false) + override def createBroker( + config: KafkaConfig, + time: Time, + startup: Boolean, + threadNamePrefix: Option[String], + ): KafkaBroker = { + val server = new KafkaServer(config, time, threadNamePrefix, false) if (startup) server.startup() server } @@ -81,9 +87,12 @@ class KRaftQuorumImplementation(val raftManager: KafkaRaftManager[ApiMessageAndV val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]], val clusterId: String, val log: Logging) extends QuorumImplementation { - override def createBroker(config: KafkaConfig, - time: Time, - startup: Boolean): KafkaBroker = { + override def createBroker( + config: KafkaConfig, + time: Time, + startup: Boolean, + threadNamePrefix: Option[String], + ): KafkaBroker = { val broker = new BrokerServer(config = config, metaProps = new MetaProperties(clusterId, config.nodeId), raftManager = raftManager, @@ -219,10 +228,13 @@ abstract class QuorumTestHarness extends Logging { } } - def createBroker(config: KafkaConfig, - time: Time = Time.SYSTEM, - startup: Boolean = true): KafkaBroker = { - implementation.createBroker(config, time, startup) + def createBroker( + config: KafkaConfig, + time: Time = Time.SYSTEM, + startup: Boolean = true, + threadNamePrefix: Option[String] = None + ): KafkaBroker = { + implementation.createBroker(config, time, startup, threadNamePrefix) } def shutdownZooKeeper(): Unit = asZk().shutdown() diff --git a/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnRequestServerTest.scala b/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnRequestServerTest.scala index 0a98d2626c..74320e62b4 100644 --- a/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnRequestServerTest.scala +++ b/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnRequestServerTest.scala @@ -17,13 +17,16 @@ package kafka.server -import java.util.Properties +import kafka.utils.TestInfoUtils +import java.util.Properties import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, AddPartitionsToTxnResponse} import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.{BeforeEach, Test, TestInfo} +import org.junit.jupiter.api.{BeforeEach, TestInfo} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource import scala.jdk.CollectionConverters._ @@ -37,11 +40,12 @@ class AddPartitionsToTxnRequestServerTest extends BaseRequestTest { @BeforeEach override def setUp(testInfo: TestInfo): Unit = { super.setUp(testInfo) - createTopic(topic1, numPartitions, servers.size, new Properties()) + createTopic(topic1, numPartitions, brokers.size, new Properties()) } - @Test - def shouldReceiveOperationNotAttemptedWhenOtherPartitionHasError(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def shouldReceiveOperationNotAttemptedWhenOtherPartitionHasError(quorum: String): Unit = { // The basic idea is that we have one unknown topic and one created topic. We should get the 'UNKNOWN_TOPIC_OR_PARTITION' // error for the unknown topic and the 'OPERATION_NOT_ATTEMPTED' error for the known and authorized topic. val nonExistentTopic = new TopicPartition("unknownTopic", 0) @@ -58,7 +62,7 @@ class AddPartitionsToTxnRequestServerTest extends BaseRequestTest { List(createdTopicPartition, nonExistentTopic).asJava) .build() - val leaderId = servers.head.config.brokerId + val leaderId = brokers.head.config.brokerId val response = connectAndReceive[AddPartitionsToTxnResponse](request, brokerSocketServer(leaderId)) assertEquals(2, response.errors.size) diff --git a/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala b/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala index 7e5d791db2..1adf544819 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala @@ -17,15 +17,13 @@ package kafka.server import java.util - import java.util.concurrent.atomic.AtomicReference - -import kafka.utils.{CoreUtils, TestUtils} -import kafka.server.QuorumTestHarness +import kafka.utils.{CoreUtils, TestInfoUtils, TestUtils} import org.apache.kafka.common.metrics.{KafkaMetric, MetricsContext, MetricsReporter} -import org.junit.jupiter.api.Assertions.{assertEquals} -import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo} +import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource object KafkaMetricsReporterTest { @@ -43,52 +41,63 @@ object KafkaMetricsReporterTest { override def contextChange(metricsContext: MetricsContext): Unit = { //read jmxPrefix - MockMetricsReporter.JMXPREFIX.set(metricsContext.contextLabels().get("_namespace").toString) - MockMetricsReporter.CLUSTERID.set(metricsContext.contextLabels().get("kafka.cluster.id").toString) - MockMetricsReporter.BROKERID.set(metricsContext.contextLabels().get("kafka.broker.id").toString) + MockMetricsReporter.JMXPREFIX.set(contextLabelOrNull("_namespace", metricsContext)) + MockMetricsReporter.CLUSTERID.set(contextLabelOrNull("kafka.cluster.id", metricsContext)) + MockMetricsReporter.BROKERID.set(contextLabelOrNull("kafka.broker.id", metricsContext)) + MockMetricsReporter.NODEID.set(contextLabelOrNull("kafka.node.id", metricsContext)) } - override def configure(configs: util.Map[String, _]): Unit = {} + private def contextLabelOrNull(name: String, metricsContext: MetricsContext): String = { + Option(metricsContext.contextLabels().get(name)).flatMap(v => Option(v.toString())).getOrElse(null) + } + override def configure(configs: util.Map[String, _]): Unit = {} } object MockMetricsReporter { val JMXPREFIX: AtomicReference[String] = new AtomicReference[String] val BROKERID : AtomicReference[String] = new AtomicReference[String] + val NODEID : AtomicReference[String] = new AtomicReference[String] val CLUSTERID : AtomicReference[String] = new AtomicReference[String] } } class KafkaMetricsReporterTest extends QuorumTestHarness { - var server: KafkaServer = null + var broker: KafkaBroker = null var config: KafkaConfig = null @BeforeEach override def setUp(testInfo: TestInfo): Unit = { super.setUp(testInfo) - val props = TestUtils.createBrokerConfig(1, zkConnect) + val props = TestUtils.createBrokerConfig(1, zkConnectOrNull) props.setProperty(KafkaConfig.MetricReporterClassesProp, "kafka.server.KafkaMetricsReporterTest$MockMetricsReporter") props.setProperty(KafkaConfig.BrokerIdGenerationEnableProp, "true") - props.setProperty(KafkaConfig.BrokerIdProp, "-1") + props.setProperty(KafkaConfig.BrokerIdProp, "1") config = KafkaConfig.fromProps(props) - server = new KafkaServer(config, threadNamePrefix = Option(this.getClass.getName)) - server.startup() + broker = createBroker(config, threadNamePrefix = Option(this.getClass.getName)) + broker.startup() } - @Test - def testMetricsContextNamespacePresent(): Unit = { - assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.CLUSTERID) - assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.BROKERID) - assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.JMXPREFIX) - assertEquals("kafka.server", KafkaMetricsReporterTest.MockMetricsReporter.JMXPREFIX.get()) + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testMetricsContextNamespacePresent(quorum: String): Unit = { + assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.CLUSTERID.get()) + if (isKRaftTest()) { + assertNull(KafkaMetricsReporterTest.MockMetricsReporter.BROKERID.get()) + assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.NODEID.get()) + } else { + assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.BROKERID.get()) + assertNull(KafkaMetricsReporterTest.MockMetricsReporter.NODEID.get()) + } + assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.JMXPREFIX.get()) - server.shutdown() + broker.shutdown() TestUtils.assertNoNonDaemonThreads(this.getClass.getName) } @AfterEach override def tearDown(): Unit = { - server.shutdown() + broker.shutdown() CoreUtils.delete(config.logDirs) super.tearDown() } diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index c49a7bdde0..a1063466c4 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -47,7 +47,7 @@ import org.apache.kafka.clients.admin._ import org.apache.kafka.clients.consumer._ import org.apache.kafka.clients.consumer.internals.AbstractCoordinator import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} -import org.apache.kafka.common.TopicIdPartition +import org.apache.kafka.common.{KafkaFuture, Node, TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter} import org.apache.kafka.common.config.{ConfigException, ConfigResource} import org.apache.kafka.common.config.ConfigResource.Type.TOPIC @@ -67,7 +67,6 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerd import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, Deserializer, IntegerSerializer, Serializer} import org.apache.kafka.common.utils.Utils._ import org.apache.kafka.common.utils.{Time, Utils} -import org.apache.kafka.common.{KafkaFuture, TopicPartition, Uuid} import org.apache.kafka.controller.QuorumController import org.apache.kafka.server.authorizer.{AuthorizableRequestContext, Authorizer => JAuthorizer} import org.apache.kafka.server.common.MetadataVersion @@ -871,8 +870,59 @@ object TestUtils extends Logging { * LeaderDuringDelete). * @throws AssertionError if the expected condition is not true within the timeout. */ - def waitUntilLeaderIsElectedOrChanged(zkClient: KafkaZkClient, topic: String, partition: Int, timeoutMs: Long = 30000L, - oldLeaderOpt: Option[Int] = None, newLeaderOpt: Option[Int] = None): Int = { + def waitUntilLeaderIsElectedOrChanged( + zkClient: KafkaZkClient, + topic: String, + partition: Int, + timeoutMs: Long = 30000L, + oldLeaderOpt: Option[Int] = None, + newLeaderOpt: Option[Int] = None + ): Int = { + def getPartitionLeader(topic: String, partition: Int): Option[Int] = { + zkClient.getLeaderForPartition(new TopicPartition(topic, partition)) + } + doWaitUntilLeaderIsElectedOrChanged(getPartitionLeader, topic, partition, timeoutMs, oldLeaderOpt, newLeaderOpt) + } + + /** + * If neither oldLeaderOpt nor newLeaderOpt is defined, wait until the leader of a partition is elected. + * If oldLeaderOpt is defined, it waits until the new leader is different from the old leader. + * If newLeaderOpt is defined, it waits until the new leader becomes the expected new leader. + * + * @return The new leader (note that negative values are used to indicate conditions like NoLeader and + * LeaderDuringDelete). + * @throws AssertionError if the expected condition is not true within the timeout. + */ + def waitUntilLeaderIsElectedOrChangedWithAdmin( + admin: Admin, + topic: String, + partition: Int, + timeoutMs: Long = 30000L, + oldLeaderOpt: Option[Int] = None, + newLeaderOpt: Option[Int] = None + ): Int = { + def getPartitionLeader(topic: String, partition: Int): Option[Int] = { + admin.describeTopics(Collections.singletonList(topic)).allTopicNames().get().get(topic).partitions().asScala. + find(_.partition() == partition). + flatMap { p => + if (p.leader().id() == Node.noNode().id()) { + None + } else { + Some(p.leader().id()) + } + } + } + doWaitUntilLeaderIsElectedOrChanged(getPartitionLeader, topic, partition, timeoutMs, oldLeaderOpt, newLeaderOpt) + } + + private def doWaitUntilLeaderIsElectedOrChanged( + getPartitionLeader: (String, Int) => Option[Int], + topic: String, + partition: Int, + timeoutMs: Long, + oldLeaderOpt: Option[Int], + newLeaderOpt: Option[Int] + ): Int = { require(!(oldLeaderOpt.isDefined && newLeaderOpt.isDefined), "Can't define both the old and the new leader") val startTime = System.currentTimeMillis() val topicPartition = new TopicPartition(topic, partition) @@ -884,7 +934,7 @@ object TestUtils extends Logging { var electedOrChangedLeader: Option[Int] = None while (electedOrChangedLeader.isEmpty && System.currentTimeMillis() < startTime + timeoutMs) { // check if leader is elected - leader = zkClient.getLeaderForPartition(topicPartition) + leader = getPartitionLeader(topic, partition) leader match { case Some(l) => (newLeaderOpt, oldLeaderOpt) match { case (Some(newLeader), _) if newLeader == l =>