chia7712 commented on a change in pull request #10206: URL: https://github.com/apache/kafka/pull/10206#discussion_r585660313
########## File path: core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala ########## @@ -1801,6 +1801,31 @@ class AuthorizerIntegrationTest extends BaseRequestTest { assertThrows(classOf[TransactionalIdAuthorizationException], () => producer.commitTransaction()) } + @Test + def testListTransactionsAuthorization(): Unit = { + createTopic(topic) + addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), transactionalIdResource) + addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), topicResource) + + // Start a transaction and write to a topic. + val producer = buildTransactionalProducer() + producer.initTransactions() + producer.beginTransaction() + producer.send(new ProducerRecord(tp.topic, tp.partition, "1".getBytes, "1".getBytes)).get + + // First verify that we can list the transaction + val listTransactionsRequest = new ListTransactionsRequest.Builder(new ListTransactionsRequestData()).build() + val authorizedResponse = connectAndReceive[ListTransactionsResponse](listTransactionsRequest) + assertEquals(Errors.NONE, Errors.forCode(authorizedResponse.data.errorCode)) + assertEquals(Set(transactionalId), authorizedResponse.data.transactionStates.asScala.map(_.transactionalId).toSet) + + // Now revoke authorization and verify that the transaction is no longer listable + removeAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), transactionalIdResource) Review comment: How about adding test for `DESCRIBE` permission? ########## File path: core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala ########## @@ -476,13 +476,78 @@ class TransactionStateManagerTest { } @Test - def shouldReturnNotCooridnatorErrorIfTransactionIdPartitionNotOwned(): Unit = { + def shouldReturnNotCoordinatorErrorIfTransactionIdPartitionNotOwned(): Unit = { transactionManager.getTransactionState(transactionalId1).fold( err => assertEquals(Errors.NOT_COORDINATOR, err), _ => fail(transactionalId1 + "'s transaction state is already in the cache") ) } + @Test + def testListTransactionsWithCoordinatorLoadingInProgress(): Unit = { + transactionManager.addLoadingPartition(partitionId = 0, coordinatorEpoch = 15) + val listResponse = transactionManager.listTransactionStates( + filterProducerIds = Set.empty, + filterStateNames = Set.empty + ) + assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS, Errors.forCode(listResponse.errorCode)) + } + + @Test + def testListTransactionsFiltering(): Unit = { + for (partitionId <- 0 until numPartitions) { + transactionManager.addLoadedTransactionsToCache(partitionId, 0, new Pool[String, TransactionMetadata]()) + } + + def putTransaction( + transactionalId: String, + producerId: Long, + state: TransactionState + ): Unit = { + val txnMetadata = transactionMetadata(transactionalId, producerId, state) + transactionManager.putTransactionStateIfNotExists(txnMetadata).left.toOption.foreach { error => + fail(s"Failed to insert transaction $txnMetadata due to error $error") + } + } + + putTransaction(transactionalId = "t0", producerId = 0, state = Ongoing) + putTransaction(transactionalId = "t1", producerId = 1, state = Ongoing) + putTransaction(transactionalId = "t2", producerId = 2, state = PrepareCommit) + putTransaction(transactionalId = "t3", producerId = 3, state = PrepareAbort) + putTransaction(transactionalId = "t4", producerId = 4, state = CompleteCommit) + putTransaction(transactionalId = "t5", producerId = 5, state = CompleteAbort) + putTransaction(transactionalId = "t6", producerId = 6, state = CompleteAbort) + putTransaction(transactionalId = "t7", producerId = 7, state = PrepareEpochFence) + // Note that `Dead` transactions are never returned. This is a transient state + // which is used when the transaction state is in the process of being deleted + // (whether though expiration or coordinator unloading). + putTransaction(transactionalId = "t8", producerId = 8, state = Dead) + + def assertListTransactions( + expectedTransactionalIds: Set[String], + filterProducerIds: Set[Long] = Set.empty, + filterStates: Set[String] = Set.empty + ): Unit = { + val listResponse = transactionManager.listTransactionStates(filterProducerIds, filterStates) + assertEquals(Errors.NONE, Errors.forCode(listResponse.errorCode)) + assertEquals(expectedTransactionalIds, listResponse.transactionStates.asScala.map(_.transactionalId).toSet) + val expectedUnknownStates = filterStates.filter(state => TransactionState.fromName(state).isEmpty) + assertEquals(expectedUnknownStates, listResponse.unknownStateFilters.asScala.toSet) + } + + assertListTransactions(Set("t0", "t1", "t2", "t3", "t4", "t5", "t6", "t7")) + assertListTransactions(Set("t0", "t1"), filterStates = Set("Ongoing")) + assertListTransactions(Set("t0", "t1"), filterStates = Set("Ongoing", "UnknownState")) + assertListTransactions(Set("t2", "t4"), filterStates = Set("PrepareCommit", "CompleteCommit")) + assertListTransactions(Set(), filterStates = Set("UnknownState")) + assertListTransactions(Set("t5"), filterProducerIds = Set(5L)) + assertListTransactions(Set("t5", "t6"), filterProducerIds = Set(5L, 6L, 8L, 9L)) + assertListTransactions(Set("t4"), filterProducerIds = Set(4L, 5L), filterStates = Set("CompleteCommit")) + assertListTransactions(Set("t4", "t5"), filterProducerIds = Set(4L, 5L), filterStates = Set("CompleteCommit", "CompleteAbort")) + assertListTransactions(Set(), filterProducerIds = Set(3L, 6L), filterStates = Set("UnknownState")) + assertListTransactions(Set(), filterProducerIds = Set(10L), filterStates = Set("CompleteCommit")) + } Review comment: How about adding test for `Dead` state? It should always return nothing. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org