chia7712 commented on code in PR #16652:
URL: https://github.com/apache/kafka/pull/16652#discussion_r1687852197
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -94,6 +95,438 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
super.tearDown()
}
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeUserScramCredentials(quorum: String): Unit = {
+ client = createAdminClient
+
+ // add a new user
+ val targetUserName: String = "tom"
Review Comment:
we don't need to declare the type in scala world.
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -94,6 +95,438 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
super.tearDown()
}
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeUserScramCredentials(quorum: String): Unit = {
+ client = createAdminClient
+
+ // add a new user
+ val targetUserName: String = "tom"
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion(targetUserName, new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+ )).all.get
+ TestUtils.waitUntilTrue(() =>
{client.describeUserScramCredentials().all().get().size() == 1},
+ "Add one user scram credential timeout")
+ val result = client.describeUserScramCredentials().all().get()
+ result.forEach((userName, scramDescription) => {
+ assertEquals(targetUserName, userName)
+ assertEquals(targetUserName, scramDescription.name())
+ val credentialInfos = scramDescription.credentialInfos()
+ assertEquals(1, credentialInfos.size())
+ assertEquals(ScramMechanism.SCRAM_SHA_256,
credentialInfos.get(0).mechanism())
+ assertEquals(4096, credentialInfos.get(0).iterations())
+ })
+
+ // add other users
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion("tom2", new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456"),
+ new UserScramCredentialUpsertion("tom3", new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+ )).all().get
+ TestUtils.waitUntilTrue(() =>
{client.describeUserScramCredentials().all().get().size() == 3},
+ "Add user scram credential timeout")
+ assertEquals(3, client.describeUserScramCredentials().all().get().size())
Review Comment:
we don't need this assert as line#125 is good enough
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -94,6 +95,438 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
super.tearDown()
}
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeUserScramCredentials(quorum: String): Unit = {
+ client = createAdminClient
+
+ // add a new user
+ val targetUserName: String = "tom"
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion(targetUserName, new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+ )).all.get
+ TestUtils.waitUntilTrue(() =>
{client.describeUserScramCredentials().all().get().size() == 1},
Review Comment:
`() => client.describeUserScramCredentials().all().get().size() == 1`
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -94,6 +95,438 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
super.tearDown()
}
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeUserScramCredentials(quorum: String): Unit = {
+ client = createAdminClient
+
+ // add a new user
+ val targetUserName: String = "tom"
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion(targetUserName, new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+ )).all.get
+ TestUtils.waitUntilTrue(() =>
{client.describeUserScramCredentials().all().get().size() == 1},
+ "Add one user scram credential timeout")
+ val result = client.describeUserScramCredentials().all().get()
+ result.forEach((userName, scramDescription) => {
+ assertEquals(targetUserName, userName)
+ assertEquals(targetUserName, scramDescription.name())
+ val credentialInfos = scramDescription.credentialInfos()
+ assertEquals(1, credentialInfos.size())
+ assertEquals(ScramMechanism.SCRAM_SHA_256,
credentialInfos.get(0).mechanism())
+ assertEquals(4096, credentialInfos.get(0).iterations())
+ })
+
+ // add other users
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion("tom2", new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456"),
+ new UserScramCredentialUpsertion("tom3", new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+ )).all().get
+ TestUtils.waitUntilTrue(() =>
{client.describeUserScramCredentials().all().get().size() == 3},
+ "Add user scram credential timeout")
+ assertEquals(3, client.describeUserScramCredentials().all().get().size())
+
+ // alter user info
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion(targetUserName, new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_512, 8192), "123456")
+ )).all.get
+ TestUtils.waitUntilTrue(() => {
+ val userTomResult = client.describeUserScramCredentials().all().get()
+ val userScramCredential = userTomResult.get(targetUserName)
+ val credentialInfos = userScramCredential.credentialInfos()
+ credentialInfos.size() == 2
+ }, "Alter user scram credential timeout")
+ val userTomResult = client.describeUserScramCredentials().all().get()
+ assertEquals(3, userTomResult.size())
+ val userScramCredential = userTomResult.get(targetUserName)
+ assertEquals(targetUserName, userScramCredential.name())
+ val credentialInfos = userScramCredential.credentialInfos()
+ assertEquals(2, credentialInfos.size())
+ val credentialList = credentialInfos.asScala.sortBy(s =>
s.mechanism().`type`())
+ assertEquals(ScramMechanism.SCRAM_SHA_256, credentialList.head.mechanism())
+ assertEquals(4096, credentialList.head.iterations())
+ assertEquals(ScramMechanism.SCRAM_SHA_512, credentialList(1).mechanism())
+ assertEquals(8192, credentialList(1).iterations())
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeProducers(quorum: String): Unit = {
+ client = createAdminClient
+ val tp = new TopicPartition("topic1", 0)
+ client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1,
1.toShort))).all().get()
+
+ def appendCommonRecords = (records: Int) => {
+ val producer = new
KafkaProducer(Collections.singletonMap(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
+ plaintextBootstrapServers(brokers).asInstanceOf[Object]), new
ByteArraySerializer, new ByteArraySerializer)
+ try {
+ (0 until records).foreach(i =>
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), i.toString.getBytes,
i.toString.getBytes())))
+ } finally producer.close()
+ }
+
+ def appendTransactionRecords(transactionId: String, records: Int, commit:
Boolean): KafkaProducer[Array[Byte], Array[Byte]] = {
+ val producer = TestUtils.createTransactionalProducer(transactionId,
brokers)
+ producer.initTransactions()
+ producer.beginTransaction()
+ (0 until records).foreach(i =>
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), i.toString.getBytes,
i.toString.getBytes())))
+ producer.flush()
+ if (commit) {
+ producer.commitTransaction()
+ producer.close()
+ }
+
+ producer
+ }
+
+ def consumeToExpectedNumber = (expectedNumber: Int) => {
+ val configs = new util.HashMap[String, Object]()
+ configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
plaintextBootstrapServers(brokers))
+ configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_COMMITTED.toString)
+ val consumer = new KafkaConsumer(configs, new ByteArrayDeserializer, new
ByteArrayDeserializer)
+ try {
+ consumer.assign(Collections.singleton(tp))
+ consumer.seekToBeginning(Collections.singleton(tp))
+ var consumeNum = 0
+ while (consumeNum < expectedNumber) {
+ val records = consumer.poll(time.Duration.ofMillis(100))
+ consumeNum += records.count()
+ }
+ } finally consumer.close()
+ }
+
+ def queryProducerDetail() =
client.describeProducers(Collections.singletonList(tp))
+ .partitionResult(tp).get().activeProducers().asScala
+
+ // send common msg
+ appendCommonRecords(1)
+ val producerIterator = queryProducerDetail()
+ assertEquals(1, producerIterator.size)
+ val producerState = producerIterator.last
+ assertEquals(0, producerState.producerEpoch())
+ assertFalse(producerState.coordinatorEpoch().isPresent)
+ assertFalse(producerState.currentTransactionStartOffset().isPresent)
+
+
+ // send committed transaction msg
+ appendTransactionRecords("foo", 2, commit = true)
+ // consume 3 records to ensure transaction finished
+ consumeToExpectedNumber(3)
+ val transactionProducerIterator = queryProducerDetail()
+ assertEquals(2, transactionProducerIterator.size)
+ val containsCoordinatorEpochIterator = transactionProducerIterator
+ .filter(producer => producer.coordinatorEpoch().isPresent)
+ assertEquals(1, containsCoordinatorEpochIterator.size)
+ val transactionProducerState = containsCoordinatorEpochIterator.last
+
assertFalse(transactionProducerState.currentTransactionStartOffset().isPresent)
+
+
+ // send ongoing transaction msg
+ val ongoingProducer = appendTransactionRecords("foo3", 3, commit = false)
+ try {
+ val transactionNoneCommitProducerIterator = queryProducerDetail()
+ assertEquals(3, transactionNoneCommitProducerIterator.size)
+ val containsOngoingIterator = transactionNoneCommitProducerIterator
+ .filter(producer => producer.currentTransactionStartOffset().isPresent)
+ assertEquals(1, containsOngoingIterator.size)
+ val ongoingTransactionProducerState = containsOngoingIterator.last
+ // we send (1 common msg) + (2 transaction msg) + (1 transaction marker
msg), so transactionStartOffset is 4
+ assertEquals(4,
ongoingTransactionProducerState.currentTransactionStartOffset().getAsLong)
+ } finally ongoingProducer.close()
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeTransactions(quorum: String): Unit = {
+ client = createAdminClient
+ val tp = new TopicPartition("topic1", 0)
+ client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1,
1.toShort))).all().get()
+
+ var transactionId = "foo"
+
+ def consumeToExpectedNumber = (expectedNumber: Int) => {
+ val configs = new util.HashMap[String, Object]()
+ configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
plaintextBootstrapServers(brokers))
+ configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_COMMITTED.toString)
+ val consumer = new KafkaConsumer(configs, new ByteArrayDeserializer, new
ByteArrayDeserializer)
+ try {
+ consumer.assign(Collections.singleton(tp))
+ consumer.seekToBeginning(Collections.singleton(tp))
+ var consumeNum = 0
+ while (consumeNum < expectedNumber) {
+ val records = consumer.poll(time.Duration.ofMillis(100))
+ consumeNum += records.count()
+ }
+ } finally consumer.close()
+ }
+
+ def transactionState(): TransactionState = {
+
client.describeTransactions(Collections.singleton(transactionId)).description(transactionId).get().state()
+ }
+
+ def findCoordinatorIdByTransactionId(transactionId: String): Int = {
+ // calculate the transaction partition id
+ val transactionPartitionId = Utils.abs(transactionId.hashCode) % 50
+ val transactionTopic =
client.describeTopics(Collections.singleton(Topic.TRANSACTION_STATE_TOPIC_NAME))
+ val partitionList =
transactionTopic.allTopicNames().get().get(Topic.TRANSACTION_STATE_TOPIC_NAME).partitions()
+ val partition = partitionList.asScala.filter(tp => tp.partition() ==
transactionPartitionId).head
+ partition.leader().id()
+ }
+
+ // normal commit case
+ val producer = TestUtils.createTransactionalProducer(transactionId,
brokers)
+ try {
+ // init, the transaction is not begin, so
TransactionalIdNotFoundException is expected
+ val exception = assertThrows(classOf[ExecutionException], () =>
transactionState())
+ assertInstanceOf(classOf[TransactionalIdNotFoundException],
exception.getCause)
+
+ producer.initTransactions()
+ assertEquals(TransactionState.EMPTY, transactionState())
+ producer.beginTransaction()
+ assertEquals(TransactionState.EMPTY, transactionState())
+
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+ producer.flush()
+ assertEquals(TransactionState.ONGOING, transactionState())
+ producer.commitTransaction()
+
+
+ val transactionResult =
client.describeTransactions(Collections.singleton(transactionId))
+ .description(transactionId).get()
+ assertEquals(findCoordinatorIdByTransactionId(transactionId),
transactionResult.coordinatorId())
+ assertEquals(0, transactionResult.producerId())
+ assertEquals(0, transactionResult.producerEpoch())
+ assertEquals(1, transactionResult.topicPartitions().size())
+ assertEquals(tp, transactionResult.topicPartitions().asScala.head)
+
+
+ val state = transactionState()
+ // Either PREPARE_COMMIT or COMPLETE_COMMIT is expected
+ assertTrue(state == TransactionState.PREPARE_COMMIT || state ==
TransactionState.COMPLETE_COMMIT)
+ // producer commit transaction, but maybe transaction coordinator has
not been submitted mark msg
+ // so we start up a consumer and consume the expected number of msg, to
ensure transaction committed
+ consumeToExpectedNumber(1)
+ assertEquals(TransactionState.COMPLETE_COMMIT, transactionState())
+ } finally producer.close()
+
+ // abort case
+ transactionId = "foo2"
+ val abortProducer = TestUtils.createTransactionalProducer(transactionId,
brokers)
+ try {
+ // init, the transaction is not begin, so
TransactionalIdNotFoundException is expected
+ val exception = assertThrows(classOf[ExecutionException], () =>
transactionState())
+
assertTrue(exception.getCause.isInstanceOf[TransactionalIdNotFoundException])
+
+ abortProducer.initTransactions()
+ assertEquals(TransactionState.EMPTY, transactionState())
+ abortProducer.beginTransaction()
+ assertEquals(TransactionState.EMPTY, transactionState())
+
+ val transactionResult =
client.describeTransactions(Collections.singleton(transactionId))
+ .description(transactionId).get()
+ assertEquals(findCoordinatorIdByTransactionId(transactionId),
transactionResult.coordinatorId())
+ assertEquals(0, transactionResult.topicPartitions().size())
+
+ abortProducer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+ abortProducer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k2".getBytes, "v2".getBytes()))
+ abortProducer.flush()
+
+ val transactionSendMsgResult =
client.describeTransactions(Collections.singleton(transactionId))
+ .description(transactionId).get()
+ assertEquals(findCoordinatorIdByTransactionId(transactionId),
transactionSendMsgResult.coordinatorId())
+ assertEquals(1, transactionSendMsgResult.topicPartitions().size())
+ assertEquals(tp, transactionSendMsgResult.topicPartitions().asScala.head)
+
+ assertEquals(TransactionState.ONGOING, transactionState())
+ abortProducer.abortTransaction()
+ val state = transactionState()
+ assertTrue(state == TransactionState.PREPARE_ABORT || state ==
TransactionState.COMPLETE_ABORT)
+ // producer commit transaction, but maybe transaction coordinator has
not been submitted mark msg
+ // so we start up a consumer and consume the expected number of msg, to
ensure transaction committed
+ consumeToExpectedNumber(1)
+ assertEquals(TransactionState.COMPLETE_ABORT, transactionState())
+ } finally abortProducer.close()
+ }
+
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeTransactionsTimeoutMs(quorum: String): Unit = {
+ client = createAdminClient
+ val tp = new TopicPartition("topic1", 0)
+ client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1,
1.toShort))).all().get()
+
+ val transactionId = "foo"
+
+ val producer = TestUtils.createTransactionalProducer(transactionId,
brokers)
+ try {
+ producer.initTransactions()
+ producer.beginTransaction()
+
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+ producer.flush()
+ producer.commitTransaction()
+
+
+ val exception = assertThrows(classOf[ExecutionException], () => {
+ val timeoutOptions = new DescribeTransactionsOptions()
+ timeoutOptions.timeoutMs(0)
+ client.describeTransactions(Collections.singleton(transactionId),
timeoutOptions)
+ .description(transactionId).get()
+ })
+ assertInstanceOf(classOf[TimeoutException], exception.getCause)
+ } finally producer.close()
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testAbortTransactionTimeout(quorum: String): Unit = {
+ client = createAdminClient
+ val tp = new TopicPartition("topic1", 0)
+ client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1,
1.toShort))).all().get()
+
+ val producer = TestUtils.createTransactionalProducer("foo", brokers)
+ try {
+ producer.initTransactions()
+ producer.beginTransaction()
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+ producer.flush()
+ producer.commitTransaction()
+
+ producer.beginTransaction()
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k2".getBytes, "v2".getBytes()))
+ producer.flush()
+
+ val transactionalProducer =
client.describeProducers(Collections.singletonList(tp))
+
.partitionResult(tp).get().activeProducers().asScala.minBy(_.producerId())
+
+ val exception = assertThrows(classOf[ExecutionException], () => {
+ val options = new AbortTransactionOptions()
+ options.timeoutMs(0)
+ client.abortTransaction(
+ new AbortTransactionSpec(tp,
+ transactionalProducer.producerId(),
+ transactionalProducer.producerEpoch().toShort,
+ transactionalProducer.coordinatorEpoch().getAsInt),
options).all().get()
+ })
+ assertInstanceOf(classOf[TimeoutException], exception.getCause)
+ } finally producer.close()
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testListTransactions(quorum: String): Unit = {
+ val tp = new TopicPartition("topic1", 0)
+ def createTransactionList(): Unit = {
+ client = createAdminClient
+ client.createTopics(Collections.singletonList(new NewTopic(tp.topic(),
1, 1.toShort))).all().get()
+
+ def consumeToExpectedNumber = (expectedNumber: Int) => {
+ val configs = new util.HashMap[String, Object]()
+ configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
plaintextBootstrapServers(brokers))
+ configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_COMMITTED.toString)
+ val consumer = new KafkaConsumer(configs, new ByteArrayDeserializer,
new ByteArrayDeserializer)
+ try {
+ consumer.assign(Collections.singleton(tp))
+ consumer.seekToBeginning(Collections.singleton(tp))
+ var consumeNum = 0
+ while (consumeNum < expectedNumber) {
+ val records = consumer.poll(time.Duration.ofMillis(100))
+ consumeNum += records.count()
+ }
+ } finally consumer.close()
+ }
+
+ val producer = TestUtils.createTransactionalProducer("foo", brokers)
+ try {
+ producer.initTransactions()
+ producer.beginTransaction()
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+ producer.flush()
+ producer.commitTransaction()
+ } finally producer.close()
+
+ val producer2 = TestUtils.createTransactionalProducer("foo2", brokers)
+ try {
+ producer2.initTransactions()
+ producer2.beginTransaction()
+ producer2.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+ producer2.flush()
+ producer2.abortTransaction()
+ } finally producer2.close()
+
+ val producer3 = TestUtils.createTransactionalProducer("foo3", brokers)
+ try {
+ producer3.initTransactions()
+ producer3.beginTransaction()
+ producer3.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+ producer3.flush()
+ producer3.commitTransaction()
+ } finally producer3.close()
+
+ consumeToExpectedNumber(2)
+ }
+
+ createTransactionList()
+
+ val result = client.listTransactions().all().get()
+ assertEquals(3, result.size())
+
+ var options = new ListTransactionsOptions()
+ val commitStates: util.Collection[TransactionState] = new
util.ArrayList[TransactionState]()
+ commitStates.add(TransactionState.COMPLETE_COMMIT)
+ options.filterStates(commitStates)
+ assertEquals(2, client.listTransactions(options).all().get().size())
Review Comment:
```scala
assertEquals(2, client.listTransactions(new
ListTransactionsOptions().filterStates(Collections.singletonList(state)))
.all().get().size())
```
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -94,6 +95,438 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
super.tearDown()
}
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeUserScramCredentials(quorum: String): Unit = {
+ client = createAdminClient
+
+ // add a new user
+ val targetUserName: String = "tom"
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion(targetUserName, new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+ )).all.get
+ TestUtils.waitUntilTrue(() =>
{client.describeUserScramCredentials().all().get().size() == 1},
+ "Add one user scram credential timeout")
+ val result = client.describeUserScramCredentials().all().get()
+ result.forEach((userName, scramDescription) => {
+ assertEquals(targetUserName, userName)
+ assertEquals(targetUserName, scramDescription.name())
+ val credentialInfos = scramDescription.credentialInfos()
+ assertEquals(1, credentialInfos.size())
+ assertEquals(ScramMechanism.SCRAM_SHA_256,
credentialInfos.get(0).mechanism())
+ assertEquals(4096, credentialInfos.get(0).iterations())
+ })
+
+ // add other users
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion("tom2", new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456"),
+ new UserScramCredentialUpsertion("tom3", new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+ )).all().get
+ TestUtils.waitUntilTrue(() =>
{client.describeUserScramCredentials().all().get().size() == 3},
+ "Add user scram credential timeout")
+ assertEquals(3, client.describeUserScramCredentials().all().get().size())
+
+ // alter user info
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion(targetUserName, new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_512, 8192), "123456")
+ )).all.get
+ TestUtils.waitUntilTrue(() => {
+ val userTomResult = client.describeUserScramCredentials().all().get()
+ val userScramCredential = userTomResult.get(targetUserName)
+ val credentialInfos = userScramCredential.credentialInfos()
+ credentialInfos.size() == 2
+ }, "Alter user scram credential timeout")
+ val userTomResult = client.describeUserScramCredentials().all().get()
+ assertEquals(3, userTomResult.size())
+ val userScramCredential = userTomResult.get(targetUserName)
+ assertEquals(targetUserName, userScramCredential.name())
+ val credentialInfos = userScramCredential.credentialInfos()
+ assertEquals(2, credentialInfos.size())
+ val credentialList = credentialInfos.asScala.sortBy(s =>
s.mechanism().`type`())
+ assertEquals(ScramMechanism.SCRAM_SHA_256, credentialList.head.mechanism())
+ assertEquals(4096, credentialList.head.iterations())
+ assertEquals(ScramMechanism.SCRAM_SHA_512, credentialList(1).mechanism())
+ assertEquals(8192, credentialList(1).iterations())
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeProducers(quorum: String): Unit = {
+ client = createAdminClient
+ val tp = new TopicPartition("topic1", 0)
+ client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1,
1.toShort))).all().get()
+
+ def appendCommonRecords = (records: Int) => {
+ val producer = new
KafkaProducer(Collections.singletonMap(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
+ plaintextBootstrapServers(brokers).asInstanceOf[Object]), new
ByteArraySerializer, new ByteArraySerializer)
+ try {
+ (0 until records).foreach(i =>
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), i.toString.getBytes,
i.toString.getBytes())))
+ } finally producer.close()
+ }
+
+ def appendTransactionRecords(transactionId: String, records: Int, commit:
Boolean): KafkaProducer[Array[Byte], Array[Byte]] = {
+ val producer = TestUtils.createTransactionalProducer(transactionId,
brokers)
+ producer.initTransactions()
+ producer.beginTransaction()
+ (0 until records).foreach(i =>
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), i.toString.getBytes,
i.toString.getBytes())))
+ producer.flush()
+ if (commit) {
+ producer.commitTransaction()
+ producer.close()
+ }
+
+ producer
+ }
+
+ def consumeToExpectedNumber = (expectedNumber: Int) => {
+ val configs = new util.HashMap[String, Object]()
+ configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
plaintextBootstrapServers(brokers))
+ configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_COMMITTED.toString)
+ val consumer = new KafkaConsumer(configs, new ByteArrayDeserializer, new
ByteArrayDeserializer)
+ try {
+ consumer.assign(Collections.singleton(tp))
+ consumer.seekToBeginning(Collections.singleton(tp))
+ var consumeNum = 0
+ while (consumeNum < expectedNumber) {
+ val records = consumer.poll(time.Duration.ofMillis(100))
+ consumeNum += records.count()
+ }
+ } finally consumer.close()
+ }
+
+ def queryProducerDetail() =
client.describeProducers(Collections.singletonList(tp))
+ .partitionResult(tp).get().activeProducers().asScala
+
+ // send common msg
+ appendCommonRecords(1)
+ val producerIterator = queryProducerDetail()
+ assertEquals(1, producerIterator.size)
+ val producerState = producerIterator.last
+ assertEquals(0, producerState.producerEpoch())
+ assertFalse(producerState.coordinatorEpoch().isPresent)
+ assertFalse(producerState.currentTransactionStartOffset().isPresent)
+
+
+ // send committed transaction msg
+ appendTransactionRecords("foo", 2, commit = true)
+ // consume 3 records to ensure transaction finished
+ consumeToExpectedNumber(3)
+ val transactionProducerIterator = queryProducerDetail()
+ assertEquals(2, transactionProducerIterator.size)
+ val containsCoordinatorEpochIterator = transactionProducerIterator
+ .filter(producer => producer.coordinatorEpoch().isPresent)
+ assertEquals(1, containsCoordinatorEpochIterator.size)
+ val transactionProducerState = containsCoordinatorEpochIterator.last
+
assertFalse(transactionProducerState.currentTransactionStartOffset().isPresent)
+
+
+ // send ongoing transaction msg
+ val ongoingProducer = appendTransactionRecords("foo3", 3, commit = false)
+ try {
+ val transactionNoneCommitProducerIterator = queryProducerDetail()
+ assertEquals(3, transactionNoneCommitProducerIterator.size)
+ val containsOngoingIterator = transactionNoneCommitProducerIterator
+ .filter(producer => producer.currentTransactionStartOffset().isPresent)
+ assertEquals(1, containsOngoingIterator.size)
+ val ongoingTransactionProducerState = containsOngoingIterator.last
+ // we send (1 common msg) + (2 transaction msg) + (1 transaction marker
msg), so transactionStartOffset is 4
+ assertEquals(4,
ongoingTransactionProducerState.currentTransactionStartOffset().getAsLong)
+ } finally ongoingProducer.close()
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeTransactions(quorum: String): Unit = {
+ client = createAdminClient
+ val tp = new TopicPartition("topic1", 0)
+ client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1,
1.toShort))).all().get()
+
+ var transactionId = "foo"
+
+ def consumeToExpectedNumber = (expectedNumber: Int) => {
+ val configs = new util.HashMap[String, Object]()
+ configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
plaintextBootstrapServers(brokers))
+ configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_COMMITTED.toString)
+ val consumer = new KafkaConsumer(configs, new ByteArrayDeserializer, new
ByteArrayDeserializer)
+ try {
+ consumer.assign(Collections.singleton(tp))
+ consumer.seekToBeginning(Collections.singleton(tp))
+ var consumeNum = 0
+ while (consumeNum < expectedNumber) {
+ val records = consumer.poll(time.Duration.ofMillis(100))
+ consumeNum += records.count()
+ }
+ } finally consumer.close()
+ }
+
+ def transactionState(): TransactionState = {
+
client.describeTransactions(Collections.singleton(transactionId)).description(transactionId).get().state()
+ }
+
+ def findCoordinatorIdByTransactionId(transactionId: String): Int = {
+ // calculate the transaction partition id
+ val transactionPartitionId = Utils.abs(transactionId.hashCode) % 50
+ val transactionTopic =
client.describeTopics(Collections.singleton(Topic.TRANSACTION_STATE_TOPIC_NAME))
+ val partitionList =
transactionTopic.allTopicNames().get().get(Topic.TRANSACTION_STATE_TOPIC_NAME).partitions()
+ val partition = partitionList.asScala.filter(tp => tp.partition() ==
transactionPartitionId).head
+ partition.leader().id()
+ }
+
+ // normal commit case
+ val producer = TestUtils.createTransactionalProducer(transactionId,
brokers)
+ try {
+ // init, the transaction is not begin, so
TransactionalIdNotFoundException is expected
+ val exception = assertThrows(classOf[ExecutionException], () =>
transactionState())
+ assertInstanceOf(classOf[TransactionalIdNotFoundException],
exception.getCause)
+
+ producer.initTransactions()
+ assertEquals(TransactionState.EMPTY, transactionState())
+ producer.beginTransaction()
+ assertEquals(TransactionState.EMPTY, transactionState())
+
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+ producer.flush()
+ assertEquals(TransactionState.ONGOING, transactionState())
+ producer.commitTransaction()
+
+
+ val transactionResult =
client.describeTransactions(Collections.singleton(transactionId))
+ .description(transactionId).get()
+ assertEquals(findCoordinatorIdByTransactionId(transactionId),
transactionResult.coordinatorId())
+ assertEquals(0, transactionResult.producerId())
+ assertEquals(0, transactionResult.producerEpoch())
+ assertEquals(1, transactionResult.topicPartitions().size())
+ assertEquals(tp, transactionResult.topicPartitions().asScala.head)
+
+
+ val state = transactionState()
+ // Either PREPARE_COMMIT or COMPLETE_COMMIT is expected
+ assertTrue(state == TransactionState.PREPARE_COMMIT || state ==
TransactionState.COMPLETE_COMMIT)
+ // producer commit transaction, but maybe transaction coordinator has
not been submitted mark msg
+ // so we start up a consumer and consume the expected number of msg, to
ensure transaction committed
+ consumeToExpectedNumber(1)
+ assertEquals(TransactionState.COMPLETE_COMMIT, transactionState())
+ } finally producer.close()
+
+ // abort case
+ transactionId = "foo2"
+ val abortProducer = TestUtils.createTransactionalProducer(transactionId,
brokers)
+ try {
+ // init, the transaction is not begin, so
TransactionalIdNotFoundException is expected
+ val exception = assertThrows(classOf[ExecutionException], () =>
transactionState())
+
assertTrue(exception.getCause.isInstanceOf[TransactionalIdNotFoundException])
+
+ abortProducer.initTransactions()
+ assertEquals(TransactionState.EMPTY, transactionState())
+ abortProducer.beginTransaction()
+ assertEquals(TransactionState.EMPTY, transactionState())
+
+ val transactionResult =
client.describeTransactions(Collections.singleton(transactionId))
+ .description(transactionId).get()
+ assertEquals(findCoordinatorIdByTransactionId(transactionId),
transactionResult.coordinatorId())
+ assertEquals(0, transactionResult.topicPartitions().size())
+
+ abortProducer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+ abortProducer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k2".getBytes, "v2".getBytes()))
+ abortProducer.flush()
+
+ val transactionSendMsgResult =
client.describeTransactions(Collections.singleton(transactionId))
+ .description(transactionId).get()
+ assertEquals(findCoordinatorIdByTransactionId(transactionId),
transactionSendMsgResult.coordinatorId())
+ assertEquals(1, transactionSendMsgResult.topicPartitions().size())
+ assertEquals(tp, transactionSendMsgResult.topicPartitions().asScala.head)
+
+ assertEquals(TransactionState.ONGOING, transactionState())
+ abortProducer.abortTransaction()
+ val state = transactionState()
+ assertTrue(state == TransactionState.PREPARE_ABORT || state ==
TransactionState.COMPLETE_ABORT)
+ // producer commit transaction, but maybe transaction coordinator has
not been submitted mark msg
+ // so we start up a consumer and consume the expected number of msg, to
ensure transaction committed
+ consumeToExpectedNumber(1)
+ assertEquals(TransactionState.COMPLETE_ABORT, transactionState())
+ } finally abortProducer.close()
+ }
+
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeTransactionsTimeoutMs(quorum: String): Unit = {
+ client = createAdminClient
+ val tp = new TopicPartition("topic1", 0)
+ client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1,
1.toShort))).all().get()
+
+ val transactionId = "foo"
+
+ val producer = TestUtils.createTransactionalProducer(transactionId,
brokers)
+ try {
+ producer.initTransactions()
+ producer.beginTransaction()
+
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+ producer.flush()
+ producer.commitTransaction()
+
+
+ val exception = assertThrows(classOf[ExecutionException], () => {
+ val timeoutOptions = new DescribeTransactionsOptions()
+ timeoutOptions.timeoutMs(0)
+ client.describeTransactions(Collections.singleton(transactionId),
timeoutOptions)
+ .description(transactionId).get()
+ })
+ assertInstanceOf(classOf[TimeoutException], exception.getCause)
+ } finally producer.close()
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testAbortTransactionTimeout(quorum: String): Unit = {
+ client = createAdminClient
+ val tp = new TopicPartition("topic1", 0)
+ client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1,
1.toShort))).all().get()
+
+ val producer = TestUtils.createTransactionalProducer("foo", brokers)
+ try {
+ producer.initTransactions()
+ producer.beginTransaction()
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+ producer.flush()
+ producer.commitTransaction()
+
+ producer.beginTransaction()
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k2".getBytes, "v2".getBytes()))
+ producer.flush()
+
+ val transactionalProducer =
client.describeProducers(Collections.singletonList(tp))
+
.partitionResult(tp).get().activeProducers().asScala.minBy(_.producerId())
+
+ val exception = assertThrows(classOf[ExecutionException], () => {
+ val options = new AbortTransactionOptions()
+ options.timeoutMs(0)
+ client.abortTransaction(
+ new AbortTransactionSpec(tp,
+ transactionalProducer.producerId(),
+ transactionalProducer.producerEpoch().toShort,
+ transactionalProducer.coordinatorEpoch().getAsInt),
options).all().get()
+ })
+ assertInstanceOf(classOf[TimeoutException], exception.getCause)
+ } finally producer.close()
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testListTransactions(quorum: String): Unit = {
+ val tp = new TopicPartition("topic1", 0)
+ def createTransactionList(): Unit = {
+ client = createAdminClient
+ client.createTopics(Collections.singletonList(new NewTopic(tp.topic(),
1, 1.toShort))).all().get()
+
+ def consumeToExpectedNumber = (expectedNumber: Int) => {
+ val configs = new util.HashMap[String, Object]()
+ configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
plaintextBootstrapServers(brokers))
+ configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_COMMITTED.toString)
+ val consumer = new KafkaConsumer(configs, new ByteArrayDeserializer,
new ByteArrayDeserializer)
+ try {
+ consumer.assign(Collections.singleton(tp))
+ consumer.seekToBeginning(Collections.singleton(tp))
+ var consumeNum = 0
+ while (consumeNum < expectedNumber) {
+ val records = consumer.poll(time.Duration.ofMillis(100))
+ consumeNum += records.count()
+ }
+ } finally consumer.close()
+ }
+
+ val producer = TestUtils.createTransactionalProducer("foo", brokers)
+ try {
+ producer.initTransactions()
+ producer.beginTransaction()
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+ producer.flush()
+ producer.commitTransaction()
+ } finally producer.close()
+
+ val producer2 = TestUtils.createTransactionalProducer("foo2", brokers)
+ try {
+ producer2.initTransactions()
+ producer2.beginTransaction()
+ producer2.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+ producer2.flush()
+ producer2.abortTransaction()
+ } finally producer2.close()
+
+ val producer3 = TestUtils.createTransactionalProducer("foo3", brokers)
+ try {
+ producer3.initTransactions()
+ producer3.beginTransaction()
+ producer3.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+ producer3.flush()
+ producer3.commitTransaction()
+ } finally producer3.close()
+
+ consumeToExpectedNumber(2)
+ }
+
+ createTransactionList()
+
+ val result = client.listTransactions().all().get()
+ assertEquals(3, result.size())
+
+ var options = new ListTransactionsOptions()
+ val commitStates: util.Collection[TransactionState] = new
util.ArrayList[TransactionState]()
+ commitStates.add(TransactionState.COMPLETE_COMMIT)
+ options.filterStates(commitStates)
+ assertEquals(2, client.listTransactions(options).all().get().size())
+
+ options = new ListTransactionsOptions()
Review Comment:
ditto
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -94,6 +95,438 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
super.tearDown()
}
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeUserScramCredentials(quorum: String): Unit = {
+ client = createAdminClient
+
+ // add a new user
+ val targetUserName: String = "tom"
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion(targetUserName, new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+ )).all.get
+ TestUtils.waitUntilTrue(() =>
{client.describeUserScramCredentials().all().get().size() == 1},
+ "Add one user scram credential timeout")
+ val result = client.describeUserScramCredentials().all().get()
+ result.forEach((userName, scramDescription) => {
+ assertEquals(targetUserName, userName)
+ assertEquals(targetUserName, scramDescription.name())
+ val credentialInfos = scramDescription.credentialInfos()
+ assertEquals(1, credentialInfos.size())
+ assertEquals(ScramMechanism.SCRAM_SHA_256,
credentialInfos.get(0).mechanism())
+ assertEquals(4096, credentialInfos.get(0).iterations())
+ })
+
+ // add other users
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion("tom2", new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456"),
+ new UserScramCredentialUpsertion("tom3", new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+ )).all().get
+ TestUtils.waitUntilTrue(() =>
{client.describeUserScramCredentials().all().get().size() == 3},
+ "Add user scram credential timeout")
+ assertEquals(3, client.describeUserScramCredentials().all().get().size())
+
+ // alter user info
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion(targetUserName, new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_512, 8192), "123456")
+ )).all.get
+ TestUtils.waitUntilTrue(() => {
+ val userTomResult = client.describeUserScramCredentials().all().get()
+ val userScramCredential = userTomResult.get(targetUserName)
+ val credentialInfos = userScramCredential.credentialInfos()
+ credentialInfos.size() == 2
+ }, "Alter user scram credential timeout")
+ val userTomResult = client.describeUserScramCredentials().all().get()
+ assertEquals(3, userTomResult.size())
+ val userScramCredential = userTomResult.get(targetUserName)
+ assertEquals(targetUserName, userScramCredential.name())
+ val credentialInfos = userScramCredential.credentialInfos()
+ assertEquals(2, credentialInfos.size())
+ val credentialList = credentialInfos.asScala.sortBy(s =>
s.mechanism().`type`())
+ assertEquals(ScramMechanism.SCRAM_SHA_256, credentialList.head.mechanism())
+ assertEquals(4096, credentialList.head.iterations())
+ assertEquals(ScramMechanism.SCRAM_SHA_512, credentialList(1).mechanism())
+ assertEquals(8192, credentialList(1).iterations())
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeProducers(quorum: String): Unit = {
+ client = createAdminClient
+ val tp = new TopicPartition("topic1", 0)
+ client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1,
1.toShort))).all().get()
+
+ def appendCommonRecords = (records: Int) => {
+ val producer = new
KafkaProducer(Collections.singletonMap(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
+ plaintextBootstrapServers(brokers).asInstanceOf[Object]), new
ByteArraySerializer, new ByteArraySerializer)
+ try {
+ (0 until records).foreach(i =>
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), i.toString.getBytes,
i.toString.getBytes())))
+ } finally producer.close()
+ }
+
+ def appendTransactionRecords(transactionId: String, records: Int, commit:
Boolean): KafkaProducer[Array[Byte], Array[Byte]] = {
+ val producer = TestUtils.createTransactionalProducer(transactionId,
brokers)
+ producer.initTransactions()
+ producer.beginTransaction()
+ (0 until records).foreach(i =>
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), i.toString.getBytes,
i.toString.getBytes())))
+ producer.flush()
+ if (commit) {
+ producer.commitTransaction()
+ producer.close()
+ }
+
+ producer
+ }
+
+ def consumeToExpectedNumber = (expectedNumber: Int) => {
+ val configs = new util.HashMap[String, Object]()
+ configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
plaintextBootstrapServers(brokers))
+ configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_COMMITTED.toString)
+ val consumer = new KafkaConsumer(configs, new ByteArrayDeserializer, new
ByteArrayDeserializer)
+ try {
+ consumer.assign(Collections.singleton(tp))
+ consumer.seekToBeginning(Collections.singleton(tp))
+ var consumeNum = 0
+ while (consumeNum < expectedNumber) {
+ val records = consumer.poll(time.Duration.ofMillis(100))
+ consumeNum += records.count()
+ }
+ } finally consumer.close()
+ }
+
+ def queryProducerDetail() =
client.describeProducers(Collections.singletonList(tp))
+ .partitionResult(tp).get().activeProducers().asScala
+
+ // send common msg
+ appendCommonRecords(1)
+ val producerIterator = queryProducerDetail()
+ assertEquals(1, producerIterator.size)
+ val producerState = producerIterator.last
+ assertEquals(0, producerState.producerEpoch())
+ assertFalse(producerState.coordinatorEpoch().isPresent)
+ assertFalse(producerState.currentTransactionStartOffset().isPresent)
+
+
+ // send committed transaction msg
+ appendTransactionRecords("foo", 2, commit = true)
+ // consume 3 records to ensure transaction finished
+ consumeToExpectedNumber(3)
+ val transactionProducerIterator = queryProducerDetail()
+ assertEquals(2, transactionProducerIterator.size)
+ val containsCoordinatorEpochIterator = transactionProducerIterator
+ .filter(producer => producer.coordinatorEpoch().isPresent)
+ assertEquals(1, containsCoordinatorEpochIterator.size)
+ val transactionProducerState = containsCoordinatorEpochIterator.last
+
assertFalse(transactionProducerState.currentTransactionStartOffset().isPresent)
+
+
+ // send ongoing transaction msg
+ val ongoingProducer = appendTransactionRecords("foo3", 3, commit = false)
+ try {
+ val transactionNoneCommitProducerIterator = queryProducerDetail()
+ assertEquals(3, transactionNoneCommitProducerIterator.size)
+ val containsOngoingIterator = transactionNoneCommitProducerIterator
+ .filter(producer => producer.currentTransactionStartOffset().isPresent)
+ assertEquals(1, containsOngoingIterator.size)
+ val ongoingTransactionProducerState = containsOngoingIterator.last
+ // we send (1 common msg) + (2 transaction msg) + (1 transaction marker
msg), so transactionStartOffset is 4
+ assertEquals(4,
ongoingTransactionProducerState.currentTransactionStartOffset().getAsLong)
+ } finally ongoingProducer.close()
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeTransactions(quorum: String): Unit = {
+ client = createAdminClient
+ val tp = new TopicPartition("topic1", 0)
+ client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1,
1.toShort))).all().get()
+
+ var transactionId = "foo"
+
+ def consumeToExpectedNumber = (expectedNumber: Int) => {
+ val configs = new util.HashMap[String, Object]()
+ configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
plaintextBootstrapServers(brokers))
+ configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_COMMITTED.toString)
+ val consumer = new KafkaConsumer(configs, new ByteArrayDeserializer, new
ByteArrayDeserializer)
+ try {
+ consumer.assign(Collections.singleton(tp))
+ consumer.seekToBeginning(Collections.singleton(tp))
+ var consumeNum = 0
+ while (consumeNum < expectedNumber) {
+ val records = consumer.poll(time.Duration.ofMillis(100))
+ consumeNum += records.count()
+ }
+ } finally consumer.close()
+ }
+
+ def transactionState(): TransactionState = {
+
client.describeTransactions(Collections.singleton(transactionId)).description(transactionId).get().state()
+ }
+
+ def findCoordinatorIdByTransactionId(transactionId: String): Int = {
+ // calculate the transaction partition id
+ val transactionPartitionId = Utils.abs(transactionId.hashCode) % 50
+ val transactionTopic =
client.describeTopics(Collections.singleton(Topic.TRANSACTION_STATE_TOPIC_NAME))
+ val partitionList =
transactionTopic.allTopicNames().get().get(Topic.TRANSACTION_STATE_TOPIC_NAME).partitions()
+ val partition = partitionList.asScala.filter(tp => tp.partition() ==
transactionPartitionId).head
+ partition.leader().id()
+ }
+
+ // normal commit case
+ val producer = TestUtils.createTransactionalProducer(transactionId,
brokers)
+ try {
+ // init, the transaction is not begin, so
TransactionalIdNotFoundException is expected
+ val exception = assertThrows(classOf[ExecutionException], () =>
transactionState())
+ assertInstanceOf(classOf[TransactionalIdNotFoundException],
exception.getCause)
+
+ producer.initTransactions()
+ assertEquals(TransactionState.EMPTY, transactionState())
+ producer.beginTransaction()
+ assertEquals(TransactionState.EMPTY, transactionState())
+
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+ producer.flush()
+ assertEquals(TransactionState.ONGOING, transactionState())
+ producer.commitTransaction()
+
+
+ val transactionResult =
client.describeTransactions(Collections.singleton(transactionId))
+ .description(transactionId).get()
+ assertEquals(findCoordinatorIdByTransactionId(transactionId),
transactionResult.coordinatorId())
+ assertEquals(0, transactionResult.producerId())
+ assertEquals(0, transactionResult.producerEpoch())
+ assertEquals(1, transactionResult.topicPartitions().size())
+ assertEquals(tp, transactionResult.topicPartitions().asScala.head)
+
+
+ val state = transactionState()
+ // Either PREPARE_COMMIT or COMPLETE_COMMIT is expected
+ assertTrue(state == TransactionState.PREPARE_COMMIT || state ==
TransactionState.COMPLETE_COMMIT)
+ // producer commit transaction, but maybe transaction coordinator has
not been submitted mark msg
+ // so we start up a consumer and consume the expected number of msg, to
ensure transaction committed
+ consumeToExpectedNumber(1)
+ assertEquals(TransactionState.COMPLETE_COMMIT, transactionState())
+ } finally producer.close()
+
+ // abort case
+ transactionId = "foo2"
+ val abortProducer = TestUtils.createTransactionalProducer(transactionId,
brokers)
+ try {
+ // init, the transaction is not begin, so
TransactionalIdNotFoundException is expected
+ val exception = assertThrows(classOf[ExecutionException], () =>
transactionState())
+
assertTrue(exception.getCause.isInstanceOf[TransactionalIdNotFoundException])
+
+ abortProducer.initTransactions()
+ assertEquals(TransactionState.EMPTY, transactionState())
+ abortProducer.beginTransaction()
+ assertEquals(TransactionState.EMPTY, transactionState())
+
+ val transactionResult =
client.describeTransactions(Collections.singleton(transactionId))
+ .description(transactionId).get()
+ assertEquals(findCoordinatorIdByTransactionId(transactionId),
transactionResult.coordinatorId())
+ assertEquals(0, transactionResult.topicPartitions().size())
+
+ abortProducer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+ abortProducer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k2".getBytes, "v2".getBytes()))
+ abortProducer.flush()
+
+ val transactionSendMsgResult =
client.describeTransactions(Collections.singleton(transactionId))
+ .description(transactionId).get()
+ assertEquals(findCoordinatorIdByTransactionId(transactionId),
transactionSendMsgResult.coordinatorId())
+ assertEquals(1, transactionSendMsgResult.topicPartitions().size())
+ assertEquals(tp, transactionSendMsgResult.topicPartitions().asScala.head)
+
+ assertEquals(TransactionState.ONGOING, transactionState())
+ abortProducer.abortTransaction()
+ val state = transactionState()
+ assertTrue(state == TransactionState.PREPARE_ABORT || state ==
TransactionState.COMPLETE_ABORT)
+ // producer commit transaction, but maybe transaction coordinator has
not been submitted mark msg
+ // so we start up a consumer and consume the expected number of msg, to
ensure transaction committed
+ consumeToExpectedNumber(1)
+ assertEquals(TransactionState.COMPLETE_ABORT, transactionState())
+ } finally abortProducer.close()
+ }
+
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeTransactionsTimeoutMs(quorum: String): Unit = {
+ client = createAdminClient
+ val tp = new TopicPartition("topic1", 0)
+ client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1,
1.toShort))).all().get()
+
+ val transactionId = "foo"
+
+ val producer = TestUtils.createTransactionalProducer(transactionId,
brokers)
+ try {
+ producer.initTransactions()
+ producer.beginTransaction()
+
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+ producer.flush()
+ producer.commitTransaction()
+
+
+ val exception = assertThrows(classOf[ExecutionException], () => {
+ val timeoutOptions = new DescribeTransactionsOptions()
+ timeoutOptions.timeoutMs(0)
+ client.describeTransactions(Collections.singleton(transactionId),
timeoutOptions)
+ .description(transactionId).get()
+ })
+ assertInstanceOf(classOf[TimeoutException], exception.getCause)
+ } finally producer.close()
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testAbortTransactionTimeout(quorum: String): Unit = {
+ client = createAdminClient
+ val tp = new TopicPartition("topic1", 0)
+ client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1,
1.toShort))).all().get()
+
+ val producer = TestUtils.createTransactionalProducer("foo", brokers)
+ try {
+ producer.initTransactions()
+ producer.beginTransaction()
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+ producer.flush()
+ producer.commitTransaction()
+
+ producer.beginTransaction()
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k2".getBytes, "v2".getBytes()))
+ producer.flush()
+
+ val transactionalProducer =
client.describeProducers(Collections.singletonList(tp))
+
.partitionResult(tp).get().activeProducers().asScala.minBy(_.producerId())
+
+ val exception = assertThrows(classOf[ExecutionException], () => {
+ val options = new AbortTransactionOptions()
+ options.timeoutMs(0)
+ client.abortTransaction(
+ new AbortTransactionSpec(tp,
+ transactionalProducer.producerId(),
+ transactionalProducer.producerEpoch().toShort,
+ transactionalProducer.coordinatorEpoch().getAsInt),
options).all().get()
+ })
+ assertInstanceOf(classOf[TimeoutException], exception.getCause)
+ } finally producer.close()
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testListTransactions(quorum: String): Unit = {
+ val tp = new TopicPartition("topic1", 0)
+ def createTransactionList(): Unit = {
+ client = createAdminClient
+ client.createTopics(Collections.singletonList(new NewTopic(tp.topic(),
1, 1.toShort))).all().get()
+
+ def consumeToExpectedNumber = (expectedNumber: Int) => {
+ val configs = new util.HashMap[String, Object]()
+ configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
plaintextBootstrapServers(brokers))
+ configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_COMMITTED.toString)
+ val consumer = new KafkaConsumer(configs, new ByteArrayDeserializer,
new ByteArrayDeserializer)
+ try {
+ consumer.assign(Collections.singleton(tp))
+ consumer.seekToBeginning(Collections.singleton(tp))
+ var consumeNum = 0
+ while (consumeNum < expectedNumber) {
+ val records = consumer.poll(time.Duration.ofMillis(100))
+ consumeNum += records.count()
+ }
+ } finally consumer.close()
+ }
+
+ val producer = TestUtils.createTransactionalProducer("foo", brokers)
+ try {
+ producer.initTransactions()
+ producer.beginTransaction()
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+ producer.flush()
+ producer.commitTransaction()
+ } finally producer.close()
+
+ val producer2 = TestUtils.createTransactionalProducer("foo2", brokers)
+ try {
+ producer2.initTransactions()
+ producer2.beginTransaction()
+ producer2.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+ producer2.flush()
+ producer2.abortTransaction()
+ } finally producer2.close()
+
+ val producer3 = TestUtils.createTransactionalProducer("foo3", brokers)
+ try {
+ producer3.initTransactions()
+ producer3.beginTransaction()
+ producer3.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+ producer3.flush()
+ producer3.commitTransaction()
+ } finally producer3.close()
+
+ consumeToExpectedNumber(2)
+ }
+
+ createTransactionList()
+
+ val result = client.listTransactions().all().get()
+ assertEquals(3, result.size())
+
+ var options = new ListTransactionsOptions()
+ val commitStates: util.Collection[TransactionState] = new
util.ArrayList[TransactionState]()
+ commitStates.add(TransactionState.COMPLETE_COMMIT)
+ options.filterStates(commitStates)
+ assertEquals(2, client.listTransactions(options).all().get().size())
+
+ options = new ListTransactionsOptions()
+ val abortStates: util.Collection[TransactionState] = new
util.ArrayList[TransactionState]()
+ abortStates.add(TransactionState.COMPLETE_ABORT)
+ options.filterStates(abortStates)
+ assertEquals(1, client.listTransactions(options).all().get().size())
+
+ options = new ListTransactionsOptions()
Review Comment:
ditto
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -94,6 +95,438 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
super.tearDown()
}
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeUserScramCredentials(quorum: String): Unit = {
+ client = createAdminClient
+
+ // add a new user
+ val targetUserName: String = "tom"
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion(targetUserName, new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+ )).all.get
+ TestUtils.waitUntilTrue(() =>
{client.describeUserScramCredentials().all().get().size() == 1},
+ "Add one user scram credential timeout")
+ val result = client.describeUserScramCredentials().all().get()
+ result.forEach((userName, scramDescription) => {
+ assertEquals(targetUserName, userName)
+ assertEquals(targetUserName, scramDescription.name())
+ val credentialInfos = scramDescription.credentialInfos()
+ assertEquals(1, credentialInfos.size())
+ assertEquals(ScramMechanism.SCRAM_SHA_256,
credentialInfos.get(0).mechanism())
+ assertEquals(4096, credentialInfos.get(0).iterations())
+ })
+
+ // add other users
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion("tom2", new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456"),
+ new UserScramCredentialUpsertion("tom3", new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+ )).all().get
+ TestUtils.waitUntilTrue(() =>
{client.describeUserScramCredentials().all().get().size() == 3},
+ "Add user scram credential timeout")
+ assertEquals(3, client.describeUserScramCredentials().all().get().size())
+
+ // alter user info
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion(targetUserName, new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_512, 8192), "123456")
+ )).all.get
+ TestUtils.waitUntilTrue(() => {
+ val userTomResult = client.describeUserScramCredentials().all().get()
+ val userScramCredential = userTomResult.get(targetUserName)
+ val credentialInfos = userScramCredential.credentialInfos()
+ credentialInfos.size() == 2
+ }, "Alter user scram credential timeout")
+ val userTomResult = client.describeUserScramCredentials().all().get()
+ assertEquals(3, userTomResult.size())
+ val userScramCredential = userTomResult.get(targetUserName)
+ assertEquals(targetUserName, userScramCredential.name())
+ val credentialInfos = userScramCredential.credentialInfos()
+ assertEquals(2, credentialInfos.size())
+ val credentialList = credentialInfos.asScala.sortBy(s =>
s.mechanism().`type`())
+ assertEquals(ScramMechanism.SCRAM_SHA_256, credentialList.head.mechanism())
+ assertEquals(4096, credentialList.head.iterations())
+ assertEquals(ScramMechanism.SCRAM_SHA_512, credentialList(1).mechanism())
+ assertEquals(8192, credentialList(1).iterations())
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeProducers(quorum: String): Unit = {
+ client = createAdminClient
+ val tp = new TopicPartition("topic1", 0)
+ client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1,
1.toShort))).all().get()
+
+ def appendCommonRecords = (records: Int) => {
+ val producer = new
KafkaProducer(Collections.singletonMap(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
+ plaintextBootstrapServers(brokers).asInstanceOf[Object]), new
ByteArraySerializer, new ByteArraySerializer)
+ try {
+ (0 until records).foreach(i =>
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), i.toString.getBytes,
i.toString.getBytes())))
+ } finally producer.close()
+ }
+
+ def appendTransactionRecords(transactionId: String, records: Int, commit:
Boolean): KafkaProducer[Array[Byte], Array[Byte]] = {
+ val producer = TestUtils.createTransactionalProducer(transactionId,
brokers)
+ producer.initTransactions()
+ producer.beginTransaction()
+ (0 until records).foreach(i =>
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), i.toString.getBytes,
i.toString.getBytes())))
+ producer.flush()
+ if (commit) {
+ producer.commitTransaction()
+ producer.close()
+ }
+
+ producer
+ }
+
+ def consumeToExpectedNumber = (expectedNumber: Int) => {
+ val configs = new util.HashMap[String, Object]()
+ configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
plaintextBootstrapServers(brokers))
+ configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_COMMITTED.toString)
+ val consumer = new KafkaConsumer(configs, new ByteArrayDeserializer, new
ByteArrayDeserializer)
+ try {
+ consumer.assign(Collections.singleton(tp))
+ consumer.seekToBeginning(Collections.singleton(tp))
+ var consumeNum = 0
+ while (consumeNum < expectedNumber) {
+ val records = consumer.poll(time.Duration.ofMillis(100))
+ consumeNum += records.count()
+ }
+ } finally consumer.close()
+ }
+
+ def queryProducerDetail() =
client.describeProducers(Collections.singletonList(tp))
+ .partitionResult(tp).get().activeProducers().asScala
+
+ // send common msg
+ appendCommonRecords(1)
+ val producerIterator = queryProducerDetail()
+ assertEquals(1, producerIterator.size)
+ val producerState = producerIterator.last
+ assertEquals(0, producerState.producerEpoch())
+ assertFalse(producerState.coordinatorEpoch().isPresent)
+ assertFalse(producerState.currentTransactionStartOffset().isPresent)
+
+
+ // send committed transaction msg
+ appendTransactionRecords("foo", 2, commit = true)
+ // consume 3 records to ensure transaction finished
+ consumeToExpectedNumber(3)
+ val transactionProducerIterator = queryProducerDetail()
+ assertEquals(2, transactionProducerIterator.size)
+ val containsCoordinatorEpochIterator = transactionProducerIterator
+ .filter(producer => producer.coordinatorEpoch().isPresent)
+ assertEquals(1, containsCoordinatorEpochIterator.size)
+ val transactionProducerState = containsCoordinatorEpochIterator.last
+
assertFalse(transactionProducerState.currentTransactionStartOffset().isPresent)
+
+
+ // send ongoing transaction msg
+ val ongoingProducer = appendTransactionRecords("foo3", 3, commit = false)
+ try {
+ val transactionNoneCommitProducerIterator = queryProducerDetail()
+ assertEquals(3, transactionNoneCommitProducerIterator.size)
+ val containsOngoingIterator = transactionNoneCommitProducerIterator
+ .filter(producer => producer.currentTransactionStartOffset().isPresent)
+ assertEquals(1, containsOngoingIterator.size)
+ val ongoingTransactionProducerState = containsOngoingIterator.last
+ // we send (1 common msg) + (2 transaction msg) + (1 transaction marker
msg), so transactionStartOffset is 4
+ assertEquals(4,
ongoingTransactionProducerState.currentTransactionStartOffset().getAsLong)
+ } finally ongoingProducer.close()
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeTransactions(quorum: String): Unit = {
+ client = createAdminClient
+ val tp = new TopicPartition("topic1", 0)
+ client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1,
1.toShort))).all().get()
+
+ var transactionId = "foo"
+
+ def consumeToExpectedNumber = (expectedNumber: Int) => {
+ val configs = new util.HashMap[String, Object]()
+ configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
plaintextBootstrapServers(brokers))
+ configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_COMMITTED.toString)
+ val consumer = new KafkaConsumer(configs, new ByteArrayDeserializer, new
ByteArrayDeserializer)
+ try {
+ consumer.assign(Collections.singleton(tp))
+ consumer.seekToBeginning(Collections.singleton(tp))
+ var consumeNum = 0
+ while (consumeNum < expectedNumber) {
+ val records = consumer.poll(time.Duration.ofMillis(100))
+ consumeNum += records.count()
+ }
+ } finally consumer.close()
+ }
+
+ def transactionState(): TransactionState = {
+
client.describeTransactions(Collections.singleton(transactionId)).description(transactionId).get().state()
+ }
+
+ def findCoordinatorIdByTransactionId(transactionId: String): Int = {
+ // calculate the transaction partition id
+ val transactionPartitionId = Utils.abs(transactionId.hashCode) % 50
+ val transactionTopic =
client.describeTopics(Collections.singleton(Topic.TRANSACTION_STATE_TOPIC_NAME))
+ val partitionList =
transactionTopic.allTopicNames().get().get(Topic.TRANSACTION_STATE_TOPIC_NAME).partitions()
+ val partition = partitionList.asScala.filter(tp => tp.partition() ==
transactionPartitionId).head
+ partition.leader().id()
+ }
+
+ // normal commit case
+ val producer = TestUtils.createTransactionalProducer(transactionId,
brokers)
+ try {
+ // init, the transaction is not begin, so
TransactionalIdNotFoundException is expected
+ val exception = assertThrows(classOf[ExecutionException], () =>
transactionState())
+ assertInstanceOf(classOf[TransactionalIdNotFoundException],
exception.getCause)
+
+ producer.initTransactions()
+ assertEquals(TransactionState.EMPTY, transactionState())
+ producer.beginTransaction()
+ assertEquals(TransactionState.EMPTY, transactionState())
+
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+ producer.flush()
+ assertEquals(TransactionState.ONGOING, transactionState())
+ producer.commitTransaction()
+
+
+ val transactionResult =
client.describeTransactions(Collections.singleton(transactionId))
+ .description(transactionId).get()
+ assertEquals(findCoordinatorIdByTransactionId(transactionId),
transactionResult.coordinatorId())
+ assertEquals(0, transactionResult.producerId())
+ assertEquals(0, transactionResult.producerEpoch())
+ assertEquals(1, transactionResult.topicPartitions().size())
+ assertEquals(tp, transactionResult.topicPartitions().asScala.head)
+
+
+ val state = transactionState()
+ // Either PREPARE_COMMIT or COMPLETE_COMMIT is expected
+ assertTrue(state == TransactionState.PREPARE_COMMIT || state ==
TransactionState.COMPLETE_COMMIT)
+ // producer commit transaction, but maybe transaction coordinator has
not been submitted mark msg
+ // so we start up a consumer and consume the expected number of msg, to
ensure transaction committed
+ consumeToExpectedNumber(1)
+ assertEquals(TransactionState.COMPLETE_COMMIT, transactionState())
+ } finally producer.close()
+
+ // abort case
+ transactionId = "foo2"
+ val abortProducer = TestUtils.createTransactionalProducer(transactionId,
brokers)
+ try {
+ // init, the transaction is not begin, so
TransactionalIdNotFoundException is expected
+ val exception = assertThrows(classOf[ExecutionException], () =>
transactionState())
+
assertTrue(exception.getCause.isInstanceOf[TransactionalIdNotFoundException])
+
+ abortProducer.initTransactions()
+ assertEquals(TransactionState.EMPTY, transactionState())
+ abortProducer.beginTransaction()
+ assertEquals(TransactionState.EMPTY, transactionState())
+
+ val transactionResult =
client.describeTransactions(Collections.singleton(transactionId))
+ .description(transactionId).get()
+ assertEquals(findCoordinatorIdByTransactionId(transactionId),
transactionResult.coordinatorId())
+ assertEquals(0, transactionResult.topicPartitions().size())
+
+ abortProducer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+ abortProducer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k2".getBytes, "v2".getBytes()))
+ abortProducer.flush()
+
+ val transactionSendMsgResult =
client.describeTransactions(Collections.singleton(transactionId))
+ .description(transactionId).get()
+ assertEquals(findCoordinatorIdByTransactionId(transactionId),
transactionSendMsgResult.coordinatorId())
+ assertEquals(1, transactionSendMsgResult.topicPartitions().size())
Review Comment:
ditto
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -94,6 +95,438 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
super.tearDown()
}
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeUserScramCredentials(quorum: String): Unit = {
+ client = createAdminClient
+
+ // add a new user
+ val targetUserName: String = "tom"
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion(targetUserName, new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+ )).all.get
+ TestUtils.waitUntilTrue(() =>
{client.describeUserScramCredentials().all().get().size() == 1},
+ "Add one user scram credential timeout")
+ val result = client.describeUserScramCredentials().all().get()
+ result.forEach((userName, scramDescription) => {
+ assertEquals(targetUserName, userName)
+ assertEquals(targetUserName, scramDescription.name())
+ val credentialInfos = scramDescription.credentialInfos()
+ assertEquals(1, credentialInfos.size())
+ assertEquals(ScramMechanism.SCRAM_SHA_256,
credentialInfos.get(0).mechanism())
+ assertEquals(4096, credentialInfos.get(0).iterations())
+ })
+
+ // add other users
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion("tom2", new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456"),
+ new UserScramCredentialUpsertion("tom3", new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+ )).all().get
+ TestUtils.waitUntilTrue(() =>
{client.describeUserScramCredentials().all().get().size() == 3},
+ "Add user scram credential timeout")
+ assertEquals(3, client.describeUserScramCredentials().all().get().size())
+
+ // alter user info
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion(targetUserName, new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_512, 8192), "123456")
+ )).all.get
+ TestUtils.waitUntilTrue(() => {
+ val userTomResult = client.describeUserScramCredentials().all().get()
+ val userScramCredential = userTomResult.get(targetUserName)
+ val credentialInfos = userScramCredential.credentialInfos()
+ credentialInfos.size() == 2
+ }, "Alter user scram credential timeout")
+ val userTomResult = client.describeUserScramCredentials().all().get()
+ assertEquals(3, userTomResult.size())
+ val userScramCredential = userTomResult.get(targetUserName)
+ assertEquals(targetUserName, userScramCredential.name())
+ val credentialInfos = userScramCredential.credentialInfos()
+ assertEquals(2, credentialInfos.size())
+ val credentialList = credentialInfos.asScala.sortBy(s =>
s.mechanism().`type`())
+ assertEquals(ScramMechanism.SCRAM_SHA_256, credentialList.head.mechanism())
+ assertEquals(4096, credentialList.head.iterations())
+ assertEquals(ScramMechanism.SCRAM_SHA_512, credentialList(1).mechanism())
+ assertEquals(8192, credentialList(1).iterations())
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeProducers(quorum: String): Unit = {
+ client = createAdminClient
+ val tp = new TopicPartition("topic1", 0)
+ client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1,
1.toShort))).all().get()
+
+ def appendCommonRecords = (records: Int) => {
+ val producer = new
KafkaProducer(Collections.singletonMap(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
+ plaintextBootstrapServers(brokers).asInstanceOf[Object]), new
ByteArraySerializer, new ByteArraySerializer)
+ try {
+ (0 until records).foreach(i =>
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), i.toString.getBytes,
i.toString.getBytes())))
+ } finally producer.close()
+ }
+
+ def appendTransactionRecords(transactionId: String, records: Int, commit:
Boolean): KafkaProducer[Array[Byte], Array[Byte]] = {
+ val producer = TestUtils.createTransactionalProducer(transactionId,
brokers)
+ producer.initTransactions()
+ producer.beginTransaction()
+ (0 until records).foreach(i =>
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), i.toString.getBytes,
i.toString.getBytes())))
+ producer.flush()
+ if (commit) {
+ producer.commitTransaction()
+ producer.close()
+ }
+
+ producer
+ }
+
+ def consumeToExpectedNumber = (expectedNumber: Int) => {
+ val configs = new util.HashMap[String, Object]()
+ configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
plaintextBootstrapServers(brokers))
+ configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_COMMITTED.toString)
+ val consumer = new KafkaConsumer(configs, new ByteArrayDeserializer, new
ByteArrayDeserializer)
+ try {
+ consumer.assign(Collections.singleton(tp))
+ consumer.seekToBeginning(Collections.singleton(tp))
+ var consumeNum = 0
+ while (consumeNum < expectedNumber) {
Review Comment:
This will be a infinite loop if the test gets something wrong. Could you
consider adding "timeout" to this method?
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -94,6 +95,438 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
super.tearDown()
}
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeUserScramCredentials(quorum: String): Unit = {
+ client = createAdminClient
+
+ // add a new user
+ val targetUserName: String = "tom"
+ client.alterUserScramCredentials(util.Arrays.asList(
Review Comment:
`Collections.singletonList`
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -94,6 +95,438 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
super.tearDown()
}
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeUserScramCredentials(quorum: String): Unit = {
+ client = createAdminClient
+
+ // add a new user
+ val targetUserName: String = "tom"
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion(targetUserName, new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+ )).all.get
+ TestUtils.waitUntilTrue(() =>
{client.describeUserScramCredentials().all().get().size() == 1},
+ "Add one user scram credential timeout")
+ val result = client.describeUserScramCredentials().all().get()
+ result.forEach((userName, scramDescription) => {
+ assertEquals(targetUserName, userName)
+ assertEquals(targetUserName, scramDescription.name())
+ val credentialInfos = scramDescription.credentialInfos()
+ assertEquals(1, credentialInfos.size())
+ assertEquals(ScramMechanism.SCRAM_SHA_256,
credentialInfos.get(0).mechanism())
+ assertEquals(4096, credentialInfos.get(0).iterations())
+ })
+
+ // add other users
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion("tom2", new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456"),
+ new UserScramCredentialUpsertion("tom3", new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+ )).all().get
+ TestUtils.waitUntilTrue(() =>
{client.describeUserScramCredentials().all().get().size() == 3},
+ "Add user scram credential timeout")
+ assertEquals(3, client.describeUserScramCredentials().all().get().size())
+
+ // alter user info
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion(targetUserName, new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_512, 8192), "123456")
+ )).all.get
+ TestUtils.waitUntilTrue(() => {
+ val userTomResult = client.describeUserScramCredentials().all().get()
+ val userScramCredential = userTomResult.get(targetUserName)
+ val credentialInfos = userScramCredential.credentialInfos()
+ credentialInfos.size() == 2
+ }, "Alter user scram credential timeout")
+ val userTomResult = client.describeUserScramCredentials().all().get()
+ assertEquals(3, userTomResult.size())
+ val userScramCredential = userTomResult.get(targetUserName)
+ assertEquals(targetUserName, userScramCredential.name())
+ val credentialInfos = userScramCredential.credentialInfos()
+ assertEquals(2, credentialInfos.size())
+ val credentialList = credentialInfos.asScala.sortBy(s =>
s.mechanism().`type`())
+ assertEquals(ScramMechanism.SCRAM_SHA_256, credentialList.head.mechanism())
+ assertEquals(4096, credentialList.head.iterations())
+ assertEquals(ScramMechanism.SCRAM_SHA_512, credentialList(1).mechanism())
+ assertEquals(8192, credentialList(1).iterations())
+ }
Review Comment:
Could you add a test to verify timeout?
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -94,6 +95,438 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
super.tearDown()
}
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeUserScramCredentials(quorum: String): Unit = {
+ client = createAdminClient
+
+ // add a new user
+ val targetUserName: String = "tom"
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion(targetUserName, new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+ )).all.get
+ TestUtils.waitUntilTrue(() =>
{client.describeUserScramCredentials().all().get().size() == 1},
+ "Add one user scram credential timeout")
+ val result = client.describeUserScramCredentials().all().get()
+ result.forEach((userName, scramDescription) => {
+ assertEquals(targetUserName, userName)
+ assertEquals(targetUserName, scramDescription.name())
+ val credentialInfos = scramDescription.credentialInfos()
+ assertEquals(1, credentialInfos.size())
+ assertEquals(ScramMechanism.SCRAM_SHA_256,
credentialInfos.get(0).mechanism())
+ assertEquals(4096, credentialInfos.get(0).iterations())
+ })
+
+ // add other users
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion("tom2", new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456"),
+ new UserScramCredentialUpsertion("tom3", new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+ )).all().get
+ TestUtils.waitUntilTrue(() =>
{client.describeUserScramCredentials().all().get().size() == 3},
+ "Add user scram credential timeout")
+ assertEquals(3, client.describeUserScramCredentials().all().get().size())
+
+ // alter user info
+ client.alterUserScramCredentials(util.Arrays.asList(
Review Comment:
`Collections.singletonList`
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -94,6 +95,438 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
super.tearDown()
}
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeUserScramCredentials(quorum: String): Unit = {
+ client = createAdminClient
+
+ // add a new user
+ val targetUserName: String = "tom"
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion(targetUserName, new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+ )).all.get
+ TestUtils.waitUntilTrue(() =>
{client.describeUserScramCredentials().all().get().size() == 1},
+ "Add one user scram credential timeout")
+ val result = client.describeUserScramCredentials().all().get()
+ result.forEach((userName, scramDescription) => {
+ assertEquals(targetUserName, userName)
+ assertEquals(targetUserName, scramDescription.name())
+ val credentialInfos = scramDescription.credentialInfos()
+ assertEquals(1, credentialInfos.size())
+ assertEquals(ScramMechanism.SCRAM_SHA_256,
credentialInfos.get(0).mechanism())
+ assertEquals(4096, credentialInfos.get(0).iterations())
+ })
+
+ // add other users
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion("tom2", new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456"),
+ new UserScramCredentialUpsertion("tom3", new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+ )).all().get
+ TestUtils.waitUntilTrue(() =>
{client.describeUserScramCredentials().all().get().size() == 3},
Review Comment:
`() => client.describeUserScramCredentials().all().get().size() == 3`
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -94,6 +95,438 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
super.tearDown()
}
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeUserScramCredentials(quorum: String): Unit = {
+ client = createAdminClient
+
+ // add a new user
+ val targetUserName: String = "tom"
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion(targetUserName, new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+ )).all.get
+ TestUtils.waitUntilTrue(() =>
{client.describeUserScramCredentials().all().get().size() == 1},
+ "Add one user scram credential timeout")
+ val result = client.describeUserScramCredentials().all().get()
+ result.forEach((userName, scramDescription) => {
+ assertEquals(targetUserName, userName)
+ assertEquals(targetUserName, scramDescription.name())
+ val credentialInfos = scramDescription.credentialInfos()
+ assertEquals(1, credentialInfos.size())
+ assertEquals(ScramMechanism.SCRAM_SHA_256,
credentialInfos.get(0).mechanism())
+ assertEquals(4096, credentialInfos.get(0).iterations())
+ })
+
+ // add other users
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion("tom2", new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456"),
+ new UserScramCredentialUpsertion("tom3", new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+ )).all().get
+ TestUtils.waitUntilTrue(() =>
{client.describeUserScramCredentials().all().get().size() == 3},
+ "Add user scram credential timeout")
+ assertEquals(3, client.describeUserScramCredentials().all().get().size())
+
+ // alter user info
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion(targetUserName, new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_512, 8192), "123456")
+ )).all.get
+ TestUtils.waitUntilTrue(() => {
+ val userTomResult = client.describeUserScramCredentials().all().get()
+ val userScramCredential = userTomResult.get(targetUserName)
+ val credentialInfos = userScramCredential.credentialInfos()
+ credentialInfos.size() == 2
+ }, "Alter user scram credential timeout")
+ val userTomResult = client.describeUserScramCredentials().all().get()
+ assertEquals(3, userTomResult.size())
+ val userScramCredential = userTomResult.get(targetUserName)
+ assertEquals(targetUserName, userScramCredential.name())
+ val credentialInfos = userScramCredential.credentialInfos()
+ assertEquals(2, credentialInfos.size())
+ val credentialList = credentialInfos.asScala.sortBy(s =>
s.mechanism().`type`())
+ assertEquals(ScramMechanism.SCRAM_SHA_256, credentialList.head.mechanism())
+ assertEquals(4096, credentialList.head.iterations())
+ assertEquals(ScramMechanism.SCRAM_SHA_512, credentialList(1).mechanism())
+ assertEquals(8192, credentialList(1).iterations())
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeProducers(quorum: String): Unit = {
+ client = createAdminClient
+ val tp = new TopicPartition("topic1", 0)
+ client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1,
1.toShort))).all().get()
+
+ def appendCommonRecords = (records: Int) => {
+ val producer = new
KafkaProducer(Collections.singletonMap(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
+ plaintextBootstrapServers(brokers).asInstanceOf[Object]), new
ByteArraySerializer, new ByteArraySerializer)
+ try {
+ (0 until records).foreach(i =>
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), i.toString.getBytes,
i.toString.getBytes())))
+ } finally producer.close()
+ }
+
+ def appendTransactionRecords(transactionId: String, records: Int, commit:
Boolean): KafkaProducer[Array[Byte], Array[Byte]] = {
+ val producer = TestUtils.createTransactionalProducer(transactionId,
brokers)
+ producer.initTransactions()
+ producer.beginTransaction()
+ (0 until records).foreach(i =>
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), i.toString.getBytes,
i.toString.getBytes())))
+ producer.flush()
+ if (commit) {
+ producer.commitTransaction()
+ producer.close()
+ }
+
+ producer
+ }
+
+ def consumeToExpectedNumber = (expectedNumber: Int) => {
+ val configs = new util.HashMap[String, Object]()
+ configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
plaintextBootstrapServers(brokers))
+ configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_COMMITTED.toString)
+ val consumer = new KafkaConsumer(configs, new ByteArrayDeserializer, new
ByteArrayDeserializer)
+ try {
+ consumer.assign(Collections.singleton(tp))
+ consumer.seekToBeginning(Collections.singleton(tp))
+ var consumeNum = 0
+ while (consumeNum < expectedNumber) {
+ val records = consumer.poll(time.Duration.ofMillis(100))
+ consumeNum += records.count()
+ }
+ } finally consumer.close()
+ }
+
+ def queryProducerDetail() =
client.describeProducers(Collections.singletonList(tp))
+ .partitionResult(tp).get().activeProducers().asScala
+
+ // send common msg
+ appendCommonRecords(1)
+ val producerIterator = queryProducerDetail()
+ assertEquals(1, producerIterator.size)
+ val producerState = producerIterator.last
+ assertEquals(0, producerState.producerEpoch())
+ assertFalse(producerState.coordinatorEpoch().isPresent)
+ assertFalse(producerState.currentTransactionStartOffset().isPresent)
+
+
+ // send committed transaction msg
+ appendTransactionRecords("foo", 2, commit = true)
+ // consume 3 records to ensure transaction finished
+ consumeToExpectedNumber(3)
+ val transactionProducerIterator = queryProducerDetail()
+ assertEquals(2, transactionProducerIterator.size)
+ val containsCoordinatorEpochIterator = transactionProducerIterator
+ .filter(producer => producer.coordinatorEpoch().isPresent)
+ assertEquals(1, containsCoordinatorEpochIterator.size)
+ val transactionProducerState = containsCoordinatorEpochIterator.last
+
assertFalse(transactionProducerState.currentTransactionStartOffset().isPresent)
+
+
+ // send ongoing transaction msg
+ val ongoingProducer = appendTransactionRecords("foo3", 3, commit = false)
+ try {
+ val transactionNoneCommitProducerIterator = queryProducerDetail()
+ assertEquals(3, transactionNoneCommitProducerIterator.size)
+ val containsOngoingIterator = transactionNoneCommitProducerIterator
+ .filter(producer => producer.currentTransactionStartOffset().isPresent)
+ assertEquals(1, containsOngoingIterator.size)
+ val ongoingTransactionProducerState = containsOngoingIterator.last
+ // we send (1 common msg) + (2 transaction msg) + (1 transaction marker
msg), so transactionStartOffset is 4
+ assertEquals(4,
ongoingTransactionProducerState.currentTransactionStartOffset().getAsLong)
+ } finally ongoingProducer.close()
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeTransactions(quorum: String): Unit = {
+ client = createAdminClient
+ val tp = new TopicPartition("topic1", 0)
+ client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1,
1.toShort))).all().get()
+
+ var transactionId = "foo"
+
+ def consumeToExpectedNumber = (expectedNumber: Int) => {
+ val configs = new util.HashMap[String, Object]()
+ configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
plaintextBootstrapServers(brokers))
+ configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_COMMITTED.toString)
+ val consumer = new KafkaConsumer(configs, new ByteArrayDeserializer, new
ByteArrayDeserializer)
+ try {
+ consumer.assign(Collections.singleton(tp))
+ consumer.seekToBeginning(Collections.singleton(tp))
+ var consumeNum = 0
+ while (consumeNum < expectedNumber) {
+ val records = consumer.poll(time.Duration.ofMillis(100))
+ consumeNum += records.count()
+ }
+ } finally consumer.close()
+ }
+
+ def transactionState(): TransactionState = {
+
client.describeTransactions(Collections.singleton(transactionId)).description(transactionId).get().state()
+ }
+
+ def findCoordinatorIdByTransactionId(transactionId: String): Int = {
+ // calculate the transaction partition id
+ val transactionPartitionId = Utils.abs(transactionId.hashCode) % 50
+ val transactionTopic =
client.describeTopics(Collections.singleton(Topic.TRANSACTION_STATE_TOPIC_NAME))
+ val partitionList =
transactionTopic.allTopicNames().get().get(Topic.TRANSACTION_STATE_TOPIC_NAME).partitions()
+ val partition = partitionList.asScala.filter(tp => tp.partition() ==
transactionPartitionId).head
+ partition.leader().id()
+ }
+
+ // normal commit case
+ val producer = TestUtils.createTransactionalProducer(transactionId,
brokers)
+ try {
+ // init, the transaction is not begin, so
TransactionalIdNotFoundException is expected
+ val exception = assertThrows(classOf[ExecutionException], () =>
transactionState())
+ assertInstanceOf(classOf[TransactionalIdNotFoundException],
exception.getCause)
+
+ producer.initTransactions()
+ assertEquals(TransactionState.EMPTY, transactionState())
+ producer.beginTransaction()
+ assertEquals(TransactionState.EMPTY, transactionState())
+
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+ producer.flush()
+ assertEquals(TransactionState.ONGOING, transactionState())
+ producer.commitTransaction()
+
+
+ val transactionResult =
client.describeTransactions(Collections.singleton(transactionId))
+ .description(transactionId).get()
+ assertEquals(findCoordinatorIdByTransactionId(transactionId),
transactionResult.coordinatorId())
+ assertEquals(0, transactionResult.producerId())
+ assertEquals(0, transactionResult.producerEpoch())
+ assertEquals(1, transactionResult.topicPartitions().size())
Review Comment:
`assertEquals(Collections.singleton(tp),
transactionResult.topicPartitions())`
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -94,6 +95,438 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
super.tearDown()
}
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeUserScramCredentials(quorum: String): Unit = {
+ client = createAdminClient
+
+ // add a new user
+ val targetUserName: String = "tom"
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion(targetUserName, new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+ )).all.get
+ TestUtils.waitUntilTrue(() =>
{client.describeUserScramCredentials().all().get().size() == 1},
+ "Add one user scram credential timeout")
+ val result = client.describeUserScramCredentials().all().get()
+ result.forEach((userName, scramDescription) => {
+ assertEquals(targetUserName, userName)
+ assertEquals(targetUserName, scramDescription.name())
+ val credentialInfos = scramDescription.credentialInfos()
+ assertEquals(1, credentialInfos.size())
+ assertEquals(ScramMechanism.SCRAM_SHA_256,
credentialInfos.get(0).mechanism())
+ assertEquals(4096, credentialInfos.get(0).iterations())
+ })
+
+ // add other users
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion("tom2", new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456"),
+ new UserScramCredentialUpsertion("tom3", new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+ )).all().get
+ TestUtils.waitUntilTrue(() =>
{client.describeUserScramCredentials().all().get().size() == 3},
+ "Add user scram credential timeout")
+ assertEquals(3, client.describeUserScramCredentials().all().get().size())
+
+ // alter user info
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion(targetUserName, new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_512, 8192), "123456")
+ )).all.get
+ TestUtils.waitUntilTrue(() => {
+ val userTomResult = client.describeUserScramCredentials().all().get()
+ val userScramCredential = userTomResult.get(targetUserName)
+ val credentialInfos = userScramCredential.credentialInfos()
+ credentialInfos.size() == 2
+ }, "Alter user scram credential timeout")
+ val userTomResult = client.describeUserScramCredentials().all().get()
+ assertEquals(3, userTomResult.size())
+ val userScramCredential = userTomResult.get(targetUserName)
+ assertEquals(targetUserName, userScramCredential.name())
+ val credentialInfos = userScramCredential.credentialInfos()
+ assertEquals(2, credentialInfos.size())
+ val credentialList = credentialInfos.asScala.sortBy(s =>
s.mechanism().`type`())
+ assertEquals(ScramMechanism.SCRAM_SHA_256, credentialList.head.mechanism())
+ assertEquals(4096, credentialList.head.iterations())
+ assertEquals(ScramMechanism.SCRAM_SHA_512, credentialList(1).mechanism())
+ assertEquals(8192, credentialList(1).iterations())
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeProducers(quorum: String): Unit = {
+ client = createAdminClient
+ val tp = new TopicPartition("topic1", 0)
+ client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1,
1.toShort))).all().get()
+
+ def appendCommonRecords = (records: Int) => {
+ val producer = new
KafkaProducer(Collections.singletonMap(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
+ plaintextBootstrapServers(brokers).asInstanceOf[Object]), new
ByteArraySerializer, new ByteArraySerializer)
+ try {
+ (0 until records).foreach(i =>
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), i.toString.getBytes,
i.toString.getBytes())))
+ } finally producer.close()
+ }
+
+ def appendTransactionRecords(transactionId: String, records: Int, commit:
Boolean): KafkaProducer[Array[Byte], Array[Byte]] = {
+ val producer = TestUtils.createTransactionalProducer(transactionId,
brokers)
+ producer.initTransactions()
+ producer.beginTransaction()
+ (0 until records).foreach(i =>
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), i.toString.getBytes,
i.toString.getBytes())))
+ producer.flush()
+ if (commit) {
+ producer.commitTransaction()
+ producer.close()
+ }
+
+ producer
+ }
+
+ def consumeToExpectedNumber = (expectedNumber: Int) => {
+ val configs = new util.HashMap[String, Object]()
+ configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
plaintextBootstrapServers(brokers))
+ configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_COMMITTED.toString)
+ val consumer = new KafkaConsumer(configs, new ByteArrayDeserializer, new
ByteArrayDeserializer)
+ try {
+ consumer.assign(Collections.singleton(tp))
+ consumer.seekToBeginning(Collections.singleton(tp))
+ var consumeNum = 0
+ while (consumeNum < expectedNumber) {
+ val records = consumer.poll(time.Duration.ofMillis(100))
+ consumeNum += records.count()
+ }
+ } finally consumer.close()
+ }
+
+ def queryProducerDetail() =
client.describeProducers(Collections.singletonList(tp))
+ .partitionResult(tp).get().activeProducers().asScala
+
+ // send common msg
+ appendCommonRecords(1)
+ val producerIterator = queryProducerDetail()
+ assertEquals(1, producerIterator.size)
+ val producerState = producerIterator.last
+ assertEquals(0, producerState.producerEpoch())
+ assertFalse(producerState.coordinatorEpoch().isPresent)
+ assertFalse(producerState.currentTransactionStartOffset().isPresent)
+
+
+ // send committed transaction msg
+ appendTransactionRecords("foo", 2, commit = true)
+ // consume 3 records to ensure transaction finished
+ consumeToExpectedNumber(3)
+ val transactionProducerIterator = queryProducerDetail()
+ assertEquals(2, transactionProducerIterator.size)
+ val containsCoordinatorEpochIterator = transactionProducerIterator
+ .filter(producer => producer.coordinatorEpoch().isPresent)
+ assertEquals(1, containsCoordinatorEpochIterator.size)
+ val transactionProducerState = containsCoordinatorEpochIterator.last
+
assertFalse(transactionProducerState.currentTransactionStartOffset().isPresent)
+
+
+ // send ongoing transaction msg
+ val ongoingProducer = appendTransactionRecords("foo3", 3, commit = false)
+ try {
+ val transactionNoneCommitProducerIterator = queryProducerDetail()
+ assertEquals(3, transactionNoneCommitProducerIterator.size)
+ val containsOngoingIterator = transactionNoneCommitProducerIterator
+ .filter(producer => producer.currentTransactionStartOffset().isPresent)
+ assertEquals(1, containsOngoingIterator.size)
+ val ongoingTransactionProducerState = containsOngoingIterator.last
+ // we send (1 common msg) + (2 transaction msg) + (1 transaction marker
msg), so transactionStartOffset is 4
+ assertEquals(4,
ongoingTransactionProducerState.currentTransactionStartOffset().getAsLong)
+ } finally ongoingProducer.close()
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeTransactions(quorum: String): Unit = {
+ client = createAdminClient
+ val tp = new TopicPartition("topic1", 0)
+ client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1,
1.toShort))).all().get()
+
+ var transactionId = "foo"
+
+ def consumeToExpectedNumber = (expectedNumber: Int) => {
Review Comment:
Could you share this helper with other test case?
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -94,6 +95,438 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
super.tearDown()
}
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeUserScramCredentials(quorum: String): Unit = {
+ client = createAdminClient
+
+ // add a new user
+ val targetUserName: String = "tom"
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion(targetUserName, new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+ )).all.get
+ TestUtils.waitUntilTrue(() =>
{client.describeUserScramCredentials().all().get().size() == 1},
+ "Add one user scram credential timeout")
+ val result = client.describeUserScramCredentials().all().get()
+ result.forEach((userName, scramDescription) => {
+ assertEquals(targetUserName, userName)
+ assertEquals(targetUserName, scramDescription.name())
+ val credentialInfos = scramDescription.credentialInfos()
+ assertEquals(1, credentialInfos.size())
+ assertEquals(ScramMechanism.SCRAM_SHA_256,
credentialInfos.get(0).mechanism())
+ assertEquals(4096, credentialInfos.get(0).iterations())
+ })
+
+ // add other users
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion("tom2", new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456"),
+ new UserScramCredentialUpsertion("tom3", new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+ )).all().get
+ TestUtils.waitUntilTrue(() =>
{client.describeUserScramCredentials().all().get().size() == 3},
+ "Add user scram credential timeout")
+ assertEquals(3, client.describeUserScramCredentials().all().get().size())
+
+ // alter user info
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion(targetUserName, new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_512, 8192), "123456")
+ )).all.get
+ TestUtils.waitUntilTrue(() => {
+ val userTomResult = client.describeUserScramCredentials().all().get()
+ val userScramCredential = userTomResult.get(targetUserName)
+ val credentialInfos = userScramCredential.credentialInfos()
+ credentialInfos.size() == 2
+ }, "Alter user scram credential timeout")
+ val userTomResult = client.describeUserScramCredentials().all().get()
+ assertEquals(3, userTomResult.size())
+ val userScramCredential = userTomResult.get(targetUserName)
+ assertEquals(targetUserName, userScramCredential.name())
+ val credentialInfos = userScramCredential.credentialInfos()
+ assertEquals(2, credentialInfos.size())
+ val credentialList = credentialInfos.asScala.sortBy(s =>
s.mechanism().`type`())
+ assertEquals(ScramMechanism.SCRAM_SHA_256, credentialList.head.mechanism())
+ assertEquals(4096, credentialList.head.iterations())
+ assertEquals(ScramMechanism.SCRAM_SHA_512, credentialList(1).mechanism())
+ assertEquals(8192, credentialList(1).iterations())
+ }
Review Comment:
Also, `describeUserScramCredentials(List<String> users)` needs IT
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -94,6 +95,438 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
super.tearDown()
}
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeUserScramCredentials(quorum: String): Unit = {
+ client = createAdminClient
+
+ // add a new user
+ val targetUserName: String = "tom"
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion(targetUserName, new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+ )).all.get
+ TestUtils.waitUntilTrue(() =>
{client.describeUserScramCredentials().all().get().size() == 1},
+ "Add one user scram credential timeout")
+ val result = client.describeUserScramCredentials().all().get()
+ result.forEach((userName, scramDescription) => {
+ assertEquals(targetUserName, userName)
+ assertEquals(targetUserName, scramDescription.name())
+ val credentialInfos = scramDescription.credentialInfos()
+ assertEquals(1, credentialInfos.size())
+ assertEquals(ScramMechanism.SCRAM_SHA_256,
credentialInfos.get(0).mechanism())
+ assertEquals(4096, credentialInfos.get(0).iterations())
+ })
+
+ // add other users
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion("tom2", new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456"),
+ new UserScramCredentialUpsertion("tom3", new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+ )).all().get
+ TestUtils.waitUntilTrue(() =>
{client.describeUserScramCredentials().all().get().size() == 3},
+ "Add user scram credential timeout")
+ assertEquals(3, client.describeUserScramCredentials().all().get().size())
+
+ // alter user info
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion(targetUserName, new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_512, 8192), "123456")
+ )).all.get
+ TestUtils.waitUntilTrue(() => {
+ val userTomResult = client.describeUserScramCredentials().all().get()
+ val userScramCredential = userTomResult.get(targetUserName)
+ val credentialInfos = userScramCredential.credentialInfos()
+ credentialInfos.size() == 2
+ }, "Alter user scram credential timeout")
+ val userTomResult = client.describeUserScramCredentials().all().get()
+ assertEquals(3, userTomResult.size())
+ val userScramCredential = userTomResult.get(targetUserName)
+ assertEquals(targetUserName, userScramCredential.name())
+ val credentialInfos = userScramCredential.credentialInfos()
+ assertEquals(2, credentialInfos.size())
+ val credentialList = credentialInfos.asScala.sortBy(s =>
s.mechanism().`type`())
+ assertEquals(ScramMechanism.SCRAM_SHA_256, credentialList.head.mechanism())
+ assertEquals(4096, credentialList.head.iterations())
+ assertEquals(ScramMechanism.SCRAM_SHA_512, credentialList(1).mechanism())
+ assertEquals(8192, credentialList(1).iterations())
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeProducers(quorum: String): Unit = {
+ client = createAdminClient
+ val tp = new TopicPartition("topic1", 0)
+ client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1,
1.toShort))).all().get()
+
+ def appendCommonRecords = (records: Int) => {
+ val producer = new
KafkaProducer(Collections.singletonMap(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
+ plaintextBootstrapServers(brokers).asInstanceOf[Object]), new
ByteArraySerializer, new ByteArraySerializer)
+ try {
+ (0 until records).foreach(i =>
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), i.toString.getBytes,
i.toString.getBytes())))
+ } finally producer.close()
+ }
+
+ def appendTransactionRecords(transactionId: String, records: Int, commit:
Boolean): KafkaProducer[Array[Byte], Array[Byte]] = {
+ val producer = TestUtils.createTransactionalProducer(transactionId,
brokers)
+ producer.initTransactions()
+ producer.beginTransaction()
+ (0 until records).foreach(i =>
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), i.toString.getBytes,
i.toString.getBytes())))
+ producer.flush()
+ if (commit) {
+ producer.commitTransaction()
+ producer.close()
+ }
+
+ producer
+ }
+
+ def consumeToExpectedNumber = (expectedNumber: Int) => {
+ val configs = new util.HashMap[String, Object]()
+ configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
plaintextBootstrapServers(brokers))
+ configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_COMMITTED.toString)
+ val consumer = new KafkaConsumer(configs, new ByteArrayDeserializer, new
ByteArrayDeserializer)
+ try {
+ consumer.assign(Collections.singleton(tp))
+ consumer.seekToBeginning(Collections.singleton(tp))
+ var consumeNum = 0
+ while (consumeNum < expectedNumber) {
+ val records = consumer.poll(time.Duration.ofMillis(100))
+ consumeNum += records.count()
+ }
+ } finally consumer.close()
+ }
+
+ def queryProducerDetail() =
client.describeProducers(Collections.singletonList(tp))
+ .partitionResult(tp).get().activeProducers().asScala
+
+ // send common msg
+ appendCommonRecords(1)
+ val producerIterator = queryProducerDetail()
+ assertEquals(1, producerIterator.size)
+ val producerState = producerIterator.last
+ assertEquals(0, producerState.producerEpoch())
+ assertFalse(producerState.coordinatorEpoch().isPresent)
+ assertFalse(producerState.currentTransactionStartOffset().isPresent)
+
+
+ // send committed transaction msg
+ appendTransactionRecords("foo", 2, commit = true)
+ // consume 3 records to ensure transaction finished
+ consumeToExpectedNumber(3)
+ val transactionProducerIterator = queryProducerDetail()
+ assertEquals(2, transactionProducerIterator.size)
+ val containsCoordinatorEpochIterator = transactionProducerIterator
+ .filter(producer => producer.coordinatorEpoch().isPresent)
+ assertEquals(1, containsCoordinatorEpochIterator.size)
+ val transactionProducerState = containsCoordinatorEpochIterator.last
+
assertFalse(transactionProducerState.currentTransactionStartOffset().isPresent)
+
+
+ // send ongoing transaction msg
+ val ongoingProducer = appendTransactionRecords("foo3", 3, commit = false)
+ try {
+ val transactionNoneCommitProducerIterator = queryProducerDetail()
+ assertEquals(3, transactionNoneCommitProducerIterator.size)
+ val containsOngoingIterator = transactionNoneCommitProducerIterator
+ .filter(producer => producer.currentTransactionStartOffset().isPresent)
+ assertEquals(1, containsOngoingIterator.size)
+ val ongoingTransactionProducerState = containsOngoingIterator.last
+ // we send (1 common msg) + (2 transaction msg) + (1 transaction marker
msg), so transactionStartOffset is 4
+ assertEquals(4,
ongoingTransactionProducerState.currentTransactionStartOffset().getAsLong)
+ } finally ongoingProducer.close()
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeTransactions(quorum: String): Unit = {
+ client = createAdminClient
+ val tp = new TopicPartition("topic1", 0)
+ client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1,
1.toShort))).all().get()
+
+ var transactionId = "foo"
+
+ def consumeToExpectedNumber = (expectedNumber: Int) => {
+ val configs = new util.HashMap[String, Object]()
+ configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
plaintextBootstrapServers(brokers))
+ configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_COMMITTED.toString)
+ val consumer = new KafkaConsumer(configs, new ByteArrayDeserializer, new
ByteArrayDeserializer)
+ try {
+ consumer.assign(Collections.singleton(tp))
+ consumer.seekToBeginning(Collections.singleton(tp))
+ var consumeNum = 0
+ while (consumeNum < expectedNumber) {
+ val records = consumer.poll(time.Duration.ofMillis(100))
+ consumeNum += records.count()
+ }
+ } finally consumer.close()
+ }
+
+ def transactionState(): TransactionState = {
+
client.describeTransactions(Collections.singleton(transactionId)).description(transactionId).get().state()
+ }
+
+ def findCoordinatorIdByTransactionId(transactionId: String): Int = {
+ // calculate the transaction partition id
+ val transactionPartitionId = Utils.abs(transactionId.hashCode) % 50
+ val transactionTopic =
client.describeTopics(Collections.singleton(Topic.TRANSACTION_STATE_TOPIC_NAME))
+ val partitionList =
transactionTopic.allTopicNames().get().get(Topic.TRANSACTION_STATE_TOPIC_NAME).partitions()
+ val partition = partitionList.asScala.filter(tp => tp.partition() ==
transactionPartitionId).head
+ partition.leader().id()
+ }
+
+ // normal commit case
+ val producer = TestUtils.createTransactionalProducer(transactionId,
brokers)
+ try {
+ // init, the transaction is not begin, so
TransactionalIdNotFoundException is expected
+ val exception = assertThrows(classOf[ExecutionException], () =>
transactionState())
+ assertInstanceOf(classOf[TransactionalIdNotFoundException],
exception.getCause)
+
+ producer.initTransactions()
+ assertEquals(TransactionState.EMPTY, transactionState())
+ producer.beginTransaction()
+ assertEquals(TransactionState.EMPTY, transactionState())
+
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+ producer.flush()
+ assertEquals(TransactionState.ONGOING, transactionState())
+ producer.commitTransaction()
+
+
+ val transactionResult =
client.describeTransactions(Collections.singleton(transactionId))
+ .description(transactionId).get()
+ assertEquals(findCoordinatorIdByTransactionId(transactionId),
transactionResult.coordinatorId())
+ assertEquals(0, transactionResult.producerId())
+ assertEquals(0, transactionResult.producerEpoch())
+ assertEquals(1, transactionResult.topicPartitions().size())
+ assertEquals(tp, transactionResult.topicPartitions().asScala.head)
+
+
+ val state = transactionState()
+ // Either PREPARE_COMMIT or COMPLETE_COMMIT is expected
+ assertTrue(state == TransactionState.PREPARE_COMMIT || state ==
TransactionState.COMPLETE_COMMIT)
+ // producer commit transaction, but maybe transaction coordinator has
not been submitted mark msg
+ // so we start up a consumer and consume the expected number of msg, to
ensure transaction committed
+ consumeToExpectedNumber(1)
+ assertEquals(TransactionState.COMPLETE_COMMIT, transactionState())
+ } finally producer.close()
+
+ // abort case
+ transactionId = "foo2"
+ val abortProducer = TestUtils.createTransactionalProducer(transactionId,
brokers)
+ try {
+ // init, the transaction is not begin, so
TransactionalIdNotFoundException is expected
+ val exception = assertThrows(classOf[ExecutionException], () =>
transactionState())
+
assertTrue(exception.getCause.isInstanceOf[TransactionalIdNotFoundException])
+
+ abortProducer.initTransactions()
+ assertEquals(TransactionState.EMPTY, transactionState())
+ abortProducer.beginTransaction()
+ assertEquals(TransactionState.EMPTY, transactionState())
+
+ val transactionResult =
client.describeTransactions(Collections.singleton(transactionId))
+ .description(transactionId).get()
+ assertEquals(findCoordinatorIdByTransactionId(transactionId),
transactionResult.coordinatorId())
+ assertEquals(0, transactionResult.topicPartitions().size())
+
+ abortProducer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+ abortProducer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k2".getBytes, "v2".getBytes()))
+ abortProducer.flush()
+
+ val transactionSendMsgResult =
client.describeTransactions(Collections.singleton(transactionId))
+ .description(transactionId).get()
+ assertEquals(findCoordinatorIdByTransactionId(transactionId),
transactionSendMsgResult.coordinatorId())
+ assertEquals(1, transactionSendMsgResult.topicPartitions().size())
+ assertEquals(tp, transactionSendMsgResult.topicPartitions().asScala.head)
+
+ assertEquals(TransactionState.ONGOING, transactionState())
+ abortProducer.abortTransaction()
+ val state = transactionState()
+ assertTrue(state == TransactionState.PREPARE_ABORT || state ==
TransactionState.COMPLETE_ABORT)
+ // producer commit transaction, but maybe transaction coordinator has
not been submitted mark msg
+ // so we start up a consumer and consume the expected number of msg, to
ensure transaction committed
+ consumeToExpectedNumber(1)
+ assertEquals(TransactionState.COMPLETE_ABORT, transactionState())
+ } finally abortProducer.close()
+ }
+
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeTransactionsTimeoutMs(quorum: String): Unit = {
+ client = createAdminClient
+ val tp = new TopicPartition("topic1", 0)
+ client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1,
1.toShort))).all().get()
+
+ val transactionId = "foo"
+
+ val producer = TestUtils.createTransactionalProducer(transactionId,
brokers)
+ try {
+ producer.initTransactions()
+ producer.beginTransaction()
+
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+ producer.flush()
+ producer.commitTransaction()
+
+
+ val exception = assertThrows(classOf[ExecutionException], () => {
+ val timeoutOptions = new DescribeTransactionsOptions()
+ timeoutOptions.timeoutMs(0)
+ client.describeTransactions(Collections.singleton(transactionId),
timeoutOptions)
+ .description(transactionId).get()
+ })
+ assertInstanceOf(classOf[TimeoutException], exception.getCause)
+ } finally producer.close()
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testAbortTransactionTimeout(quorum: String): Unit = {
+ client = createAdminClient
+ val tp = new TopicPartition("topic1", 0)
+ client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1,
1.toShort))).all().get()
+
+ val producer = TestUtils.createTransactionalProducer("foo", brokers)
+ try {
+ producer.initTransactions()
+ producer.beginTransaction()
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+ producer.flush()
+ producer.commitTransaction()
+
+ producer.beginTransaction()
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k2".getBytes, "v2".getBytes()))
+ producer.flush()
+
+ val transactionalProducer =
client.describeProducers(Collections.singletonList(tp))
+
.partitionResult(tp).get().activeProducers().asScala.minBy(_.producerId())
+
+ val exception = assertThrows(classOf[ExecutionException], () => {
+ val options = new AbortTransactionOptions()
+ options.timeoutMs(0)
+ client.abortTransaction(
+ new AbortTransactionSpec(tp,
+ transactionalProducer.producerId(),
+ transactionalProducer.producerEpoch().toShort,
+ transactionalProducer.coordinatorEpoch().getAsInt),
options).all().get()
+ })
+ assertInstanceOf(classOf[TimeoutException], exception.getCause)
+ } finally producer.close()
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testListTransactions(quorum: String): Unit = {
+ val tp = new TopicPartition("topic1", 0)
+ def createTransactionList(): Unit = {
+ client = createAdminClient
+ client.createTopics(Collections.singletonList(new NewTopic(tp.topic(),
1, 1.toShort))).all().get()
+
+ def consumeToExpectedNumber = (expectedNumber: Int) => {
+ val configs = new util.HashMap[String, Object]()
+ configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
plaintextBootstrapServers(brokers))
+ configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_COMMITTED.toString)
+ val consumer = new KafkaConsumer(configs, new ByteArrayDeserializer,
new ByteArrayDeserializer)
+ try {
+ consumer.assign(Collections.singleton(tp))
+ consumer.seekToBeginning(Collections.singleton(tp))
+ var consumeNum = 0
+ while (consumeNum < expectedNumber) {
+ val records = consumer.poll(time.Duration.ofMillis(100))
+ consumeNum += records.count()
+ }
+ } finally consumer.close()
+ }
+
+ val producer = TestUtils.createTransactionalProducer("foo", brokers)
+ try {
+ producer.initTransactions()
+ producer.beginTransaction()
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+ producer.flush()
+ producer.commitTransaction()
+ } finally producer.close()
+
+ val producer2 = TestUtils.createTransactionalProducer("foo2", brokers)
+ try {
+ producer2.initTransactions()
+ producer2.beginTransaction()
+ producer2.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+ producer2.flush()
+ producer2.abortTransaction()
+ } finally producer2.close()
+
+ val producer3 = TestUtils.createTransactionalProducer("foo3", brokers)
+ try {
+ producer3.initTransactions()
+ producer3.beginTransaction()
+ producer3.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+ producer3.flush()
+ producer3.commitTransaction()
+ } finally producer3.close()
+
+ consumeToExpectedNumber(2)
+ }
+
+ createTransactionList()
+
+ val result = client.listTransactions().all().get()
+ assertEquals(3, result.size())
+
+ var options = new ListTransactionsOptions()
+ val commitStates: util.Collection[TransactionState] = new
util.ArrayList[TransactionState]()
+ commitStates.add(TransactionState.COMPLETE_COMMIT)
+ options.filterStates(commitStates)
+ assertEquals(2, client.listTransactions(options).all().get().size())
+
+ options = new ListTransactionsOptions()
+ val abortStates: util.Collection[TransactionState] = new
util.ArrayList[TransactionState]()
+ abortStates.add(TransactionState.COMPLETE_ABORT)
+ options.filterStates(abortStates)
+ assertEquals(1, client.listTransactions(options).all().get().size())
+
+ options = new ListTransactionsOptions()
+ val producerIds: util.Collection[java.lang.Long] = new
util.ArrayList[java.lang.Long]()
+ producerIds.add(0L)
+ options.filterProducerIds(producerIds)
+ assertEquals(1, client.listTransactions(options).all().get().size())
+
+ // ensure all transaction's txnStartTimestamp >= 500
+ options = new ListTransactionsOptions()
+ Thread.sleep(501)
+ options.filterOnDuration(500)
+ assertEquals(3, client.listTransactions(options).all().get().size())
Review Comment:
```scala
assertEquals(3, client.listTransactions(new
ListTransactionsOptions().filterOnDuration(500)).all().get().size())
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]