artemlivshits commented on code in PR #16719:
URL: https://github.com/apache/kafka/pull/16719#discussion_r1728148053
##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -517,47 +526,90 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
val coordinatorEpoch = epochAndTxnMetadata.coordinatorEpoch
txnMetadata.inLock {
- if (txnMetadata.producerId != producerId)
+ producerIdCopy = txnMetadata.producerId
+ producerEpochCopy = txnMetadata.producerEpoch
+ val currentTxnMetadataIsAtLeastTransactionsV2 =
txnMetadata.clientTransactionVersion >= 2
Review Comment:
I don't have a strong opinion, if you think this code structure is best, I
think it works (I left a comment on the other issue separately).
##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -517,47 +526,90 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
val coordinatorEpoch = epochAndTxnMetadata.coordinatorEpoch
txnMetadata.inLock {
- if (txnMetadata.producerId != producerId)
+ producerIdCopy = txnMetadata.producerId
+ producerEpochCopy = txnMetadata.producerEpoch
+ val currentTxnMetadataIsAtLeastTransactionsV2 =
txnMetadata.clientTransactionVersion >= 2
+ // True if the client used TV_2 and retried a request that had
overflowed the epoch, and a new producer ID is stored in the txnMetadata
+ val retryOnOverflow =
!txnMetadata.pendingState.contains(PrepareEpochFence) &&
currentTxnMetadataIsAtLeastTransactionsV2 &&
+ txnMetadata.previousProducerId == producerId && producerEpoch ==
Short.MaxValue - 1 && txnMetadata.producerEpoch == 0
+ // True if the client used TV_2 and retried an endTxn request, and
the bumped producer epoch is stored in the txnMetadata.
+ val retryOnEpochBump = endTxnEpochBumped(txnMetadata,
producerEpoch)
+
+ // With transaction V2, state + same epoch is not sufficient to
determine if a retry transition is valid. If the epoch is the
+ // same it actually indicates the next endTransaction call.
Instead, we want to check the epoch matches with the epoch in the retry
conditions.
+ // Use the following criteria to determine if a retry is valid
+ // 1) If the request is not transactionV2, the same epoch check
is sufficient
+ // 2) If the request is transaction V2, confirm the epoch is
correct via the retry booleans defined above.
+ // Note: The retry on overflow case is only valid when txnMetadata
is CompleteCommit/Abort
+ val validCompleteTransition =
!currentTxnMetadataIsAtLeastTransactionsV2 || retryOnEpochBump ||
retryOnOverflow
+ val validPrepareTransition =
!currentTxnMetadataIsAtLeastTransactionsV2 || retryOnEpochBump
+
+ if (txnMetadata.producerId != producerId && !retryOnOverflow)
Left(Errors.INVALID_PRODUCER_ID_MAPPING)
- // Strict equality is enforced on the client side requests, as
they shouldn't bump the producer epoch.
- else if ((isFromClient && producerEpoch !=
txnMetadata.producerEpoch) || producerEpoch < txnMetadata.producerEpoch)
+ // Strict equality is enforced on the client side requests, as
they shouldn't bump the producer epoch without server knowledge.
+ // If a TV_2 endTxn request is retried by the client, it will not
have the same epoch -- don't throw producer fenced in these cases.
+ else if (((isFromClient && producerEpoch !=
txnMetadata.producerEpoch) || producerEpoch < txnMetadata.producerEpoch) &&
!retryOnEpochBump && !retryOnOverflow)
Left(Errors.PRODUCER_FENCED)
else if (txnMetadata.pendingTransitionInProgress &&
txnMetadata.pendingState.get != PrepareEpochFence)
Left(Errors.CONCURRENT_TRANSACTIONS)
else txnMetadata.state match {
case Ongoing =>
- val nextState = if (txnMarkerResult ==
TransactionResult.COMMIT)
- PrepareCommit
- else
- PrepareAbort
-
- if (nextState == PrepareAbort &&
txnMetadata.pendingState.contains(PrepareEpochFence)) {
- // We should clear the pending state to make way for the
transition to PrepareAbort and also bump
- // the epoch in the transaction metadata we are about to
append.
- isEpochFence = true
- txnMetadata.pendingState = None
- txnMetadata.producerEpoch = producerEpoch
- txnMetadata.lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH
- }
+ // We should only be in retrying states on Prepare/Complete
states.
+ // If we are in ongoing, some other producer must have
transitioned that state.
+ if (retryOnOverflow || retryOnEpochBump) {
+ Left(Errors.PRODUCER_FENCED)
+ } else {
+ val nextState = if (txnMarkerResult ==
TransactionResult.COMMIT)
+ PrepareCommit
+ else
+ PrepareAbort
+
+ // Maybe allocate new producer ID if we are bumping epoch
and epoch is exhausted
+ val nextProducerIdOrErrors =
+ if (requestIsAtLeastTransactionsV2 &&
!txnMetadata.pendingState.contains(PrepareEpochFence) &&
txnMetadata.isProducerEpochExhausted) {
+ producerIdManager.generateProducerId() match {
+ case Success(newProducerId) =>
+ Right(newProducerId)
+ case Failure(exception) =>
+ Left(Errors.forException(exception))
+ }
+ } else {
+ Right(RecordBatch.NO_PRODUCER_ID)
+ }
+
+ if (nextState == PrepareAbort &&
txnMetadata.pendingState.contains(PrepareEpochFence)) {
+ // We should clear the pending state to make way for the
transition to PrepareAbort and also bump
+ // the epoch in the transaction metadata we are about to
append.
+ isEpochFence = true
+ txnMetadata.pendingState = None
+ txnMetadata.producerEpoch = producerEpoch
+ txnMetadata.lastProducerEpoch =
RecordBatch.NO_PRODUCER_EPOCH
+ }
- Right(coordinatorEpoch,
txnMetadata.prepareAbortOrCommit(nextState, time.milliseconds()))
+ nextProducerIdOrErrors match {
+ case Left(error) =>
+ Left(error)
+ case Right(nextProducerId) =>
+ Right(coordinatorEpoch,
txnMetadata.prepareAbortOrCommit(nextState, clientTransactionVersion,
nextProducerId, time.milliseconds()))
+ }
+ }
case CompleteCommit =>
- if (txnMarkerResult == TransactionResult.COMMIT)
+ if (txnMarkerResult == TransactionResult.COMMIT &&
validCompleteTransition)
Review Comment:
This is not consistent with what we had before: unexpected epoch led to
producer fenced error:
```
if ((isFromClient && producerEpoch != txnMetadata.producerEpoch) ||
producerEpoch < txnMetadata.producerEpoch)
Left(Errors.PRODUCER_FENCED)
```
If we decide to redefine this condition, we should redefine it consistently
for all states.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]