chia7712 commented on a change in pull request #10206:
URL: https://github.com/apache/kafka/pull/10206#discussion_r585660313



##########
File path: 
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
##########
@@ -1801,6 +1801,31 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     assertThrows(classOf[TransactionalIdAuthorizationException], () => 
producer.commitTransaction())
   }
 
+  @Test
+  def testListTransactionsAuthorization(): Unit = {
+    createTopic(topic)
+    addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, 
WildcardHost, WRITE, ALLOW)), transactionalIdResource)
+    addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, 
WildcardHost, WRITE, ALLOW)), topicResource)
+
+    // Start a transaction and write to a topic.
+    val producer = buildTransactionalProducer()
+    producer.initTransactions()
+    producer.beginTransaction()
+    producer.send(new ProducerRecord(tp.topic, tp.partition, "1".getBytes, 
"1".getBytes)).get
+
+    // First verify that we can list the transaction
+    val listTransactionsRequest = new ListTransactionsRequest.Builder(new 
ListTransactionsRequestData()).build()
+    val authorizedResponse = 
connectAndReceive[ListTransactionsResponse](listTransactionsRequest)
+    assertEquals(Errors.NONE, 
Errors.forCode(authorizedResponse.data.errorCode))
+    assertEquals(Set(transactionalId), 
authorizedResponse.data.transactionStates.asScala.map(_.transactionalId).toSet)
+
+    // Now revoke authorization and verify that the transaction is no longer 
listable
+    removeAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, 
WildcardHost, WRITE, ALLOW)), transactionalIdResource)

Review comment:
       How about adding test for `DESCRIBE` permission?

##########
File path: 
core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
##########
@@ -476,13 +476,78 @@ class TransactionStateManagerTest {
   }
 
   @Test
-  def shouldReturnNotCooridnatorErrorIfTransactionIdPartitionNotOwned(): Unit 
= {
+  def shouldReturnNotCoordinatorErrorIfTransactionIdPartitionNotOwned(): Unit 
= {
     transactionManager.getTransactionState(transactionalId1).fold(
       err => assertEquals(Errors.NOT_COORDINATOR, err),
       _ => fail(transactionalId1 + "'s transaction state is already in the 
cache")
     )
   }
 
+  @Test
+  def testListTransactionsWithCoordinatorLoadingInProgress(): Unit = {
+    transactionManager.addLoadingPartition(partitionId = 0, coordinatorEpoch = 
15)
+    val listResponse = transactionManager.listTransactionStates(
+      filterProducerIds = Set.empty,
+      filterStateNames = Set.empty
+    )
+    assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS, 
Errors.forCode(listResponse.errorCode))
+  }
+
+  @Test
+  def testListTransactionsFiltering(): Unit = {
+    for (partitionId <- 0 until numPartitions) {
+      transactionManager.addLoadedTransactionsToCache(partitionId, 0, new 
Pool[String, TransactionMetadata]())
+    }
+
+    def putTransaction(
+      transactionalId: String,
+      producerId: Long,
+      state: TransactionState
+    ): Unit = {
+      val txnMetadata = transactionMetadata(transactionalId, producerId, state)
+      
transactionManager.putTransactionStateIfNotExists(txnMetadata).left.toOption.foreach
 { error =>
+        fail(s"Failed to insert transaction $txnMetadata due to error $error")
+      }
+    }
+
+    putTransaction(transactionalId = "t0", producerId = 0, state = Ongoing)
+    putTransaction(transactionalId = "t1", producerId = 1, state = Ongoing)
+    putTransaction(transactionalId = "t2", producerId = 2, state = 
PrepareCommit)
+    putTransaction(transactionalId = "t3", producerId = 3, state = 
PrepareAbort)
+    putTransaction(transactionalId = "t4", producerId = 4, state = 
CompleteCommit)
+    putTransaction(transactionalId = "t5", producerId = 5, state = 
CompleteAbort)
+    putTransaction(transactionalId = "t6", producerId = 6, state = 
CompleteAbort)
+    putTransaction(transactionalId = "t7", producerId = 7, state = 
PrepareEpochFence)
+    // Note that `Dead` transactions are never returned. This is a transient 
state
+    // which is used when the transaction state is in the process of being 
deleted
+    // (whether though expiration or coordinator unloading).
+    putTransaction(transactionalId = "t8", producerId = 8, state = Dead)
+
+    def assertListTransactions(
+      expectedTransactionalIds: Set[String],
+      filterProducerIds: Set[Long] = Set.empty,
+      filterStates: Set[String] = Set.empty
+    ): Unit = {
+      val listResponse = 
transactionManager.listTransactionStates(filterProducerIds, filterStates)
+      assertEquals(Errors.NONE, Errors.forCode(listResponse.errorCode))
+      assertEquals(expectedTransactionalIds, 
listResponse.transactionStates.asScala.map(_.transactionalId).toSet)
+      val expectedUnknownStates = filterStates.filter(state => 
TransactionState.fromName(state).isEmpty)
+      assertEquals(expectedUnknownStates, 
listResponse.unknownStateFilters.asScala.toSet)
+    }
+
+    assertListTransactions(Set("t0", "t1", "t2", "t3", "t4", "t5", "t6", "t7"))
+    assertListTransactions(Set("t0", "t1"), filterStates = Set("Ongoing"))
+    assertListTransactions(Set("t0", "t1"), filterStates = Set("Ongoing", 
"UnknownState"))
+    assertListTransactions(Set("t2", "t4"), filterStates = 
Set("PrepareCommit", "CompleteCommit"))
+    assertListTransactions(Set(), filterStates = Set("UnknownState"))
+    assertListTransactions(Set("t5"), filterProducerIds = Set(5L))
+    assertListTransactions(Set("t5", "t6"), filterProducerIds = Set(5L, 6L, 
8L, 9L))
+    assertListTransactions(Set("t4"), filterProducerIds = Set(4L, 5L), 
filterStates = Set("CompleteCommit"))
+    assertListTransactions(Set("t4", "t5"), filterProducerIds = Set(4L, 5L), 
filterStates = Set("CompleteCommit", "CompleteAbort"))
+    assertListTransactions(Set(), filterProducerIds = Set(3L, 6L), 
filterStates = Set("UnknownState"))
+    assertListTransactions(Set(), filterProducerIds = Set(10L), filterStates = 
Set("CompleteCommit"))
+  }

Review comment:
       How about adding test for `Dead` state? It should always return nothing.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to