artemlivshits commented on code in PR #17698:
URL: https://github.com/apache/kafka/pull/17698#discussion_r1853036403


##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -498,6 +498,45 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
       requestLocal)
   }
 
+  /*
+    The state transition for Empty, CompleteAbort, CompleteCommit is 
complicated, so here is a table to make it clear.
+    Note:
+    PF = PRODUCER_FENCED
+    ITS = INVALID_TXN_STATE
+    NONE = No error and no epoch bump
+    EB = No error and epoch bump
+
+    Retry => producerEpoch = txnState.ProducerEpoch - 1
+    Current => producerEpoch = txnState.ProducerEpoch
+    ------------------------------------------------------
+    With transaction V1.

Review Comment:
   We don't have have a difference between retry and current in V1.  I'm not 
sure we need a table for V1 because for V1 it simple: if epoch matches, then if 
action matches it's a retry otherwise invalid state.



##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -498,6 +498,45 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
       requestLocal)
   }
 
+  /*
+    The state transition for Empty, CompleteAbort, CompleteCommit is 
complicated, so here is a table to make it clear.
+    Note:
+    PF = PRODUCER_FENCED
+    ITS = INVALID_TXN_STATE
+    NONE = No error and no epoch bump
+    EB = No error and epoch bump
+
+    Retry => producerEpoch = txnState.ProducerEpoch - 1
+    Current => producerEpoch = txnState.ProducerEpoch
+    ------------------------------------------------------
+    With transaction V1.
+    +----------------+-----------------+-----------------+
+    |                | Abort           | Commit          |
+    +----------------+-------+---------+-------+---------+
+    |                | Retry | Current | Retry | Current |
+    +----------------+-------+---------+-------+---------+
+    | Empty          | PF    | ITS     | PF    | ITS     |
+    +----------------+-------+---------+-------+---------+
+    | CompleteAbort  | PF    | NONE    | PF    | ITS     |
+    +----------------+-------+---------+-------+---------+
+    | CompleteCommit | PF    | ITS     | PF    | NONE    |
+    +----------------+-------+---------+-------+---------+
+
+    With transaction V2.
+    +----------------+-----------------+-----------------+
+    |                | Abort           | Commit          |
+    +----------------+-------+---------+-------+---------+
+    |                | Retry | Current | Retry | Current |
+    +----------------+-------+---------+-------+---------+
+    | Empty          | PF    | EB      | ITS   | ITS     |
+    +----------------+-------+---------+-------+---------+
+    | CompleteAbort  | NONE  | EB      | ITS   | ITS     |
+    +----------------+-------+---------+-------+---------+
+    | CompleteCommit | ITS   | EB      | NONE  | ITS     |
+    +----------------+-------+---------+-------+---------+
+   */
+

Review Comment:
   nit: extra line



