artemlivshits commented on code in PR #17698: URL: https://github.com/apache/kafka/pull/17698#discussion_r1853036403
########## core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala: ########## @@ -498,6 +498,45 @@ class TransactionCoordinator(txnConfig: TransactionConfig, requestLocal) } + /* + The state transition for Empty, CompleteAbort, CompleteCommit is complicated, so here is a table to make it clear. + Note: + PF = PRODUCER_FENCED + ITS = INVALID_TXN_STATE + NONE = No error and no epoch bump + EB = No error and epoch bump + + Retry => producerEpoch = txnState.ProducerEpoch - 1 + Current => producerEpoch = txnState.ProducerEpoch + ------------------------------------------------------ + With transaction V1. Review Comment: We don't have have a difference between retry and current in V1. I'm not sure we need a table for V1 because for V1 it simple: if epoch matches, then if action matches it's a retry otherwise invalid state. ########## core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala: ########## @@ -498,6 +498,45 @@ class TransactionCoordinator(txnConfig: TransactionConfig, requestLocal) } + /* + The state transition for Empty, CompleteAbort, CompleteCommit is complicated, so here is a table to make it clear. + Note: + PF = PRODUCER_FENCED + ITS = INVALID_TXN_STATE + NONE = No error and no epoch bump + EB = No error and epoch bump + + Retry => producerEpoch = txnState.ProducerEpoch - 1 + Current => producerEpoch = txnState.ProducerEpoch + ------------------------------------------------------ + With transaction V1. + +----------------+-----------------+-----------------+ + | | Abort | Commit | + +----------------+-------+---------+-------+---------+ + | | Retry | Current | Retry | Current | + +----------------+-------+---------+-------+---------+ + | Empty | PF | ITS | PF | ITS | + +----------------+-------+---------+-------+---------+ + | CompleteAbort | PF | NONE | PF | ITS | + +----------------+-------+---------+-------+---------+ + | CompleteCommit | PF | ITS | PF | NONE | + +----------------+-------+---------+-------+---------+ + + With transaction V2. + +----------------+-----------------+-----------------+ + | | Abort | Commit | + +----------------+-------+---------+-------+---------+ + | | Retry | Current | Retry | Current | + +----------------+-------+---------+-------+---------+ + | Empty | PF | EB | ITS | ITS | + +----------------+-------+---------+-------+---------+ + | CompleteAbort | NONE | EB | ITS | ITS | + +----------------+-------+---------+-------+---------+ + | CompleteCommit | ITS | EB | NONE | ITS | + +----------------+-------+---------+-------+---------+ + */ + Review Comment: nit: extra line ########## core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala: ########## @@ -487,13 +491,17 @@ private[transaction] class TransactionMetadata(val transactionalId: String, producerEpoch = transitMetadata.producerEpoch lastProducerEpoch = transitMetadata.lastProducerEpoch nextProducerId = transitMetadata.nextProducerId + if (toState == PrepareAbort && (state == Empty || state == CompleteCommit || state == CompleteAbort)) { + // In V2, we allow state transits from Empty, CompleteCommit and CompleteAbort to PrepareAbort. Let's + // use the last update time as the txn start time. + txnStartTimestamp = transitMetadata.txnLastUpdateTimestamp Review Comment: The transaction starts when we created the transition, if we get stuck replicating we should see it in the metrics. ########## core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala: ########## @@ -564,41 +633,44 @@ class TransactionCoordinator(txnConfig: TransactionConfig, else PrepareAbort - // Maybe allocate new producer ID if we are bumping epoch and epoch is exhausted - val nextProducerIdOrErrors = - if (clientTransactionVersion.supportsEpochBump() && !txnMetadata.pendingState.contains(PrepareEpochFence) && txnMetadata.isProducerEpochExhausted) { - try { - Right(producerIdManager.generateProducerId()) - } catch { - case e: Exception => Left(Errors.forException(e)) - } + generateTxnTransitMetadataForTxnCompletion(nextState) Review Comment: How do we distinguish between committing an ongoing transaction and committing an empty transaction? For the latter I presume we need to set the transaction start time and etc. ########## core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala: ########## @@ -498,6 +498,45 @@ class TransactionCoordinator(txnConfig: TransactionConfig, requestLocal) } + /* + The state transition for Empty, CompleteAbort, CompleteCommit is complicated, so here is a table to make it clear. + Note: + PF = PRODUCER_FENCED + ITS = INVALID_TXN_STATE + NONE = No error and no epoch bump + EB = No error and epoch bump + + Retry => producerEpoch = txnState.ProducerEpoch - 1 + Current => producerEpoch = txnState.ProducerEpoch + ------------------------------------------------------ + With transaction V1. + +----------------+-----------------+-----------------+ + | | Abort | Commit | + +----------------+-------+---------+-------+---------+ + | | Retry | Current | Retry | Current | + +----------------+-------+---------+-------+---------+ + | Empty | PF | ITS | PF | ITS | + +----------------+-------+---------+-------+---------+ + | CompleteAbort | PF | NONE | PF | ITS | + +----------------+-------+---------+-------+---------+ + | CompleteCommit | PF | ITS | PF | NONE | + +----------------+-------+---------+-------+---------+ + + With transaction V2. + +----------------+-----------------+-----------------+ + | | Abort | Commit | + +----------------+-------+---------+-------+---------+ + | | Retry | Current | Retry | Current | + +----------------+-------+---------+-------+---------+ + | Empty | PF | EB | ITS | ITS | Review Comment: For commit, the "retry" case is "PF". For Empty, I think the logic is simple -- there is no retry, we just allow aborts to proceed to epoch bump. ########## core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala: ########## @@ -498,6 +498,45 @@ class TransactionCoordinator(txnConfig: TransactionConfig, requestLocal) } + /* + The state transition for Empty, CompleteAbort, CompleteCommit is complicated, so here is a table to make it clear. Review Comment: nit: "is complicated" makes the sentence longer without adding value for the reader :-) ########## core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala: ########## @@ -564,41 +633,44 @@ class TransactionCoordinator(txnConfig: TransactionConfig, else PrepareAbort - // Maybe allocate new producer ID if we are bumping epoch and epoch is exhausted - val nextProducerIdOrErrors = - if (clientTransactionVersion.supportsEpochBump() && !txnMetadata.pendingState.contains(PrepareEpochFence) && txnMetadata.isProducerEpochExhausted) { - try { - Right(producerIdManager.generateProducerId()) - } catch { - case e: Exception => Left(Errors.forException(e)) - } + generateTxnTransitMetadataForTxnCompletion(nextState) + case CompleteCommit => + if (currentTxnMetadataIsAtLeastTransactionsV2) { + if (txnMarkerResult == TransactionResult.COMMIT) { + if (isRetry) + Left(Errors.NONE) + else + logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult) } else { - Right(RecordBatch.NO_PRODUCER_ID) + if (isRetry) + logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult) + else + generateTxnTransitMetadataForTxnCompletion(PrepareAbort) } - - if (nextState == PrepareAbort && txnMetadata.pendingState.contains(PrepareEpochFence)) { - // We should clear the pending state to make way for the transition to PrepareAbort and also bump - // the epoch in the transaction metadata we are about to append. - isEpochFence = true - txnMetadata.pendingState = None - txnMetadata.producerEpoch = producerEpoch - txnMetadata.lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH + } else { + // Transaction V1 + if (txnMarkerResult == TransactionResult.COMMIT) { + Left(Errors.NONE) + } else + logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult) } - - nextProducerIdOrErrors.flatMap { - nextProducerId => - Right(coordinatorEpoch, txnMetadata.prepareAbortOrCommit(nextState, clientTransactionVersion, nextProducerId.asInstanceOf[Long], time.milliseconds())) - } - case CompleteCommit => - if (txnMarkerResult == TransactionResult.COMMIT) - Left(Errors.NONE) - else - logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult) case CompleteAbort => - if (txnMarkerResult == TransactionResult.ABORT) - Left(Errors.NONE) - else - logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult) + if (currentTxnMetadataIsAtLeastTransactionsV2) { + if (txnMarkerResult == TransactionResult.ABORT) { + if (isRetry) + Left(Errors.NONE) + else + generateTxnTransitMetadataForTxnCompletion(PrepareAbort) + } else { + logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult) + } + } else { + // Transaction V1 + if (txnMarkerResult == TransactionResult.ABORT) + Left(Errors.NONE) + else + logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult) + } Review Comment: I think the statement could be simpler if structured like this: ``` if if (txnMarkerResult == TransactionResult.ABORT) { if (!currentTxnMetadataIsAtLeastTransactionsV2 || isRetry) { Left(Errors.NONE) } else { generateTxnTransitMetadataForTxnCompletion(PrepareAbort) } } else { logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult) } ``` Maybe even simpler to fold `!currentTxnMetadataIsAtLeastTransactionsV2` condition into isRetry value above, like `val isRetry = retryOnEpochBump || retryOnOverflow || !currentTxnMetadataIsAtLeastTransactionsV2` then the condition would become even more intuitive -- if action matches and isRetry then just return NONE, otherwise start new operation. I think we could do a similar simplification for the CompleteCommit case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org