jolshan commented on code in PR #19910: URL: https://github.com/apache/kafka/pull/19910#discussion_r2141000080
########## core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala: ########## @@ -1327,6 +1327,72 @@ class TransactionCoordinatorTest { any()) } + @Test + def shouldHandleTimeoutAtEpochOverflowBoundaryCorrectlyTV2(): Unit = { + // Test the scenario where we have an ongoing transaction at epoch 32766 (Short.MaxValue - 1) + // and the producer crashes/times out. This test verifies that the timeout handling + // correctly manages the epoch overflow scenario without causing failures. + + val epochAtMaxBoundary = (Short.MaxValue - 1).toShort // 32766 + val now = time.milliseconds() + + // Create transaction metadata at the epoch boundary that would cause overflow if double-incremented + val txnMetadata = new TransactionMetadata( + transactionalId = transactionalId, + producerId = producerId, + prevProducerId = RecordBatch.NO_PRODUCER_ID, + nextProducerId = RecordBatch.NO_PRODUCER_ID, + producerEpoch = epochAtMaxBoundary, + lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH, + txnTimeoutMs = txnTimeoutMs, + state = TransactionState.ONGOING, + topicPartitions = partitions, + txnStartTimestamp = now, + txnLastUpdateTimestamp = now, + clientTransactionVersion = TV_2 + ) + assertTrue(txnMetadata.isProducerEpochExhausted) + + // Mock the transaction manager to return our test transaction as timed out + when(transactionManager.timedOutTransactions()) + .thenReturn(List(TransactionalIdAndProducerIdEpoch(transactionalId, producerId, epochAtMaxBoundary))) + when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId))) + .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata)))) + when(transactionManager.transactionVersionLevel()).thenReturn(TV_2) + + // Mock the append operation to simulate successful write + when(transactionManager.appendTransactionToLog( + ArgumentMatchers.eq(transactionalId), + ArgumentMatchers.eq(coordinatorEpoch), + any[TxnTransitMetadata], + capturedErrorsCallback.capture(), + any(), + any()) + ).thenAnswer(_ => capturedErrorsCallback.getValue.apply(Errors.NONE)) + + // Track the actual behavior + var callbackInvoked = false + var resultError: Errors = null + var resultProducerId: Long = -1 + var resultEpoch: Short = -1 + + def checkOnEndTransactionComplete(txnIdAndPidEpoch: TransactionalIdAndProducerIdEpoch) + (error: Errors, newProducerId: Long, newProducerEpoch: Short): Unit = { + callbackInvoked = true + resultError = error + resultProducerId = newProducerId + resultEpoch = newProducerEpoch + println(s"Timeout callback invoked with error: $error, producerId: $newProducerId, epoch: $newProducerEpoch") Review Comment: Did we want to verify the callback is invoked and that the values are as expected? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org