##########
core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala:
##########
@@ -487,13 +491,17 @@ private[transaction] class TransactionMetadata(val 
transactionalId: String,
             producerEpoch = transitMetadata.producerEpoch
             lastProducerEpoch = transitMetadata.lastProducerEpoch
             nextProducerId = transitMetadata.nextProducerId
+            if (toState == PrepareAbort && (state == Empty || state == 
CompleteCommit || state == CompleteAbort)) {
+              // In V2, we allow state transits from Empty, CompleteCommit and 
CompleteAbort to PrepareAbort. Let's
+              // use the last update time as the txn start time.
+              txnStartTimestamp = transitMetadata.txnLastUpdateTimestamp

Review Comment:
   The transaction starts when we created the transition, if we get stuck 
replicating we should see it in the metrics.



##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -564,41 +633,44 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
                 else
                   PrepareAbort
 
-                // Maybe allocate new producer ID if we are bumping epoch and 
epoch is exhausted
-                val nextProducerIdOrErrors =
-                  if (clientTransactionVersion.supportsEpochBump() && 
!txnMetadata.pendingState.contains(PrepareEpochFence) && 
txnMetadata.isProducerEpochExhausted) {
-                    try {
-                      Right(producerIdManager.generateProducerId())
-                    } catch {
-                      case e: Exception => Left(Errors.forException(e))
-                    }
+                generateTxnTransitMetadataForTxnCompletion(nextState)

Review Comment:
   How do we distinguish between committing an ongoing transaction and 
committing an empty transaction?  For the latter I presume we need to set the 
transaction start time and etc.



##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -498,6 +498,45 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
       requestLocal)
   }
 
+  /*
+    The state transition for Empty, CompleteAbort, CompleteCommit is 
complicated, so here is a table to make it clear.
+    Note:
+    PF = PRODUCER_FENCED
+    ITS = INVALID_TXN_STATE
+    NONE = No error and no epoch bump
+    EB = No error and epoch bump
+
+    Retry => producerEpoch = txnState.ProducerEpoch - 1
+    Current => producerEpoch = txnState.ProducerEpoch
+    ------------------------------------------------------
+    With transaction V1.
+    +----------------+-----------------+-----------------+
+    |                | Abort           | Commit          |
+    +----------------+-------+---------+-------+---------+
+    |                | Retry | Current | Retry | Current |
+    +----------------+-------+---------+-------+---------+
+    | Empty          | PF    | ITS     | PF    | ITS     |
+    +----------------+-------+---------+-------+---------+
+    | CompleteAbort  | PF    | NONE    | PF    | ITS     |
+    +----------------+-------+---------+-------+---------+
+    | CompleteCommit | PF    | ITS     | PF    | NONE    |
+    +----------------+-------+---------+-------+---------+
+
+    With transaction V2.
+    +----------------+-----------------+-----------------+
+    |                | Abort           | Commit          |
+    +----------------+-------+---------+-------+---------+
+    |                | Retry | Current | Retry | Current |
+    +----------------+-------+---------+-------+---------+
+    | Empty          | PF    | EB      | ITS   | ITS     |

Review Comment:
   For commit, the "retry" case is "PF".  For Empty, I think the logic is 
simple -- there is no retry, we just allow aborts to proceed to epoch bump.



##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -498,6 +498,45 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
       requestLocal)
   }
 
+  /*
+    The state transition for Empty, CompleteAbort, CompleteCommit is 
complicated, so here is a table to make it clear.

Review Comment:
   nit: "is complicated" makes the sentence longer without adding value for the 
reader :-) 



##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -564,41 +633,44 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
                 else
                   PrepareAbort
 
-                // Maybe allocate new producer ID if we are bumping epoch and 
epoch is exhausted
-                val nextProducerIdOrErrors =
-                  if (clientTransactionVersion.supportsEpochBump() && 
!txnMetadata.pendingState.contains(PrepareEpochFence) && 
txnMetadata.isProducerEpochExhausted) {
-                    try {
-                      Right(producerIdManager.generateProducerId())
-                    } catch {
-                      case e: Exception => Left(Errors.forException(e))
-                    }
+                generateTxnTransitMetadataForTxnCompletion(nextState)
+              case CompleteCommit =>
+                if (currentTxnMetadataIsAtLeastTransactionsV2) {
+                  if (txnMarkerResult == TransactionResult.COMMIT) {
+                    if (isRetry)
+                      Left(Errors.NONE)
+                    else
+                      logInvalidStateTransitionAndReturnError(transactionalId, 
txnMetadata.state, txnMarkerResult)
                   } else {
-                    Right(RecordBatch.NO_PRODUCER_ID)
+                    if (isRetry)
+                      logInvalidStateTransitionAndReturnError(transactionalId, 
txnMetadata.state, txnMarkerResult)
+                    else
+                      generateTxnTransitMetadataForTxnCompletion(PrepareAbort)
                   }
-
-                if (nextState == PrepareAbort && 
txnMetadata.pendingState.contains(PrepareEpochFence)) {
-                  // We should clear the pending state to make way for the 
transition to PrepareAbort and also bump
-                  // the epoch in the transaction metadata we are about to 
append.
-                  isEpochFence = true
-                  txnMetadata.pendingState = None
-                  txnMetadata.producerEpoch = producerEpoch
-                  txnMetadata.lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH
+                } else {
+                  // Transaction V1
+                  if (txnMarkerResult == TransactionResult.COMMIT) {
+                      Left(Errors.NONE)
+                  } else
+                    logInvalidStateTransitionAndReturnError(transactionalId, 
txnMetadata.state, txnMarkerResult)
                 }
-
-                nextProducerIdOrErrors.flatMap {
-                  nextProducerId =>
-                    Right(coordinatorEpoch, 
txnMetadata.prepareAbortOrCommit(nextState, clientTransactionVersion, 
nextProducerId.asInstanceOf[Long], time.milliseconds()))
-                }
-              case CompleteCommit =>
-                if (txnMarkerResult == TransactionResult.COMMIT)
-                  Left(Errors.NONE)
-                else
-                  logInvalidStateTransitionAndReturnError(transactionalId, 
txnMetadata.state, txnMarkerResult)
               case CompleteAbort =>
-                if (txnMarkerResult == TransactionResult.ABORT)
-                  Left(Errors.NONE)
-                else
-                  logInvalidStateTransitionAndReturnError(transactionalId, 
txnMetadata.state, txnMarkerResult)
+                if (currentTxnMetadataIsAtLeastTransactionsV2) {
+                  if (txnMarkerResult == TransactionResult.ABORT) {
+                    if (isRetry)
+                      Left(Errors.NONE)
+                    else
+                      generateTxnTransitMetadataForTxnCompletion(PrepareAbort)
+                  } else {
+                    logInvalidStateTransitionAndReturnError(transactionalId, 
txnMetadata.state, txnMarkerResult)
+                  }
+                } else {
+                  // Transaction V1
+                  if (txnMarkerResult == TransactionResult.ABORT)
+                    Left(Errors.NONE)
+                  else
+                    logInvalidStateTransitionAndReturnError(transactionalId, 
txnMetadata.state, txnMarkerResult)
+                }

Review Comment:
   I think the statement could be simpler if structured like this:
   ```
     if if (txnMarkerResult == TransactionResult.ABORT) {
       if (!currentTxnMetadataIsAtLeastTransactionsV2 || isRetry) {
         Left(Errors.NONE)
       } else {
         generateTxnTransitMetadataForTxnCompletion(PrepareAbort)
       }
     } else {
       logInvalidStateTransitionAndReturnError(transactionalId, 
txnMetadata.state, txnMarkerResult)
     }
   ```
   
   Maybe even simpler to fold `!currentTxnMetadataIsAtLeastTransactionsV2` 
condition into isRetry value above, like `val isRetry = retryOnEpochBump || 
retryOnOverflow || !currentTxnMetadataIsAtLeastTransactionsV2` then the 
condition would become even more intuitive -- if action matches and isRetry 
then just return NONE, otherwise start new operation.
   
   I think we could do a similar simplification for the CompleteCommit case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to