CalvinConfluent commented on code in PR #17698:
URL: https://github.com/apache/kafka/pull/17698#discussion_r1854240231


##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -564,41 +633,44 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
                 else
                   PrepareAbort
 
-                // Maybe allocate new producer ID if we are bumping epoch and 
epoch is exhausted
-                val nextProducerIdOrErrors =
-                  if (clientTransactionVersion.supportsEpochBump() && 
!txnMetadata.pendingState.contains(PrepareEpochFence) && 
txnMetadata.isProducerEpochExhausted) {
-                    try {
-                      Right(producerIdManager.generateProducerId())
-                    } catch {
-                      case e: Exception => Left(Errors.forException(e))
-                    }
+                generateTxnTransitMetadataForTxnCompletion(nextState)
+              case CompleteCommit =>
+                if (currentTxnMetadataIsAtLeastTransactionsV2) {
+                  if (txnMarkerResult == TransactionResult.COMMIT) {
+                    if (isRetry)
+                      Left(Errors.NONE)
+                    else
+                      logInvalidStateTransitionAndReturnError(transactionalId, 
txnMetadata.state, txnMarkerResult)
                   } else {
-                    Right(RecordBatch.NO_PRODUCER_ID)
+                    if (isRetry)
+                      logInvalidStateTransitionAndReturnError(transactionalId, 
txnMetadata.state, txnMarkerResult)
+                    else
+                      generateTxnTransitMetadataForTxnCompletion(PrepareAbort)
                   }
-
-                if (nextState == PrepareAbort && 
txnMetadata.pendingState.contains(PrepareEpochFence)) {
-                  // We should clear the pending state to make way for the 
transition to PrepareAbort and also bump
-                  // the epoch in the transaction metadata we are about to 
append.
-                  isEpochFence = true
-                  txnMetadata.pendingState = None
-                  txnMetadata.producerEpoch = producerEpoch
-                  txnMetadata.lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH
+                } else {
+                  // Transaction V1
+                  if (txnMarkerResult == TransactionResult.COMMIT) {
+                      Left(Errors.NONE)
+                  } else
+                    logInvalidStateTransitionAndReturnError(transactionalId, 
txnMetadata.state, txnMarkerResult)
                 }
-
-                nextProducerIdOrErrors.flatMap {
-                  nextProducerId =>
-                    Right(coordinatorEpoch, 
txnMetadata.prepareAbortOrCommit(nextState, clientTransactionVersion, 
nextProducerId.asInstanceOf[Long], time.milliseconds()))
-                }
-              case CompleteCommit =>
-                if (txnMarkerResult == TransactionResult.COMMIT)
-                  Left(Errors.NONE)
-                else
-                  logInvalidStateTransitionAndReturnError(transactionalId, 
txnMetadata.state, txnMarkerResult)
               case CompleteAbort =>
-                if (txnMarkerResult == TransactionResult.ABORT)
-                  Left(Errors.NONE)
-                else
-                  logInvalidStateTransitionAndReturnError(transactionalId, 
txnMetadata.state, txnMarkerResult)
+                if (currentTxnMetadataIsAtLeastTransactionsV2) {
+                  if (txnMarkerResult == TransactionResult.ABORT) {
+                    if (isRetry)
+                      Left(Errors.NONE)
+                    else
+                      generateTxnTransitMetadataForTxnCompletion(PrepareAbort)
+                  } else {
+                    logInvalidStateTransitionAndReturnError(transactionalId, 
txnMetadata.state, txnMarkerResult)
+                  }
+                } else {
+                  // Transaction V1
+                  if (txnMarkerResult == TransactionResult.ABORT)
+                    Left(Errors.NONE)
+                  else
+                    logInvalidStateTransitionAndReturnError(transactionalId, 
txnMetadata.state, txnMarkerResult)
+                }

Review Comment:
   I agree that the if conditions can be more concise here. However, I found it 
more readable to list all the conditions here. Otherwise, it makes my brain 
burn when I verify whether the result matches the scenario mentioned in the 
table.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to