artemlivshits commented on code in PR #21735:
URL: https://github.com/apache/kafka/pull/21735#discussion_r2927059806


##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -441,7 +459,13 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
             } else if (txnMetadata.producerId != producerId) {
               Left(Errors.INVALID_PRODUCER_ID_MAPPING)
             } else if (txnMetadata.producerEpoch != producerEpoch) {
-              Left(Errors.PRODUCER_FENCED)
+              // Check if client is using the client-facing epoch 
(nextProducerEpoch) after calling
+              // InitProducerId(keepPreparedTxn=true).  Adding partitions is 
not allowed in this state.

Review Comment:
   This should never happen in proper client implementation -- after using 
initTransactions(true), the client allows only commit / abort, but in case a 
third-party client doesn't follow the spec, we enforce it on the broker.



##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -779,134 +805,165 @@ class TransactionCoordinator(txnConfig: 
TransactionConfig,
           val coordinatorEpoch = epochAndTxnMetadata.coordinatorEpoch
 
           txnMetadata.inLock(() => {
-            producerIdCopy = txnMetadata.producerId
-            producerEpochCopy = txnMetadata.producerEpoch
-            // PrepareEpochFence has slightly different epoch bumping logic so 
don't include it here.
-            // Note that, it can only happen when the current state is Ongoing.
-            isEpochFence = txnMetadata.pendingState.filter(s => s == 
TransactionState.PREPARE_EPOCH_FENCE).isPresent
-            // True if the client retried a request that had overflowed the 
epoch, and a new producer ID is stored in the txnMetadata
-            val retryOnOverflow = !isEpochFence && txnMetadata.prevProducerId 
== producerId &&
-              producerEpoch == Short.MaxValue - 1 && txnMetadata.producerEpoch 
== 0
-            // True if the client retried an endTxn request, and the bumped 
producer epoch is stored in the txnMetadata.
-            val retryOnEpochBump = !isEpochFence && txnMetadata.producerEpoch 
== producerEpoch + 1
-
-            val isValidEpoch = {
-              if (!isEpochFence) {
-                // With transactions V2, state + same epoch is not sufficient 
to determine if a retry transition is valid. If the epoch is the
-                // same it actually indicates the next endTransaction call. 
Instead, we want to check the epoch matches with the epoch in the retry 
conditions.
-                // Return producer fenced even in the cases where the epoch is 
higher and could indicate an invalid state transition.
-                // Use the following criteria to determine if a v2 retry is 
valid:
-                txnMetadata.state match {
-                  case TransactionState.ONGOING | TransactionState.EMPTY | 
TransactionState.DEAD | TransactionState.PREPARE_EPOCH_FENCE =>
-                    producerEpoch == txnMetadata.producerEpoch
-                  case TransactionState.PREPARE_COMMIT | 
TransactionState.PREPARE_ABORT =>
-                    retryOnEpochBump
-                  case TransactionState.COMPLETE_COMMIT | 
TransactionState.COMPLETE_ABORT =>
-                    retryOnEpochBump || retryOnOverflow || producerEpoch == 
txnMetadata.producerEpoch
+            // These checks are performed first so that we don't have to deal 
with intermediate states
+            // (including PrepareCommit and PrepareAbort) in already complex 
state machine.
+            if (txnMetadata.pendingTransitionInProgress && 
!txnMetadata.pendingState.filter(s => s == 
TransactionState.PREPARE_EPOCH_FENCE).isPresent) {
+              Left(Errors.CONCURRENT_TRANSACTIONS)
+            } else if (txnMetadata.state == TransactionState.PREPARE_COMMIT || 
txnMetadata.state == TransactionState.PREPARE_ABORT) {
+              Left(Errors.CONCURRENT_TRANSACTIONS)

Review Comment:
   This is a minor semantics change -- to avoid complicated logic of epoch 
comparison in "transient" states, we just return CONCURRENT_TRANSACTIONS, and 
skip the complexity.  The semantical difference is that now the clients won't 
get PRODUCE_FENCED error until the the transaction transitions to a "permanent" 
state.  



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -340,7 +340,10 @@ synchronized TransactionalRequestResult 
initializeTransactions(
                     .setEnable2Pc(enable2PC)
                     .setKeepPreparedTxn(keepPreparedTxn);
 
-            InitProducerIdHandler handler = new InitProducerIdHandler(new 
InitProducerIdRequest.Builder(requestData),
+            // Use unstable API version when 2PC features are enabled 
(requires InitProducerId v6)
+            boolean enableUnstableLastVersion = enable2PC || keepPreparedTxn;
+            InitProducerIdHandler handler = new InitProducerIdHandler(

Review Comment:
   Apparently we were missing ability to allow client to use unstable versions, 
this is needed for unit tests to work.  The functionality follows the pattern 
we have in other places.



##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -779,134 +805,165 @@ class TransactionCoordinator(txnConfig: 
TransactionConfig,
           val coordinatorEpoch = epochAndTxnMetadata.coordinatorEpoch
 
           txnMetadata.inLock(() => {
-            producerIdCopy = txnMetadata.producerId
-            producerEpochCopy = txnMetadata.producerEpoch
-            // PrepareEpochFence has slightly different epoch bumping logic so 
don't include it here.
-            // Note that, it can only happen when the current state is Ongoing.
-            isEpochFence = txnMetadata.pendingState.filter(s => s == 
TransactionState.PREPARE_EPOCH_FENCE).isPresent
-            // True if the client retried a request that had overflowed the 
epoch, and a new producer ID is stored in the txnMetadata
-            val retryOnOverflow = !isEpochFence && txnMetadata.prevProducerId 
== producerId &&
-              producerEpoch == Short.MaxValue - 1 && txnMetadata.producerEpoch 
== 0
-            // True if the client retried an endTxn request, and the bumped 
producer epoch is stored in the txnMetadata.
-            val retryOnEpochBump = !isEpochFence && txnMetadata.producerEpoch 
== producerEpoch + 1
-
-            val isValidEpoch = {
-              if (!isEpochFence) {
-                // With transactions V2, state + same epoch is not sufficient 
to determine if a retry transition is valid. If the epoch is the
-                // same it actually indicates the next endTransaction call. 
Instead, we want to check the epoch matches with the epoch in the retry 
conditions.
-                // Return producer fenced even in the cases where the epoch is 
higher and could indicate an invalid state transition.
-                // Use the following criteria to determine if a v2 retry is 
valid:
-                txnMetadata.state match {
-                  case TransactionState.ONGOING | TransactionState.EMPTY | 
TransactionState.DEAD | TransactionState.PREPARE_EPOCH_FENCE =>
-                    producerEpoch == txnMetadata.producerEpoch
-                  case TransactionState.PREPARE_COMMIT | 
TransactionState.PREPARE_ABORT =>
-                    retryOnEpochBump
-                  case TransactionState.COMPLETE_COMMIT | 
TransactionState.COMPLETE_ABORT =>
-                    retryOnEpochBump || retryOnOverflow || producerEpoch == 
txnMetadata.producerEpoch
+            // These checks are performed first so that we don't have to deal 
with intermediate states
+            // (including PrepareCommit and PrepareAbort) in already complex 
state machine.
+            if (txnMetadata.pendingTransitionInProgress && 
!txnMetadata.pendingState.filter(s => s == 
TransactionState.PREPARE_EPOCH_FENCE).isPresent) {
+              Left(Errors.CONCURRENT_TRANSACTIONS)
+            } else if (txnMetadata.state == TransactionState.PREPARE_COMMIT || 
txnMetadata.state == TransactionState.PREPARE_ABORT) {
+              Left(Errors.CONCURRENT_TRANSACTIONS)
+            } else {
+              // Copy the data under lock, this is going to be used if we want 
to return success
+              // on retrying commit / abort.
+              producerIdCopy = txnMetadata.producerId
+              producerEpochCopy = txnMetadata.producerEpoch
+
+              // PrepareEpochFence has slightly different epoch bumping logic 
so don't include it here.
+              // Note that, it can only happen when the current state is 
Ongoing.
+              isEpochFence = txnMetadata.pendingState.filter(s => s == 
TransactionState.PREPARE_EPOCH_FENCE).isPresent
+
+              // Calculate retry conditions, note that they are going to be 
used only
+              // in CompleteCommit / CompleteAbort states, so we can use the 
semantics
+              // of the producerId / epoch that's applicable in those states.
+              // Also note that the retry-on-overflow condition happens only 
when
+              // overflow happens during last commit / abort, overflows during
+              // InitProducerId(keepPreparedTxn=true) go through 
retry-on-epoch-bump
+              // path.  Example:
+              //  1. State ONGOING, {pid, epoch} = {42, 10}, {nextPid, 
nextEpoch} = {73, 85} -- the client-facing epoch has overflown
+              //  2. Commit transitions to {pid, epoch} = {73, 86} and the 
client retries with {73, 85} -- retryOnEpochBump
+              // Another example where we overflow during commit after going 
through overflow with keepPreparedTxn = true
+              //  1. State ONGOING, {pid, epoch} = {42, 10}, {nextPid, 
nextEpoch} = {73, 32766}
+              //  2. Commit transitions to {pid, epoch} = {85, 0}, prevPid = 
73 and the client retries with {73, 32766} -- retryOnOverflow
+
+              // True if the client retried a request that had overflowed the 
epoch, and a new producer ID is stored in the txnMetadata
+              val retryOnOverflow = !isEpochFence && 
txnMetadata.prevProducerId == producerId &&
+                producerEpoch == Short.MaxValue - 1 && 
txnMetadata.producerEpoch == 0
+              // True if the client retried an endTxn request, and the bumped 
producer epoch is stored in the txnMetadata.
+              val retryOnEpochBump = !isEpochFence && 
txnMetadata.producerEpoch == producerEpoch + 1
+
+              val isValidEpoch = {
+                if (!isEpochFence) {
+                  // With transactions V2, state + same epoch is not 
sufficient to determine if a retry transition is valid. If the epoch is the
+                  // same it actually indicates the next endTransaction call. 
Instead, we want to check the epoch matches with the epoch in the retry 
conditions.
+                  // Return producer fenced even in the cases where the epoch 
is higher and could indicate an invalid state transition.
+                  // Use the following criteria to determine if a v2 retry is 
valid:
+                  txnMetadata.state match {
+                    case TransactionState.ONGOING | TransactionState.EMPTY | 
TransactionState.DEAD | TransactionState.PREPARE_EPOCH_FENCE =>
+                      producerEpoch == txnMetadata.clientProducerEpoch
+                    case TransactionState.PREPARE_COMMIT | 
TransactionState.PREPARE_ABORT =>
+                      retryOnEpochBump
+                    case TransactionState.COMPLETE_COMMIT | 
TransactionState.COMPLETE_ABORT =>
+                      retryOnEpochBump || retryOnOverflow || producerEpoch == 
txnMetadata.producerEpoch
+                  }
+                } else {
+                  // If the epoch is going to be fenced, it bumps the epoch 
differently with TV2.
+                  (!isFromClient || producerEpoch == 
txnMetadata.clientProducerEpoch) && producerEpoch >= 
txnMetadata.clientProducerEpoch
                 }
-              } else {
-                // If the epoch is going to be fenced, it bumps the epoch 
differently with TV2.
-                (!isFromClient || producerEpoch == txnMetadata.producerEpoch) 
&& producerEpoch >= txnMetadata.producerEpoch
               }
-            }
 
-            val isRetry = retryOnEpochBump || retryOnOverflow
-
-            def generateTxnTransitMetadataForTxnCompletion(nextState: 
TransactionState, noPartitionAdded: Boolean): ApiResult[(Int, 
TxnTransitMetadata)] = {
-              // EndTxn completion on TV2 bumps epoch, so rotate producer ID 
whenever the current epoch is exhausted.
-              // This must also apply to the epoch-fence path.
-              val nextProducerIdOrErrors =
-                if (txnMetadata.isProducerEpochExhausted) {
-                  try {
-                    Right(producerIdManager.generateProducerId())
-                  } catch {
-                    case e: Exception => Left(Errors.forException(e))
+              val isRetry = retryOnEpochBump || retryOnOverflow
+
+              def generateTxnTransitMetadataForTxnCompletion(nextState: 
TransactionState, noPartitionAdded: Boolean): ApiResult[(Int, 
TxnTransitMetadata)] = {
+                // EndTxn completion on TV2 bumps epoch, so rotate producer ID 
whenever the current epoch is exhausted.
+                // This must also apply to the epoch-fence path.
+                val nextProducerIdOrErrors =
+                  if (txnMetadata.isProducerEpochExhausted) {
+                    try {
+                      Right(producerIdManager.generateProducerId())
+                    } catch {
+                      case e: Exception => Left(Errors.forException(e))
+                    }
+                  } else {
+                    Right(txnMetadata.nextProducerId)
                   }
+
+                // If the next producer epoch is set (which can happen if we 
used InitProducerId(keepPreparedTxn))
+                // then we also need to bump this epoch -- we want to fence 
requests that were issued with that
+                // epoch after the transaction is complete.  Consider this 
example:
+                //  1. (initial state): Ongoing, producerId = 42, 
producerEpoch = 100, nextProducerId = -1, nextProducerEpoch = -1
+                //  2. InitProducerId(keepPreparedTxn): Ongoing, producerId = 
42, producerEpoch = 100, nextProducerId = 42, nextProducerEpoch = 101
+                //  3. CommitTxn: timed out (but actually delayed)
+                //  4. CommitTxn + complete: CompleteCommit, producerId = 42, 
producerEpoch = 101, nextProducerId = -1, nextProducerEpoch = -1
+                //  5. Start new transaction, do some actions
+                //  6. Delayed CommitTxn from step 3 arrives and commits 
partially completed transaction.
+                // In order to prevent this problem, we need to bump the epoch 
at step 4 (so it becomes 102) even though we
+                // already bumped it at step 2.
+                // Note that we can arrive at this scenario when a client did 
a few keep-prepared calls and then
+                // decided to do one without keep-prepared (e.g. to 
force-terminate), so this logic applies
+                // to isEpochFence as well.
+                val nextProducerEpoch = if (txnMetadata.hasNextProducerEpoch) {
+                  if (txnMetadata.isProducerEpochExhausted) 0.toShort else 
(txnMetadata.nextProducerEpoch + 1).toShort
                 } else {
-                  Right(RecordBatch.NO_PRODUCER_ID)
+                  RecordBatch.NO_PRODUCER_EPOCH
                 }
 
-              if (nextState == TransactionState.PREPARE_ABORT && isEpochFence) 
{
-                // We should clear the pending state to make way for the 
transition to PrepareAbort
-                txnMetadata.pendingState(util.Optional.empty())
-                // For TV2+, don't manually set the epoch - let 
prepareAbortOrCommit handle it naturally.
-              }
+                if (nextState == TransactionState.PREPARE_ABORT && 
isEpochFence) {
+                  // We should clear the pending state to make way for the 
transition to PrepareAbort
+                  txnMetadata.pendingState(util.Optional.empty())
+                  // For TV2+, don't manually set the epoch - let 
prepareAbortOrCommit handle it naturally.
+                }
 
-              nextProducerIdOrErrors.flatMap {
-                nextProducerId =>
-                  Right(coordinatorEpoch, 
txnMetadata.prepareAbortOrCommit(nextState, clientTransactionVersion, 
nextProducerId.asInstanceOf[Long], time.milliseconds(), noPartitionAdded))
+                nextProducerIdOrErrors.flatMap {
+                  nextProducerId =>
+                    Right(coordinatorEpoch, 
txnMetadata.prepareAbortOrCommit(nextState, clientTransactionVersion, 
nextProducerId, nextProducerEpoch, time.milliseconds(), noPartitionAdded))
+                }
               }
-            }
-
-            if (txnMetadata.pendingTransitionInProgress && 
txnMetadata.pendingState.get != TransactionState.PREPARE_EPOCH_FENCE) {
-              // This check is performed first so that the pending transition 
can complete before the next checks.
-              // With TV2, we may be transitioning over a producer epoch 
overflow, and the producer may be using the
-              // new producer ID that is still only in pending state.
-              Left(Errors.CONCURRENT_TRANSACTIONS)
-            } else if (txnMetadata.producerId != producerId && 
!retryOnOverflow)
-              Left(Errors.INVALID_PRODUCER_ID_MAPPING)
-            else if (!isValidEpoch)
-              Left(Errors.PRODUCER_FENCED)
-            else txnMetadata.state match {
-              case TransactionState.ONGOING =>
-                val nextState = if (txnMarkerResult == 
TransactionResult.COMMIT)
-                  TransactionState.PREPARE_COMMIT
-                else
-                  TransactionState.PREPARE_ABORT
 
-                generateTxnTransitMetadataForTxnCompletion(nextState, false)
-              case TransactionState.COMPLETE_COMMIT =>
-                if (txnMarkerResult == TransactionResult.COMMIT) {
-                  if (isRetry)
-                    Left(Errors.NONE)
+              if (txnMetadata.clientProducerId != producerId && 
!retryOnOverflow)
+                Left(Errors.INVALID_PRODUCER_ID_MAPPING)
+              else if (!isValidEpoch)
+                Left(Errors.PRODUCER_FENCED)
+              else txnMetadata.state match {
+                case TransactionState.ONGOING =>
+                  val nextState = if (txnMarkerResult == 
TransactionResult.COMMIT)
+                    TransactionState.PREPARE_COMMIT
                   else
+                    TransactionState.PREPARE_ABORT
+
+                  generateTxnTransitMetadataForTxnCompletion(nextState, 
noPartitionAdded = false)
+                case TransactionState.COMPLETE_COMMIT =>
+                  if (txnMarkerResult == TransactionResult.COMMIT) {
+                    if (isRetry)
+                      Left(Errors.NONE)
+                    else
+                      logInvalidStateTransitionAndReturnError(transactionalId, 
txnMetadata.state, txnMarkerResult)
+                  } else {
+                    // Abort.
+                    if (isRetry)
+                      logInvalidStateTransitionAndReturnError(transactionalId, 
txnMetadata.state, txnMarkerResult)
+                    else
+                      
generateTxnTransitMetadataForTxnCompletion(TransactionState.PREPARE_ABORT, 
noPartitionAdded = true)
+                  }
+                case TransactionState.COMPLETE_ABORT =>
+                  if (txnMarkerResult == TransactionResult.ABORT) {
+                    if (isRetry)
+                      Left(Errors.NONE)
+                    else
+                      
generateTxnTransitMetadataForTxnCompletion(TransactionState.PREPARE_ABORT, 
noPartitionAdded = true)
+                  } else {
+                    // Commit.
                     logInvalidStateTransitionAndReturnError(transactionalId, 
txnMetadata.state, txnMarkerResult)
-                } else {
-                  // Abort.
-                  if (isRetry)
+                  }
+                case TransactionState.EMPTY =>
+                  if (txnMarkerResult == TransactionResult.ABORT) {
+                    
generateTxnTransitMetadataForTxnCompletion(TransactionState.PREPARE_ABORT, 
noPartitionAdded = true)
+                  } else {
                     logInvalidStateTransitionAndReturnError(transactionalId, 
txnMetadata.state, txnMarkerResult)
-                  else
-                    
generateTxnTransitMetadataForTxnCompletion(TransactionState.PREPARE_ABORT, true)
-                }
-              case TransactionState.COMPLETE_ABORT =>
-                if (txnMarkerResult == TransactionResult.ABORT) {
-                  if (isRetry)
-                    Left(Errors.NONE)
-                  else
-                    
generateTxnTransitMetadataForTxnCompletion(TransactionState.PREPARE_ABORT, true)
-                } else {
-                  // Commit.
-                  logInvalidStateTransitionAndReturnError(transactionalId, 
txnMetadata.state, txnMarkerResult)
-                }
-              case TransactionState.PREPARE_COMMIT =>

Review Comment:
   These checks are now done upfront.



##########
core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala:
##########
@@ -879,8 +879,8 @@ class TransactionStateManagerTest {
     // is left at it is. If the transactional id is never reused, the 
TransactionMetadata
     // will be expired and it should succeed.
     val timestamp = time.milliseconds()
-    val txnMetadata = new TransactionMetadata(transactionalId, 1, 
RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH,
-      RecordBatch.NO_PRODUCER_EPOCH, transactionTimeoutMs, 
TransactionState.EMPTY, util.Set.of, timestamp, timestamp, TV_0)
+    val txnMetadata = new TransactionMetadata(transactionalId, 1, 
RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_ID, 0,
+      RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_PRODUCER_EPOCH, 
transactionTimeoutMs, TransactionState.EMPTY, util.Set.of, timestamp, 
timestamp, TV_0)

Review Comment:
   Only mechanical changes here.



##########
core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMetadataTest.scala:
##########
@@ -51,6 +51,7 @@ class TransactionMetadataTest {
       RecordBatch.NO_PRODUCER_ID,
       producerEpoch,
       RecordBatch.NO_PRODUCER_EPOCH,
+      RecordBatch.NO_PRODUCER_EPOCH,

Review Comment:
   Only mechanical changes here.



##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -779,134 +805,165 @@ class TransactionCoordinator(txnConfig: 
TransactionConfig,
           val coordinatorEpoch = epochAndTxnMetadata.coordinatorEpoch
 
           txnMetadata.inLock(() => {
-            producerIdCopy = txnMetadata.producerId
-            producerEpochCopy = txnMetadata.producerEpoch
-            // PrepareEpochFence has slightly different epoch bumping logic so 
don't include it here.
-            // Note that, it can only happen when the current state is Ongoing.
-            isEpochFence = txnMetadata.pendingState.filter(s => s == 
TransactionState.PREPARE_EPOCH_FENCE).isPresent
-            // True if the client retried a request that had overflowed the 
epoch, and a new producer ID is stored in the txnMetadata
-            val retryOnOverflow = !isEpochFence && txnMetadata.prevProducerId 
== producerId &&
-              producerEpoch == Short.MaxValue - 1 && txnMetadata.producerEpoch 
== 0
-            // True if the client retried an endTxn request, and the bumped 
producer epoch is stored in the txnMetadata.
-            val retryOnEpochBump = !isEpochFence && txnMetadata.producerEpoch 
== producerEpoch + 1
-
-            val isValidEpoch = {
-              if (!isEpochFence) {
-                // With transactions V2, state + same epoch is not sufficient 
to determine if a retry transition is valid. If the epoch is the
-                // same it actually indicates the next endTransaction call. 
Instead, we want to check the epoch matches with the epoch in the retry 
conditions.
-                // Return producer fenced even in the cases where the epoch is 
higher and could indicate an invalid state transition.
-                // Use the following criteria to determine if a v2 retry is 
valid:
-                txnMetadata.state match {
-                  case TransactionState.ONGOING | TransactionState.EMPTY | 
TransactionState.DEAD | TransactionState.PREPARE_EPOCH_FENCE =>
-                    producerEpoch == txnMetadata.producerEpoch
-                  case TransactionState.PREPARE_COMMIT | 
TransactionState.PREPARE_ABORT =>
-                    retryOnEpochBump
-                  case TransactionState.COMPLETE_COMMIT | 
TransactionState.COMPLETE_ABORT =>
-                    retryOnEpochBump || retryOnOverflow || producerEpoch == 
txnMetadata.producerEpoch
+            // These checks are performed first so that we don't have to deal 
with intermediate states
+            // (including PrepareCommit and PrepareAbort) in already complex 
state machine.
+            if (txnMetadata.pendingTransitionInProgress && 
!txnMetadata.pendingState.filter(s => s == 
TransactionState.PREPARE_EPOCH_FENCE).isPresent) {
+              Left(Errors.CONCURRENT_TRANSACTIONS)
+            } else if (txnMetadata.state == TransactionState.PREPARE_COMMIT || 
txnMetadata.state == TransactionState.PREPARE_ABORT) {
+              Left(Errors.CONCURRENT_TRANSACTIONS)
+            } else {
+              // Copy the data under lock, this is going to be used if we want 
to return success
+              // on retrying commit / abort.
+              producerIdCopy = txnMetadata.producerId
+              producerEpochCopy = txnMetadata.producerEpoch
+
+              // PrepareEpochFence has slightly different epoch bumping logic 
so don't include it here.
+              // Note that, it can only happen when the current state is 
Ongoing.
+              isEpochFence = txnMetadata.pendingState.filter(s => s == 
TransactionState.PREPARE_EPOCH_FENCE).isPresent
+
+              // Calculate retry conditions, note that they are going to be 
used only
+              // in CompleteCommit / CompleteAbort states, so we can use the 
semantics
+              // of the producerId / epoch that's applicable in those states.
+              // Also note that the retry-on-overflow condition happens only 
when
+              // overflow happens during last commit / abort, overflows during
+              // InitProducerId(keepPreparedTxn=true) go through 
retry-on-epoch-bump
+              // path.  Example:
+              //  1. State ONGOING, {pid, epoch} = {42, 10}, {nextPid, 
nextEpoch} = {73, 85} -- the client-facing epoch has overflown
+              //  2. Commit transitions to {pid, epoch} = {73, 86} and the 
client retries with {73, 85} -- retryOnEpochBump
+              // Another example where we overflow during commit after going 
through overflow with keepPreparedTxn = true
+              //  1. State ONGOING, {pid, epoch} = {42, 10}, {nextPid, 
nextEpoch} = {73, 32766}
+              //  2. Commit transitions to {pid, epoch} = {85, 0}, prevPid = 
73 and the client retries with {73, 32766} -- retryOnOverflow
+
+              // True if the client retried a request that had overflowed the 
epoch, and a new producer ID is stored in the txnMetadata
+              val retryOnOverflow = !isEpochFence && 
txnMetadata.prevProducerId == producerId &&
+                producerEpoch == Short.MaxValue - 1 && 
txnMetadata.producerEpoch == 0
+              // True if the client retried an endTxn request, and the bumped 
producer epoch is stored in the txnMetadata.
+              val retryOnEpochBump = !isEpochFence && 
txnMetadata.producerEpoch == producerEpoch + 1
+
+              val isValidEpoch = {
+                if (!isEpochFence) {
+                  // With transactions V2, state + same epoch is not 
sufficient to determine if a retry transition is valid. If the epoch is the
+                  // same it actually indicates the next endTransaction call. 
Instead, we want to check the epoch matches with the epoch in the retry 
conditions.
+                  // Return producer fenced even in the cases where the epoch 
is higher and could indicate an invalid state transition.
+                  // Use the following criteria to determine if a v2 retry is 
valid:
+                  txnMetadata.state match {
+                    case TransactionState.ONGOING | TransactionState.EMPTY | 
TransactionState.DEAD | TransactionState.PREPARE_EPOCH_FENCE =>
+                      producerEpoch == txnMetadata.clientProducerEpoch
+                    case TransactionState.PREPARE_COMMIT | 
TransactionState.PREPARE_ABORT =>
+                      retryOnEpochBump
+                    case TransactionState.COMPLETE_COMMIT | 
TransactionState.COMPLETE_ABORT =>
+                      retryOnEpochBump || retryOnOverflow || producerEpoch == 
txnMetadata.producerEpoch
+                  }
+                } else {
+                  // If the epoch is going to be fenced, it bumps the epoch 
differently with TV2.
+                  (!isFromClient || producerEpoch == 
txnMetadata.clientProducerEpoch) && producerEpoch >= 
txnMetadata.clientProducerEpoch
                 }
-              } else {
-                // If the epoch is going to be fenced, it bumps the epoch 
differently with TV2.
-                (!isFromClient || producerEpoch == txnMetadata.producerEpoch) 
&& producerEpoch >= txnMetadata.producerEpoch
               }
-            }
 
-            val isRetry = retryOnEpochBump || retryOnOverflow
-
-            def generateTxnTransitMetadataForTxnCompletion(nextState: 
TransactionState, noPartitionAdded: Boolean): ApiResult[(Int, 
TxnTransitMetadata)] = {
-              // EndTxn completion on TV2 bumps epoch, so rotate producer ID 
whenever the current epoch is exhausted.
-              // This must also apply to the epoch-fence path.
-              val nextProducerIdOrErrors =
-                if (txnMetadata.isProducerEpochExhausted) {
-                  try {
-                    Right(producerIdManager.generateProducerId())
-                  } catch {
-                    case e: Exception => Left(Errors.forException(e))
+              val isRetry = retryOnEpochBump || retryOnOverflow
+
+              def generateTxnTransitMetadataForTxnCompletion(nextState: 
TransactionState, noPartitionAdded: Boolean): ApiResult[(Int, 
TxnTransitMetadata)] = {
+                // EndTxn completion on TV2 bumps epoch, so rotate producer ID 
whenever the current epoch is exhausted.
+                // This must also apply to the epoch-fence path.
+                val nextProducerIdOrErrors =
+                  if (txnMetadata.isProducerEpochExhausted) {
+                    try {
+                      Right(producerIdManager.generateProducerId())
+                    } catch {
+                      case e: Exception => Left(Errors.forException(e))
+                    }
+                  } else {
+                    Right(txnMetadata.nextProducerId)

Review Comment:
   In the keepPrepared flow, even if epoch is not exhausted at the time of 
endTxn call, we might have already rotated the producer id and we have to use 
nextProducerId (which is what the client uses).



##########
core/src/test/scala/integration/kafka/api/TransactionsTest.scala:
##########
@@ -305,79 +314,6 @@ class TransactionsTest extends IntegrationTestHarness {
       
producer.sendOffsetsToTransaction(TestUtils.consumerPositions(consumer).asJava, 
consumer.groupMetadata()))
   }
 
-  private def sendOffset(commit: (KafkaProducer[Array[Byte], Array[Byte]],

Review Comment:
   This moved to the bottom.



##########
core/src/test/scala/integration/kafka/api/TransactionsTest.scala:
##########
@@ -713,31 +634,10 @@ class TransactionsTest extends IntegrationTestHarness {
     producer.abortTransaction()
     producer.close()
 
-    // Find the transaction coordinator partition for this transactional ID
-    val adminClient = createAdminClient()
-    try {
-      val txnDescription = 
adminClient.describeTransactions(java.util.List.of(transactionalId))
-        .description(transactionalId).get()
-      val coordinatorId = txnDescription.coordinatorId()
-
-      // Access the transaction coordinator and update the epoch to 
Short.MaxValue - 2
-      val coordinatorBroker = brokers.find(_.config.brokerId == 
coordinatorId).get
-      val txnCoordinator = 
coordinatorBroker.asInstanceOf[kafka.server.BrokerServer].transactionCoordinator
-
-      // Get the transaction metadata and update the epoch close to 
Short.MaxValue
-      // to trigger the overflow scenario. We'll set it high enough that 
subsequent
-      // operations will cause it to reach Short.MaxValue - 1 before the 
timeout.
-      
txnCoordinator.transactionManager.getTransactionState(transactionalId).foreach 
{ txnMetadataOpt =>
-        txnMetadataOpt.foreach { epochAndMetadata =>
-          epochAndMetadata.transactionMetadata.inLock(() => {
-            
epochAndMetadata.transactionMetadata.setProducerEpoch((Short.MaxValue - 
2).toShort)
-            null // inLock expects a Supplier that returns a value
-          })
-        }
-      }
-    } finally {
-      adminClient.close()
-    }
+    // Update the epoch close to Short.MaxValue to trigger the overflow 
scenario.
+    // Set it high enough that subsequent operations will cause it to reach
+    // Short.MaxValue - 1 before the timeout.
+    setProducerEpoch(transactionalId, (Short.MaxValue - 2).toShort)

Review Comment:
   This functionality is refactored into a helper function.



##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -580,7 +604,9 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
                   
txnMetadata.setLastProducerEpoch(RecordBatch.NO_PRODUCER_EPOCH)
                 }
 
-                Right(coordinatorEpoch, 
txnMetadata.prepareAbortOrCommit(nextState, 
TransactionVersion.fromFeatureLevel(0), RecordBatch.NO_PRODUCER_ID, 
time.milliseconds(), false))
+                Right((coordinatorEpoch, 
txnMetadata.prepareAbortOrCommit(nextState,
+                  TransactionVersion.fromFeatureLevel(0), 
RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH,

Review Comment:
   Just added NO_PRODUCER_EPOCH for next epoch in TV1 case.



##########
core/src/test/scala/integration/kafka/api/TransactionsTest.scala:
##########
@@ -750,33 +650,12 @@ class TransactionsTest extends IntegrationTestHarness {
     producer.flush()
 
     // Check and assert that epoch of the transaction is Short.MaxValue - 1 
(before timeout)
-    val adminClient2 = createAdminClient()
-    try {
-      val coordinatorId2 = 
adminClient2.describeTransactions(java.util.List.of(transactionalId))
-        .description(transactionalId).get().coordinatorId()
-      val coordinatorBroker2 = brokers.find(_.config.brokerId == 
coordinatorId2).get
-      val txnCoordinator2 = 
coordinatorBroker2.asInstanceOf[kafka.server.BrokerServer].transactionCoordinator
-
-      
txnCoordinator2.transactionManager.getTransactionState(transactionalId).foreach 
{ txnMetadataOpt =>
-        txnMetadataOpt.foreach { epochAndMetadata =>
-          val currentEpoch = 
epochAndMetadata.transactionMetadata.producerEpoch()
-          assertEquals((Short.MaxValue - 1).toShort, currentEpoch,
-            s"Expected epoch to be ${Short.MaxValue - 1}, but got 
$currentEpoch")
-        }
-      }
+    val currentEpoch = getProducerEpoch(transactionalId)

Review Comment:
   This is refactored to a helper function.



##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -779,134 +805,165 @@ class TransactionCoordinator(txnConfig: 
TransactionConfig,
           val coordinatorEpoch = epochAndTxnMetadata.coordinatorEpoch
 
           txnMetadata.inLock(() => {
-            producerIdCopy = txnMetadata.producerId
-            producerEpochCopy = txnMetadata.producerEpoch
-            // PrepareEpochFence has slightly different epoch bumping logic so 
don't include it here.
-            // Note that, it can only happen when the current state is Ongoing.
-            isEpochFence = txnMetadata.pendingState.filter(s => s == 
TransactionState.PREPARE_EPOCH_FENCE).isPresent
-            // True if the client retried a request that had overflowed the 
epoch, and a new producer ID is stored in the txnMetadata
-            val retryOnOverflow = !isEpochFence && txnMetadata.prevProducerId 
== producerId &&
-              producerEpoch == Short.MaxValue - 1 && txnMetadata.producerEpoch 
== 0
-            // True if the client retried an endTxn request, and the bumped 
producer epoch is stored in the txnMetadata.
-            val retryOnEpochBump = !isEpochFence && txnMetadata.producerEpoch 
== producerEpoch + 1
-
-            val isValidEpoch = {
-              if (!isEpochFence) {
-                // With transactions V2, state + same epoch is not sufficient 
to determine if a retry transition is valid. If the epoch is the
-                // same it actually indicates the next endTransaction call. 
Instead, we want to check the epoch matches with the epoch in the retry 
conditions.
-                // Return producer fenced even in the cases where the epoch is 
higher and could indicate an invalid state transition.
-                // Use the following criteria to determine if a v2 retry is 
valid:
-                txnMetadata.state match {
-                  case TransactionState.ONGOING | TransactionState.EMPTY | 
TransactionState.DEAD | TransactionState.PREPARE_EPOCH_FENCE =>
-                    producerEpoch == txnMetadata.producerEpoch
-                  case TransactionState.PREPARE_COMMIT | 
TransactionState.PREPARE_ABORT =>
-                    retryOnEpochBump
-                  case TransactionState.COMPLETE_COMMIT | 
TransactionState.COMPLETE_ABORT =>
-                    retryOnEpochBump || retryOnOverflow || producerEpoch == 
txnMetadata.producerEpoch
+            // These checks are performed first so that we don't have to deal 
with intermediate states
+            // (including PrepareCommit and PrepareAbort) in already complex 
state machine.
+            if (txnMetadata.pendingTransitionInProgress && 
!txnMetadata.pendingState.filter(s => s == 
TransactionState.PREPARE_EPOCH_FENCE).isPresent) {
+              Left(Errors.CONCURRENT_TRANSACTIONS)
+            } else if (txnMetadata.state == TransactionState.PREPARE_COMMIT || 
txnMetadata.state == TransactionState.PREPARE_ABORT) {
+              Left(Errors.CONCURRENT_TRANSACTIONS)
+            } else {
+              // Copy the data under lock, this is going to be used if we want 
to return success
+              // on retrying commit / abort.
+              producerIdCopy = txnMetadata.producerId
+              producerEpochCopy = txnMetadata.producerEpoch
+
+              // PrepareEpochFence has slightly different epoch bumping logic 
so don't include it here.
+              // Note that, it can only happen when the current state is 
Ongoing.
+              isEpochFence = txnMetadata.pendingState.filter(s => s == 
TransactionState.PREPARE_EPOCH_FENCE).isPresent
+
+              // Calculate retry conditions, note that they are going to be 
used only
+              // in CompleteCommit / CompleteAbort states, so we can use the 
semantics
+              // of the producerId / epoch that's applicable in those states.
+              // Also note that the retry-on-overflow condition happens only 
when
+              // overflow happens during last commit / abort, overflows during
+              // InitProducerId(keepPreparedTxn=true) go through 
retry-on-epoch-bump
+              // path.  Example:
+              //  1. State ONGOING, {pid, epoch} = {42, 10}, {nextPid, 
nextEpoch} = {73, 85} -- the client-facing epoch has overflown
+              //  2. Commit transitions to {pid, epoch} = {73, 86} and the 
client retries with {73, 85} -- retryOnEpochBump
+              // Another example where we overflow during commit after going 
through overflow with keepPreparedTxn = true
+              //  1. State ONGOING, {pid, epoch} = {42, 10}, {nextPid, 
nextEpoch} = {73, 32766}
+              //  2. Commit transitions to {pid, epoch} = {85, 0}, prevPid = 
73 and the client retries with {73, 32766} -- retryOnOverflow
+
+              // True if the client retried a request that had overflowed the 
epoch, and a new producer ID is stored in the txnMetadata
+              val retryOnOverflow = !isEpochFence && 
txnMetadata.prevProducerId == producerId &&
+                producerEpoch == Short.MaxValue - 1 && 
txnMetadata.producerEpoch == 0
+              // True if the client retried an endTxn request, and the bumped 
producer epoch is stored in the txnMetadata.
+              val retryOnEpochBump = !isEpochFence && 
txnMetadata.producerEpoch == producerEpoch + 1
+
+              val isValidEpoch = {
+                if (!isEpochFence) {
+                  // With transactions V2, state + same epoch is not 
sufficient to determine if a retry transition is valid. If the epoch is the
+                  // same it actually indicates the next endTransaction call. 
Instead, we want to check the epoch matches with the epoch in the retry 
conditions.
+                  // Return producer fenced even in the cases where the epoch 
is higher and could indicate an invalid state transition.
+                  // Use the following criteria to determine if a v2 retry is 
valid:
+                  txnMetadata.state match {
+                    case TransactionState.ONGOING | TransactionState.EMPTY | 
TransactionState.DEAD | TransactionState.PREPARE_EPOCH_FENCE =>
+                      producerEpoch == txnMetadata.clientProducerEpoch
+                    case TransactionState.PREPARE_COMMIT | 
TransactionState.PREPARE_ABORT =>
+                      retryOnEpochBump
+                    case TransactionState.COMPLETE_COMMIT | 
TransactionState.COMPLETE_ABORT =>
+                      retryOnEpochBump || retryOnOverflow || producerEpoch == 
txnMetadata.producerEpoch
+                  }
+                } else {
+                  // If the epoch is going to be fenced, it bumps the epoch 
differently with TV2.
+                  (!isFromClient || producerEpoch == 
txnMetadata.clientProducerEpoch) && producerEpoch >= 
txnMetadata.clientProducerEpoch
                 }
-              } else {
-                // If the epoch is going to be fenced, it bumps the epoch 
differently with TV2.
-                (!isFromClient || producerEpoch == txnMetadata.producerEpoch) 
&& producerEpoch >= txnMetadata.producerEpoch
               }
-            }
 
-            val isRetry = retryOnEpochBump || retryOnOverflow
-
-            def generateTxnTransitMetadataForTxnCompletion(nextState: 
TransactionState, noPartitionAdded: Boolean): ApiResult[(Int, 
TxnTransitMetadata)] = {
-              // EndTxn completion on TV2 bumps epoch, so rotate producer ID 
whenever the current epoch is exhausted.
-              // This must also apply to the epoch-fence path.
-              val nextProducerIdOrErrors =
-                if (txnMetadata.isProducerEpochExhausted) {
-                  try {
-                    Right(producerIdManager.generateProducerId())
-                  } catch {
-                    case e: Exception => Left(Errors.forException(e))
+              val isRetry = retryOnEpochBump || retryOnOverflow
+
+              def generateTxnTransitMetadataForTxnCompletion(nextState: 
TransactionState, noPartitionAdded: Boolean): ApiResult[(Int, 
TxnTransitMetadata)] = {
+                // EndTxn completion on TV2 bumps epoch, so rotate producer ID 
whenever the current epoch is exhausted.
+                // This must also apply to the epoch-fence path.
+                val nextProducerIdOrErrors =
+                  if (txnMetadata.isProducerEpochExhausted) {
+                    try {
+                      Right(producerIdManager.generateProducerId())
+                    } catch {
+                      case e: Exception => Left(Errors.forException(e))
+                    }
+                  } else {
+                    Right(txnMetadata.nextProducerId)
                   }
+
+                // If the next producer epoch is set (which can happen if we 
used InitProducerId(keepPreparedTxn))
+                // then we also need to bump this epoch -- we want to fence 
requests that were issued with that
+                // epoch after the transaction is complete.  Consider this 
example:
+                //  1. (initial state): Ongoing, producerId = 42, 
producerEpoch = 100, nextProducerId = -1, nextProducerEpoch = -1
+                //  2. InitProducerId(keepPreparedTxn): Ongoing, producerId = 
42, producerEpoch = 100, nextProducerId = 42, nextProducerEpoch = 101
+                //  3. CommitTxn: timed out (but actually delayed)
+                //  4. CommitTxn + complete: CompleteCommit, producerId = 42, 
producerEpoch = 101, nextProducerId = -1, nextProducerEpoch = -1
+                //  5. Start new transaction, do some actions
+                //  6. Delayed CommitTxn from step 3 arrives and commits 
partially completed transaction.
+                // In order to prevent this problem, we need to bump the epoch 
at step 4 (so it becomes 102) even though we
+                // already bumped it at step 2.
+                // Note that we can arrive at this scenario when a client did 
a few keep-prepared calls and then
+                // decided to do one without keep-prepared (e.g. to 
force-terminate), so this logic applies
+                // to isEpochFence as well.
+                val nextProducerEpoch = if (txnMetadata.hasNextProducerEpoch) {
+                  if (txnMetadata.isProducerEpochExhausted) 0.toShort else 
(txnMetadata.nextProducerEpoch + 1).toShort
                 } else {
-                  Right(RecordBatch.NO_PRODUCER_ID)
+                  RecordBatch.NO_PRODUCER_EPOCH
                 }
 
-              if (nextState == TransactionState.PREPARE_ABORT && isEpochFence) 
{
-                // We should clear the pending state to make way for the 
transition to PrepareAbort
-                txnMetadata.pendingState(util.Optional.empty())
-                // For TV2+, don't manually set the epoch - let 
prepareAbortOrCommit handle it naturally.
-              }
+                if (nextState == TransactionState.PREPARE_ABORT && 
isEpochFence) {
+                  // We should clear the pending state to make way for the 
transition to PrepareAbort
+                  txnMetadata.pendingState(util.Optional.empty())
+                  // For TV2+, don't manually set the epoch - let 
prepareAbortOrCommit handle it naturally.
+                }
 
-              nextProducerIdOrErrors.flatMap {
-                nextProducerId =>
-                  Right(coordinatorEpoch, 
txnMetadata.prepareAbortOrCommit(nextState, clientTransactionVersion, 
nextProducerId.asInstanceOf[Long], time.milliseconds(), noPartitionAdded))
+                nextProducerIdOrErrors.flatMap {
+                  nextProducerId =>
+                    Right(coordinatorEpoch, 
txnMetadata.prepareAbortOrCommit(nextState, clientTransactionVersion, 
nextProducerId, nextProducerEpoch, time.milliseconds(), noPartitionAdded))
+                }
               }
-            }
-
-            if (txnMetadata.pendingTransitionInProgress && 
txnMetadata.pendingState.get != TransactionState.PREPARE_EPOCH_FENCE) {

Review Comment:
   This check is now done upfront.



##########
core/src/test/scala/integration/kafka/api/TransactionsTest.scala:
##########
@@ -750,33 +650,12 @@ class TransactionsTest extends IntegrationTestHarness {
     producer.flush()
 
     // Check and assert that epoch of the transaction is Short.MaxValue - 1 
(before timeout)
-    val adminClient2 = createAdminClient()
-    try {
-      val coordinatorId2 = 
adminClient2.describeTransactions(java.util.List.of(transactionalId))
-        .description(transactionalId).get().coordinatorId()
-      val coordinatorBroker2 = brokers.find(_.config.brokerId == 
coordinatorId2).get
-      val txnCoordinator2 = 
coordinatorBroker2.asInstanceOf[kafka.server.BrokerServer].transactionCoordinator
-
-      
txnCoordinator2.transactionManager.getTransactionState(transactionalId).foreach 
{ txnMetadataOpt =>
-        txnMetadataOpt.foreach { epochAndMetadata =>
-          val currentEpoch = 
epochAndMetadata.transactionMetadata.producerEpoch()
-          assertEquals((Short.MaxValue - 1).toShort, currentEpoch,
-            s"Expected epoch to be ${Short.MaxValue - 1}, but got 
$currentEpoch")
-        }
-      }
+    val currentEpoch = getProducerEpoch(transactionalId)
+    assertEquals((Short.MaxValue - 1).toShort, currentEpoch,
+      s"Expected epoch to be ${Short.MaxValue - 1}, but got $currentEpoch")
 
-      // Wait until state is complete abort
-      waitUntilTrue(() => {
-        val listResult = adminClient2.listTransactions()
-        val txns = listResult.all().get().asScala
-        txns.exists(txn =>
-          txn.transactionalId() == transactionalId &&
-          txn.state() == TransactionState.COMPLETE_ABORT
-        )
-      }, "Transaction was not aborted on timeout")
-    } finally {
-      adminClient2.close()
-    }
+    // Wait until state is complete abort
+    waitForTransactionState(transactionalId, TransactionState.COMPLETE_ABORT)

Review Comment:
   This is refactored to a helper function.



##########
core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala:
##########
@@ -66,9 +66,9 @@ class TransactionMarkerChannelManagerTest {
   private val txnTimeoutMs = 0
   private val txnResult = TransactionResult.COMMIT
   private val txnMetadata1 = new TransactionMetadata(transactionalId1, 
producerId1, producerId1, RecordBatch.NO_PRODUCER_ID,
-    producerEpoch, lastProducerEpoch, txnTimeoutMs, 
TransactionState.PREPARE_COMMIT, util.Set.of(partition1, partition2), 0L, 0L, 
TransactionVersion.TV_2)
+    producerEpoch, lastProducerEpoch, RecordBatch.NO_PRODUCER_EPOCH, 
txnTimeoutMs, TransactionState.PREPARE_COMMIT, util.Set.of(partition1, 
partition2), 0L, 0L, TransactionVersion.TV_2)

Review Comment:
   Only mechanical changes here.



##########
transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLog.java:
##########
@@ -84,18 +84,25 @@ public static byte[] valueToBytes(TxnTransitMetadata 
txnMetadata,
                             
.setPartitionIds(entry.getValue().stream().map(TopicPartition::partition).toList())).toList();
         }
 
-        return MessageUtil.toVersionPrefixedBytes(
-                transactionVersionLevel.transactionLogValueVersion(),
-                new TransactionLogValue()
-                        .setProducerId(txnMetadata.producerId())
-                        .setProducerEpoch(txnMetadata.producerEpoch())
-                        .setTransactionTimeoutMs(txnMetadata.txnTimeoutMs())
-                        .setTransactionStatus(txnMetadata.txnState().id())
-                        
.setTransactionLastUpdateTimestampMs(txnMetadata.txnLastUpdateTimestamp())
-                        
.setTransactionStartTimestampMs(txnMetadata.txnStartTimestamp())
-                        .setTransactionPartitions(transactionPartitions)
-                        
.setClientTransactionVersion(txnMetadata.clientTransactionVersion().featureLevel())
-        );
+        short logVersion = 
transactionVersionLevel.transactionLogValueVersion();
+        TransactionLogValue value = new TransactionLogValue()
+                .setProducerId(txnMetadata.producerId())
+                .setProducerEpoch(txnMetadata.producerEpoch())
+                .setTransactionTimeoutMs(txnMetadata.txnTimeoutMs())
+                .setTransactionStatus(txnMetadata.txnState().id())
+                
.setTransactionLastUpdateTimestampMs(txnMetadata.txnLastUpdateTimestamp())
+                
.setTransactionStartTimestampMs(txnMetadata.txnStartTimestamp())
+                .setTransactionPartitions(transactionPartitions)
+                
.setClientTransactionVersion(txnMetadata.clientTransactionVersion().featureLevel());
+
+        // Tagged fields are only supported in log version 1+ (flexible 
versions)
+        if (logVersion >= 1) {
+            value.setPreviousProducerId(txnMetadata.prevProducerId());
+            value.setNextProducerId(txnMetadata.nextProducerId());

Review Comment:
   This is actually a miss for TV2 and needs to be fixed independently.



##########
core/src/test/scala/integration/kafka/api/TransactionsTest.scala:
##########
@@ -1056,6 +935,545 @@ class TransactionsTest extends IntegrationTestHarness {
     producer.abortTransaction()
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
+  
@MethodSource(Array("getTestGroupProtocolParametersConsumerGroupProtocolOnly"))
+  def testFencingWithDualIdentity(groupProtocol: String): Unit = {
+    // Test that zombie producer is properly fenced when dual identity is 
established
+    // after crash recovery with 2PC keepPreparedTxn.
+    val transactionalId = "test-fencing-dual-identity"
+    val testTopic = s"test-fencing-topic-${System.nanoTime()}"
+    createTopic(testTopic, 1, brokerCount, topicConfig())
+
+    val consumer = transactionalConsumers.head
+    consumer.subscribe(Seq(testTopic).asJava)
+    consumer.poll(Duration.ofMillis(100))
+
+    // Start transaction and send records (simulating crash before commit)
+    val producer1 = createTransactionalProducer(transactionalId, enable2PC = 
true)
+    producer1.initTransactions()
+    producer1.beginTransaction()
+    val numRecords = 5
+    for (i <- 0 until numRecords) {
+      producer1.send(new ProducerRecord(testTopic, 0, s"key-$i".getBytes, 
s"value-$i".getBytes))
+    }
+    producer1.flush()
+    // Simulate crash - don't commit (leave transaction prepared)
+
+    // Create new producer and recover with 
initTransactions(keepPreparedTxn=true)
+    val producer2 = createTransactionalProducer(transactionalId, enable2PC = 
true)
+    producer2.initTransactions(true)  // This establishes dual identity
+
+    // Old (zombie) producer tries to abort - should get 
ProducerFencedException
+    assertThrows(classOf[ProducerFencedException], () => {
+      producer1.abortTransaction()
+    }, "Zombie producer should be fenced when trying to abort")
+
+    // New producer commits the prepared transaction
+    producer2.commitTransaction()
+
+    // Wait for transaction to reach COMPLETE_COMMIT state
+    waitForTransactionState(transactionalId, TransactionState.COMPLETE_COMMIT)
+
+    // Verify consumer sees the records from the first producer
+    consumer.seekToBeginning(consumer.assignment())
+    val consumedRecords = consumeRecordsFor(consumer)
+    assertEquals(numRecords, consumedRecords.size,
+      "Consumer should see all records from first producer after commit")
+
+    // Verify transaction state
+    val txnDescription = 
adminClient.describeTransactions(java.util.List.of(transactionalId))
+      .description(transactionalId).get()
+    assertEquals(TransactionState.COMPLETE_COMMIT, txnDescription.state(),
+      "Transaction should be in COMPLETE_COMMIT state")
+
+    producer1.close()
+    producer2.close()
+    consumer.unsubscribe()
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
+  
@MethodSource(Array("getTestGroupProtocolParametersConsumerGroupProtocolOnly"))
+  def testPrepareAndCompleteTransactionCommit(groupProtocol: String): Unit = {
+    // Test the 2PC API: prepareTransaction() and completeTransaction() for 
commit path.
+    // This tests the client-side 2PC API where the producer explicitly calls 
prepareTransaction()
+    // and then completeTransaction() to commit.
+    val transactionalId = "test-prepare-complete-commit"
+    val testTopic = s"test-prepare-complete-topic-${System.nanoTime()}"
+    createTopic(testTopic, 1, brokerCount, topicConfig())
+
+    val consumer = transactionalConsumers.head
+    consumer.subscribe(Seq(testTopic).asJava)
+    consumer.poll(Duration.ofMillis(100))
+
+    // Start transaction and produce records
+    val producer1 = createTransactionalProducer(transactionalId, enable2PC = 
true)
+    producer1.initTransactions()
+    producer1.beginTransaction()
+    val numRecords = 5
+    for (i <- 0 until numRecords) {
+      producer1.send(new ProducerRecord(testTopic, 0, s"key-$i".getBytes, 
s"value-$i".getBytes))
+    }
+    producer1.flush()
+
+    // Prepare the transaction (moves to PREPARED state)
+    val preparedState = producer1.prepareTransaction()
+    assertNotNull(preparedState, "prepareTransaction should return prepared 
state")
+
+    // Simulate crash - don't call commit/abort, just leave in prepared state
+    // (In real scenario, producer would crash here)
+
+    // New producer recovers and completes the transaction
+    val producer2 = createTransactionalProducer(transactionalId, enable2PC = 
true)
+    producer2.initTransactions(true)  // keepPreparedTxn=true
+
+    // Complete the transaction with the prepared state (should commit)
+    producer2.completeTransaction(preparedState)
+
+    // Wait for transaction to reach COMPLETE_COMMIT state
+    waitForTransactionState(transactionalId, TransactionState.COMPLETE_COMMIT)
+
+    // Verify consumer sees the records
+    consumer.seekToBeginning(consumer.assignment())
+    val consumedRecords = consumeRecordsFor(consumer)
+    assertEquals(numRecords, consumedRecords.size,
+      "Consumer should see all records after commit")
+
+    // Verify transaction state
+    val txnDescription = 
adminClient.describeTransactions(java.util.List.of(transactionalId))
+      .description(transactionalId).get()
+    assertEquals(TransactionState.COMPLETE_COMMIT, txnDescription.state(),
+      "Transaction should be in COMPLETE_COMMIT state")
+
+    producer1.close()
+    producer2.close()
+    consumer.unsubscribe()
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
+  
@MethodSource(Array("getTestGroupProtocolParametersConsumerGroupProtocolOnly"))
+  def testPrepareAndCompleteTransactionAbort(groupProtocol: String): Unit = {
+    // Test the 2PC API: prepareTransaction() and completeTransaction() for 
abort path.
+    // This tests that completeTransaction() with a non-matching state 
triggers abort.
+    val transactionalId = "test-prepare-complete-abort"
+    val testTopic = s"test-prepare-complete-abort-topic-${System.nanoTime()}"
+    createTopic(testTopic, 1, brokerCount, topicConfig())
+
+    val consumer = transactionalConsumers.head
+    consumer.subscribe(Seq(testTopic).asJava)
+    consumer.poll(Duration.ofMillis(100))
+
+    val producer1 = createTransactionalProducer(transactionalId, enable2PC = 
true)
+    producer1.initTransactions()
+
+    // First transaction - prepare but then abort
+    producer1.beginTransaction()
+    for (i <- 0 until 3) {
+      producer1.send(new ProducerRecord(testTopic, 0, s"key1-$i".getBytes, 
s"value1-$i".getBytes))
+    }
+    producer1.flush()
+
+    // Get marker for first transaction
+    val marker1 = producer1.prepareTransaction()
+    assertNotNull(marker1, "prepareTransaction should return prepared state")
+
+    // Abort the first prepared transaction
+    producer1.abortTransaction()
+
+    // Second transaction - prepare and leave in prepared state
+    producer1.beginTransaction()
+    val numRecords = 5
+    for (i <- 0 until numRecords) {
+      producer1.send(new ProducerRecord(testTopic, 0, s"key2-$i".getBytes, 
s"value2-$i".getBytes))
+    }
+    producer1.flush()
+
+    // Get marker for second transaction
+    val marker2 = producer1.prepareTransaction()
+    assertNotNull(marker2, "prepareTransaction should return prepared state")
+
+    // Simulate crash - leave second transaction in prepared state
+
+    // New producer recovers
+    val producer2 = createTransactionalProducer(transactionalId, enable2PC = 
true)
+    producer2.initTransactions(true)  // keepPreparedTxn=true
+
+    // Complete with marker1 (old, aborted transaction) - should trigger abort
+    // because it doesn't match the current prepared state (marker2)
+    producer2.completeTransaction(marker1)
+
+    // Wait for transaction to reach COMPLETE_ABORT state
+    waitForTransactionState(transactionalId, TransactionState.COMPLETE_ABORT)
+
+    // Verify consumer sees NO records (both transactions aborted)
+    consumer.seekToBeginning(consumer.assignment())
+    val consumedRecords = consumeRecordsFor(consumer)
+    assertEquals(0, consumedRecords.size,
+      "Consumer should see no records after abort")
+
+    // Verify transaction state
+    val txnDescription = 
adminClient.describeTransactions(java.util.List.of(transactionalId))
+      .description(transactionalId).get()
+    assertEquals(TransactionState.COMPLETE_ABORT, txnDescription.state(),
+      "Transaction should be in COMPLETE_ABORT state")
+
+    producer1.close()
+    producer2.close()
+    consumer.unsubscribe()
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
+  @MethodSource(Array("getTestGroupProtocolParametersAll"))
+  def testProducerCrashAndRecoverWith2PC(groupProtocol: String): Unit = {
+    // Test producer crash and recovery with 2PC keepPrepared transaction flow.
+    // Note: This uses a standard transactional producer.  2PC is enabled 
server-side
+    // and triggered by calling initTransactions(keepPreparedTxn=true) after 
crash.
+
+    def test2PCRecovery(numCrashes: Int, shouldCommit: Boolean): Unit = {
+      val transactionalId = s"test-2pc-recovery-${System.nanoTime()}"
+      val testTopic = s"test-2pc-topic-${System.nanoTime()}"
+      createTopic(testTopic, 1, brokerCount, topicConfig())
+
+      val consumer = transactionalConsumers.head
+      consumer.subscribe(Seq(testTopic).asJava)
+      consumer.poll(Duration.ofMillis(100))  // Trigger assignment
+
+      // Create producer and send records in a transaction
+      var producer = createTransactionalProducer(transactionalId, enable2PC = 
true)
+      producer.initTransactions()
+      producer.beginTransaction()
+
+      val numRecords = 5
+      for (i <- 0 until numRecords) {
+        producer.send(new ProducerRecord(testTopic, 0, s"key-$i".getBytes,
+          s"value-$i".getBytes))
+      }
+      producer.flush()
+
+      // Verify records not visible to read_committed consumer
+      consumer.poll(Duration.ofMillis(100))
+      assertEquals(0, consumer.assignment().asScala.map(tp =>
+        consumer.position(tp)).sum, "Records should not be visible before 
commit")
+
+      // Simulate crash by closing without committing
+      producer.close(Duration.ZERO)
+
+      val baseEpoch: Short = 0
+
+        // Crash and recover numCrashes times
+        for (crashNum <- 1 to numCrashes) {
+          val recoveredProducer = createTransactionalProducer(transactionalId, 
enable2PC = true)
+          // Use keepPreparedTxn=true to preserve in-flight transaction
+          recoveredProducer.initTransactions(true)
+
+          // Verify dual identity after recovery
+          val txnDescription = 
adminClient.describeTransactions(java.util.List.of(transactionalId))
+            .description(transactionalId).get()
+
+          // After crash recovery, server should set up dual identity
+          assertTrue(txnDescription.producerEpoch() >= baseEpoch,
+            s"Crash $crashNum: client epoch should be >= base epoch")
+
+          // For simplicity in this test, we just verify epoch progression.
+          // Detailed dual identity verification is in unit tests.
+
+          if (crashNum < numCrashes) {
+            // Simulate another crash
+            recoveredProducer.close(Duration.ZERO)
+          } else {
+            // Last recovery - complete the transaction
+            producer = recoveredProducer
+          }
+        }
+
+        // Complete prepared transaction directly.  Cannot call 
beginTransaction()
+        // in prepared state - must call commitTransaction() or 
abortTransaction() directly.
+        if (shouldCommit) {
+          producer.commitTransaction()
+        } else {
+          producer.abortTransaction()
+        }
+
+        // Wait for the transaction to fully complete on the server before 
proceeding.
+        // The client-side commitTransaction() returns after receiving the 
EndTxn response,
+        // but the server might still be writing markers and transitioning to 
COMPLETE_* state.
+        val expectedState = if (shouldCommit) TransactionState.COMPLETE_COMMIT 
else TransactionState.COMPLETE_ABORT
+        waitForTransactionState(transactionalId, expectedState)
+
+        // Verify consumer sees correct records
+        consumer.seekToBeginning(consumer.assignment())
+        val consumedRecords = consumeRecordsFor(consumer)
+
+        if (shouldCommit) {
+          assertEquals(numRecords, consumedRecords.size,
+            "Consumer should see all records after commit")
+        } else {
+          assertEquals(0, consumedRecords.size,
+            "Consumer should see no records after abort")
+        }
+
+        // Verify fresh transaction works with bumped epoch
+        producer.beginTransaction()
+        producer.send(new ProducerRecord(testTopic, 0, "fresh-key".getBytes,
+          "fresh-value".getBytes))
+        producer.commitTransaction()
+
+        // Seek to beginning to read all records (both 2PC and fresh)
+        consumer.seekToBeginning(consumer.assignment())
+        val allRecords = consumeRecordsFor(consumer)
+        val expectedTotal = if (shouldCommit) numRecords + 1 else 1
+        assertEquals(expectedTotal, allRecords.size,
+          s"Should have $expectedTotal total records after fresh transaction")
+
+        producer.close()
+
+        // Unsubscribe consumer for next test iteration
+        consumer.unsubscribe()
+    }
+
+    // Test single crash with commit
+    test2PCRecovery(numCrashes = 1, shouldCommit = true)
+
+    // Test single crash with abort
+    test2PCRecovery(numCrashes = 1, shouldCommit = false)
+
+    // Test multiple crashes before final commit (validates epoch progression)
+    test2PCRecovery(numCrashes = 3, shouldCommit = true)
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
+  
@MethodSource(Array("getTestGroupProtocolParametersConsumerGroupProtocolOnly"))
+  def testProducerIdRotationWithEpochExhaustion(groupProtocol: String): Unit = 
{
+    // Test producer ID rotation when client epoch is exhausted.  This tests 
both scenarios:
+    // 1. Rotation during initTransactions(keepPreparedTxn=true)
+    // 2. Rotation during commitTransaction()
+
+    def testRotation(startEpoch: Short, doubleRotation: Boolean = false): Unit 
= {
+      val transactionalId = s"test-rotation-${System.nanoTime()}"
+      val testTopic = s"test-rotation-topic-${System.nanoTime()}"
+      createTopic(testTopic, 1, brokerCount, topicConfig())
+
+      val consumer = transactionalConsumers.head
+      consumer.subscribe(Seq(testTopic).asJava)
+      consumer.poll(Duration.ofMillis(100))
+
+      // Establish transactional ID
+      var producer = createTransactionalProducer(transactionalId, enable2PC = 
true)
+      producer.initTransactions()
+      producer.close()
+
+      // Set epoch to trigger rotation at desired point
+      setProducerEpoch(transactionalId, startEpoch)
+
+      // Create producer and start a prepared transaction
+      producer = createTransactionalProducer(transactionalId, enable2PC = true)
+      producer.initTransactions()
+      producer.beginTransaction()
+      val numRecords = 3
+      for (i <- 0 until numRecords) {
+        producer.send(new ProducerRecord(testTopic, 0, s"key-$i".getBytes, 
s"value-$i".getBytes))
+      }
+      producer.flush()
+      // Don't commit - leave transaction prepared (simulates crash)
+
+      val originalProducerId = getProducerId(transactionalId)
+
+      if (doubleRotation) {
+        // First rotation: loop calling initTransactions(true) until rotation 
occurs
+        var rotationCount = 0
+        var currentClientId = originalProducerId
+        var iteration = 0
+        while (rotationCount == 0 && iteration < 20) {
+          iteration += 1
+          val recoveryProducer = createTransactionalProducer(transactionalId, 
enable2PC = true)
+          recoveryProducer.initTransactions(true)  // keepPreparedTxn
+
+          val clientId = getClientProducerId(transactionalId)
+          if (clientId != currentClientId) {
+            rotationCount = 1
+            currentClientId = clientId
+          }
+          recoveryProducer.close(Duration.ZERO)
+        }
+
+        assertTrue(rotationCount >= 1, s"First rotation should have occurred 
after $iteration iterations")
+        assertEquals(0.toShort, getClientProducerEpoch(transactionalId),
+          "After first rotation, client epoch should be exactly 0")
+
+        // Set client epoch high again to trigger second rotation
+        setClientProducerEpoch(transactionalId, startEpoch)
+
+        // Do a compensating epoch bump so that we get at the same epoch as 
w/o double rotation
+        val recoveryProducer = createTransactionalProducer(transactionalId, 
enable2PC = true)
+        recoveryProducer.initTransactions(true)
+        recoveryProducer.close(Duration.ZERO)
+      }
+
+      // We do 3 total epoch increments from the initial startEpoch:
+      //   1. First initTransactions(): startEpoch → startEpoch + 1
+      //   2. Second initTransactions(true): startEpoch + 1 → startEpoch + 2
+      //   3. commitTransaction(): startEpoch + 2 → startEpoch + 3
+      // Rotation may happen during step 2 or 3 depending on startEpoch.
+      val finalProducer = createTransactionalProducer(transactionalId, 
enable2PC = true)
+      finalProducer.initTransactions(true)  // keepPreparedTxn - may rotate 
here
+
+      // Complete the transaction - may rotate here
+      finalProducer.commitTransaction()
+
+      // Wait for transaction to reach COMPLETE_COMMIT state
+      waitForTransactionState(transactionalId, 
TransactionState.COMPLETE_COMMIT)
+
+      // Verify that rotation happened (regardless of whether it occurred 
during
+      // initTransactions() or commitTransaction()). After transaction 
completes, verify:
+      //  - Producer ID changed (rotation occurred)
+      //  - Total epoch increments = 3 (accounting for overflow)
+      val finalProducerId = getProducerId(transactionalId)
+      assertNotEquals(originalProducerId, finalProducerId,
+        s"Producer ID should have rotated by transaction completion 
(original=$originalProducerId, final=$finalProducerId)")
+
+      // Verify epoch overflow occurred and total epoch increments.
+      val finalEpoch = getProducerEpoch(transactionalId)
+
+      // Final epoch is less than startEpoch (overflow occurred)
+      assertTrue(finalEpoch < startEpoch,
+        s"Overflow should have occurred: finalEpoch=$finalEpoch < 
startEpoch=$startEpoch")
+
+      // Total epoch increments = 3 (accounting for overflow)
+      // Formula: finalEpoch + (MaxValue - startEpoch) = total increments
+      // This accounts for increments from startEpoch to MaxValue boundary, 
then
+      // wrap to 0 and increments to finalEpoch.
+      val totalIncrements = finalEpoch + Short.MaxValue - startEpoch
+      assertEquals(3, totalIncrements,
+        s"Should have 3 total epoch increments: finalEpoch=$finalEpoch + " +
+        s"(MaxValue=${Short.MaxValue} - startEpoch=$startEpoch) = 
$totalIncrements")
+
+      // Verify consumer sees all records
+      consumer.seekToBeginning(consumer.assignment())
+      val consumedRecords = consumeRecordsFor(consumer)
+      assertEquals(numRecords, consumedRecords.size,
+        "Consumer should see all records despite rotation")
+
+      finalProducer.close()
+      producer.close()
+      consumer.unsubscribe()
+    }
+
+    // Scenario 1: Rotation happens during 
initTransactions(keepPreparedTxn=true) call
+    // Example flow:
+    //  1. After setProducerEpoch: epoch=32765
+    //  2. First initTransactions() + beginTransaction(): epoch=32766, 
transaction ONGOING
+    //  3. initTransactions(keepPreparedTxn=true): tries 32766+1=32767 → 
rotation triggered
+    //     → Creates dual identity: producerId unchanged, 
nextProducerId=<new>, nextProducerEpoch=0
+    //  4. commitTransaction(): Completes transition with nextProducerEpoch 
bumped 0→1
+    //     → Final state: producerId=<new>, epoch=1
+    testRotation((Short.MaxValue - 2).toShort)  // 32765
+
+    // Scenario 1 with double rotation: First rotation during iteration, 
second during final init/commit
+    testRotation((Short.MaxValue - 2).toShort, doubleRotation = true)
+
+    // Scenario 2: Rotation happens during commitTransaction() call
+    // Example flow:
+    //  1. After setProducerEpoch: epoch=32764
+    //  2. First initTransactions() + beginTransaction(): epoch=32765, 
transaction ONGOING
+    //  3. initTransactions(keepPreparedTxn=true): bumps to 32766 (not 
exhausted yet)
+    //     → Creates dual identity: producerId unchanged, 
nextProducerId=<producerId>, nextProducerEpoch=32766
+    //  4. commitTransaction(): tries 32766+1=32767 → rotation triggered
+    //     → Final state: producerId=<new>, epoch=0
+    testRotation((Short.MaxValue - 3).toShort)  // 32764
+
+    // Scenario 2 with double rotation: First rotation during iteration, 
second during final init/commit
+    testRotation((Short.MaxValue - 3).toShort, doubleRotation = true)
+  }
+
+  // Helper methods
+
+  private def sendOffset(commit: (KafkaProducer[Array[Byte], Array[Byte]],

Review Comment:
   This helper got moved.



##########
transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionMetadata.java:
##########
@@ -155,7 +181,7 @@ public TxnTransitMetadata prepareIncrementProducerEpoch(
         int newTxnTimeoutMs,
         Optional<Short> expectedProducerEpoch,
         long updateTimestamp) {
-        if (isProducerEpochExhausted())
+        if (isEpochExhausted(producerEpoch))

Review Comment:
   Now`isProducerEpochExhausted()` uses clientProducerEpoch(), but here were 
validating `producerEpoch`, because that's what's going to be incremented 
further in the function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to