chickenchickenlove commented on code in PR #21469:
URL: https://github.com/apache/kafka/pull/21469#discussion_r2871106781


##########
core/src/test/scala/integration/kafka/api/TransactionsTest.scala:
##########
@@ -695,6 +696,122 @@ class TransactionsTest extends IntegrationTestHarness {
     assertThrows(classOf[IllegalStateException], () => 
producer.initTransactions())
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
+  
@MethodSource(Array("getTestGroupProtocolParametersConsumerGroupProtocolOnly"))
+  def testRecoveryFromEpochOverflow(groupProtocol: String): Unit = {
+    // We could encounter a bug (see 
https://issues.apache.org/jira/browse/KAFKA-20090)
+    // that only reproduces when epoch gets to Short.MaxValue - 1 and 
transaction is
+    // aborted on timeout.
+    val transactionalId = "test-overflow"
+    var producer = createTransactionalProducer(transactionalId, 
transactionTimeoutMs = 500)
+    val abortedRecord = new ProducerRecord[Array[Byte], Array[Byte]](topic1, 
0, "key".getBytes, "aborted".getBytes)
+
+    // Create a transaction, produce one record, and abort
+    producer.initTransactions()
+    producer.beginTransaction()
+    producer.send(abortedRecord)
+    producer.abortTransaction()
+    producer.close()
+
+    // Find the transaction coordinator partition for this transactional ID
+    val adminClient = createAdminClient()
+    try {
+      val txnDescription = 
adminClient.describeTransactions(java.util.List.of(transactionalId))
+        .description(transactionalId).get()
+      val coordinatorId = txnDescription.coordinatorId()
+
+      // Access the transaction coordinator and update the epoch to 
Short.MaxValue - 2
+      val coordinatorBroker = brokers.find(_.config.brokerId == 
coordinatorId).get
+      val txnCoordinator = 
coordinatorBroker.asInstanceOf[kafka.server.BrokerServer].transactionCoordinator
+
+      // Get the transaction metadata and update the epoch close to 
Short.MaxValue
+      // to trigger the overflow scenario. We'll set it high enough that 
subsequent
+      // operations will cause it to reach Short.MaxValue - 1 before the 
timeout.
+      
txnCoordinator.transactionManager.getTransactionState(transactionalId).foreach 
{ txnMetadataOpt =>
+        txnMetadataOpt.foreach { epochAndMetadata =>
+          epochAndMetadata.transactionMetadata.inLock(() => {
+            
epochAndMetadata.transactionMetadata.setProducerEpoch((Short.MaxValue - 
2).toShort)
+            null // inLock expects a Supplier that returns a value
+          })
+        }
+      }
+    } finally {
+      adminClient.close()
+    }
+
+    // Re-initialize the producer which will bump epoch
+    producer = createTransactionalProducer(transactionalId, 
transactionTimeoutMs = 500)
+    producer.initTransactions()
+
+    // Start a transaction
+    producer.beginTransaction()
+    // Produce one record and wait for it to complete
+    producer.send(abortedRecord).get()
+    producer.flush()
+
+    // Check and assert that epoch of the transaction is Short.MaxValue - 1 
(before timeout)
+    val adminClient2 = createAdminClient()
+    try {
+      val coordinatorId2 = 
adminClient2.describeTransactions(java.util.List.of(transactionalId))
+        .description(transactionalId).get().coordinatorId()
+      val coordinatorBroker2 = brokers.find(_.config.brokerId == 
coordinatorId2).get
+      val txnCoordinator2 = 
coordinatorBroker2.asInstanceOf[kafka.server.BrokerServer].transactionCoordinator
+
+      
txnCoordinator2.transactionManager.getTransactionState(transactionalId).foreach 
{ txnMetadataOpt =>
+        txnMetadataOpt.foreach { epochAndMetadata =>
+          val currentEpoch = 
epochAndMetadata.transactionMetadata.producerEpoch()
+          assertEquals((Short.MaxValue - 1).toShort, currentEpoch,
+            s"Expected epoch to be ${Short.MaxValue - 1}, but got 
$currentEpoch")
+        }
+      }
+
+      // Wait until state is complete abort
+      waitUntilTrue(() => {
+        val listResult = adminClient2.listTransactions()
+        val txns = listResult.all().get().asScala
+        txns.exists(txn =>
+          txn.transactionalId() == transactionalId &&
+          txn.state() == TransactionState.COMPLETE_ABORT
+        )
+      }, "Transaction was not aborted on timeout")
+    } finally {
+      adminClient2.close()
+    }
+
+    // Abort, this should be treated as retry of the abort caused by timeout
+    producer.abortTransaction()

Review Comment:
   If the intent is abort-only recovery at `Short.MaxValue`, could we add tests 
to pin that contract?
   1. client commit is rejected (currently `UNKNOWN_SERVER_ERROR`, if 
intentional),
   2. client abort recovers, fence/timeout-triggered abort recovers.
   
   This would make the intended behavior explicit and prevent regressions.
   (It could be over-engineering. If so, please ignore this comment! 🙇‍♂️ )
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to