chia7712 commented on code in PR #16652:
URL: https://github.com/apache/kafka/pull/16652#discussion_r1697491704
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -94,6 +95,380 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
super.tearDown()
}
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeUserScramCredentials(quorum: String): Unit = {
+ client = createAdminClient
+
+ // add a new user
+ val targetUserName = "tom"
+ client.alterUserScramCredentials(Collections.singletonList(
+ new UserScramCredentialUpsertion(targetUserName, new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+ )).all.get
+ TestUtils.waitUntilTrue(() =>
client.describeUserScramCredentials().all().get().size() == 1,
+ "Add one user scram credential timeout")
+
+ val result = client.describeUserScramCredentials().all().get()
+ result.forEach((userName, scramDescription) => {
+ assertEquals(targetUserName, userName)
+ assertEquals(targetUserName, scramDescription.name())
+ val credentialInfos = scramDescription.credentialInfos()
+ assertEquals(1, credentialInfos.size())
+ assertEquals(ScramMechanism.SCRAM_SHA_256,
credentialInfos.get(0).mechanism())
+ assertEquals(4096, credentialInfos.get(0).iterations())
+ })
+
+ // add other users
+ client.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion("tom2", new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456"),
+ new UserScramCredentialUpsertion("tom3", new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+ )).all().get
+ TestUtils.waitUntilTrue(() =>
client.describeUserScramCredentials().all().get().size() == 3,
+ "Add user scram credential timeout")
+
+ // alter user info
+ client.alterUserScramCredentials(Collections.singletonList(
+ new UserScramCredentialUpsertion(targetUserName, new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_512, 8192), "123456")
+ )).all.get
+ TestUtils.waitUntilTrue(() => {
+
client.describeUserScramCredentials().all().get().get(targetUserName).credentialInfos().size()
== 2
+ }, "Alter user scram credential timeout")
+
+ val userTomResult = client.describeUserScramCredentials().all().get()
+ assertEquals(3, userTomResult.size())
+ val userScramCredential = userTomResult.get(targetUserName)
+ assertEquals(targetUserName, userScramCredential.name())
+ val credentialInfos = userScramCredential.credentialInfos()
+ assertEquals(2, credentialInfos.size())
+ val credentialList = credentialInfos.asScala.sortBy(s =>
s.mechanism().`type`())
+ assertEquals(ScramMechanism.SCRAM_SHA_256, credentialList.head.mechanism())
+ assertEquals(4096, credentialList.head.iterations())
+ assertEquals(ScramMechanism.SCRAM_SHA_512, credentialList(1).mechanism())
+ assertEquals(8192, credentialList(1).iterations())
+
+ // test describeUserScramCredentials(List<String> users)
+ val userAndScramMap =
client.describeUserScramCredentials(Collections.singletonList("tom2")).all().get()
+ assertEquals(1, userAndScramMap.size())
+ val scram = userAndScramMap.get("tom2")
+ assertNotNull(scram)
+ val credentialInfo = scram.credentialInfos().get(0)
+ assertEquals(ScramMechanism.SCRAM_SHA_256, credentialInfo.mechanism())
+ assertEquals(4096, credentialInfo.iterations())
+
+ // test describeUserScramCredentials(List<String> users,
DescribeUserScramCredentialsOptions options)
+ val exception = assertThrows(classOf[ExecutionException], () => {
+ client.describeUserScramCredentials(Collections.singletonList("tom4"),
+ new DescribeUserScramCredentialsOptions().timeoutMs(0)).all().get()
+ })
+ assertInstanceOf(classOf[TimeoutException], exception.getCause)
+ }
+
+ private def consumeToExpectedNumber = (expectedNumber: Int) => {
+ val configs = new util.HashMap[String, Object]()
+ configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
plaintextBootstrapServers(brokers))
+ configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_COMMITTED.toString)
+ val consumer = new KafkaConsumer(configs, new ByteArrayDeserializer, new
ByteArrayDeserializer)
+ try {
+ consumer.assign(Collections.singleton(topicPartition))
+ consumer.seekToBeginning(Collections.singleton(topicPartition))
+ var consumeNum = 0
+ TestUtils.waitUntilTrue(() => {
+ val records = consumer.poll(time.Duration.ofMillis(100))
+ consumeNum += records.count()
+ consumeNum >= expectedNumber
+ }, "consumeToExpectedNumber timeout")
+ } finally consumer.close()
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeProducers(quorum: String): Unit = {
+ client = createAdminClient
+ client.createTopics(Collections.singletonList(new NewTopic(topic, 1,
1.toShort))).all().get()
+
+ def appendCommonRecords = (records: Int) => {
+ val producer = new
KafkaProducer(Collections.singletonMap(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
+ plaintextBootstrapServers(brokers).asInstanceOf[Object]), new
ByteArraySerializer, new ByteArraySerializer)
+ try {
+ (0 until records).foreach(i =>
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ topic, partition, i.toString.getBytes, i.toString.getBytes())))
+ } finally producer.close()
+ }
+
+ def appendTransactionRecords(transactionId: String, records: Int, commit:
Boolean): KafkaProducer[Array[Byte], Array[Byte]] = {
+ val producer = TestUtils.createTransactionalProducer(transactionId,
brokers)
+ producer.initTransactions()
+ producer.beginTransaction()
+ (0 until records).foreach(i =>
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+ topic, partition, i.toString.getBytes, i.toString.getBytes())))
+ producer.flush()
+ if (commit) {
+ producer.commitTransaction()
+ producer.close()
+ }
+
+ producer
+ }
+
+ def queryProducerDetail() = client
+ .describeProducers(Collections.singletonList(topicPartition))
+ .partitionResult(topicPartition).get().activeProducers().asScala
+
+ // send common msg
+ appendCommonRecords(1)
+ val producerIterator = queryProducerDetail()
+ assertEquals(1, producerIterator.size)
+ val producerState = producerIterator.last
+ assertEquals(0, producerState.producerEpoch())
+ assertFalse(producerState.coordinatorEpoch().isPresent)
+ assertFalse(producerState.currentTransactionStartOffset().isPresent)
+
+
+ // send committed transaction msg
+ appendTransactionRecords("foo", 2, commit = true)
+ // consume 3 records to ensure transaction finished
+ consumeToExpectedNumber(3)
+ val transactionProducerIterator = queryProducerDetail()
+ assertEquals(2, transactionProducerIterator.size)
+ val containsCoordinatorEpochIterator = transactionProducerIterator
+ .filter(producer => producer.coordinatorEpoch().isPresent)
+ assertEquals(1, containsCoordinatorEpochIterator.size)
+ val transactionProducerState = containsCoordinatorEpochIterator.last
+
assertFalse(transactionProducerState.currentTransactionStartOffset().isPresent)
+
+
+ // send ongoing transaction msg
+ val ongoingProducer = appendTransactionRecords("foo3", 3, commit = false)
+ try {
+ val transactionNoneCommitProducerIterator = queryProducerDetail()
+ assertEquals(3, transactionNoneCommitProducerIterator.size)
+ val containsOngoingIterator = transactionNoneCommitProducerIterator
+ .filter(producer => producer.currentTransactionStartOffset().isPresent)
+ assertEquals(1, containsOngoingIterator.size)
+ val ongoingTransactionProducerState = containsOngoingIterator.last
+ // we send (1 common msg) + (2 transaction msg) + (1 transaction marker
msg), so transactionStartOffset is 4
+ assertEquals(4,
ongoingTransactionProducerState.currentTransactionStartOffset().getAsLong)
+ } finally ongoingProducer.close()
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeTransactions(quorum: String): Unit = {
+ client = createAdminClient
+ client.createTopics(Collections.singletonList(new NewTopic(topic, 1,
1.toShort))).all().get()
+
+ var transactionId = "foo"
+
+ def transactionState(): TransactionState = {
+
client.describeTransactions(Collections.singleton(transactionId)).description(transactionId).get().state()
+ }
+
+ def findCoordinatorIdByTransactionId(transactionId: String): Int = {
+ // calculate the transaction partition id
+ val transactionPartitionId = Utils.abs(transactionId.hashCode) % 50
Review Comment:
Could you please replace the "50" by the true number of partitions?
Otherwise, it will get failed if we change the partitions of transaction topic
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]