jolshan commented on code in PR #19910:
URL: https://github.com/apache/kafka/pull/19910#discussion_r2141000080


##########
core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala:
##########
@@ -1327,6 +1327,72 @@ class TransactionCoordinatorTest {
       any())
   }
 
+  @Test
+  def shouldHandleTimeoutAtEpochOverflowBoundaryCorrectlyTV2(): Unit = {
+    // Test the scenario where we have an ongoing transaction at epoch 32766 
(Short.MaxValue - 1)
+    // and the producer crashes/times out. This test verifies that the timeout 
handling
+    // correctly manages the epoch overflow scenario without causing failures.
+
+    val epochAtMaxBoundary = (Short.MaxValue - 1).toShort // 32766
+    val now = time.milliseconds()
+
+    // Create transaction metadata at the epoch boundary that would cause 
overflow if double-incremented
+    val txnMetadata = new TransactionMetadata(
+      transactionalId = transactionalId,
+      producerId = producerId,
+      prevProducerId = RecordBatch.NO_PRODUCER_ID,
+      nextProducerId = RecordBatch.NO_PRODUCER_ID,
+      producerEpoch = epochAtMaxBoundary,
+      lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
+      txnTimeoutMs = txnTimeoutMs,
+      state = TransactionState.ONGOING,
+      topicPartitions = partitions,
+      txnStartTimestamp = now,
+      txnLastUpdateTimestamp = now,
+      clientTransactionVersion = TV_2
+    )
+    assertTrue(txnMetadata.isProducerEpochExhausted)
+
+    // Mock the transaction manager to return our test transaction as timed out
+    when(transactionManager.timedOutTransactions())
+      .thenReturn(List(TransactionalIdAndProducerIdEpoch(transactionalId, 
producerId, epochAtMaxBoundary)))
+    
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
+      .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, 
txnMetadata))))
+    when(transactionManager.transactionVersionLevel()).thenReturn(TV_2)
+
+    // Mock the append operation to simulate successful write
+    when(transactionManager.appendTransactionToLog(
+      ArgumentMatchers.eq(transactionalId),
+      ArgumentMatchers.eq(coordinatorEpoch),
+      any[TxnTransitMetadata],
+      capturedErrorsCallback.capture(),
+      any(),
+      any())
+    ).thenAnswer(_ => capturedErrorsCallback.getValue.apply(Errors.NONE))
+
+    // Track the actual behavior
+    var callbackInvoked = false
+    var resultError: Errors = null
+    var resultProducerId: Long = -1
+    var resultEpoch: Short = -1
+
+    def checkOnEndTransactionComplete(txnIdAndPidEpoch: 
TransactionalIdAndProducerIdEpoch)
+      (error: Errors, newProducerId: Long, newProducerEpoch: Short): Unit = {
+        callbackInvoked = true
+        resultError = error
+        resultProducerId = newProducerId
+        resultEpoch = newProducerEpoch
+        println(s"Timeout callback invoked with error: $error, producerId: 
$newProducerId, epoch: $newProducerEpoch")

Review Comment:
   Did we want to verify the callback is invoked and that the values are as 
expected?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to