jolshan commented on code in PR #16719:
URL: https://github.com/apache/kafka/pull/16719#discussion_r1729197382


##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -517,47 +526,90 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
           val coordinatorEpoch = epochAndTxnMetadata.coordinatorEpoch
 
           txnMetadata.inLock {
-            if (txnMetadata.producerId != producerId)
+            producerIdCopy = txnMetadata.producerId
+            producerEpochCopy = txnMetadata.producerEpoch
+            val currentTxnMetadataIsAtLeastTransactionsV2 = 
txnMetadata.clientTransactionVersion >= 2
+            // True if the client used TV_2 and retried a request that had 
overflowed the epoch, and a new producer ID is stored in the txnMetadata
+            val retryOnOverflow = 
!txnMetadata.pendingState.contains(PrepareEpochFence) && 
currentTxnMetadataIsAtLeastTransactionsV2 &&
+              txnMetadata.previousProducerId == producerId && producerEpoch == 
Short.MaxValue - 1 && txnMetadata.producerEpoch == 0
+            // True if the client used TV_2 and retried an endTxn request, and 
the bumped producer epoch is stored in the txnMetadata.
+            val retryOnEpochBump = endTxnEpochBumped(txnMetadata, 
producerEpoch)
+
+            // With transaction V2, state + same epoch is not sufficient to 
determine if a retry transition is valid. If the epoch is the
+            // same it actually indicates the next endTransaction call. 
Instead, we want to check the epoch matches with the epoch in the retry 
conditions.
+            // Use the following criteria to determine if a retry is valid
+            //  1) If the request is not transactionV2, the same epoch check 
is sufficient
+            //  2) If the request is transaction V2, confirm the epoch is 
correct via the retry booleans defined above.
+            // Note: The retry on overflow case is only valid when txnMetadata 
is CompleteCommit/Abort
+            val validCompleteTransition = 
!currentTxnMetadataIsAtLeastTransactionsV2 || retryOnEpochBump || 
retryOnOverflow
+            val validPrepareTransition = 
!currentTxnMetadataIsAtLeastTransactionsV2 || retryOnEpochBump
+
+            if (txnMetadata.producerId != producerId && !retryOnOverflow)
               Left(Errors.INVALID_PRODUCER_ID_MAPPING)
-            // Strict equality is enforced on the client side requests, as 
they shouldn't bump the producer epoch.
-            else if ((isFromClient && producerEpoch != 
txnMetadata.producerEpoch) || producerEpoch < txnMetadata.producerEpoch)
+            // Strict equality is enforced on the client side requests, as 
they shouldn't bump the producer epoch without server knowledge.
+            // If a TV_2 endTxn request is retried by the client, it will not 
have the same epoch -- don't throw producer fenced in these cases.
+            else if (((isFromClient && producerEpoch != 
txnMetadata.producerEpoch) || producerEpoch < txnMetadata.producerEpoch) && 
!retryOnEpochBump && !retryOnOverflow)
               Left(Errors.PRODUCER_FENCED)
             else if (txnMetadata.pendingTransitionInProgress && 
txnMetadata.pendingState.get != PrepareEpochFence)
               Left(Errors.CONCURRENT_TRANSACTIONS)
             else txnMetadata.state match {
               case Ongoing =>
-                val nextState = if (txnMarkerResult == 
TransactionResult.COMMIT)
-                  PrepareCommit
-                else
-                  PrepareAbort
-
-                if (nextState == PrepareAbort && 
txnMetadata.pendingState.contains(PrepareEpochFence)) {
-                  // We should clear the pending state to make way for the 
transition to PrepareAbort and also bump
-                  // the epoch in the transaction metadata we are about to 
append.
-                  isEpochFence = true
-                  txnMetadata.pendingState = None
-                  txnMetadata.producerEpoch = producerEpoch
-                  txnMetadata.lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH
-                }
+                // We should only be in retrying states on Prepare/Complete 
states.
+                // If we are in ongoing, some other producer must have 
transitioned that state.
+                if (retryOnOverflow || retryOnEpochBump) {
+                  Left(Errors.PRODUCER_FENCED)
+                } else {
+                  val nextState = if (txnMarkerResult == 
TransactionResult.COMMIT)
+                    PrepareCommit
+                  else
+                    PrepareAbort
+
+                  // Maybe allocate new producer ID if we are bumping epoch 
and epoch is exhausted
+                  val nextProducerIdOrErrors =
+                    if (requestIsAtLeastTransactionsV2 && 
!txnMetadata.pendingState.contains(PrepareEpochFence) && 
txnMetadata.isProducerEpochExhausted) {
+                      producerIdManager.generateProducerId() match {
+                        case Success(newProducerId) =>
+                          Right(newProducerId)
+                        case Failure(exception) =>
+                          Left(Errors.forException(exception))
+                      }
+                    } else {
+                      Right(RecordBatch.NO_PRODUCER_ID)
+                    }
+
+                  if (nextState == PrepareAbort && 
txnMetadata.pendingState.contains(PrepareEpochFence)) {
+                    // We should clear the pending state to make way for the 
transition to PrepareAbort and also bump
+                    // the epoch in the transaction metadata we are about to 
append.
+                    isEpochFence = true
+                    txnMetadata.pendingState = None
+                    txnMetadata.producerEpoch = producerEpoch
+                    txnMetadata.lastProducerEpoch = 
RecordBatch.NO_PRODUCER_EPOCH
+                  }
 
-                Right(coordinatorEpoch, 
txnMetadata.prepareAbortOrCommit(nextState, time.milliseconds()))
+                  nextProducerIdOrErrors match {
+                    case Left(error) =>
+                      Left(error)
+                    case Right(nextProducerId) =>
+                      Right(coordinatorEpoch, 
txnMetadata.prepareAbortOrCommit(nextState, clientTransactionVersion, 
nextProducerId, time.milliseconds()))
+                  }
+                }
               case CompleteCommit =>
-                if (txnMarkerResult == TransactionResult.COMMIT)
+                if (txnMarkerResult == TransactionResult.COMMIT && 
validCompleteTransition)

Review Comment:
   I don't think that's quite true because the epoch is expected when it is the 
same epoch -- it was just the wrong state. We just never had a case where same 
epoch == invalidState unless it was the wrong type of marker. 
   
   I don't feel super strongly though since the result will be the same from 
the client side.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to