chia7712 commented on code in PR #16652:
URL: https://github.com/apache/kafka/pull/16652#discussion_r1687852197


##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -94,6 +95,438 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     super.tearDown()
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeUserScramCredentials(quorum: String): Unit = {
+    client = createAdminClient
+
+    // add a new user
+    val targetUserName: String = "tom"

Review Comment:
   we don't need to declare the type in scala world.



##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -94,6 +95,438 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     super.tearDown()
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeUserScramCredentials(quorum: String): Unit = {
+    client = createAdminClient
+
+    // add a new user
+    val targetUserName: String = "tom"
+    client.alterUserScramCredentials(util.Arrays.asList(
+      new UserScramCredentialUpsertion(targetUserName, new 
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+    )).all.get
+    TestUtils.waitUntilTrue(() => 
{client.describeUserScramCredentials().all().get().size() == 1},
+      "Add one user scram credential timeout")
+    val result = client.describeUserScramCredentials().all().get()
+    result.forEach((userName, scramDescription) => {
+      assertEquals(targetUserName, userName)
+      assertEquals(targetUserName, scramDescription.name())
+      val credentialInfos = scramDescription.credentialInfos()
+      assertEquals(1, credentialInfos.size())
+      assertEquals(ScramMechanism.SCRAM_SHA_256, 
credentialInfos.get(0).mechanism())
+      assertEquals(4096, credentialInfos.get(0).iterations())
+    })
+
+    // add other users
+    client.alterUserScramCredentials(util.Arrays.asList(
+      new UserScramCredentialUpsertion("tom2", new 
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456"),
+      new UserScramCredentialUpsertion("tom3", new 
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+    )).all().get
+    TestUtils.waitUntilTrue(() => 
{client.describeUserScramCredentials().all().get().size() == 3},
+      "Add user scram credential timeout")
+    assertEquals(3, client.describeUserScramCredentials().all().get().size())

Review Comment:
   we don't need this assert as line#125 is good enough



##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -94,6 +95,438 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     super.tearDown()
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeUserScramCredentials(quorum: String): Unit = {
+    client = createAdminClient
+
+    // add a new user
+    val targetUserName: String = "tom"
+    client.alterUserScramCredentials(util.Arrays.asList(
+      new UserScramCredentialUpsertion(targetUserName, new 
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+    )).all.get
+    TestUtils.waitUntilTrue(() => 
{client.describeUserScramCredentials().all().get().size() == 1},

Review Comment:
   `() => client.describeUserScramCredentials().all().get().size() == 1`



##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -94,6 +95,438 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     super.tearDown()
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeUserScramCredentials(quorum: String): Unit = {
+    client = createAdminClient
+
+    // add a new user
+    val targetUserName: String = "tom"
+    client.alterUserScramCredentials(util.Arrays.asList(
+      new UserScramCredentialUpsertion(targetUserName, new 
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+    )).all.get
+    TestUtils.waitUntilTrue(() => 
{client.describeUserScramCredentials().all().get().size() == 1},
+      "Add one user scram credential timeout")
+    val result = client.describeUserScramCredentials().all().get()
+    result.forEach((userName, scramDescription) => {
+      assertEquals(targetUserName, userName)
+      assertEquals(targetUserName, scramDescription.name())
+      val credentialInfos = scramDescription.credentialInfos()
+      assertEquals(1, credentialInfos.size())
+      assertEquals(ScramMechanism.SCRAM_SHA_256, 
credentialInfos.get(0).mechanism())
+      assertEquals(4096, credentialInfos.get(0).iterations())
+    })
+
+    // add other users
+    client.alterUserScramCredentials(util.Arrays.asList(
+      new UserScramCredentialUpsertion("tom2", new 
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456"),
+      new UserScramCredentialUpsertion("tom3", new 
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+    )).all().get
+    TestUtils.waitUntilTrue(() => 
{client.describeUserScramCredentials().all().get().size() == 3},
+      "Add user scram credential timeout")
+    assertEquals(3, client.describeUserScramCredentials().all().get().size())
+
+    // alter user info
+    client.alterUserScramCredentials(util.Arrays.asList(
+      new UserScramCredentialUpsertion(targetUserName, new 
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_512, 8192), "123456")
+    )).all.get
+    TestUtils.waitUntilTrue(() => {
+      val userTomResult = client.describeUserScramCredentials().all().get()
+      val userScramCredential = userTomResult.get(targetUserName)
+      val credentialInfos = userScramCredential.credentialInfos()
+      credentialInfos.size() == 2
+    }, "Alter user scram credential timeout")
+    val userTomResult = client.describeUserScramCredentials().all().get()
+    assertEquals(3, userTomResult.size())
+    val userScramCredential = userTomResult.get(targetUserName)
+    assertEquals(targetUserName, userScramCredential.name())
+    val credentialInfos = userScramCredential.credentialInfos()
+    assertEquals(2, credentialInfos.size())
+    val credentialList = credentialInfos.asScala.sortBy(s => 
s.mechanism().`type`())
+    assertEquals(ScramMechanism.SCRAM_SHA_256, credentialList.head.mechanism())
+    assertEquals(4096, credentialList.head.iterations())
+    assertEquals(ScramMechanism.SCRAM_SHA_512, credentialList(1).mechanism())
+    assertEquals(8192, credentialList(1).iterations())
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeProducers(quorum: String): Unit = {
+    client = createAdminClient
+    val tp = new TopicPartition("topic1", 0)
+    client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1, 
1.toShort))).all().get()
+
+    def appendCommonRecords = (records: Int) => {
+      val producer = new 
KafkaProducer(Collections.singletonMap(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
+        plaintextBootstrapServers(brokers).asInstanceOf[Object]), new 
ByteArraySerializer, new ByteArraySerializer)
+      try {
+        (0 until records).foreach(i =>
+          producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+            tp.topic(), tp.partition(), i.toString.getBytes, 
i.toString.getBytes())))
+      } finally producer.close()
+    }
+
+    def appendTransactionRecords(transactionId: String, records: Int, commit: 
Boolean): KafkaProducer[Array[Byte], Array[Byte]] = {
+      val producer = TestUtils.createTransactionalProducer(transactionId, 
brokers)
+      producer.initTransactions()
+      producer.beginTransaction()
+      (0 until records).foreach(i =>
+        producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+          tp.topic(), tp.partition(), i.toString.getBytes, 
i.toString.getBytes())))
+      producer.flush()
+      if (commit) {
+        producer.commitTransaction()
+        producer.close()
+      }
+
+      producer
+    }
+
+    def consumeToExpectedNumber = (expectedNumber: Int) => {
+      val configs = new util.HashMap[String, Object]()
+      configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
plaintextBootstrapServers(brokers))
+      configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, 
IsolationLevel.READ_COMMITTED.toString)
+      val consumer = new KafkaConsumer(configs, new ByteArrayDeserializer, new 
ByteArrayDeserializer)
+      try {
+        consumer.assign(Collections.singleton(tp))
+        consumer.seekToBeginning(Collections.singleton(tp))
+        var consumeNum = 0
+        while (consumeNum < expectedNumber) {
+          val records = consumer.poll(time.Duration.ofMillis(100))
+          consumeNum += records.count()
+        }
+      } finally consumer.close()
+    }
+
+    def queryProducerDetail() = 
client.describeProducers(Collections.singletonList(tp))
+      .partitionResult(tp).get().activeProducers().asScala
+
+    // send common msg
+    appendCommonRecords(1)
+    val producerIterator = queryProducerDetail()
+    assertEquals(1, producerIterator.size)
+    val producerState = producerIterator.last
+    assertEquals(0, producerState.producerEpoch())
+    assertFalse(producerState.coordinatorEpoch().isPresent)
+    assertFalse(producerState.currentTransactionStartOffset().isPresent)
+
+
+    // send committed transaction msg
+    appendTransactionRecords("foo", 2, commit = true)
+    // consume 3 records to ensure transaction finished
+    consumeToExpectedNumber(3)
+    val transactionProducerIterator = queryProducerDetail()
+    assertEquals(2, transactionProducerIterator.size)
+    val containsCoordinatorEpochIterator = transactionProducerIterator
+      .filter(producer => producer.coordinatorEpoch().isPresent)
+    assertEquals(1, containsCoordinatorEpochIterator.size)
+    val transactionProducerState = containsCoordinatorEpochIterator.last
+    
assertFalse(transactionProducerState.currentTransactionStartOffset().isPresent)
+
+
+    // send ongoing transaction msg
+    val ongoingProducer = appendTransactionRecords("foo3", 3, commit = false)
+    try {
+      val transactionNoneCommitProducerIterator = queryProducerDetail()
+      assertEquals(3, transactionNoneCommitProducerIterator.size)
+      val containsOngoingIterator = transactionNoneCommitProducerIterator
+        .filter(producer => producer.currentTransactionStartOffset().isPresent)
+      assertEquals(1, containsOngoingIterator.size)
+      val ongoingTransactionProducerState = containsOngoingIterator.last
+      // we send (1 common msg) + (2 transaction msg) + (1 transaction marker 
msg), so transactionStartOffset is 4
+      assertEquals(4, 
ongoingTransactionProducerState.currentTransactionStartOffset().getAsLong)
+    } finally ongoingProducer.close()
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeTransactions(quorum: String): Unit = {
+    client = createAdminClient
+    val tp = new TopicPartition("topic1", 0)
+    client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1, 
1.toShort))).all().get()
+
+    var transactionId = "foo"
+
+    def consumeToExpectedNumber = (expectedNumber: Int) => {
+      val configs = new util.HashMap[String, Object]()
+      configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
plaintextBootstrapServers(brokers))
+      configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, 
IsolationLevel.READ_COMMITTED.toString)
+      val consumer = new KafkaConsumer(configs, new ByteArrayDeserializer, new 
ByteArrayDeserializer)
+      try {
+        consumer.assign(Collections.singleton(tp))
+        consumer.seekToBeginning(Collections.singleton(tp))
+        var consumeNum = 0
+        while (consumeNum < expectedNumber) {
+          val records = consumer.poll(time.Duration.ofMillis(100))
+          consumeNum += records.count()
+        }
+      } finally consumer.close()
+    }
+
+    def transactionState(): TransactionState = {
+      
client.describeTransactions(Collections.singleton(transactionId)).description(transactionId).get().state()
+    }
+
+    def findCoordinatorIdByTransactionId(transactionId: String): Int = {
+      // calculate the transaction partition id
+      val transactionPartitionId = Utils.abs(transactionId.hashCode) % 50
+      val transactionTopic = 
client.describeTopics(Collections.singleton(Topic.TRANSACTION_STATE_TOPIC_NAME))
+      val partitionList = 
transactionTopic.allTopicNames().get().get(Topic.TRANSACTION_STATE_TOPIC_NAME).partitions()
+      val partition = partitionList.asScala.filter(tp => tp.partition() == 
transactionPartitionId).head
+      partition.leader().id()
+    }
+
+    // normal commit case
+    val producer = TestUtils.createTransactionalProducer(transactionId, 
brokers)
+    try {
+      // init, the transaction is not begin, so 
TransactionalIdNotFoundException is expected
+      val exception = assertThrows(classOf[ExecutionException], () => 
transactionState())
+      assertInstanceOf(classOf[TransactionalIdNotFoundException], 
exception.getCause)
+
+      producer.initTransactions()
+      assertEquals(TransactionState.EMPTY, transactionState())
+      producer.beginTransaction()
+      assertEquals(TransactionState.EMPTY, transactionState())
+
+      producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+        tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+      producer.flush()
+      assertEquals(TransactionState.ONGOING, transactionState())
+      producer.commitTransaction()
+
+
+      val transactionResult = 
client.describeTransactions(Collections.singleton(transactionId))
+        .description(transactionId).get()
+      assertEquals(findCoordinatorIdByTransactionId(transactionId), 
transactionResult.coordinatorId())
+      assertEquals(0, transactionResult.producerId())
+      assertEquals(0, transactionResult.producerEpoch())
+      assertEquals(1, transactionResult.topicPartitions().size())
+      assertEquals(tp, transactionResult.topicPartitions().asScala.head)
+
+
+      val state = transactionState()
+      // Either PREPARE_COMMIT or COMPLETE_COMMIT is expected
+      assertTrue(state == TransactionState.PREPARE_COMMIT || state == 
TransactionState.COMPLETE_COMMIT)
+      // producer commit transaction, but maybe transaction coordinator has 
not been submitted mark msg
+      // so we start up a consumer and consume the expected number of msg, to 
ensure transaction committed
+      consumeToExpectedNumber(1)
+      assertEquals(TransactionState.COMPLETE_COMMIT, transactionState())
+    } finally producer.close()
+
+    // abort case
+    transactionId = "foo2"
+    val abortProducer = TestUtils.createTransactionalProducer(transactionId, 
brokers)
+    try {
+      // init, the transaction is not begin, so 
TransactionalIdNotFoundException is expected
+      val exception = assertThrows(classOf[ExecutionException], () => 
transactionState())
+      
assertTrue(exception.getCause.isInstanceOf[TransactionalIdNotFoundException])
+
+      abortProducer.initTransactions()
+      assertEquals(TransactionState.EMPTY, transactionState())
+      abortProducer.beginTransaction()
+      assertEquals(TransactionState.EMPTY, transactionState())
+
+      val transactionResult = 
client.describeTransactions(Collections.singleton(transactionId))
+        .description(transactionId).get()
+      assertEquals(findCoordinatorIdByTransactionId(transactionId), 
transactionResult.coordinatorId())
+      assertEquals(0, transactionResult.topicPartitions().size())
+
+      abortProducer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+        tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+      abortProducer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+        tp.topic(), tp.partition(), "k2".getBytes, "v2".getBytes()))
+      abortProducer.flush()
+
+      val transactionSendMsgResult = 
client.describeTransactions(Collections.singleton(transactionId))
+        .description(transactionId).get()
+      assertEquals(findCoordinatorIdByTransactionId(transactionId), 
transactionSendMsgResult.coordinatorId())
+      assertEquals(1, transactionSendMsgResult.topicPartitions().size())
+      assertEquals(tp, transactionSendMsgResult.topicPartitions().asScala.head)
+
+      assertEquals(TransactionState.ONGOING, transactionState())
+      abortProducer.abortTransaction()
+      val state = transactionState()
+      assertTrue(state == TransactionState.PREPARE_ABORT || state == 
TransactionState.COMPLETE_ABORT)
+      // producer commit transaction, but maybe transaction coordinator has 
not been submitted mark msg
+      // so we start up a consumer and consume the expected number of msg, to 
ensure transaction committed
+      consumeToExpectedNumber(1)
+      assertEquals(TransactionState.COMPLETE_ABORT, transactionState())
+    } finally abortProducer.close()
+  }
+
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeTransactionsTimeoutMs(quorum: String): Unit = {
+    client = createAdminClient
+    val tp = new TopicPartition("topic1", 0)
+    client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1, 
1.toShort))).all().get()
+
+    val transactionId = "foo"
+
+    val producer = TestUtils.createTransactionalProducer(transactionId, 
brokers)
+    try {
+      producer.initTransactions()
+      producer.beginTransaction()
+
+      producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+        tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+      producer.flush()
+      producer.commitTransaction()
+
+
+      val exception = assertThrows(classOf[ExecutionException], () => {
+        val timeoutOptions = new DescribeTransactionsOptions()
+        timeoutOptions.timeoutMs(0)
+        client.describeTransactions(Collections.singleton(transactionId), 
timeoutOptions)
+          .description(transactionId).get()
+      })
+      assertInstanceOf(classOf[TimeoutException], exception.getCause)
+    } finally producer.close()
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testAbortTransactionTimeout(quorum: String): Unit = {
+    client = createAdminClient
+    val tp = new TopicPartition("topic1", 0)
+    client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1, 
1.toShort))).all().get()
+
+    val producer = TestUtils.createTransactionalProducer("foo", brokers)
+    try {
+      producer.initTransactions()
+      producer.beginTransaction()
+      producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+        tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+      producer.flush()
+      producer.commitTransaction()
+
+      producer.beginTransaction()
+      producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+        tp.topic(), tp.partition(), "k2".getBytes, "v2".getBytes()))
+      producer.flush()
+
+      val transactionalProducer = 
client.describeProducers(Collections.singletonList(tp))
+        
.partitionResult(tp).get().activeProducers().asScala.minBy(_.producerId())
+
+      val exception = assertThrows(classOf[ExecutionException], () => {
+        val options = new AbortTransactionOptions()
+        options.timeoutMs(0)
+        client.abortTransaction(
+          new AbortTransactionSpec(tp,
+            transactionalProducer.producerId(),
+            transactionalProducer.producerEpoch().toShort,
+            transactionalProducer.coordinatorEpoch().getAsInt), 
options).all().get()
+      })
+      assertInstanceOf(classOf[TimeoutException], exception.getCause)
+    } finally producer.close()
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testListTransactions(quorum: String): Unit = {
+    val tp = new TopicPartition("topic1", 0)
+    def createTransactionList(): Unit = {
+      client = createAdminClient
+      client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 
1, 1.toShort))).all().get()
+
+      def consumeToExpectedNumber = (expectedNumber: Int) => {
+        val configs = new util.HashMap[String, Object]()
+        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
plaintextBootstrapServers(brokers))
+        configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, 
IsolationLevel.READ_COMMITTED.toString)
+        val consumer = new KafkaConsumer(configs, new ByteArrayDeserializer, 
new ByteArrayDeserializer)
+        try {
+          consumer.assign(Collections.singleton(tp))
+          consumer.seekToBeginning(Collections.singleton(tp))
+          var consumeNum = 0
+          while (consumeNum < expectedNumber) {
+            val records = consumer.poll(time.Duration.ofMillis(100))
+            consumeNum += records.count()
+          }
+        } finally consumer.close()
+      }
+
+      val producer = TestUtils.createTransactionalProducer("foo", brokers)
+      try {
+        producer.initTransactions()
+        producer.beginTransaction()
+        producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+          tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+        producer.flush()
+        producer.commitTransaction()
+      } finally producer.close()
+
+      val producer2 = TestUtils.createTransactionalProducer("foo2", brokers)
+      try {
+        producer2.initTransactions()
+        producer2.beginTransaction()
+        producer2.send(new ProducerRecord[Array[Byte], Array[Byte]](
+          tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+        producer2.flush()
+        producer2.abortTransaction()
+      } finally producer2.close()
+
+      val producer3 = TestUtils.createTransactionalProducer("foo3", brokers)
+      try {
+        producer3.initTransactions()
+        producer3.beginTransaction()
+        producer3.send(new ProducerRecord[Array[Byte], Array[Byte]](
+          tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+        producer3.flush()
+        producer3.commitTransaction()
+      } finally producer3.close()
+
+      consumeToExpectedNumber(2)
+    }
+
+    createTransactionList()
+
+    val result = client.listTransactions().all().get()
+    assertEquals(3, result.size())
+
+    var options = new ListTransactionsOptions()
+    val commitStates: util.Collection[TransactionState] = new 
util.ArrayList[TransactionState]()
+    commitStates.add(TransactionState.COMPLETE_COMMIT)
+    options.filterStates(commitStates)
+    assertEquals(2, client.listTransactions(options).all().get().size())

Review Comment:
   ```scala
         assertEquals(2, client.listTransactions(new 
ListTransactionsOptions().filterStates(Collections.singletonList(state)))
           .all().get().size())
   ```



##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -94,6 +95,438 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     super.tearDown()
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeUserScramCredentials(quorum: String): Unit = {
+    client = createAdminClient
+
+    // add a new user
+    val targetUserName: String = "tom"
+    client.alterUserScramCredentials(util.Arrays.asList(
+      new UserScramCredentialUpsertion(targetUserName, new 
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+    )).all.get
+    TestUtils.waitUntilTrue(() => 
{client.describeUserScramCredentials().all().get().size() == 1},
+      "Add one user scram credential timeout")
+    val result = client.describeUserScramCredentials().all().get()
+    result.forEach((userName, scramDescription) => {
+      assertEquals(targetUserName, userName)
+      assertEquals(targetUserName, scramDescription.name())
+      val credentialInfos = scramDescription.credentialInfos()
+      assertEquals(1, credentialInfos.size())
+      assertEquals(ScramMechanism.SCRAM_SHA_256, 
credentialInfos.get(0).mechanism())
+      assertEquals(4096, credentialInfos.get(0).iterations())
+    })
+
+    // add other users
+    client.alterUserScramCredentials(util.Arrays.asList(
+      new UserScramCredentialUpsertion("tom2", new 
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456"),
+      new UserScramCredentialUpsertion("tom3", new 
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+    )).all().get
+    TestUtils.waitUntilTrue(() => 
{client.describeUserScramCredentials().all().get().size() == 3},
+      "Add user scram credential timeout")
+    assertEquals(3, client.describeUserScramCredentials().all().get().size())
+
+    // alter user info
+    client.alterUserScramCredentials(util.Arrays.asList(
+      new UserScramCredentialUpsertion(targetUserName, new 
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_512, 8192), "123456")
+    )).all.get
+    TestUtils.waitUntilTrue(() => {
+      val userTomResult = client.describeUserScramCredentials().all().get()
+      val userScramCredential = userTomResult.get(targetUserName)
+      val credentialInfos = userScramCredential.credentialInfos()
+      credentialInfos.size() == 2
+    }, "Alter user scram credential timeout")
+    val userTomResult = client.describeUserScramCredentials().all().get()
+    assertEquals(3, userTomResult.size())
+    val userScramCredential = userTomResult.get(targetUserName)
+    assertEquals(targetUserName, userScramCredential.name())
+    val credentialInfos = userScramCredential.credentialInfos()
+    assertEquals(2, credentialInfos.size())
+    val credentialList = credentialInfos.asScala.sortBy(s => 
s.mechanism().`type`())
+    assertEquals(ScramMechanism.SCRAM_SHA_256, credentialList.head.mechanism())
+    assertEquals(4096, credentialList.head.iterations())
+    assertEquals(ScramMechanism.SCRAM_SHA_512, credentialList(1).mechanism())
+    assertEquals(8192, credentialList(1).iterations())
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeProducers(quorum: String): Unit = {
+    client = createAdminClient
+    val tp = new TopicPartition("topic1", 0)
+    client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1, 
1.toShort))).all().get()
+
+    def appendCommonRecords = (records: Int) => {
+      val producer = new 
KafkaProducer(Collections.singletonMap(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
+        plaintextBootstrapServers(brokers).asInstanceOf[Object]), new 
ByteArraySerializer, new ByteArraySerializer)
+      try {
+        (0 until records).foreach(i =>
+          producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+            tp.topic(), tp.partition(), i.toString.getBytes, 
i.toString.getBytes())))
+      } finally producer.close()
+    }
+
+    def appendTransactionRecords(transactionId: String, records: Int, commit: 
Boolean): KafkaProducer[Array[Byte], Array[Byte]] = {
+      val producer = TestUtils.createTransactionalProducer(transactionId, 
brokers)
+      producer.initTransactions()
+      producer.beginTransaction()
+      (0 until records).foreach(i =>
+        producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+          tp.topic(), tp.partition(), i.toString.getBytes, 
i.toString.getBytes())))
+      producer.flush()
+      if (commit) {
+        producer.commitTransaction()
+        producer.close()
+      }
+
+      producer
+    }
+
+    def consumeToExpectedNumber = (expectedNumber: Int) => {
+      val configs = new util.HashMap[String, Object]()
+      configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
plaintextBootstrapServers(brokers))
+      configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, 
IsolationLevel.READ_COMMITTED.toString)
+      val consumer = new KafkaConsumer(configs, new ByteArrayDeserializer, new 
ByteArrayDeserializer)
+      try {
+        consumer.assign(Collections.singleton(tp))
+        consumer.seekToBeginning(Collections.singleton(tp))
+        var consumeNum = 0
+        while (consumeNum < expectedNumber) {
+          val records = consumer.poll(time.Duration.ofMillis(100))
+          consumeNum += records.count()
+        }
+      } finally consumer.close()
+    }
+
+    def queryProducerDetail() = 
client.describeProducers(Collections.singletonList(tp))
+      .partitionResult(tp).get().activeProducers().asScala
+
+    // send common msg
+    appendCommonRecords(1)
+    val producerIterator = queryProducerDetail()
+    assertEquals(1, producerIterator.size)
+    val producerState = producerIterator.last
+    assertEquals(0, producerState.producerEpoch())
+    assertFalse(producerState.coordinatorEpoch().isPresent)
+    assertFalse(producerState.currentTransactionStartOffset().isPresent)
+
+
+    // send committed transaction msg
+    appendTransactionRecords("foo", 2, commit = true)
+    // consume 3 records to ensure transaction finished
+    consumeToExpectedNumber(3)
+    val transactionProducerIterator = queryProducerDetail()
+    assertEquals(2, transactionProducerIterator.size)
+    val containsCoordinatorEpochIterator = transactionProducerIterator
+      .filter(producer => producer.coordinatorEpoch().isPresent)
+    assertEquals(1, containsCoordinatorEpochIterator.size)
+    val transactionProducerState = containsCoordinatorEpochIterator.last
+    
assertFalse(transactionProducerState.currentTransactionStartOffset().isPresent)
+
+
+    // send ongoing transaction msg
+    val ongoingProducer = appendTransactionRecords("foo3", 3, commit = false)
+    try {
+      val transactionNoneCommitProducerIterator = queryProducerDetail()
+      assertEquals(3, transactionNoneCommitProducerIterator.size)
+      val containsOngoingIterator = transactionNoneCommitProducerIterator
+        .filter(producer => producer.currentTransactionStartOffset().isPresent)
+      assertEquals(1, containsOngoingIterator.size)
+      val ongoingTransactionProducerState = containsOngoingIterator.last
+      // we send (1 common msg) + (2 transaction msg) + (1 transaction marker 
msg), so transactionStartOffset is 4
+      assertEquals(4, 
ongoingTransactionProducerState.currentTransactionStartOffset().getAsLong)
+    } finally ongoingProducer.close()
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeTransactions(quorum: String): Unit = {
+    client = createAdminClient
+    val tp = new TopicPartition("topic1", 0)
+    client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1, 
1.toShort))).all().get()
+
+    var transactionId = "foo"
+
+    def consumeToExpectedNumber = (expectedNumber: Int) => {
+      val configs = new util.HashMap[String, Object]()
+      configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
plaintextBootstrapServers(brokers))
+      configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, 
IsolationLevel.READ_COMMITTED.toString)
+      val consumer = new KafkaConsumer(configs, new ByteArrayDeserializer, new 
ByteArrayDeserializer)
+      try {
+        consumer.assign(Collections.singleton(tp))
+        consumer.seekToBeginning(Collections.singleton(tp))
+        var consumeNum = 0
+        while (consumeNum < expectedNumber) {
+          val records = consumer.poll(time.Duration.ofMillis(100))
+          consumeNum += records.count()
+        }
+      } finally consumer.close()
+    }
+
+    def transactionState(): TransactionState = {
+      
client.describeTransactions(Collections.singleton(transactionId)).description(transactionId).get().state()
+    }
+
+    def findCoordinatorIdByTransactionId(transactionId: String): Int = {
+      // calculate the transaction partition id
+      val transactionPartitionId = Utils.abs(transactionId.hashCode) % 50
+      val transactionTopic = 
client.describeTopics(Collections.singleton(Topic.TRANSACTION_STATE_TOPIC_NAME))
+      val partitionList = 
transactionTopic.allTopicNames().get().get(Topic.TRANSACTION_STATE_TOPIC_NAME).partitions()
+      val partition = partitionList.asScala.filter(tp => tp.partition() == 
transactionPartitionId).head
+      partition.leader().id()
+    }
+
+    // normal commit case
+    val producer = TestUtils.createTransactionalProducer(transactionId, 
brokers)
+    try {
+      // init, the transaction is not begin, so 
TransactionalIdNotFoundException is expected
+      val exception = assertThrows(classOf[ExecutionException], () => 
transactionState())
+      assertInstanceOf(classOf[TransactionalIdNotFoundException], 
exception.getCause)
+
+      producer.initTransactions()
+      assertEquals(TransactionState.EMPTY, transactionState())
+      producer.beginTransaction()
+      assertEquals(TransactionState.EMPTY, transactionState())
+
+      producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+        tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+      producer.flush()
+      assertEquals(TransactionState.ONGOING, transactionState())
+      producer.commitTransaction()
+
+
+      val transactionResult = 
client.describeTransactions(Collections.singleton(transactionId))
+        .description(transactionId).get()
+      assertEquals(findCoordinatorIdByTransactionId(transactionId), 
transactionResult.coordinatorId())
+      assertEquals(0, transactionResult.producerId())
+      assertEquals(0, transactionResult.producerEpoch())
+      assertEquals(1, transactionResult.topicPartitions().size())
+      assertEquals(tp, transactionResult.topicPartitions().asScala.head)
+
+
+      val state = transactionState()
+      // Either PREPARE_COMMIT or COMPLETE_COMMIT is expected
+      assertTrue(state == TransactionState.PREPARE_COMMIT || state == 
TransactionState.COMPLETE_COMMIT)
+      // producer commit transaction, but maybe transaction coordinator has 
not been submitted mark msg
+      // so we start up a consumer and consume the expected number of msg, to 
ensure transaction committed
+      consumeToExpectedNumber(1)
+      assertEquals(TransactionState.COMPLETE_COMMIT, transactionState())
+    } finally producer.close()
+
+    // abort case
+    transactionId = "foo2"
+    val abortProducer = TestUtils.createTransactionalProducer(transactionId, 
brokers)
+    try {
+      // init, the transaction is not begin, so 
TransactionalIdNotFoundException is expected
+      val exception = assertThrows(classOf[ExecutionException], () => 
transactionState())
+      
assertTrue(exception.getCause.isInstanceOf[TransactionalIdNotFoundException])
+
+      abortProducer.initTransactions()
+      assertEquals(TransactionState.EMPTY, transactionState())
+      abortProducer.beginTransaction()
+      assertEquals(TransactionState.EMPTY, transactionState())
+
+      val transactionResult = 
client.describeTransactions(Collections.singleton(transactionId))
+        .description(transactionId).get()
+      assertEquals(findCoordinatorIdByTransactionId(transactionId), 
transactionResult.coordinatorId())
+      assertEquals(0, transactionResult.topicPartitions().size())
+
+      abortProducer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+        tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+      abortProducer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+        tp.topic(), tp.partition(), "k2".getBytes, "v2".getBytes()))
+      abortProducer.flush()
+
+      val transactionSendMsgResult = 
client.describeTransactions(Collections.singleton(transactionId))
+        .description(transactionId).get()
+      assertEquals(findCoordinatorIdByTransactionId(transactionId), 
transactionSendMsgResult.coordinatorId())
+      assertEquals(1, transactionSendMsgResult.topicPartitions().size())
+      assertEquals(tp, transactionSendMsgResult.topicPartitions().asScala.head)
+
+      assertEquals(TransactionState.ONGOING, transactionState())
+      abortProducer.abortTransaction()
+      val state = transactionState()
+      assertTrue(state == TransactionState.PREPARE_ABORT || state == 
TransactionState.COMPLETE_ABORT)
+      // producer commit transaction, but maybe transaction coordinator has 
not been submitted mark msg
+      // so we start up a consumer and consume the expected number of msg, to 
ensure transaction committed
+      consumeToExpectedNumber(1)
+      assertEquals(TransactionState.COMPLETE_ABORT, transactionState())
+    } finally abortProducer.close()
+  }
+
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeTransactionsTimeoutMs(quorum: String): Unit = {
+    client = createAdminClient
+    val tp = new TopicPartition("topic1", 0)
+    client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1, 
1.toShort))).all().get()
+
+    val transactionId = "foo"
+
+    val producer = TestUtils.createTransactionalProducer(transactionId, 
brokers)
+    try {
+      producer.initTransactions()
+      producer.beginTransaction()
+
+      producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+        tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+      producer.flush()
+      producer.commitTransaction()
+
+
+      val exception = assertThrows(classOf[ExecutionException], () => {
+        val timeoutOptions = new DescribeTransactionsOptions()
+        timeoutOptions.timeoutMs(0)
+        client.describeTransactions(Collections.singleton(transactionId), 
timeoutOptions)
+          .description(transactionId).get()
+      })
+      assertInstanceOf(classOf[TimeoutException], exception.getCause)
+    } finally producer.close()
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testAbortTransactionTimeout(quorum: String): Unit = {
+    client = createAdminClient
+    val tp = new TopicPartition("topic1", 0)
+    client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1, 
1.toShort))).all().get()
+
+    val producer = TestUtils.createTransactionalProducer("foo", brokers)
+    try {
+      producer.initTransactions()
+      producer.beginTransaction()
+      producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+        tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+      producer.flush()
+      producer.commitTransaction()
+
+      producer.beginTransaction()
+      producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+        tp.topic(), tp.partition(), "k2".getBytes, "v2".getBytes()))
+      producer.flush()
+
+      val transactionalProducer = 
client.describeProducers(Collections.singletonList(tp))
+        
.partitionResult(tp).get().activeProducers().asScala.minBy(_.producerId())
+
+      val exception = assertThrows(classOf[ExecutionException], () => {
+        val options = new AbortTransactionOptions()
+        options.timeoutMs(0)
+        client.abortTransaction(
+          new AbortTransactionSpec(tp,
+            transactionalProducer.producerId(),
+            transactionalProducer.producerEpoch().toShort,
+            transactionalProducer.coordinatorEpoch().getAsInt), 
options).all().get()
+      })
+      assertInstanceOf(classOf[TimeoutException], exception.getCause)
+    } finally producer.close()
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testListTransactions(quorum: String): Unit = {
+    val tp = new TopicPartition("topic1", 0)
+    def createTransactionList(): Unit = {
+      client = createAdminClient
+      client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 
1, 1.toShort))).all().get()
+
+      def consumeToExpectedNumber = (expectedNumber: Int) => {
+        val configs = new util.HashMap[String, Object]()
+        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
plaintextBootstrapServers(brokers))
+        configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, 
IsolationLevel.READ_COMMITTED.toString)
+        val consumer = new KafkaConsumer(configs, new ByteArrayDeserializer, 
new ByteArrayDeserializer)
+        try {
+          consumer.assign(Collections.singleton(tp))
+          consumer.seekToBeginning(Collections.singleton(tp))
+          var consumeNum = 0
+          while (consumeNum < expectedNumber) {
+            val records = consumer.poll(time.Duration.ofMillis(100))
+            consumeNum += records.count()
+          }
+        } finally consumer.close()
+      }
+
+      val producer = TestUtils.createTransactionalProducer("foo", brokers)
+      try {
+        producer.initTransactions()
+        producer.beginTransaction()
+        producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+          tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+        producer.flush()
+        producer.commitTransaction()
+      } finally producer.close()
+
+      val producer2 = TestUtils.createTransactionalProducer("foo2", brokers)
+      try {
+        producer2.initTransactions()
+        producer2.beginTransaction()
+        producer2.send(new ProducerRecord[Array[Byte], Array[Byte]](
+          tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+        producer2.flush()
+        producer2.abortTransaction()
+      } finally producer2.close()
+
+      val producer3 = TestUtils.createTransactionalProducer("foo3", brokers)
+      try {
+        producer3.initTransactions()
+        producer3.beginTransaction()
+        producer3.send(new ProducerRecord[Array[Byte], Array[Byte]](
+          tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+        producer3.flush()
+        producer3.commitTransaction()
+      } finally producer3.close()
+
+      consumeToExpectedNumber(2)
+    }
+
+    createTransactionList()
+
+    val result = client.listTransactions().all().get()
+    assertEquals(3, result.size())
+
+    var options = new ListTransactionsOptions()
+    val commitStates: util.Collection[TransactionState] = new 
util.ArrayList[TransactionState]()
+    commitStates.add(TransactionState.COMPLETE_COMMIT)
+    options.filterStates(commitStates)
+    assertEquals(2, client.listTransactions(options).all().get().size())
+
+    options = new ListTransactionsOptions()

Review Comment:
   ditto



##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -94,6 +95,438 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     super.tearDown()
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeUserScramCredentials(quorum: String): Unit = {
+    client = createAdminClient
+
+    // add a new user
+    val targetUserName: String = "tom"
+    client.alterUserScramCredentials(util.Arrays.asList(
+      new UserScramCredentialUpsertion(targetUserName, new 
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+    )).all.get
+    TestUtils.waitUntilTrue(() => 
{client.describeUserScramCredentials().all().get().size() == 1},
+      "Add one user scram credential timeout")
+    val result = client.describeUserScramCredentials().all().get()
+    result.forEach((userName, scramDescription) => {
+      assertEquals(targetUserName, userName)
+      assertEquals(targetUserName, scramDescription.name())
+      val credentialInfos = scramDescription.credentialInfos()
+      assertEquals(1, credentialInfos.size())
+      assertEquals(ScramMechanism.SCRAM_SHA_256, 
credentialInfos.get(0).mechanism())
+      assertEquals(4096, credentialInfos.get(0).iterations())
+    })
+
+    // add other users
+    client.alterUserScramCredentials(util.Arrays.asList(
+      new UserScramCredentialUpsertion("tom2", new 
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456"),
+      new UserScramCredentialUpsertion("tom3", new 
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+    )).all().get
+    TestUtils.waitUntilTrue(() => 
{client.describeUserScramCredentials().all().get().size() == 3},
+      "Add user scram credential timeout")
+    assertEquals(3, client.describeUserScramCredentials().all().get().size())
+
+    // alter user info
+    client.alterUserScramCredentials(util.Arrays.asList(
+      new UserScramCredentialUpsertion(targetUserName, new 
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_512, 8192), "123456")
+    )).all.get
+    TestUtils.waitUntilTrue(() => {
+      val userTomResult = client.describeUserScramCredentials().all().get()
+      val userScramCredential = userTomResult.get(targetUserName)
+      val credentialInfos = userScramCredential.credentialInfos()
+      credentialInfos.size() == 2
+    }, "Alter user scram credential timeout")
+    val userTomResult = client.describeUserScramCredentials().all().get()
+    assertEquals(3, userTomResult.size())
+    val userScramCredential = userTomResult.get(targetUserName)
+    assertEquals(targetUserName, userScramCredential.name())
+    val credentialInfos = userScramCredential.credentialInfos()
+    assertEquals(2, credentialInfos.size())
+    val credentialList = credentialInfos.asScala.sortBy(s => 
s.mechanism().`type`())
+    assertEquals(ScramMechanism.SCRAM_SHA_256, credentialList.head.mechanism())
+    assertEquals(4096, credentialList.head.iterations())
+    assertEquals(ScramMechanism.SCRAM_SHA_512, credentialList(1).mechanism())
+    assertEquals(8192, credentialList(1).iterations())
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeProducers(quorum: String): Unit = {
+    client = createAdminClient
+    val tp = new TopicPartition("topic1", 0)
+    client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1, 
1.toShort))).all().get()
+
+    def appendCommonRecords = (records: Int) => {
+      val producer = new 
KafkaProducer(Collections.singletonMap(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
+        plaintextBootstrapServers(brokers).asInstanceOf[Object]), new 
ByteArraySerializer, new ByteArraySerializer)
+      try {
+        (0 until records).foreach(i =>
+          producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+            tp.topic(), tp.partition(), i.toString.getBytes, 
i.toString.getBytes())))
+      } finally producer.close()
+    }
+
+    def appendTransactionRecords(transactionId: String, records: Int, commit: 
Boolean): KafkaProducer[Array[Byte], Array[Byte]] = {
+      val producer = TestUtils.createTransactionalProducer(transactionId, 
brokers)
+      producer.initTransactions()
+      producer.beginTransaction()
+      (0 until records).foreach(i =>
+        producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+          tp.topic(), tp.partition(), i.toString.getBytes, 
i.toString.getBytes())))
+      producer.flush()
+      if (commit) {
+        producer.commitTransaction()
+        producer.close()
+      }
+
+      producer
+    }
+
+    def consumeToExpectedNumber = (expectedNumber: Int) => {
+      val configs = new util.HashMap[String, Object]()
+      configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
plaintextBootstrapServers(brokers))
+      configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, 
IsolationLevel.READ_COMMITTED.toString)
+      val consumer = new KafkaConsumer(configs, new ByteArrayDeserializer, new 
ByteArrayDeserializer)
+      try {
+        consumer.assign(Collections.singleton(tp))
+        consumer.seekToBeginning(Collections.singleton(tp))
+        var consumeNum = 0
+        while (consumeNum < expectedNumber) {
+          val records = consumer.poll(time.Duration.ofMillis(100))
+          consumeNum += records.count()
+        }
+      } finally consumer.close()
+    }
+
+    def queryProducerDetail() = 
client.describeProducers(Collections.singletonList(tp))
+      .partitionResult(tp).get().activeProducers().asScala
+
+    // send common msg
+    appendCommonRecords(1)
+    val producerIterator = queryProducerDetail()
+    assertEquals(1, producerIterator.size)
+    val producerState = producerIterator.last
+    assertEquals(0, producerState.producerEpoch())
+    assertFalse(producerState.coordinatorEpoch().isPresent)
+    assertFalse(producerState.currentTransactionStartOffset().isPresent)
+
+
+    // send committed transaction msg
+    appendTransactionRecords("foo", 2, commit = true)
+    // consume 3 records to ensure transaction finished
+    consumeToExpectedNumber(3)
+    val transactionProducerIterator = queryProducerDetail()
+    assertEquals(2, transactionProducerIterator.size)
+    val containsCoordinatorEpochIterator = transactionProducerIterator
+      .filter(producer => producer.coordinatorEpoch().isPresent)
+    assertEquals(1, containsCoordinatorEpochIterator.size)
+    val transactionProducerState = containsCoordinatorEpochIterator.last
+    
assertFalse(transactionProducerState.currentTransactionStartOffset().isPresent)
+
+
+    // send ongoing transaction msg
+    val ongoingProducer = appendTransactionRecords("foo3", 3, commit = false)
+    try {
+      val transactionNoneCommitProducerIterator = queryProducerDetail()
+      assertEquals(3, transactionNoneCommitProducerIterator.size)
+      val containsOngoingIterator = transactionNoneCommitProducerIterator
+        .filter(producer => producer.currentTransactionStartOffset().isPresent)
+      assertEquals(1, containsOngoingIterator.size)
+      val ongoingTransactionProducerState = containsOngoingIterator.last
+      // we send (1 common msg) + (2 transaction msg) + (1 transaction marker 
msg), so transactionStartOffset is 4
+      assertEquals(4, 
ongoingTransactionProducerState.currentTransactionStartOffset().getAsLong)
+    } finally ongoingProducer.close()
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeTransactions(quorum: String): Unit = {
+    client = createAdminClient
+    val tp = new TopicPartition("topic1", 0)
+    client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1, 
1.toShort))).all().get()
+
+    var transactionId = "foo"
+
+    def consumeToExpectedNumber = (expectedNumber: Int) => {
+      val configs = new util.HashMap[String, Object]()
+      configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
plaintextBootstrapServers(brokers))
+      configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, 
IsolationLevel.READ_COMMITTED.toString)
+      val consumer = new KafkaConsumer(configs, new ByteArrayDeserializer, new 
ByteArrayDeserializer)
+      try {
+        consumer.assign(Collections.singleton(tp))
+        consumer.seekToBeginning(Collections.singleton(tp))
+        var consumeNum = 0
+        while (consumeNum < expectedNumber) {
+          val records = consumer.poll(time.Duration.ofMillis(100))
+          consumeNum += records.count()
+        }
+      } finally consumer.close()
+    }
+
+    def transactionState(): TransactionState = {
+      
client.describeTransactions(Collections.singleton(transactionId)).description(transactionId).get().state()
+    }
+
+    def findCoordinatorIdByTransactionId(transactionId: String): Int = {
+      // calculate the transaction partition id
+      val transactionPartitionId = Utils.abs(transactionId.hashCode) % 50
+      val transactionTopic = 
client.describeTopics(Collections.singleton(Topic.TRANSACTION_STATE_TOPIC_NAME))
+      val partitionList = 
transactionTopic.allTopicNames().get().get(Topic.TRANSACTION_STATE_TOPIC_NAME).partitions()
+      val partition = partitionList.asScala.filter(tp => tp.partition() == 
transactionPartitionId).head
+      partition.leader().id()
+    }
+
+    // normal commit case
+    val producer = TestUtils.createTransactionalProducer(transactionId, 
brokers)
+    try {
+      // init, the transaction is not begin, so 
TransactionalIdNotFoundException is expected
+      val exception = assertThrows(classOf[ExecutionException], () => 
transactionState())
+      assertInstanceOf(classOf[TransactionalIdNotFoundException], 
exception.getCause)
+
+      producer.initTransactions()
+      assertEquals(TransactionState.EMPTY, transactionState())
+      producer.beginTransaction()
+      assertEquals(TransactionState.EMPTY, transactionState())
+
+      producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+        tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+      producer.flush()
+      assertEquals(TransactionState.ONGOING, transactionState())
+      producer.commitTransaction()
+
+
+      val transactionResult = 
client.describeTransactions(Collections.singleton(transactionId))
+        .description(transactionId).get()
+      assertEquals(findCoordinatorIdByTransactionId(transactionId), 
transactionResult.coordinatorId())
+      assertEquals(0, transactionResult.producerId())
+      assertEquals(0, transactionResult.producerEpoch())
+      assertEquals(1, transactionResult.topicPartitions().size())
+      assertEquals(tp, transactionResult.topicPartitions().asScala.head)
+
+
+      val state = transactionState()
+      // Either PREPARE_COMMIT or COMPLETE_COMMIT is expected
+      assertTrue(state == TransactionState.PREPARE_COMMIT || state == 
TransactionState.COMPLETE_COMMIT)
+      // producer commit transaction, but maybe transaction coordinator has 
not been submitted mark msg
+      // so we start up a consumer and consume the expected number of msg, to 
ensure transaction committed
+      consumeToExpectedNumber(1)
+      assertEquals(TransactionState.COMPLETE_COMMIT, transactionState())
+    } finally producer.close()
+
+    // abort case
+    transactionId = "foo2"
+    val abortProducer = TestUtils.createTransactionalProducer(transactionId, 
brokers)
+    try {
+      // init, the transaction is not begin, so 
TransactionalIdNotFoundException is expected
+      val exception = assertThrows(classOf[ExecutionException], () => 
transactionState())
+      
assertTrue(exception.getCause.isInstanceOf[TransactionalIdNotFoundException])
+
+      abortProducer.initTransactions()
+      assertEquals(TransactionState.EMPTY, transactionState())
+      abortProducer.beginTransaction()
+      assertEquals(TransactionState.EMPTY, transactionState())
+
+      val transactionResult = 
client.describeTransactions(Collections.singleton(transactionId))
+        .description(transactionId).get()
+      assertEquals(findCoordinatorIdByTransactionId(transactionId), 
transactionResult.coordinatorId())
+      assertEquals(0, transactionResult.topicPartitions().size())
+
+      abortProducer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+        tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+      abortProducer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+        tp.topic(), tp.partition(), "k2".getBytes, "v2".getBytes()))
+      abortProducer.flush()
+
+      val transactionSendMsgResult = 
client.describeTransactions(Collections.singleton(transactionId))
+        .description(transactionId).get()
+      assertEquals(findCoordinatorIdByTransactionId(transactionId), 
transactionSendMsgResult.coordinatorId())
+      assertEquals(1, transactionSendMsgResult.topicPartitions().size())
+      assertEquals(tp, transactionSendMsgResult.topicPartitions().asScala.head)
+
+      assertEquals(TransactionState.ONGOING, transactionState())
+      abortProducer.abortTransaction()
+      val state = transactionState()
+      assertTrue(state == TransactionState.PREPARE_ABORT || state == 
TransactionState.COMPLETE_ABORT)
+      // producer commit transaction, but maybe transaction coordinator has 
not been submitted mark msg
+      // so we start up a consumer and consume the expected number of msg, to 
ensure transaction committed
+      consumeToExpectedNumber(1)
+      assertEquals(TransactionState.COMPLETE_ABORT, transactionState())
+    } finally abortProducer.close()
+  }
+
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeTransactionsTimeoutMs(quorum: String): Unit = {
+    client = createAdminClient
+    val tp = new TopicPartition("topic1", 0)
+    client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1, 
1.toShort))).all().get()
+
+    val transactionId = "foo"
+
+    val producer = TestUtils.createTransactionalProducer(transactionId, 
brokers)
+    try {
+      producer.initTransactions()
+      producer.beginTransaction()
+
+      producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+        tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+      producer.flush()
+      producer.commitTransaction()
+
+
+      val exception = assertThrows(classOf[ExecutionException], () => {
+        val timeoutOptions = new DescribeTransactionsOptions()
+        timeoutOptions.timeoutMs(0)
+        client.describeTransactions(Collections.singleton(transactionId), 
timeoutOptions)
+          .description(transactionId).get()
+      })
+      assertInstanceOf(classOf[TimeoutException], exception.getCause)
+    } finally producer.close()
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testAbortTransactionTimeout(quorum: String): Unit = {
+    client = createAdminClient
+    val tp = new TopicPartition("topic1", 0)
+    client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1, 
1.toShort))).all().get()
+
+    val producer = TestUtils.createTransactionalProducer("foo", brokers)
+    try {
+      producer.initTransactions()
+      producer.beginTransaction()
+      producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+        tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+      producer.flush()
+      producer.commitTransaction()
+
+      producer.beginTransaction()
+      producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+        tp.topic(), tp.partition(), "k2".getBytes, "v2".getBytes()))
+      producer.flush()
+
+      val transactionalProducer = 
client.describeProducers(Collections.singletonList(tp))
+        
.partitionResult(tp).get().activeProducers().asScala.minBy(_.producerId())
+
+      val exception = assertThrows(classOf[ExecutionException], () => {
+        val options = new AbortTransactionOptions()
+        options.timeoutMs(0)
+        client.abortTransaction(
+          new AbortTransactionSpec(tp,
+            transactionalProducer.producerId(),
+            transactionalProducer.producerEpoch().toShort,
+            transactionalProducer.coordinatorEpoch().getAsInt), 
options).all().get()
+      })
+      assertInstanceOf(classOf[TimeoutException], exception.getCause)
+    } finally producer.close()
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testListTransactions(quorum: String): Unit = {
+    val tp = new TopicPartition("topic1", 0)
+    def createTransactionList(): Unit = {
+      client = createAdminClient
+      client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 
1, 1.toShort))).all().get()
+
+      def consumeToExpectedNumber = (expectedNumber: Int) => {
+        val configs = new util.HashMap[String, Object]()
+        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
plaintextBootstrapServers(brokers))
+        configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, 
IsolationLevel.READ_COMMITTED.toString)
+        val consumer = new KafkaConsumer(configs, new ByteArrayDeserializer, 
new ByteArrayDeserializer)
+        try {
+          consumer.assign(Collections.singleton(tp))
+          consumer.seekToBeginning(Collections.singleton(tp))
+          var consumeNum = 0
+          while (consumeNum < expectedNumber) {
+            val records = consumer.poll(time.Duration.ofMillis(100))
+            consumeNum += records.count()
+          }
+        } finally consumer.close()
+      }
+
+      val producer = TestUtils.createTransactionalProducer("foo", brokers)
+      try {
+        producer.initTransactions()
+        producer.beginTransaction()
+        producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+          tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+        producer.flush()
+        producer.commitTransaction()
+      } finally producer.close()
+
+      val producer2 = TestUtils.createTransactionalProducer("foo2", brokers)
+      try {
+        producer2.initTransactions()
+        producer2.beginTransaction()
+        producer2.send(new ProducerRecord[Array[Byte], Array[Byte]](
+          tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+        producer2.flush()
+        producer2.abortTransaction()
+      } finally producer2.close()
+
+      val producer3 = TestUtils.createTransactionalProducer("foo3", brokers)
+      try {
+        producer3.initTransactions()
+        producer3.beginTransaction()
+        producer3.send(new ProducerRecord[Array[Byte], Array[Byte]](
+          tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+        producer3.flush()
+        producer3.commitTransaction()
+      } finally producer3.close()
+
+      consumeToExpectedNumber(2)
+    }
+
+    createTransactionList()
+
+    val result = client.listTransactions().all().get()
+    assertEquals(3, result.size())
+
+    var options = new ListTransactionsOptions()
+    val commitStates: util.Collection[TransactionState] = new 
util.ArrayList[TransactionState]()
+    commitStates.add(TransactionState.COMPLETE_COMMIT)
+    options.filterStates(commitStates)
+    assertEquals(2, client.listTransactions(options).all().get().size())
+
+    options = new ListTransactionsOptions()
+    val abortStates: util.Collection[TransactionState] = new 
util.ArrayList[TransactionState]()
+    abortStates.add(TransactionState.COMPLETE_ABORT)
+    options.filterStates(abortStates)
+    assertEquals(1, client.listTransactions(options).all().get().size())
+
+    options = new ListTransactionsOptions()

Review Comment:
   ditto



##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -94,6 +95,438 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     super.tearDown()
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeUserScramCredentials(quorum: String): Unit = {
+    client = createAdminClient
+
+    // add a new user
+    val targetUserName: String = "tom"
+    client.alterUserScramCredentials(util.Arrays.asList(
+      new UserScramCredentialUpsertion(targetUserName, new 
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+    )).all.get
+    TestUtils.waitUntilTrue(() => 
{client.describeUserScramCredentials().all().get().size() == 1},
+      "Add one user scram credential timeout")
+    val result = client.describeUserScramCredentials().all().get()
+    result.forEach((userName, scramDescription) => {
+      assertEquals(targetUserName, userName)
+      assertEquals(targetUserName, scramDescription.name())
+      val credentialInfos = scramDescription.credentialInfos()
+      assertEquals(1, credentialInfos.size())
+      assertEquals(ScramMechanism.SCRAM_SHA_256, 
credentialInfos.get(0).mechanism())
+      assertEquals(4096, credentialInfos.get(0).iterations())
+    })
+
+    // add other users
+    client.alterUserScramCredentials(util.Arrays.asList(
+      new UserScramCredentialUpsertion("tom2", new 
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456"),
+      new UserScramCredentialUpsertion("tom3", new 
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+    )).all().get
+    TestUtils.waitUntilTrue(() => 
{client.describeUserScramCredentials().all().get().size() == 3},
+      "Add user scram credential timeout")
+    assertEquals(3, client.describeUserScramCredentials().all().get().size())
+
+    // alter user info
+    client.alterUserScramCredentials(util.Arrays.asList(
+      new UserScramCredentialUpsertion(targetUserName, new 
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_512, 8192), "123456")
+    )).all.get
+    TestUtils.waitUntilTrue(() => {
+      val userTomResult = client.describeUserScramCredentials().all().get()
+      val userScramCredential = userTomResult.get(targetUserName)
+      val credentialInfos = userScramCredential.credentialInfos()
+      credentialInfos.size() == 2
+    }, "Alter user scram credential timeout")
+    val userTomResult = client.describeUserScramCredentials().all().get()
+    assertEquals(3, userTomResult.size())
+    val userScramCredential = userTomResult.get(targetUserName)
+    assertEquals(targetUserName, userScramCredential.name())
+    val credentialInfos = userScramCredential.credentialInfos()
+    assertEquals(2, credentialInfos.size())
+    val credentialList = credentialInfos.asScala.sortBy(s => 
s.mechanism().`type`())
+    assertEquals(ScramMechanism.SCRAM_SHA_256, credentialList.head.mechanism())
+    assertEquals(4096, credentialList.head.iterations())
+    assertEquals(ScramMechanism.SCRAM_SHA_512, credentialList(1).mechanism())
+    assertEquals(8192, credentialList(1).iterations())
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeProducers(quorum: String): Unit = {
+    client = createAdminClient
+    val tp = new TopicPartition("topic1", 0)
+    client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1, 
1.toShort))).all().get()
+
+    def appendCommonRecords = (records: Int) => {
+      val producer = new 
KafkaProducer(Collections.singletonMap(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
+        plaintextBootstrapServers(brokers).asInstanceOf[Object]), new 
ByteArraySerializer, new ByteArraySerializer)
+      try {
+        (0 until records).foreach(i =>
+          producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+            tp.topic(), tp.partition(), i.toString.getBytes, 
i.toString.getBytes())))
+      } finally producer.close()
+    }
+
+    def appendTransactionRecords(transactionId: String, records: Int, commit: 
Boolean): KafkaProducer[Array[Byte], Array[Byte]] = {
+      val producer = TestUtils.createTransactionalProducer(transactionId, 
brokers)
+      producer.initTransactions()
+      producer.beginTransaction()
+      (0 until records).foreach(i =>
+        producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+          tp.topic(), tp.partition(), i.toString.getBytes, 
i.toString.getBytes())))
+      producer.flush()
+      if (commit) {
+        producer.commitTransaction()
+        producer.close()
+      }
+
+      producer
+    }
+
+    def consumeToExpectedNumber = (expectedNumber: Int) => {
+      val configs = new util.HashMap[String, Object]()
+      configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
plaintextBootstrapServers(brokers))
+      configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, 
IsolationLevel.READ_COMMITTED.toString)
+      val consumer = new KafkaConsumer(configs, new ByteArrayDeserializer, new 
ByteArrayDeserializer)
+      try {
+        consumer.assign(Collections.singleton(tp))
+        consumer.seekToBeginning(Collections.singleton(tp))
+        var consumeNum = 0
+        while (consumeNum < expectedNumber) {
+          val records = consumer.poll(time.Duration.ofMillis(100))
+          consumeNum += records.count()
+        }
+      } finally consumer.close()
+    }
+
+    def queryProducerDetail() = 
client.describeProducers(Collections.singletonList(tp))
+      .partitionResult(tp).get().activeProducers().asScala
+
+    // send common msg
+    appendCommonRecords(1)
+    val producerIterator = queryProducerDetail()
+    assertEquals(1, producerIterator.size)
+    val producerState = producerIterator.last
+    assertEquals(0, producerState.producerEpoch())
+    assertFalse(producerState.coordinatorEpoch().isPresent)
+    assertFalse(producerState.currentTransactionStartOffset().isPresent)
+
+
+    // send committed transaction msg
+    appendTransactionRecords("foo", 2, commit = true)
+    // consume 3 records to ensure transaction finished
+    consumeToExpectedNumber(3)
+    val transactionProducerIterator = queryProducerDetail()
+    assertEquals(2, transactionProducerIterator.size)
+    val containsCoordinatorEpochIterator = transactionProducerIterator
+      .filter(producer => producer.coordinatorEpoch().isPresent)
+    assertEquals(1, containsCoordinatorEpochIterator.size)
+    val transactionProducerState = containsCoordinatorEpochIterator.last
+    
assertFalse(transactionProducerState.currentTransactionStartOffset().isPresent)
+
+
+    // send ongoing transaction msg
+    val ongoingProducer = appendTransactionRecords("foo3", 3, commit = false)
+    try {
+      val transactionNoneCommitProducerIterator = queryProducerDetail()
+      assertEquals(3, transactionNoneCommitProducerIterator.size)
+      val containsOngoingIterator = transactionNoneCommitProducerIterator
+        .filter(producer => producer.currentTransactionStartOffset().isPresent)
+      assertEquals(1, containsOngoingIterator.size)
+      val ongoingTransactionProducerState = containsOngoingIterator.last
+      // we send (1 common msg) + (2 transaction msg) + (1 transaction marker 
msg), so transactionStartOffset is 4
+      assertEquals(4, 
ongoingTransactionProducerState.currentTransactionStartOffset().getAsLong)
+    } finally ongoingProducer.close()
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeTransactions(quorum: String): Unit = {
+    client = createAdminClient
+    val tp = new TopicPartition("topic1", 0)
+    client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1, 
1.toShort))).all().get()
+
+    var transactionId = "foo"
+
+    def consumeToExpectedNumber = (expectedNumber: Int) => {
+      val configs = new util.HashMap[String, Object]()
+      configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
plaintextBootstrapServers(brokers))
+      configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, 
IsolationLevel.READ_COMMITTED.toString)
+      val consumer = new KafkaConsumer(configs, new ByteArrayDeserializer, new 
ByteArrayDeserializer)
+      try {
+        consumer.assign(Collections.singleton(tp))
+        consumer.seekToBeginning(Collections.singleton(tp))
+        var consumeNum = 0
+        while (consumeNum < expectedNumber) {
+          val records = consumer.poll(time.Duration.ofMillis(100))
+          consumeNum += records.count()
+        }
+      } finally consumer.close()
+    }
+
+    def transactionState(): TransactionState = {
+      
client.describeTransactions(Collections.singleton(transactionId)).description(transactionId).get().state()
+    }
+
+    def findCoordinatorIdByTransactionId(transactionId: String): Int = {
+      // calculate the transaction partition id
+      val transactionPartitionId = Utils.abs(transactionId.hashCode) % 50
+      val transactionTopic = 
client.describeTopics(Collections.singleton(Topic.TRANSACTION_STATE_TOPIC_NAME))
+      val partitionList = 
transactionTopic.allTopicNames().get().get(Topic.TRANSACTION_STATE_TOPIC_NAME).partitions()
+      val partition = partitionList.asScala.filter(tp => tp.partition() == 
transactionPartitionId).head
+      partition.leader().id()
+    }
+
+    // normal commit case
+    val producer = TestUtils.createTransactionalProducer(transactionId, 
brokers)
+    try {
+      // init, the transaction is not begin, so 
TransactionalIdNotFoundException is expected
+      val exception = assertThrows(classOf[ExecutionException], () => 
transactionState())
+      assertInstanceOf(classOf[TransactionalIdNotFoundException], 
exception.getCause)
+
+      producer.initTransactions()
+      assertEquals(TransactionState.EMPTY, transactionState())
+      producer.beginTransaction()
+      assertEquals(TransactionState.EMPTY, transactionState())
+
+      producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+        tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+      producer.flush()
+      assertEquals(TransactionState.ONGOING, transactionState())
+      producer.commitTransaction()
+
+
+      val transactionResult = 
client.describeTransactions(Collections.singleton(transactionId))
+        .description(transactionId).get()
+      assertEquals(findCoordinatorIdByTransactionId(transactionId), 
transactionResult.coordinatorId())
+      assertEquals(0, transactionResult.producerId())
+      assertEquals(0, transactionResult.producerEpoch())
+      assertEquals(1, transactionResult.topicPartitions().size())
+      assertEquals(tp, transactionResult.topicPartitions().asScala.head)
+
+
+      val state = transactionState()
+      // Either PREPARE_COMMIT or COMPLETE_COMMIT is expected
+      assertTrue(state == TransactionState.PREPARE_COMMIT || state == 
TransactionState.COMPLETE_COMMIT)
+      // producer commit transaction, but maybe transaction coordinator has 
not been submitted mark msg
+      // so we start up a consumer and consume the expected number of msg, to 
ensure transaction committed
+      consumeToExpectedNumber(1)
+      assertEquals(TransactionState.COMPLETE_COMMIT, transactionState())
+    } finally producer.close()
+
+    // abort case
+    transactionId = "foo2"
+    val abortProducer = TestUtils.createTransactionalProducer(transactionId, 
brokers)
+    try {
+      // init, the transaction is not begin, so 
TransactionalIdNotFoundException is expected
+      val exception = assertThrows(classOf[ExecutionException], () => 
transactionState())
+      
assertTrue(exception.getCause.isInstanceOf[TransactionalIdNotFoundException])
+
+      abortProducer.initTransactions()
+      assertEquals(TransactionState.EMPTY, transactionState())
+      abortProducer.beginTransaction()
+      assertEquals(TransactionState.EMPTY, transactionState())
+
+      val transactionResult = 
client.describeTransactions(Collections.singleton(transactionId))
+        .description(transactionId).get()
+      assertEquals(findCoordinatorIdByTransactionId(transactionId), 
transactionResult.coordinatorId())
+      assertEquals(0, transactionResult.topicPartitions().size())
+
+      abortProducer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+        tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+      abortProducer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+        tp.topic(), tp.partition(), "k2".getBytes, "v2".getBytes()))
+      abortProducer.flush()
+
+      val transactionSendMsgResult = 
client.describeTransactions(Collections.singleton(transactionId))
+        .description(transactionId).get()
+      assertEquals(findCoordinatorIdByTransactionId(transactionId), 
transactionSendMsgResult.coordinatorId())
+      assertEquals(1, transactionSendMsgResult.topicPartitions().size())

