chickenchickenlove commented on code in PR #21469:
URL: https://github.com/apache/kafka/pull/21469#discussion_r2871106781
##########
core/src/test/scala/integration/kafka/api/TransactionsTest.scala:
##########
@@ -695,6 +696,122 @@ class TransactionsTest extends IntegrationTestHarness {
assertThrows(classOf[IllegalStateException], () =>
producer.initTransactions())
}
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
+
@MethodSource(Array("getTestGroupProtocolParametersConsumerGroupProtocolOnly"))
+ def testRecoveryFromEpochOverflow(groupProtocol: String): Unit = {
+ // We could encounter a bug (see
https://issues.apache.org/jira/browse/KAFKA-20090)
+ // that only reproduces when epoch gets to Short.MaxValue - 1 and
transaction is
+ // aborted on timeout.
+ val transactionalId = "test-overflow"
+ var producer = createTransactionalProducer(transactionalId,
transactionTimeoutMs = 500)
+ val abortedRecord = new ProducerRecord[Array[Byte], Array[Byte]](topic1,
0, "key".getBytes, "aborted".getBytes)
+
+ // Create a transaction, produce one record, and abort
+ producer.initTransactions()
+ producer.beginTransaction()
+ producer.send(abortedRecord)
+ producer.abortTransaction()
+ producer.close()
+
+ // Find the transaction coordinator partition for this transactional ID
+ val adminClient = createAdminClient()
+ try {
+ val txnDescription =
adminClient.describeTransactions(java.util.List.of(transactionalId))
+ .description(transactionalId).get()
+ val coordinatorId = txnDescription.coordinatorId()
+
+ // Access the transaction coordinator and update the epoch to
Short.MaxValue - 2
+ val coordinatorBroker = brokers.find(_.config.brokerId ==
coordinatorId).get
+ val txnCoordinator =
coordinatorBroker.asInstanceOf[kafka.server.BrokerServer].transactionCoordinator
+
+ // Get the transaction metadata and update the epoch close to
Short.MaxValue
+ // to trigger the overflow scenario. We'll set it high enough that
subsequent
+ // operations will cause it to reach Short.MaxValue - 1 before the
timeout.
+
txnCoordinator.transactionManager.getTransactionState(transactionalId).foreach
{ txnMetadataOpt =>
+ txnMetadataOpt.foreach { epochAndMetadata =>
+ epochAndMetadata.transactionMetadata.inLock(() => {
+
epochAndMetadata.transactionMetadata.setProducerEpoch((Short.MaxValue -
2).toShort)
+ null // inLock expects a Supplier that returns a value
+ })
+ }
+ }
+ } finally {
+ adminClient.close()
+ }
+
+ // Re-initialize the producer which will bump epoch
+ producer = createTransactionalProducer(transactionalId,
transactionTimeoutMs = 500)
+ producer.initTransactions()
+
+ // Start a transaction
+ producer.beginTransaction()
+ // Produce one record and wait for it to complete
+ producer.send(abortedRecord).get()
+ producer.flush()
+
+ // Check and assert that epoch of the transaction is Short.MaxValue - 1
(before timeout)
+ val adminClient2 = createAdminClient()
+ try {
+ val coordinatorId2 =
adminClient2.describeTransactions(java.util.List.of(transactionalId))
+ .description(transactionalId).get().coordinatorId()
+ val coordinatorBroker2 = brokers.find(_.config.brokerId ==
coordinatorId2).get
+ val txnCoordinator2 =
coordinatorBroker2.asInstanceOf[kafka.server.BrokerServer].transactionCoordinator
+
+
txnCoordinator2.transactionManager.getTransactionState(transactionalId).foreach
{ txnMetadataOpt =>
+ txnMetadataOpt.foreach { epochAndMetadata =>
+ val currentEpoch =
epochAndMetadata.transactionMetadata.producerEpoch()
+ assertEquals((Short.MaxValue - 1).toShort, currentEpoch,
+ s"Expected epoch to be ${Short.MaxValue - 1}, but got
$currentEpoch")
+ }
+ }
+
+ // Wait until state is complete abort
+ waitUntilTrue(() => {
+ val listResult = adminClient2.listTransactions()
+ val txns = listResult.all().get().asScala
+ txns.exists(txn =>
+ txn.transactionalId() == transactionalId &&
+ txn.state() == TransactionState.COMPLETE_ABORT
+ )
+ }, "Transaction was not aborted on timeout")
+ } finally {
+ adminClient2.close()
+ }
+
+ // Abort, this should be treated as retry of the abort caused by timeout
+ producer.abortTransaction()
Review Comment:
If the intent is abort-only recovery at `Short.MaxValue`, could we add tests
to pin that contract?
1. client commit is rejected (currently `UNKNOWN_SERVER_ERROR`, if
intentional),
2. client abort recovers, fence/timeout-triggered abort recovers.
This would make the intended behavior explicit and prevent regressions.
(It could be over-engineering. If so, please ignore this comment! 🙇♂️ )
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]