Review Comment:
   ditto



##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -94,6 +95,438 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     super.tearDown()
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeUserScramCredentials(quorum: String): Unit = {
+    client = createAdminClient
+
+    // add a new user
+    val targetUserName: String = "tom"
+    client.alterUserScramCredentials(util.Arrays.asList(
+      new UserScramCredentialUpsertion(targetUserName, new 
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+    )).all.get
+    TestUtils.waitUntilTrue(() => 
{client.describeUserScramCredentials().all().get().size() == 1},
+      "Add one user scram credential timeout")
+    val result = client.describeUserScramCredentials().all().get()
+    result.forEach((userName, scramDescription) => {
+      assertEquals(targetUserName, userName)
+      assertEquals(targetUserName, scramDescription.name())
+      val credentialInfos = scramDescription.credentialInfos()
+      assertEquals(1, credentialInfos.size())
+      assertEquals(ScramMechanism.SCRAM_SHA_256, 
credentialInfos.get(0).mechanism())
+      assertEquals(4096, credentialInfos.get(0).iterations())
+    })
+
+    // add other users
+    client.alterUserScramCredentials(util.Arrays.asList(
+      new UserScramCredentialUpsertion("tom2", new 
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456"),
+      new UserScramCredentialUpsertion("tom3", new 
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+    )).all().get
+    TestUtils.waitUntilTrue(() => 
{client.describeUserScramCredentials().all().get().size() == 3},
+      "Add user scram credential timeout")
+    assertEquals(3, client.describeUserScramCredentials().all().get().size())
+
+    // alter user info
+    client.alterUserScramCredentials(util.Arrays.asList(
+      new UserScramCredentialUpsertion(targetUserName, new 
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_512, 8192), "123456")
+    )).all.get
+    TestUtils.waitUntilTrue(() => {
+      val userTomResult = client.describeUserScramCredentials().all().get()
+      val userScramCredential = userTomResult.get(targetUserName)
+      val credentialInfos = userScramCredential.credentialInfos()
+      credentialInfos.size() == 2
+    }, "Alter user scram credential timeout")
+    val userTomResult = client.describeUserScramCredentials().all().get()
+    assertEquals(3, userTomResult.size())
+    val userScramCredential = userTomResult.get(targetUserName)
+    assertEquals(targetUserName, userScramCredential.name())
+    val credentialInfos = userScramCredential.credentialInfos()
+    assertEquals(2, credentialInfos.size())
+    val credentialList = credentialInfos.asScala.sortBy(s => 
s.mechanism().`type`())
+    assertEquals(ScramMechanism.SCRAM_SHA_256, credentialList.head.mechanism())
+    assertEquals(4096, credentialList.head.iterations())
+    assertEquals(ScramMechanism.SCRAM_SHA_512, credentialList(1).mechanism())
+    assertEquals(8192, credentialList(1).iterations())
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeProducers(quorum: String): Unit = {
+    client = createAdminClient
+    val tp = new TopicPartition("topic1", 0)
+    client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1, 
1.toShort))).all().get()
+
+    def appendCommonRecords = (records: Int) => {
+      val producer = new 
KafkaProducer(Collections.singletonMap(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
+        plaintextBootstrapServers(brokers).asInstanceOf[Object]), new 
ByteArraySerializer, new ByteArraySerializer)
+      try {
+        (0 until records).foreach(i =>
+          producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+            tp.topic(), tp.partition(), i.toString.getBytes, 
i.toString.getBytes())))
+      } finally producer.close()
+    }
+
+    def appendTransactionRecords(transactionId: String, records: Int, commit: 
Boolean): KafkaProducer[Array[Byte], Array[Byte]] = {
+      val producer = TestUtils.createTransactionalProducer(transactionId, 
brokers)
+      producer.initTransactions()
+      producer.beginTransaction()
+      (0 until records).foreach(i =>
+        producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+          tp.topic(), tp.partition(), i.toString.getBytes, 
i.toString.getBytes())))
+      producer.flush()
+      if (commit) {
+        producer.commitTransaction()
+        producer.close()
+      }
+
+      producer
+    }
+
+    def consumeToExpectedNumber = (expectedNumber: Int) => {
+      val configs = new util.HashMap[String, Object]()
+      configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
plaintextBootstrapServers(brokers))
+      configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, 
IsolationLevel.READ_COMMITTED.toString)
+      val consumer = new KafkaConsumer(configs, new ByteArrayDeserializer, new 
ByteArrayDeserializer)
+      try {
+        consumer.assign(Collections.singleton(tp))
+        consumer.seekToBeginning(Collections.singleton(tp))
+        var consumeNum = 0
+        while (consumeNum < expectedNumber) {

Review Comment:
   This will be a infinite loop if the test gets something wrong. Could you 
consider adding "timeout" to this method?



##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -94,6 +95,438 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     super.tearDown()
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeUserScramCredentials(quorum: String): Unit = {
+    client = createAdminClient
+
+    // add a new user
+    val targetUserName: String = "tom"
+    client.alterUserScramCredentials(util.Arrays.asList(

Review Comment:
   `Collections.singletonList`



##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -94,6 +95,438 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     super.tearDown()
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeUserScramCredentials(quorum: String): Unit = {
+    client = createAdminClient
+
+    // add a new user
+    val targetUserName: String = "tom"
+    client.alterUserScramCredentials(util.Arrays.asList(
+      new UserScramCredentialUpsertion(targetUserName, new 
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+    )).all.get
+    TestUtils.waitUntilTrue(() => 
{client.describeUserScramCredentials().all().get().size() == 1},
+      "Add one user scram credential timeout")
+    val result = client.describeUserScramCredentials().all().get()
+    result.forEach((userName, scramDescription) => {
+      assertEquals(targetUserName, userName)
+      assertEquals(targetUserName, scramDescription.name())
+      val credentialInfos = scramDescription.credentialInfos()
+      assertEquals(1, credentialInfos.size())
+      assertEquals(ScramMechanism.SCRAM_SHA_256, 
credentialInfos.get(0).mechanism())
+      assertEquals(4096, credentialInfos.get(0).iterations())
+    })
+
+    // add other users
+    client.alterUserScramCredentials(util.Arrays.asList(
+      new UserScramCredentialUpsertion("tom2", new 
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456"),
+      new UserScramCredentialUpsertion("tom3", new 
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+    )).all().get
+    TestUtils.waitUntilTrue(() => 
{client.describeUserScramCredentials().all().get().size() == 3},
+      "Add user scram credential timeout")
+    assertEquals(3, client.describeUserScramCredentials().all().get().size())
+
+    // alter user info
+    client.alterUserScramCredentials(util.Arrays.asList(
+      new UserScramCredentialUpsertion(targetUserName, new 
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_512, 8192), "123456")
+    )).all.get
+    TestUtils.waitUntilTrue(() => {
+      val userTomResult = client.describeUserScramCredentials().all().get()
+      val userScramCredential = userTomResult.get(targetUserName)
+      val credentialInfos = userScramCredential.credentialInfos()
+      credentialInfos.size() == 2
+    }, "Alter user scram credential timeout")
+    val userTomResult = client.describeUserScramCredentials().all().get()
+    assertEquals(3, userTomResult.size())
+    val userScramCredential = userTomResult.get(targetUserName)
+    assertEquals(targetUserName, userScramCredential.name())
+    val credentialInfos = userScramCredential.credentialInfos()
+    assertEquals(2, credentialInfos.size())
+    val credentialList = credentialInfos.asScala.sortBy(s => 
s.mechanism().`type`())
+    assertEquals(ScramMechanism.SCRAM_SHA_256, credentialList.head.mechanism())
+    assertEquals(4096, credentialList.head.iterations())
+    assertEquals(ScramMechanism.SCRAM_SHA_512, credentialList(1).mechanism())
+    assertEquals(8192, credentialList(1).iterations())
+  }

Review Comment:
   Could you add a test to verify timeout?



##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -94,6 +95,438 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     super.tearDown()
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeUserScramCredentials(quorum: String): Unit = {
+    client = createAdminClient
+
+    // add a new user
+    val targetUserName: String = "tom"
+    client.alterUserScramCredentials(util.Arrays.asList(
+      new UserScramCredentialUpsertion(targetUserName, new 
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+    )).all.get
+    TestUtils.waitUntilTrue(() => 
{client.describeUserScramCredentials().all().get().size() == 1},
+      "Add one user scram credential timeout")
+    val result = client.describeUserScramCredentials().all().get()
+    result.forEach((userName, scramDescription) => {
+      assertEquals(targetUserName, userName)
+      assertEquals(targetUserName, scramDescription.name())
+      val credentialInfos = scramDescription.credentialInfos()
+      assertEquals(1, credentialInfos.size())
+      assertEquals(ScramMechanism.SCRAM_SHA_256, 
credentialInfos.get(0).mechanism())
+      assertEquals(4096, credentialInfos.get(0).iterations())
+    })
+
+    // add other users
+    client.alterUserScramCredentials(util.Arrays.asList(
+      new UserScramCredentialUpsertion("tom2", new 
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456"),
+      new UserScramCredentialUpsertion("tom3", new 
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+    )).all().get
+    TestUtils.waitUntilTrue(() => 
{client.describeUserScramCredentials().all().get().size() == 3},
+      "Add user scram credential timeout")
+    assertEquals(3, client.describeUserScramCredentials().all().get().size())
+
+    // alter user info
+    client.alterUserScramCredentials(util.Arrays.asList(

Review Comment:
   `Collections.singletonList`



##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -94,6 +95,438 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     super.tearDown()
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeUserScramCredentials(quorum: String): Unit = {
+    client = createAdminClient
+
+    // add a new user
+    val targetUserName: String = "tom"
+    client.alterUserScramCredentials(util.Arrays.asList(
+      new UserScramCredentialUpsertion(targetUserName, new 
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+    )).all.get
+    TestUtils.waitUntilTrue(() => 
{client.describeUserScramCredentials().all().get().size() == 1},
+      "Add one user scram credential timeout")
+    val result = client.describeUserScramCredentials().all().get()
+    result.forEach((userName, scramDescription) => {
+      assertEquals(targetUserName, userName)
+      assertEquals(targetUserName, scramDescription.name())
+      val credentialInfos = scramDescription.credentialInfos()
+      assertEquals(1, credentialInfos.size())
+      assertEquals(ScramMechanism.SCRAM_SHA_256, 
credentialInfos.get(0).mechanism())
+      assertEquals(4096, credentialInfos.get(0).iterations())
+    })
+
+    // add other users
+    client.alterUserScramCredentials(util.Arrays.asList(
+      new UserScramCredentialUpsertion("tom2", new 
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456"),
+      new UserScramCredentialUpsertion("tom3", new 
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+    )).all().get
+    TestUtils.waitUntilTrue(() => 
{client.describeUserScramCredentials().all().get().size() == 3},

Review Comment:
   `() => client.describeUserScramCredentials().all().get().size() == 3`



##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -94,6 +95,438 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     super.tearDown()
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeUserScramCredentials(quorum: String): Unit = {
+    client = createAdminClient
+
+    // add a new user
+    val targetUserName: String = "tom"
+    client.alterUserScramCredentials(util.Arrays.asList(
+      new UserScramCredentialUpsertion(targetUserName, new 
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+    )).all.get
+    TestUtils.waitUntilTrue(() => 
{client.describeUserScramCredentials().all().get().size() == 1},
+      "Add one user scram credential timeout")
+    val result = client.describeUserScramCredentials().all().get()
+    result.forEach((userName, scramDescription) => {
+      assertEquals(targetUserName, userName)
+      assertEquals(targetUserName, scramDescription.name())
+      val credentialInfos = scramDescription.credentialInfos()
+      assertEquals(1, credentialInfos.size())
+      assertEquals(ScramMechanism.SCRAM_SHA_256, 
credentialInfos.get(0).mechanism())
+      assertEquals(4096, credentialInfos.get(0).iterations())
+    })
+
+    // add other users
+    client.alterUserScramCredentials(util.Arrays.asList(
+      new UserScramCredentialUpsertion("tom2", new 
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456"),
+      new UserScramCredentialUpsertion("tom3", new 
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+    )).all().get
+    TestUtils.waitUntilTrue(() => 
{client.describeUserScramCredentials().all().get().size() == 3},
+      "Add user scram credential timeout")
+    assertEquals(3, client.describeUserScramCredentials().all().get().size())
+
+    // alter user info
+    client.alterUserScramCredentials(util.Arrays.asList(
+      new UserScramCredentialUpsertion(targetUserName, new 
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_512, 8192), "123456")
+    )).all.get
+    TestUtils.waitUntilTrue(() => {
+      val userTomResult = client.describeUserScramCredentials().all().get()
+      val userScramCredential = userTomResult.get(targetUserName)
+      val credentialInfos = userScramCredential.credentialInfos()
+      credentialInfos.size() == 2
+    }, "Alter user scram credential timeout")
+    val userTomResult = client.describeUserScramCredentials().all().get()
+    assertEquals(3, userTomResult.size())
+    val userScramCredential = userTomResult.get(targetUserName)
+    assertEquals(targetUserName, userScramCredential.name())
+    val credentialInfos = userScramCredential.credentialInfos()
+    assertEquals(2, credentialInfos.size())
+    val credentialList = credentialInfos.asScala.sortBy(s => 
s.mechanism().`type`())
+    assertEquals(ScramMechanism.SCRAM_SHA_256, credentialList.head.mechanism())
+    assertEquals(4096, credentialList.head.iterations())
+    assertEquals(ScramMechanism.SCRAM_SHA_512, credentialList(1).mechanism())
+    assertEquals(8192, credentialList(1).iterations())
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeProducers(quorum: String): Unit = {
+    client = createAdminClient
+    val tp = new TopicPartition("topic1", 0)
+    client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1, 
1.toShort))).all().get()
+
+    def appendCommonRecords = (records: Int) => {
+      val producer = new 
KafkaProducer(Collections.singletonMap(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
+        plaintextBootstrapServers(brokers).asInstanceOf[Object]), new 
ByteArraySerializer, new ByteArraySerializer)
+      try {
+        (0 until records).foreach(i =>
+          producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+            tp.topic(), tp.partition(), i.toString.getBytes, 
i.toString.getBytes())))
+      } finally producer.close()
+    }
+
+    def appendTransactionRecords(transactionId: String, records: Int, commit: 
Boolean): KafkaProducer[Array[Byte], Array[Byte]] = {
+      val producer = TestUtils.createTransactionalProducer(transactionId, 
brokers)
+      producer.initTransactions()
+      producer.beginTransaction()
+      (0 until records).foreach(i =>
+        producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+          tp.topic(), tp.partition(), i.toString.getBytes, 
i.toString.getBytes())))
+      producer.flush()
+      if (commit) {
+        producer.commitTransaction()
+        producer.close()
+      }
+
+      producer
+    }
+
+    def consumeToExpectedNumber = (expectedNumber: Int) => {
+      val configs = new util.HashMap[String, Object]()
+      configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
plaintextBootstrapServers(brokers))
+      configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, 
IsolationLevel.READ_COMMITTED.toString)
+      val consumer = new KafkaConsumer(configs, new ByteArrayDeserializer, new 
ByteArrayDeserializer)
+      try {
+        consumer.assign(Collections.singleton(tp))
+        consumer.seekToBeginning(Collections.singleton(tp))
+        var consumeNum = 0
+        while (consumeNum < expectedNumber) {
+          val records = consumer.poll(time.Duration.ofMillis(100))
+          consumeNum += records.count()
+        }
+      } finally consumer.close()
+    }
+
+    def queryProducerDetail() = 
client.describeProducers(Collections.singletonList(tp))
+      .partitionResult(tp).get().activeProducers().asScala
+
+    // send common msg
+    appendCommonRecords(1)
+    val producerIterator = queryProducerDetail()
+    assertEquals(1, producerIterator.size)
+    val producerState = producerIterator.last
+    assertEquals(0, producerState.producerEpoch())
+    assertFalse(producerState.coordinatorEpoch().isPresent)
+    assertFalse(producerState.currentTransactionStartOffset().isPresent)
+
+
+    // send committed transaction msg
+    appendTransactionRecords("foo", 2, commit = true)
+    // consume 3 records to ensure transaction finished
+    consumeToExpectedNumber(3)
+    val transactionProducerIterator = queryProducerDetail()
+    assertEquals(2, transactionProducerIterator.size)
+    val containsCoordinatorEpochIterator = transactionProducerIterator
+      .filter(producer => producer.coordinatorEpoch().isPresent)
+    assertEquals(1, containsCoordinatorEpochIterator.size)
+    val transactionProducerState = containsCoordinatorEpochIterator.last
+    
assertFalse(transactionProducerState.currentTransactionStartOffset().isPresent)
+
+
+    // send ongoing transaction msg
+    val ongoingProducer = appendTransactionRecords("foo3", 3, commit = false)
+    try {
+      val transactionNoneCommitProducerIterator = queryProducerDetail()
+      assertEquals(3, transactionNoneCommitProducerIterator.size)
+      val containsOngoingIterator = transactionNoneCommitProducerIterator
+        .filter(producer => producer.currentTransactionStartOffset().isPresent)
+      assertEquals(1, containsOngoingIterator.size)
+      val ongoingTransactionProducerState = containsOngoingIterator.last
+      // we send (1 common msg) + (2 transaction msg) + (1 transaction marker 
msg), so transactionStartOffset is 4
+      assertEquals(4, 
ongoingTransactionProducerState.currentTransactionStartOffset().getAsLong)
+    } finally ongoingProducer.close()
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeTransactions(quorum: String): Unit = {
+    client = createAdminClient
+    val tp = new TopicPartition("topic1", 0)
+    client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1, 
1.toShort))).all().get()
+
+    var transactionId = "foo"
+
+    def consumeToExpectedNumber = (expectedNumber: Int) => {
+      val configs = new util.HashMap[String, Object]()
+      configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
plaintextBootstrapServers(brokers))
+      configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, 
IsolationLevel.READ_COMMITTED.toString)
+      val consumer = new KafkaConsumer(configs, new ByteArrayDeserializer, new 
ByteArrayDeserializer)
+      try {
+        consumer.assign(Collections.singleton(tp))
+        consumer.seekToBeginning(Collections.singleton(tp))
+        var consumeNum = 0
+        while (consumeNum < expectedNumber) {
+          val records = consumer.poll(time.Duration.ofMillis(100))
+          consumeNum += records.count()
+        }
+      } finally consumer.close()
+    }
+
+    def transactionState(): TransactionState = {
+      
client.describeTransactions(Collections.singleton(transactionId)).description(transactionId).get().state()
+    }
+
+    def findCoordinatorIdByTransactionId(transactionId: String): Int = {
+      // calculate the transaction partition id
+      val transactionPartitionId = Utils.abs(transactionId.hashCode) % 50
+      val transactionTopic = 
client.describeTopics(Collections.singleton(Topic.TRANSACTION_STATE_TOPIC_NAME))
+      val partitionList = 
transactionTopic.allTopicNames().get().get(Topic.TRANSACTION_STATE_TOPIC_NAME).partitions()
+      val partition = partitionList.asScala.filter(tp => tp.partition() == 
transactionPartitionId).head
+      partition.leader().id()
+    }
+
+    // normal commit case
+    val producer = TestUtils.createTransactionalProducer(transactionId, 
brokers)
+    try {
+      // init, the transaction is not begin, so 
TransactionalIdNotFoundException is expected
+      val exception = assertThrows(classOf[ExecutionException], () => 
transactionState())
+      assertInstanceOf(classOf[TransactionalIdNotFoundException], 
exception.getCause)
+
+      producer.initTransactions()
+      assertEquals(TransactionState.EMPTY, transactionState())
+      producer.beginTransaction()
+      assertEquals(TransactionState.EMPTY, transactionState())
+
+      producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+        tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+      producer.flush()
+      assertEquals(TransactionState.ONGOING, transactionState())
+      producer.commitTransaction()
+
+
+      val transactionResult = 
client.describeTransactions(Collections.singleton(transactionId))
+        .description(transactionId).get()
+      assertEquals(findCoordinatorIdByTransactionId(transactionId), 
transactionResult.coordinatorId())
+      assertEquals(0, transactionResult.producerId())
+      assertEquals(0, transactionResult.producerEpoch())
+      assertEquals(1, transactionResult.topicPartitions().size())

Review Comment:
   `assertEquals(Collections.singleton(tp), 
transactionResult.topicPartitions())`



##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -94,6 +95,438 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     super.tearDown()
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeUserScramCredentials(quorum: String): Unit = {
+    client = createAdminClient
+
+    // add a new user
+    val targetUserName: String = "tom"
+    client.alterUserScramCredentials(util.Arrays.asList(
+      new UserScramCredentialUpsertion(targetUserName, new 
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+    )).all.get
+    TestUtils.waitUntilTrue(() => 
{client.describeUserScramCredentials().all().get().size() == 1},
+      "Add one user scram credential timeout")
+    val result = client.describeUserScramCredentials().all().get()
+    result.forEach((userName, scramDescription) => {
+      assertEquals(targetUserName, userName)
+      assertEquals(targetUserName, scramDescription.name())
+      val credentialInfos = scramDescription.credentialInfos()
+      assertEquals(1, credentialInfos.size())
+      assertEquals(ScramMechanism.SCRAM_SHA_256, 
credentialInfos.get(0).mechanism())
+      assertEquals(4096, credentialInfos.get(0).iterations())
+    })
+
+    // add other users
+    client.alterUserScramCredentials(util.Arrays.asList(
+      new UserScramCredentialUpsertion("tom2", new 
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456"),
+      new UserScramCredentialUpsertion("tom3", new 
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+    )).all().get
+    TestUtils.waitUntilTrue(() => 
{client.describeUserScramCredentials().all().get().size() == 3},
+      "Add user scram credential timeout")
+    assertEquals(3, client.describeUserScramCredentials().all().get().size())
+
+    // alter user info
+    client.alterUserScramCredentials(util.Arrays.asList(
+      new UserScramCredentialUpsertion(targetUserName, new 
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_512, 8192), "123456")
+    )).all.get
+    TestUtils.waitUntilTrue(() => {
+      val userTomResult = client.describeUserScramCredentials().all().get()
+      val userScramCredential = userTomResult.get(targetUserName)
+      val credentialInfos = userScramCredential.credentialInfos()
+      credentialInfos.size() == 2
+    }, "Alter user scram credential timeout")
+    val userTomResult = client.describeUserScramCredentials().all().get()
+    assertEquals(3, userTomResult.size())
+    val userScramCredential = userTomResult.get(targetUserName)
+    assertEquals(targetUserName, userScramCredential.name())
+    val credentialInfos = userScramCredential.credentialInfos()
+    assertEquals(2, credentialInfos.size())
+    val credentialList = credentialInfos.asScala.sortBy(s => 
s.mechanism().`type`())
+    assertEquals(ScramMechanism.SCRAM_SHA_256, credentialList.head.mechanism())
+    assertEquals(4096, credentialList.head.iterations())
+    assertEquals(ScramMechanism.SCRAM_SHA_512, credentialList(1).mechanism())
+    assertEquals(8192, credentialList(1).iterations())
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeProducers(quorum: String): Unit = {
+    client = createAdminClient
+    val tp = new TopicPartition("topic1", 0)
+    client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1, 
1.toShort))).all().get()
+
+    def appendCommonRecords = (records: Int) => {
+      val producer = new 
KafkaProducer(Collections.singletonMap(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
+        plaintextBootstrapServers(brokers).asInstanceOf[Object]), new 
ByteArraySerializer, new ByteArraySerializer)
+      try {
+        (0 until records).foreach(i =>
+          producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+            tp.topic(), tp.partition(), i.toString.getBytes, 
i.toString.getBytes())))
+      } finally producer.close()
+    }
+
+    def appendTransactionRecords(transactionId: String, records: Int, commit: 
Boolean): KafkaProducer[Array[Byte], Array[Byte]] = {
+      val producer = TestUtils.createTransactionalProducer(transactionId, 
brokers)
+      producer.initTransactions()
+      producer.beginTransaction()
+      (0 until records).foreach(i =>
+        producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+          tp.topic(), tp.partition(), i.toString.getBytes, 
i.toString.getBytes())))
+      producer.flush()
+      if (commit) {
+        producer.commitTransaction()
+        producer.close()
+      }
+
+      producer
+    }
+
+    def consumeToExpectedNumber = (expectedNumber: Int) => {
+      val configs = new util.HashMap[String, Object]()
+      configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
plaintextBootstrapServers(brokers))
+      configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, 
IsolationLevel.READ_COMMITTED.toString)
+      val consumer = new KafkaConsumer(configs, new ByteArrayDeserializer, new 
ByteArrayDeserializer)
+      try {
+        consumer.assign(Collections.singleton(tp))
+        consumer.seekToBeginning(Collections.singleton(tp))
+        var consumeNum = 0
+        while (consumeNum < expectedNumber) {
+          val records = consumer.poll(time.Duration.ofMillis(100))
+          consumeNum += records.count()
+        }
+      } finally consumer.close()
+    }
+
+    def queryProducerDetail() = 
client.describeProducers(Collections.singletonList(tp))
+      .partitionResult(tp).get().activeProducers().asScala
+
+    // send common msg
+    appendCommonRecords(1)
+    val producerIterator = queryProducerDetail()
+    assertEquals(1, producerIterator.size)
+    val producerState = producerIterator.last
+    assertEquals(0, producerState.producerEpoch())
+    assertFalse(producerState.coordinatorEpoch().isPresent)
+    assertFalse(producerState.currentTransactionStartOffset().isPresent)
+
+
+    // send committed transaction msg
+    appendTransactionRecords("foo", 2, commit = true)
+    // consume 3 records to ensure transaction finished
+    consumeToExpectedNumber(3)
+    val transactionProducerIterator = queryProducerDetail()
+    assertEquals(2, transactionProducerIterator.size)
+    val containsCoordinatorEpochIterator = transactionProducerIterator
+      .filter(producer => producer.coordinatorEpoch().isPresent)
+    assertEquals(1, containsCoordinatorEpochIterator.size)
+    val transactionProducerState = containsCoordinatorEpochIterator.last
+    
assertFalse(transactionProducerState.currentTransactionStartOffset().isPresent)
+
+
+    // send ongoing transaction msg
+    val ongoingProducer = appendTransactionRecords("foo3", 3, commit = false)
+    try {
+      val transactionNoneCommitProducerIterator = queryProducerDetail()
+      assertEquals(3, transactionNoneCommitProducerIterator.size)
+      val containsOngoingIterator = transactionNoneCommitProducerIterator
+        .filter(producer => producer.currentTransactionStartOffset().isPresent)
+      assertEquals(1, containsOngoingIterator.size)
+      val ongoingTransactionProducerState = containsOngoingIterator.last
+      // we send (1 common msg) + (2 transaction msg) + (1 transaction marker 
msg), so transactionStartOffset is 4
+      assertEquals(4, 
ongoingTransactionProducerState.currentTransactionStartOffset().getAsLong)
+    } finally ongoingProducer.close()
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeTransactions(quorum: String): Unit = {
+    client = createAdminClient
+    val tp = new TopicPartition("topic1", 0)
+    client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1, 
1.toShort))).all().get()
+
+    var transactionId = "foo"
+
+    def consumeToExpectedNumber = (expectedNumber: Int) => {

Review Comment:
   Could you share this helper with other test case? 



##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -94,6 +95,438 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     super.tearDown()
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeUserScramCredentials(quorum: String): Unit = {
+    client = createAdminClient
+
+    // add a new user
+    val targetUserName: String = "tom"
+    client.alterUserScramCredentials(util.Arrays.asList(
+      new UserScramCredentialUpsertion(targetUserName, new 
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+    )).all.get
+    TestUtils.waitUntilTrue(() => 
{client.describeUserScramCredentials().all().get().size() == 1},
+      "Add one user scram credential timeout")
+    val result = client.describeUserScramCredentials().all().get()
+    result.forEach((userName, scramDescription) => {
+      assertEquals(targetUserName, userName)
+      assertEquals(targetUserName, scramDescription.name())
+      val credentialInfos = scramDescription.credentialInfos()
+      assertEquals(1, credentialInfos.size())
+      assertEquals(ScramMechanism.SCRAM_SHA_256, 
credentialInfos.get(0).mechanism())
+      assertEquals(4096, credentialInfos.get(0).iterations())
+    })
+
+    // add other users
+    client.alterUserScramCredentials(util.Arrays.asList(
+      new UserScramCredentialUpsertion("tom2", new 
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456"),
+      new UserScramCredentialUpsertion("tom3", new 
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+    )).all().get
+    TestUtils.waitUntilTrue(() => 
{client.describeUserScramCredentials().all().get().size() == 3},
+      "Add user scram credential timeout")
+    assertEquals(3, client.describeUserScramCredentials().all().get().size())
+
+    // alter user info
+    client.alterUserScramCredentials(util.Arrays.asList(
+      new UserScramCredentialUpsertion(targetUserName, new 
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_512, 8192), "123456")
+    )).all.get
+    TestUtils.waitUntilTrue(() => {
+      val userTomResult = client.describeUserScramCredentials().all().get()
+      val userScramCredential = userTomResult.get(targetUserName)
+      val credentialInfos = userScramCredential.credentialInfos()
+      credentialInfos.size() == 2
+    }, "Alter user scram credential timeout")
+    val userTomResult = client.describeUserScramCredentials().all().get()
+    assertEquals(3, userTomResult.size())
+    val userScramCredential = userTomResult.get(targetUserName)
+    assertEquals(targetUserName, userScramCredential.name())
+    val credentialInfos = userScramCredential.credentialInfos()
+    assertEquals(2, credentialInfos.size())
+    val credentialList = credentialInfos.asScala.sortBy(s => 
s.mechanism().`type`())
+    assertEquals(ScramMechanism.SCRAM_SHA_256, credentialList.head.mechanism())
+    assertEquals(4096, credentialList.head.iterations())
+    assertEquals(ScramMechanism.SCRAM_SHA_512, credentialList(1).mechanism())
+    assertEquals(8192, credentialList(1).iterations())
+  }

Review Comment:
   Also, `describeUserScramCredentials(List<String> users)` needs IT



##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -94,6 +95,438 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     super.tearDown()
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeUserScramCredentials(quorum: String): Unit = {
+    client = createAdminClient
+
+    // add a new user
+    val targetUserName: String = "tom"
+    client.alterUserScramCredentials(util.Arrays.asList(
+      new UserScramCredentialUpsertion(targetUserName, new 
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+    )).all.get
+    TestUtils.waitUntilTrue(() => 
{client.describeUserScramCredentials().all().get().size() == 1},
+      "Add one user scram credential timeout")
+    val result = client.describeUserScramCredentials().all().get()
+    result.forEach((userName, scramDescription) => {
+      assertEquals(targetUserName, userName)
+      assertEquals(targetUserName, scramDescription.name())
+      val credentialInfos = scramDescription.credentialInfos()
+      assertEquals(1, credentialInfos.size())
+      assertEquals(ScramMechanism.SCRAM_SHA_256, 
credentialInfos.get(0).mechanism())
+      assertEquals(4096, credentialInfos.get(0).iterations())
+    })
+
+    // add other users
+    client.alterUserScramCredentials(util.Arrays.asList(
+      new UserScramCredentialUpsertion("tom2", new 
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456"),
+      new UserScramCredentialUpsertion("tom3", new 
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "123456")
+    )).all().get
+    TestUtils.waitUntilTrue(() => 
{client.describeUserScramCredentials().all().get().size() == 3},
+      "Add user scram credential timeout")
+    assertEquals(3, client.describeUserScramCredentials().all().get().size())
+
+    // alter user info
+    client.alterUserScramCredentials(util.Arrays.asList(
+      new UserScramCredentialUpsertion(targetUserName, new 
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_512, 8192), "123456")
+    )).all.get
+    TestUtils.waitUntilTrue(() => {
+      val userTomResult = client.describeUserScramCredentials().all().get()
+      val userScramCredential = userTomResult.get(targetUserName)
+      val credentialInfos = userScramCredential.credentialInfos()
+      credentialInfos.size() == 2
+    }, "Alter user scram credential timeout")
+    val userTomResult = client.describeUserScramCredentials().all().get()
+    assertEquals(3, userTomResult.size())
+    val userScramCredential = userTomResult.get(targetUserName)
+    assertEquals(targetUserName, userScramCredential.name())
+    val credentialInfos = userScramCredential.credentialInfos()
+    assertEquals(2, credentialInfos.size())
+    val credentialList = credentialInfos.asScala.sortBy(s => 
s.mechanism().`type`())
+    assertEquals(ScramMechanism.SCRAM_SHA_256, credentialList.head.mechanism())
+    assertEquals(4096, credentialList.head.iterations())
+    assertEquals(ScramMechanism.SCRAM_SHA_512, credentialList(1).mechanism())
+    assertEquals(8192, credentialList(1).iterations())
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeProducers(quorum: String): Unit = {
+    client = createAdminClient
+    val tp = new TopicPartition("topic1", 0)
+    client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1, 
1.toShort))).all().get()
+
+    def appendCommonRecords = (records: Int) => {
+      val producer = new 
KafkaProducer(Collections.singletonMap(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
+        plaintextBootstrapServers(brokers).asInstanceOf[Object]), new 
ByteArraySerializer, new ByteArraySerializer)
+      try {
+        (0 until records).foreach(i =>
+          producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+            tp.topic(), tp.partition(), i.toString.getBytes, 
i.toString.getBytes())))
+      } finally producer.close()
+    }
+
+    def appendTransactionRecords(transactionId: String, records: Int, commit: 
Boolean): KafkaProducer[Array[Byte], Array[Byte]] = {
+      val producer = TestUtils.createTransactionalProducer(transactionId, 
brokers)
+      producer.initTransactions()
+      producer.beginTransaction()
+      (0 until records).foreach(i =>
+        producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+          tp.topic(), tp.partition(), i.toString.getBytes, 
i.toString.getBytes())))
+      producer.flush()
+      if (commit) {
+        producer.commitTransaction()
+        producer.close()
+      }
+
+      producer
+    }
+
+    def consumeToExpectedNumber = (expectedNumber: Int) => {
+      val configs = new util.HashMap[String, Object]()
+      configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
plaintextBootstrapServers(brokers))
+      configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, 
IsolationLevel.READ_COMMITTED.toString)
+      val consumer = new KafkaConsumer(configs, new ByteArrayDeserializer, new 
ByteArrayDeserializer)
+      try {
+        consumer.assign(Collections.singleton(tp))
+        consumer.seekToBeginning(Collections.singleton(tp))
+        var consumeNum = 0
+        while (consumeNum < expectedNumber) {
+          val records = consumer.poll(time.Duration.ofMillis(100))
+          consumeNum += records.count()
+        }
+      } finally consumer.close()
+    }
+
+    def queryProducerDetail() = 
client.describeProducers(Collections.singletonList(tp))
+      .partitionResult(tp).get().activeProducers().asScala
+
+    // send common msg
+    appendCommonRecords(1)
+    val producerIterator = queryProducerDetail()
+    assertEquals(1, producerIterator.size)
+    val producerState = producerIterator.last
+    assertEquals(0, producerState.producerEpoch())
+    assertFalse(producerState.coordinatorEpoch().isPresent)
+    assertFalse(producerState.currentTransactionStartOffset().isPresent)
+
+
+    // send committed transaction msg
+    appendTransactionRecords("foo", 2, commit = true)
+    // consume 3 records to ensure transaction finished
+    consumeToExpectedNumber(3)
+    val transactionProducerIterator = queryProducerDetail()
+    assertEquals(2, transactionProducerIterator.size)
+    val containsCoordinatorEpochIterator = transactionProducerIterator
+      .filter(producer => producer.coordinatorEpoch().isPresent)
+    assertEquals(1, containsCoordinatorEpochIterator.size)
+    val transactionProducerState = containsCoordinatorEpochIterator.last
+    
assertFalse(transactionProducerState.currentTransactionStartOffset().isPresent)
+
+
+    // send ongoing transaction msg
+    val ongoingProducer = appendTransactionRecords("foo3", 3, commit = false)
+    try {
+      val transactionNoneCommitProducerIterator = queryProducerDetail()
+      assertEquals(3, transactionNoneCommitProducerIterator.size)
+      val containsOngoingIterator = transactionNoneCommitProducerIterator
+        .filter(producer => producer.currentTransactionStartOffset().isPresent)
+      assertEquals(1, containsOngoingIterator.size)
+      val ongoingTransactionProducerState = containsOngoingIterator.last
+      // we send (1 common msg) + (2 transaction msg) + (1 transaction marker 
msg), so transactionStartOffset is 4
+      assertEquals(4, 
ongoingTransactionProducerState.currentTransactionStartOffset().getAsLong)
+    } finally ongoingProducer.close()
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeTransactions(quorum: String): Unit = {
+    client = createAdminClient
+    val tp = new TopicPartition("topic1", 0)
+    client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1, 
1.toShort))).all().get()
+
+    var transactionId = "foo"
+
+    def consumeToExpectedNumber = (expectedNumber: Int) => {
+      val configs = new util.HashMap[String, Object]()
+      configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
plaintextBootstrapServers(brokers))
+      configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, 
IsolationLevel.READ_COMMITTED.toString)
+      val consumer = new KafkaConsumer(configs, new ByteArrayDeserializer, new 
ByteArrayDeserializer)
+      try {
+        consumer.assign(Collections.singleton(tp))
+        consumer.seekToBeginning(Collections.singleton(tp))
+        var consumeNum = 0
+        while (consumeNum < expectedNumber) {
+          val records = consumer.poll(time.Duration.ofMillis(100))
+          consumeNum += records.count()
+        }
+      } finally consumer.close()
+    }
+
+    def transactionState(): TransactionState = {
+      
client.describeTransactions(Collections.singleton(transactionId)).description(transactionId).get().state()
+    }
+
+    def findCoordinatorIdByTransactionId(transactionId: String): Int = {
+      // calculate the transaction partition id
+      val transactionPartitionId = Utils.abs(transactionId.hashCode) % 50
+      val transactionTopic = 
client.describeTopics(Collections.singleton(Topic.TRANSACTION_STATE_TOPIC_NAME))
+      val partitionList = 
transactionTopic.allTopicNames().get().get(Topic.TRANSACTION_STATE_TOPIC_NAME).partitions()
+      val partition = partitionList.asScala.filter(tp => tp.partition() == 
transactionPartitionId).head
+      partition.leader().id()
+    }
+
+    // normal commit case
+    val producer = TestUtils.createTransactionalProducer(transactionId, 
brokers)
+    try {
+      // init, the transaction is not begin, so 
TransactionalIdNotFoundException is expected
+      val exception = assertThrows(classOf[ExecutionException], () => 
transactionState())
+      assertInstanceOf(classOf[TransactionalIdNotFoundException], 
exception.getCause)
+
+      producer.initTransactions()
+      assertEquals(TransactionState.EMPTY, transactionState())
+      producer.beginTransaction()
+      assertEquals(TransactionState.EMPTY, transactionState())
+
+      producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+        tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+      producer.flush()
+      assertEquals(TransactionState.ONGOING, transactionState())
+      producer.commitTransaction()
+
+
+      val transactionResult = 
client.describeTransactions(Collections.singleton(transactionId))
+        .description(transactionId).get()
+      assertEquals(findCoordinatorIdByTransactionId(transactionId), 
transactionResult.coordinatorId())
+      assertEquals(0, transactionResult.producerId())
+      assertEquals(0, transactionResult.producerEpoch())
+      assertEquals(1, transactionResult.topicPartitions().size())
+      assertEquals(tp, transactionResult.topicPartitions().asScala.head)
+
+
+      val state = transactionState()
+      // Either PREPARE_COMMIT or COMPLETE_COMMIT is expected
+      assertTrue(state == TransactionState.PREPARE_COMMIT || state == 
TransactionState.COMPLETE_COMMIT)
+      // producer commit transaction, but maybe transaction coordinator has 
not been submitted mark msg
+      // so we start up a consumer and consume the expected number of msg, to 
ensure transaction committed
+      consumeToExpectedNumber(1)
+      assertEquals(TransactionState.COMPLETE_COMMIT, transactionState())
+    } finally producer.close()
+
+    // abort case
+    transactionId = "foo2"
+    val abortProducer = TestUtils.createTransactionalProducer(transactionId, 
brokers)
+    try {
+      // init, the transaction is not begin, so 
TransactionalIdNotFoundException is expected
+      val exception = assertThrows(classOf[ExecutionException], () => 
transactionState())
+      
assertTrue(exception.getCause.isInstanceOf[TransactionalIdNotFoundException])
+
+      abortProducer.initTransactions()
+      assertEquals(TransactionState.EMPTY, transactionState())
+      abortProducer.beginTransaction()
+      assertEquals(TransactionState.EMPTY, transactionState())
+
+      val transactionResult = 
client.describeTransactions(Collections.singleton(transactionId))
+        .description(transactionId).get()
+      assertEquals(findCoordinatorIdByTransactionId(transactionId), 
transactionResult.coordinatorId())
+      assertEquals(0, transactionResult.topicPartitions().size())
+
+      abortProducer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+        tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+      abortProducer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+        tp.topic(), tp.partition(), "k2".getBytes, "v2".getBytes()))
+      abortProducer.flush()
+
+      val transactionSendMsgResult = 
client.describeTransactions(Collections.singleton(transactionId))
+        .description(transactionId).get()
+      assertEquals(findCoordinatorIdByTransactionId(transactionId), 
transactionSendMsgResult.coordinatorId())
+      assertEquals(1, transactionSendMsgResult.topicPartitions().size())
+      assertEquals(tp, transactionSendMsgResult.topicPartitions().asScala.head)
+
+      assertEquals(TransactionState.ONGOING, transactionState())
+      abortProducer.abortTransaction()
+      val state = transactionState()
+      assertTrue(state == TransactionState.PREPARE_ABORT || state == 
TransactionState.COMPLETE_ABORT)
+      // producer commit transaction, but maybe transaction coordinator has 
not been submitted mark msg
+      // so we start up a consumer and consume the expected number of msg, to 
ensure transaction committed
+      consumeToExpectedNumber(1)
+      assertEquals(TransactionState.COMPLETE_ABORT, transactionState())
+    } finally abortProducer.close()
+  }
+
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeTransactionsTimeoutMs(quorum: String): Unit = {
+    client = createAdminClient
+    val tp = new TopicPartition("topic1", 0)
+    client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1, 
1.toShort))).all().get()
+
+    val transactionId = "foo"
+
+    val producer = TestUtils.createTransactionalProducer(transactionId, 
brokers)
+    try {
+      producer.initTransactions()
+      producer.beginTransaction()
+
+      producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+        tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+      producer.flush()
+      producer.commitTransaction()
+
+
+      val exception = assertThrows(classOf[ExecutionException], () => {
+        val timeoutOptions = new DescribeTransactionsOptions()
+        timeoutOptions.timeoutMs(0)
+        client.describeTransactions(Collections.singleton(transactionId), 
timeoutOptions)
+          .description(transactionId).get()
+      })
+      assertInstanceOf(classOf[TimeoutException], exception.getCause)
+    } finally producer.close()
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testAbortTransactionTimeout(quorum: String): Unit = {
+    client = createAdminClient
+    val tp = new TopicPartition("topic1", 0)
+    client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 1, 
1.toShort))).all().get()
+
+    val producer = TestUtils.createTransactionalProducer("foo", brokers)
+    try {
+      producer.initTransactions()
+      producer.beginTransaction()
+      producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+        tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+      producer.flush()
+      producer.commitTransaction()
+
+      producer.beginTransaction()
+      producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+        tp.topic(), tp.partition(), "k2".getBytes, "v2".getBytes()))
+      producer.flush()
+
+      val transactionalProducer = 
client.describeProducers(Collections.singletonList(tp))
+        
.partitionResult(tp).get().activeProducers().asScala.minBy(_.producerId())
+
+      val exception = assertThrows(classOf[ExecutionException], () => {
+        val options = new AbortTransactionOptions()
+        options.timeoutMs(0)
+        client.abortTransaction(
+          new AbortTransactionSpec(tp,
+            transactionalProducer.producerId(),
+            transactionalProducer.producerEpoch().toShort,
+            transactionalProducer.coordinatorEpoch().getAsInt), 
options).all().get()
+      })
+      assertInstanceOf(classOf[TimeoutException], exception.getCause)
+    } finally producer.close()
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testListTransactions(quorum: String): Unit = {
+    val tp = new TopicPartition("topic1", 0)
+    def createTransactionList(): Unit = {
+      client = createAdminClient
+      client.createTopics(Collections.singletonList(new NewTopic(tp.topic(), 
1, 1.toShort))).all().get()
+
+      def consumeToExpectedNumber = (expectedNumber: Int) => {
+        val configs = new util.HashMap[String, Object]()
+        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
plaintextBootstrapServers(brokers))
+        configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, 
IsolationLevel.READ_COMMITTED.toString)
+        val consumer = new KafkaConsumer(configs, new ByteArrayDeserializer, 
new ByteArrayDeserializer)
+        try {
+          consumer.assign(Collections.singleton(tp))
+          consumer.seekToBeginning(Collections.singleton(tp))
+          var consumeNum = 0
+          while (consumeNum < expectedNumber) {
+            val records = consumer.poll(time.Duration.ofMillis(100))
+            consumeNum += records.count()
+          }
+        } finally consumer.close()
+      }
+
+      val producer = TestUtils.createTransactionalProducer("foo", brokers)
+      try {
+        producer.initTransactions()
+        producer.beginTransaction()
+        producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+          tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+        producer.flush()
+        producer.commitTransaction()
+      } finally producer.close()
+
+      val producer2 = TestUtils.createTransactionalProducer("foo2", brokers)
+      try {
+        producer2.initTransactions()
+        producer2.beginTransaction()
+        producer2.send(new ProducerRecord[Array[Byte], Array[Byte]](
+          tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+        producer2.flush()
+        producer2.abortTransaction()
+      } finally producer2.close()
+
+      val producer3 = TestUtils.createTransactionalProducer("foo3", brokers)
+      try {
+        producer3.initTransactions()
+        producer3.beginTransaction()
+        producer3.send(new ProducerRecord[Array[Byte], Array[Byte]](
+          tp.topic(), tp.partition(), "k1".getBytes, "v1".getBytes()))
+        producer3.flush()
+        producer3.commitTransaction()
+      } finally producer3.close()
+
+      consumeToExpectedNumber(2)
+    }
+
+    createTransactionList()
+
+    val result = client.listTransactions().all().get()
+    assertEquals(3, result.size())
+
+    var options = new ListTransactionsOptions()
+    val commitStates: util.Collection[TransactionState] = new 
util.ArrayList[TransactionState]()
+    commitStates.add(TransactionState.COMPLETE_COMMIT)
+    options.filterStates(commitStates)
+    assertEquals(2, client.listTransactions(options).all().get().size())
+
+    options = new ListTransactionsOptions()
+    val abortStates: util.Collection[TransactionState] = new 
util.ArrayList[TransactionState]()
+    abortStates.add(TransactionState.COMPLETE_ABORT)
+    options.filterStates(abortStates)
+    assertEquals(1, client.listTransactions(options).all().get().size())
+
+    options = new ListTransactionsOptions()
+    val producerIds: util.Collection[java.lang.Long] = new 
util.ArrayList[java.lang.Long]()
+    producerIds.add(0L)
+    options.filterProducerIds(producerIds)
+    assertEquals(1, client.listTransactions(options).all().get().size())
+
+    // ensure all transaction's txnStartTimestamp >= 500
+    options = new ListTransactionsOptions()
+    Thread.sleep(501)
+    options.filterOnDuration(500)
+    assertEquals(3, client.listTransactions(options).all().get().size())

Review Comment:
   ```scala
   assertEquals(3, client.listTransactions(new 
ListTransactionsOptions().filterOnDuration(500)).all().get().size())
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to