artemlivshits commented on code in PR #21735:
URL: https://github.com/apache/kafka/pull/21735#discussion_r2927059806
##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -441,7 +459,13 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
} else if (txnMetadata.producerId != producerId) {
Left(Errors.INVALID_PRODUCER_ID_MAPPING)
} else if (txnMetadata.producerEpoch != producerEpoch) {
- Left(Errors.PRODUCER_FENCED)
+ // Check if client is using the client-facing epoch
(nextProducerEpoch) after calling
+ // InitProducerId(keepPreparedTxn=true). Adding partitions is
not allowed in this state.
Review Comment:
This should never happen in proper client implementation -- after using
initTransactions(true), the client allows only commit / abort, but in case a
third-party client doesn't follow the spec, we enforce it on the broker.
##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -779,134 +805,165 @@ class TransactionCoordinator(txnConfig:
TransactionConfig,
val coordinatorEpoch = epochAndTxnMetadata.coordinatorEpoch
txnMetadata.inLock(() => {
- producerIdCopy = txnMetadata.producerId
- producerEpochCopy = txnMetadata.producerEpoch
- // PrepareEpochFence has slightly different epoch bumping logic so
don't include it here.
- // Note that, it can only happen when the current state is Ongoing.
- isEpochFence = txnMetadata.pendingState.filter(s => s ==
TransactionState.PREPARE_EPOCH_FENCE).isPresent
- // True if the client retried a request that had overflowed the
epoch, and a new producer ID is stored in the txnMetadata
- val retryOnOverflow = !isEpochFence && txnMetadata.prevProducerId
== producerId &&
- producerEpoch == Short.MaxValue - 1 && txnMetadata.producerEpoch
== 0
- // True if the client retried an endTxn request, and the bumped
producer epoch is stored in the txnMetadata.
- val retryOnEpochBump = !isEpochFence && txnMetadata.producerEpoch
== producerEpoch + 1
-
- val isValidEpoch = {
- if (!isEpochFence) {
- // With transactions V2, state + same epoch is not sufficient
to determine if a retry transition is valid. If the epoch is the
- // same it actually indicates the next endTransaction call.
Instead, we want to check the epoch matches with the epoch in the retry
conditions.
- // Return producer fenced even in the cases where the epoch is
higher and could indicate an invalid state transition.
- // Use the following criteria to determine if a v2 retry is
valid:
- txnMetadata.state match {
- case TransactionState.ONGOING | TransactionState.EMPTY |
TransactionState.DEAD | TransactionState.PREPARE_EPOCH_FENCE =>
- producerEpoch == txnMetadata.producerEpoch
- case TransactionState.PREPARE_COMMIT |
TransactionState.PREPARE_ABORT =>
- retryOnEpochBump
- case TransactionState.COMPLETE_COMMIT |
TransactionState.COMPLETE_ABORT =>
- retryOnEpochBump || retryOnOverflow || producerEpoch ==
txnMetadata.producerEpoch
+ // These checks are performed first so that we don't have to deal
with intermediate states
+ // (including PrepareCommit and PrepareAbort) in already complex
state machine.
+ if (txnMetadata.pendingTransitionInProgress &&
!txnMetadata.pendingState.filter(s => s ==
TransactionState.PREPARE_EPOCH_FENCE).isPresent) {
+ Left(Errors.CONCURRENT_TRANSACTIONS)
+ } else if (txnMetadata.state == TransactionState.PREPARE_COMMIT ||
txnMetadata.state == TransactionState.PREPARE_ABORT) {
+ Left(Errors.CONCURRENT_TRANSACTIONS)
Review Comment:
This is a minor semantics change -- to avoid complicated logic of epoch
comparison in "transient" states, we just return CONCURRENT_TRANSACTIONS, and
skip the complexity. The semantical difference is that now the clients won't
get PRODUCE_FENCED error until the the transaction transitions to a "permanent"
state.
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -340,7 +340,10 @@ synchronized TransactionalRequestResult
initializeTransactions(
.setEnable2Pc(enable2PC)
.setKeepPreparedTxn(keepPreparedTxn);
- InitProducerIdHandler handler = new InitProducerIdHandler(new
InitProducerIdRequest.Builder(requestData),
+ // Use unstable API version when 2PC features are enabled
(requires InitProducerId v6)
+ boolean enableUnstableLastVersion = enable2PC || keepPreparedTxn;
+ InitProducerIdHandler handler = new InitProducerIdHandler(
Review Comment:
Apparently we were missing ability to allow client to use unstable versions,
this is needed for unit tests to work. The functionality follows the pattern
we have in other places.
##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -779,134 +805,165 @@ class TransactionCoordinator(txnConfig:
TransactionConfig,
val coordinatorEpoch = epochAndTxnMetadata.coordinatorEpoch
txnMetadata.inLock(() => {
- producerIdCopy = txnMetadata.producerId
- producerEpochCopy = txnMetadata.producerEpoch
- // PrepareEpochFence has slightly different epoch bumping logic so
don't include it here.
- // Note that, it can only happen when the current state is Ongoing.
- isEpochFence = txnMetadata.pendingState.filter(s => s ==
TransactionState.PREPARE_EPOCH_FENCE).isPresent
- // True if the client retried a request that had overflowed the
epoch, and a new producer ID is stored in the txnMetadata
- val retryOnOverflow = !isEpochFence && txnMetadata.prevProducerId
== producerId &&
- producerEpoch == Short.MaxValue - 1 && txnMetadata.producerEpoch
== 0
- // True if the client retried an endTxn request, and the bumped
producer epoch is stored in the txnMetadata.
- val retryOnEpochBump = !isEpochFence && txnMetadata.producerEpoch
== producerEpoch + 1
-
- val isValidEpoch = {
- if (!isEpochFence) {
- // With transactions V2, state + same epoch is not sufficient
to determine if a retry transition is valid. If the epoch is the
- // same it actually indicates the next endTransaction call.
Instead, we want to check the epoch matches with the epoch in the retry
conditions.
- // Return producer fenced even in the cases where the epoch is
higher and could indicate an invalid state transition.
- // Use the following criteria to determine if a v2 retry is
valid:
- txnMetadata.state match {
- case TransactionState.ONGOING | TransactionState.EMPTY |
TransactionState.DEAD | TransactionState.PREPARE_EPOCH_FENCE =>
- producerEpoch == txnMetadata.producerEpoch
- case TransactionState.PREPARE_COMMIT |
TransactionState.PREPARE_ABORT =>
- retryOnEpochBump
- case TransactionState.COMPLETE_COMMIT |
TransactionState.COMPLETE_ABORT =>
- retryOnEpochBump || retryOnOverflow || producerEpoch ==
txnMetadata.producerEpoch
+ // These checks are performed first so that we don't have to deal
with intermediate states
+ // (including PrepareCommit and PrepareAbort) in already complex
state machine.
+ if (txnMetadata.pendingTransitionInProgress &&
!txnMetadata.pendingState.filter(s => s ==
TransactionState.PREPARE_EPOCH_FENCE).isPresent) {
+ Left(Errors.CONCURRENT_TRANSACTIONS)
+ } else if (txnMetadata.state == TransactionState.PREPARE_COMMIT ||
txnMetadata.state == TransactionState.PREPARE_ABORT) {
+ Left(Errors.CONCURRENT_TRANSACTIONS)
+ } else {
+ // Copy the data under lock, this is going to be used if we want
to return success
+ // on retrying commit / abort.
+ producerIdCopy = txnMetadata.producerId
+ producerEpochCopy = txnMetadata.producerEpoch
+
+ // PrepareEpochFence has slightly different epoch bumping logic
so don't include it here.
+ // Note that, it can only happen when the current state is
Ongoing.
+ isEpochFence = txnMetadata.pendingState.filter(s => s ==
TransactionState.PREPARE_EPOCH_FENCE).isPresent
+
+ // Calculate retry conditions, note that they are going to be
used only
+ // in CompleteCommit / CompleteAbort states, so we can use the
semantics
+ // of the producerId / epoch that's applicable in those states.
+ // Also note that the retry-on-overflow condition happens only
when
+ // overflow happens during last commit / abort, overflows during
+ // InitProducerId(keepPreparedTxn=true) go through
retry-on-epoch-bump
+ // path. Example:
+ // 1. State ONGOING, {pid, epoch} = {42, 10}, {nextPid,
nextEpoch} = {73, 85} -- the client-facing epoch has overflown
+ // 2. Commit transitions to {pid, epoch} = {73, 86} and the
client retries with {73, 85} -- retryOnEpochBump
+ // Another example where we overflow during commit after going
through overflow with keepPreparedTxn = true
+ // 1. State ONGOING, {pid, epoch} = {42, 10}, {nextPid,
nextEpoch} = {73, 32766}
+ // 2. Commit transitions to {pid, epoch} = {85, 0}, prevPid =
73 and the client retries with {73, 32766} -- retryOnOverflow
+
+ // True if the client retried a request that had overflowed the
epoch, and a new producer ID is stored in the txnMetadata
+ val retryOnOverflow = !isEpochFence &&
txnMetadata.prevProducerId == producerId &&
+ producerEpoch == Short.MaxValue - 1 &&
txnMetadata.producerEpoch == 0
+ // True if the client retried an endTxn request, and the bumped
producer epoch is stored in the txnMetadata.
+ val retryOnEpochBump = !isEpochFence &&
txnMetadata.producerEpoch == producerEpoch + 1
+
+ val isValidEpoch = {
+ if (!isEpochFence) {
+ // With transactions V2, state + same epoch is not
sufficient to determine if a retry transition is valid. If the epoch is the
+ // same it actually indicates the next endTransaction call.
Instead, we want to check the epoch matches with the epoch in the retry
conditions.
+ // Return producer fenced even in the cases where the epoch
is higher and could indicate an invalid state transition.
+ // Use the following criteria to determine if a v2 retry is
valid:
+ txnMetadata.state match {
+ case TransactionState.ONGOING | TransactionState.EMPTY |
TransactionState.DEAD | TransactionState.PREPARE_EPOCH_FENCE =>
+ producerEpoch == txnMetadata.clientProducerEpoch
+ case TransactionState.PREPARE_COMMIT |
TransactionState.PREPARE_ABORT =>
+ retryOnEpochBump
+ case TransactionState.COMPLETE_COMMIT |
TransactionState.COMPLETE_ABORT =>
+ retryOnEpochBump || retryOnOverflow || producerEpoch ==
txnMetadata.producerEpoch
+ }
+ } else {
+ // If the epoch is going to be fenced, it bumps the epoch
differently with TV2.
+ (!isFromClient || producerEpoch ==
txnMetadata.clientProducerEpoch) && producerEpoch >=
txnMetadata.clientProducerEpoch
}
- } else {
- // If the epoch is going to be fenced, it bumps the epoch
differently with TV2.
- (!isFromClient || producerEpoch == txnMetadata.producerEpoch)
&& producerEpoch >= txnMetadata.producerEpoch
}
- }
- val isRetry = retryOnEpochBump || retryOnOverflow
-
- def generateTxnTransitMetadataForTxnCompletion(nextState:
TransactionState, noPartitionAdded: Boolean): ApiResult[(Int,
TxnTransitMetadata)] = {
- // EndTxn completion on TV2 bumps epoch, so rotate producer ID
whenever the current epoch is exhausted.
- // This must also apply to the epoch-fence path.
- val nextProducerIdOrErrors =
- if (txnMetadata.isProducerEpochExhausted) {
- try {
- Right(producerIdManager.generateProducerId())
- } catch {
- case e: Exception => Left(Errors.forException(e))
+ val isRetry = retryOnEpochBump || retryOnOverflow
+
+ def generateTxnTransitMetadataForTxnCompletion(nextState:
TransactionState, noPartitionAdded: Boolean): ApiResult[(Int,
TxnTransitMetadata)] = {
+ // EndTxn completion on TV2 bumps epoch, so rotate producer ID
whenever the current epoch is exhausted.
+ // This must also apply to the epoch-fence path.
+ val nextProducerIdOrErrors =
+ if (txnMetadata.isProducerEpochExhausted) {
+ try {
+ Right(producerIdManager.generateProducerId())
+ } catch {
+ case e: Exception => Left(Errors.forException(e))
+ }
+ } else {
+ Right(txnMetadata.nextProducerId)
}
+
+ // If the next producer epoch is set (which can happen if we
used InitProducerId(keepPreparedTxn))
+ // then we also need to bump this epoch -- we want to fence
requests that were issued with that
+ // epoch after the transaction is complete. Consider this
example:
+ // 1. (initial state): Ongoing, producerId = 42,
producerEpoch = 100, nextProducerId = -1, nextProducerEpoch = -1
+ // 2. InitProducerId(keepPreparedTxn): Ongoing, producerId =
42, producerEpoch = 100, nextProducerId = 42, nextProducerEpoch = 101
+ // 3. CommitTxn: timed out (but actually delayed)
+ // 4. CommitTxn + complete: CompleteCommit, producerId = 42,
producerEpoch = 101, nextProducerId = -1, nextProducerEpoch = -1
+ // 5. Start new transaction, do some actions
+ // 6. Delayed CommitTxn from step 3 arrives and commits
partially completed transaction.
+ // In order to prevent this problem, we need to bump the epoch
at step 4 (so it becomes 102) even though we
+ // already bumped it at step 2.
+ // Note that we can arrive at this scenario when a client did
a few keep-prepared calls and then
+ // decided to do one without keep-prepared (e.g. to
force-terminate), so this logic applies
+ // to isEpochFence as well.
+ val nextProducerEpoch = if (txnMetadata.hasNextProducerEpoch) {
+ if (txnMetadata.isProducerEpochExhausted) 0.toShort else
(txnMetadata.nextProducerEpoch + 1).toShort
} else {
- Right(RecordBatch.NO_PRODUCER_ID)
+ RecordBatch.NO_PRODUCER_EPOCH
}
- if (nextState == TransactionState.PREPARE_ABORT && isEpochFence)
{
- // We should clear the pending state to make way for the
transition to PrepareAbort
- txnMetadata.pendingState(util.Optional.empty())
- // For TV2+, don't manually set the epoch - let
prepareAbortOrCommit handle it naturally.
- }
+ if (nextState == TransactionState.PREPARE_ABORT &&
isEpochFence) {
+ // We should clear the pending state to make way for the
transition to PrepareAbort
+ txnMetadata.pendingState(util.Optional.empty())
+ // For TV2+, don't manually set the epoch - let
prepareAbortOrCommit handle it naturally.
+ }
- nextProducerIdOrErrors.flatMap {
- nextProducerId =>
- Right(coordinatorEpoch,
txnMetadata.prepareAbortOrCommit(nextState, clientTransactionVersion,
nextProducerId.asInstanceOf[Long], time.milliseconds(), noPartitionAdded))
+ nextProducerIdOrErrors.flatMap {
+ nextProducerId =>
+ Right(coordinatorEpoch,
txnMetadata.prepareAbortOrCommit(nextState, clientTransactionVersion,
nextProducerId, nextProducerEpoch, time.milliseconds(), noPartitionAdded))
+ }
}
- }
-
- if (txnMetadata.pendingTransitionInProgress &&
txnMetadata.pendingState.get != TransactionState.PREPARE_EPOCH_FENCE) {
- // This check is performed first so that the pending transition
can complete before the next checks.
- // With TV2, we may be transitioning over a producer epoch
overflow, and the producer may be using the
- // new producer ID that is still only in pending state.
- Left(Errors.CONCURRENT_TRANSACTIONS)
- } else if (txnMetadata.producerId != producerId &&
!retryOnOverflow)
- Left(Errors.INVALID_PRODUCER_ID_MAPPING)
- else if (!isValidEpoch)
- Left(Errors.PRODUCER_FENCED)
- else txnMetadata.state match {
- case TransactionState.ONGOING =>
- val nextState = if (txnMarkerResult ==
TransactionResult.COMMIT)
- TransactionState.PREPARE_COMMIT
- else
- TransactionState.PREPARE_ABORT
- generateTxnTransitMetadataForTxnCompletion(nextState, false)
- case TransactionState.COMPLETE_COMMIT =>
- if (txnMarkerResult == TransactionResult.COMMIT) {
- if (isRetry)
- Left(Errors.NONE)
+ if (txnMetadata.clientProducerId != producerId &&
!retryOnOverflow)
+ Left(Errors.INVALID_PRODUCER_ID_MAPPING)
+ else if (!isValidEpoch)
+ Left(Errors.PRODUCER_FENCED)
+ else txnMetadata.state match {
+ case TransactionState.ONGOING =>
+ val nextState = if (txnMarkerResult ==
TransactionResult.COMMIT)
+ TransactionState.PREPARE_COMMIT
else
+ TransactionState.PREPARE_ABORT
+
+ generateTxnTransitMetadataForTxnCompletion(nextState,
noPartitionAdded = false)
+ case TransactionState.COMPLETE_COMMIT =>
+ if (txnMarkerResult == TransactionResult.COMMIT) {
+ if (isRetry)
+ Left(Errors.NONE)
+ else
+ logInvalidStateTransitionAndReturnError(transactionalId,
txnMetadata.state, txnMarkerResult)
+ } else {
+ // Abort.
+ if (isRetry)
+ logInvalidStateTransitionAndReturnError(transactionalId,
txnMetadata.state, txnMarkerResult)
+ else
+
generateTxnTransitMetadataForTxnCompletion(TransactionState.PREPARE_ABORT,
noPartitionAdded = true)
+ }
+ case TransactionState.COMPLETE_ABORT =>
+ if (txnMarkerResult == TransactionResult.ABORT) {
+ if (isRetry)
+ Left(Errors.NONE)
+ else
+
generateTxnTransitMetadataForTxnCompletion(TransactionState.PREPARE_ABORT,
noPartitionAdded = true)
+ } else {
+ // Commit.
logInvalidStateTransitionAndReturnError(transactionalId,
txnMetadata.state, txnMarkerResult)
- } else {
- // Abort.
- if (isRetry)
+ }
+ case TransactionState.EMPTY =>
+ if (txnMarkerResult == TransactionResult.ABORT) {
+
generateTxnTransitMetadataForTxnCompletion(TransactionState.PREPARE_ABORT,
noPartitionAdded = true)
+ } else {
logInvalidStateTransitionAndReturnError(transactionalId,
txnMetadata.state, txnMarkerResult)
- else
-
generateTxnTransitMetadataForTxnCompletion(TransactionState.PREPARE_ABORT, true)
- }
- case TransactionState.COMPLETE_ABORT =>
- if (txnMarkerResult == TransactionResult.ABORT) {
- if (isRetry)
- Left(Errors.NONE)
- else
-
generateTxnTransitMetadataForTxnCompletion(TransactionState.PREPARE_ABORT, true)
- } else {
- // Commit.
- logInvalidStateTransitionAndReturnError(transactionalId,
txnMetadata.state, txnMarkerResult)
- }
- case TransactionState.PREPARE_COMMIT =>
Review Comment:
These checks are now done upfront.
##########
core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala:
##########
@@ -879,8 +879,8 @@ class TransactionStateManagerTest {
// is left at it is. If the transactional id is never reused, the
TransactionMetadata
// will be expired and it should succeed.
val timestamp = time.milliseconds()
- val txnMetadata = new TransactionMetadata(transactionalId, 1,
RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH,
- RecordBatch.NO_PRODUCER_EPOCH, transactionTimeoutMs,
TransactionState.EMPTY, util.Set.of, timestamp, timestamp, TV_0)
+ val txnMetadata = new TransactionMetadata(transactionalId, 1,
RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_ID, 0,
+ RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_PRODUCER_EPOCH,
transactionTimeoutMs, TransactionState.EMPTY, util.Set.of, timestamp,
timestamp, TV_0)
Review Comment:
Only mechanical changes here.
##########
core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMetadataTest.scala:
##########
@@ -51,6 +51,7 @@ class TransactionMetadataTest {
RecordBatch.NO_PRODUCER_ID,
producerEpoch,
RecordBatch.NO_PRODUCER_EPOCH,
+ RecordBatch.NO_PRODUCER_EPOCH,
Review Comment:
Only mechanical changes here.
##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -779,134 +805,165 @@ class TransactionCoordinator(txnConfig:
TransactionConfig,
val coordinatorEpoch = epochAndTxnMetadata.coordinatorEpoch
txnMetadata.inLock(() => {
- producerIdCopy = txnMetadata.producerId
- producerEpochCopy = txnMetadata.producerEpoch
- // PrepareEpochFence has slightly different epoch bumping logic so
don't include it here.
- // Note that, it can only happen when the current state is Ongoing.
- isEpochFence = txnMetadata.pendingState.filter(s => s ==
TransactionState.PREPARE_EPOCH_FENCE).isPresent
- // True if the client retried a request that had overflowed the
epoch, and a new producer ID is stored in the txnMetadata
- val retryOnOverflow = !isEpochFence && txnMetadata.prevProducerId
== producerId &&
- producerEpoch == Short.MaxValue - 1 && txnMetadata.producerEpoch
== 0
- // True if the client retried an endTxn request, and the bumped
producer epoch is stored in the txnMetadata.
- val retryOnEpochBump = !isEpochFence && txnMetadata.producerEpoch
== producerEpoch + 1
-
- val isValidEpoch = {
- if (!isEpochFence) {
- // With transactions V2, state + same epoch is not sufficient
to determine if a retry transition is valid. If the epoch is the
- // same it actually indicates the next endTransaction call.
Instead, we want to check the epoch matches with the epoch in the retry
conditions.
- // Return producer fenced even in the cases where the epoch is
higher and could indicate an invalid state transition.
- // Use the following criteria to determine if a v2 retry is
valid:
- txnMetadata.state match {
- case TransactionState.ONGOING | TransactionState.EMPTY |
TransactionState.DEAD | TransactionState.PREPARE_EPOCH_FENCE =>
- producerEpoch == txnMetadata.producerEpoch
- case TransactionState.PREPARE_COMMIT |
TransactionState.PREPARE_ABORT =>
- retryOnEpochBump
- case TransactionState.COMPLETE_COMMIT |
TransactionState.COMPLETE_ABORT =>
- retryOnEpochBump || retryOnOverflow || producerEpoch ==
txnMetadata.producerEpoch
+ // These checks are performed first so that we don't have to deal
with intermediate states
+ // (including PrepareCommit and PrepareAbort) in already complex
state machine.
+ if (txnMetadata.pendingTransitionInProgress &&
!txnMetadata.pendingState.filter(s => s ==
TransactionState.PREPARE_EPOCH_FENCE).isPresent) {
+ Left(Errors.CONCURRENT_TRANSACTIONS)
+ } else if (txnMetadata.state == TransactionState.PREPARE_COMMIT ||
txnMetadata.state == TransactionState.PREPARE_ABORT) {
+ Left(Errors.CONCURRENT_TRANSACTIONS)
+ } else {
+ // Copy the data under lock, this is going to be used if we want
to return success
+ // on retrying commit / abort.
+ producerIdCopy = txnMetadata.producerId
+ producerEpochCopy = txnMetadata.producerEpoch
+
+ // PrepareEpochFence has slightly different epoch bumping logic
so don't include it here.
+ // Note that, it can only happen when the current state is
Ongoing.
+ isEpochFence = txnMetadata.pendingState.filter(s => s ==
TransactionState.PREPARE_EPOCH_FENCE).isPresent
+
+ // Calculate retry conditions, note that they are going to be
used only
+ // in CompleteCommit / CompleteAbort states, so we can use the
semantics
+ // of the producerId / epoch that's applicable in those states.
+ // Also note that the retry-on-overflow condition happens only
when
+ // overflow happens during last commit / abort, overflows during
+ // InitProducerId(keepPreparedTxn=true) go through
retry-on-epoch-bump
+ // path. Example:
+ // 1. State ONGOING, {pid, epoch} = {42, 10}, {nextPid,
nextEpoch} = {73, 85} -- the client-facing epoch has overflown
+ // 2. Commit transitions to {pid, epoch} = {73, 86} and the
client retries with {73, 85} -- retryOnEpochBump
+ // Another example where we overflow during commit after going
through overflow with keepPreparedTxn = true
+ // 1. State ONGOING, {pid, epoch} = {42, 10}, {nextPid,
nextEpoch} = {73, 32766}
+ // 2. Commit transitions to {pid, epoch} = {85, 0}, prevPid =
73 and the client retries with {73, 32766} -- retryOnOverflow
+
+ // True if the client retried a request that had overflowed the
epoch, and a new producer ID is stored in the txnMetadata
+ val retryOnOverflow = !isEpochFence &&
txnMetadata.prevProducerId == producerId &&
+ producerEpoch == Short.MaxValue - 1 &&
txnMetadata.producerEpoch == 0
+ // True if the client retried an endTxn request, and the bumped
producer epoch is stored in the txnMetadata.
+ val retryOnEpochBump = !isEpochFence &&
txnMetadata.producerEpoch == producerEpoch + 1
+
+ val isValidEpoch = {
+ if (!isEpochFence) {
+ // With transactions V2, state + same epoch is not
sufficient to determine if a retry transition is valid. If the epoch is the
+ // same it actually indicates the next endTransaction call.
Instead, we want to check the epoch matches with the epoch in the retry
conditions.
+ // Return producer fenced even in the cases where the epoch
is higher and could indicate an invalid state transition.
+ // Use the following criteria to determine if a v2 retry is
valid:
+ txnMetadata.state match {
+ case TransactionState.ONGOING | TransactionState.EMPTY |
TransactionState.DEAD | TransactionState.PREPARE_EPOCH_FENCE =>
+ producerEpoch == txnMetadata.clientProducerEpoch
+ case TransactionState.PREPARE_COMMIT |
TransactionState.PREPARE_ABORT =>
+ retryOnEpochBump
+ case TransactionState.COMPLETE_COMMIT |
TransactionState.COMPLETE_ABORT =>
+ retryOnEpochBump || retryOnOverflow || producerEpoch ==
txnMetadata.producerEpoch
+ }
+ } else {
+ // If the epoch is going to be fenced, it bumps the epoch
differently with TV2.
+ (!isFromClient || producerEpoch ==
txnMetadata.clientProducerEpoch) && producerEpoch >=
txnMetadata.clientProducerEpoch
}
- } else {
- // If the epoch is going to be fenced, it bumps the epoch
differently with TV2.
- (!isFromClient || producerEpoch == txnMetadata.producerEpoch)
&& producerEpoch >= txnMetadata.producerEpoch
}
- }
- val isRetry = retryOnEpochBump || retryOnOverflow
-
- def generateTxnTransitMetadataForTxnCompletion(nextState:
TransactionState, noPartitionAdded: Boolean): ApiResult[(Int,
TxnTransitMetadata)] = {
- // EndTxn completion on TV2 bumps epoch, so rotate producer ID
whenever the current epoch is exhausted.
- // This must also apply to the epoch-fence path.
- val nextProducerIdOrErrors =
- if (txnMetadata.isProducerEpochExhausted) {
- try {
- Right(producerIdManager.generateProducerId())
- } catch {
- case e: Exception => Left(Errors.forException(e))
+ val isRetry = retryOnEpochBump || retryOnOverflow
+
+ def generateTxnTransitMetadataForTxnCompletion(nextState:
TransactionState, noPartitionAdded: Boolean): ApiResult[(Int,
TxnTransitMetadata)] = {
+ // EndTxn completion on TV2 bumps epoch, so rotate producer ID
whenever the current epoch is exhausted.
+ // This must also apply to the epoch-fence path.
+ val nextProducerIdOrErrors =
+ if (txnMetadata.isProducerEpochExhausted) {
+ try {
+ Right(producerIdManager.generateProducerId())
+ } catch {
+ case e: Exception => Left(Errors.forException(e))
+ }
+ } else {
+ Right(txnMetadata.nextProducerId)
Review Comment:
In the keepPrepared flow, even if epoch is not exhausted at the time of
endTxn call, we might have already rotated the producer id and we have to use
nextProducerId (which is what the client uses).
##########
core/src/test/scala/integration/kafka/api/TransactionsTest.scala:
##########
@@ -305,79 +314,6 @@ class TransactionsTest extends IntegrationTestHarness {
producer.sendOffsetsToTransaction(TestUtils.consumerPositions(consumer).asJava,
consumer.groupMetadata()))
}
- private def sendOffset(commit: (KafkaProducer[Array[Byte], Array[Byte]],
Review Comment:
This moved to the bottom.
##########
core/src/test/scala/integration/kafka/api/TransactionsTest.scala:
##########
@@ -713,31 +634,10 @@ class TransactionsTest extends IntegrationTestHarness {
producer.abortTransaction()
producer.close()
- // Find the transaction coordinator partition for this transactional ID
- val adminClient = createAdminClient()
- try {
- val txnDescription =
adminClient.describeTransactions(java.util.List.of(transactionalId))
- .description(transactionalId).get()
- val coordinatorId = txnDescription.coordinatorId()
-
- // Access the transaction coordinator and update the epoch to
Short.MaxValue - 2
- val coordinatorBroker = brokers.find(_.config.brokerId ==
coordinatorId).get
- val txnCoordinator =
coordinatorBroker.asInstanceOf[kafka.server.BrokerServer].transactionCoordinator
-
- // Get the transaction metadata and update the epoch close to
Short.MaxValue
- // to trigger the overflow scenario. We'll set it high enough that
subsequent
- // operations will cause it to reach Short.MaxValue - 1 before the
timeout.
-
txnCoordinator.transactionManager.getTransactionState(transactionalId).foreach
{ txnMetadataOpt =>
- txnMetadataOpt.foreach { epochAndMetadata =>
- epochAndMetadata.transactionMetadata.inLock(() => {
-
epochAndMetadata.transactionMetadata.setProducerEpoch((Short.MaxValue -
2).toShort)
- null // inLock expects a Supplier that returns a value
- })
- }
- }
- } finally {
- adminClient.close()
- }
+ // Update the epoch close to Short.MaxValue to trigger the overflow
scenario.
+ // Set it high enough that subsequent operations will cause it to reach
+ // Short.MaxValue - 1 before the timeout.
+ setProducerEpoch(transactionalId, (Short.MaxValue - 2).toShort)
Review Comment:
This functionality is refactored into a helper function.
##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -580,7 +604,9 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
txnMetadata.setLastProducerEpoch(RecordBatch.NO_PRODUCER_EPOCH)
}
- Right(coordinatorEpoch,
txnMetadata.prepareAbortOrCommit(nextState,
TransactionVersion.fromFeatureLevel(0), RecordBatch.NO_PRODUCER_ID,
time.milliseconds(), false))
+ Right((coordinatorEpoch,
txnMetadata.prepareAbortOrCommit(nextState,
+ TransactionVersion.fromFeatureLevel(0),
RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH,
Review Comment:
Just added NO_PRODUCER_EPOCH for next epoch in TV1 case.
##########
core/src/test/scala/integration/kafka/api/TransactionsTest.scala:
##########
@@ -750,33 +650,12 @@ class TransactionsTest extends IntegrationTestHarness {
producer.flush()
// Check and assert that epoch of the transaction is Short.MaxValue - 1
(before timeout)
- val adminClient2 = createAdminClient()
- try {
- val coordinatorId2 =
adminClient2.describeTransactions(java.util.List.of(transactionalId))
- .description(transactionalId).get().coordinatorId()
- val coordinatorBroker2 = brokers.find(_.config.brokerId ==
coordinatorId2).get
- val txnCoordinator2 =
coordinatorBroker2.asInstanceOf[kafka.server.BrokerServer].transactionCoordinator
-
-
txnCoordinator2.transactionManager.getTransactionState(transactionalId).foreach
{ txnMetadataOpt =>
- txnMetadataOpt.foreach { epochAndMetadata =>
- val currentEpoch =
epochAndMetadata.transactionMetadata.producerEpoch()
- assertEquals((Short.MaxValue - 1).toShort, currentEpoch,
- s"Expected epoch to be ${Short.MaxValue - 1}, but got
$currentEpoch")
- }
- }
+ val currentEpoch = getProducerEpoch(transactionalId)
Review Comment:
This is refactored to a helper function.
##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -779,134 +805,165 @@ class TransactionCoordinator(txnConfig:
TransactionConfig,
val coordinatorEpoch = epochAndTxnMetadata.coordinatorEpoch
txnMetadata.inLock(() => {
- producerIdCopy = txnMetadata.producerId
- producerEpochCopy = txnMetadata.producerEpoch
- // PrepareEpochFence has slightly different epoch bumping logic so
don't include it here.
- // Note that, it can only happen when the current state is Ongoing.
- isEpochFence = txnMetadata.pendingState.filter(s => s ==
TransactionState.PREPARE_EPOCH_FENCE).isPresent
- // True if the client retried a request that had overflowed the
epoch, and a new producer ID is stored in the txnMetadata
- val retryOnOverflow = !isEpochFence && txnMetadata.prevProducerId
== producerId &&
- producerEpoch == Short.MaxValue - 1 && txnMetadata.producerEpoch
== 0
- // True if the client retried an endTxn request, and the bumped
producer epoch is stored in the txnMetadata.
- val retryOnEpochBump = !isEpochFence && txnMetadata.producerEpoch
== producerEpoch + 1
-
- val isValidEpoch = {
- if (!isEpochFence) {
- // With transactions V2, state + same epoch is not sufficient
to determine if a retry transition is valid. If the epoch is the
- // same it actually indicates the next endTransaction call.
Instead, we want to check the epoch matches with the epoch in the retry
conditions.
- // Return producer fenced even in the cases where the epoch is
higher and could indicate an invalid state transition.
- // Use the following criteria to determine if a v2 retry is
valid:
- txnMetadata.state match {
- case TransactionState.ONGOING | TransactionState.EMPTY |
TransactionState.DEAD | TransactionState.PREPARE_EPOCH_FENCE =>
- producerEpoch == txnMetadata.producerEpoch
- case TransactionState.PREPARE_COMMIT |
TransactionState.PREPARE_ABORT =>
- retryOnEpochBump
- case TransactionState.COMPLETE_COMMIT |
TransactionState.COMPLETE_ABORT =>
- retryOnEpochBump || retryOnOverflow || producerEpoch ==
txnMetadata.producerEpoch
+ // These checks are performed first so that we don't have to deal
with intermediate states
+ // (including PrepareCommit and PrepareAbort) in already complex
state machine.
+ if (txnMetadata.pendingTransitionInProgress &&
!txnMetadata.pendingState.filter(s => s ==
TransactionState.PREPARE_EPOCH_FENCE).isPresent) {
+ Left(Errors.CONCURRENT_TRANSACTIONS)
+ } else if (txnMetadata.state == TransactionState.PREPARE_COMMIT ||
txnMetadata.state == TransactionState.PREPARE_ABORT) {
+ Left(Errors.CONCURRENT_TRANSACTIONS)
+ } else {
+ // Copy the data under lock, this is going to be used if we want
to return success
+ // on retrying commit / abort.
+ producerIdCopy = txnMetadata.producerId
+ producerEpochCopy = txnMetadata.producerEpoch
+
+ // PrepareEpochFence has slightly different epoch bumping logic
so don't include it here.
+ // Note that, it can only happen when the current state is
Ongoing.
+ isEpochFence = txnMetadata.pendingState.filter(s => s ==
TransactionState.PREPARE_EPOCH_FENCE).isPresent
+
+ // Calculate retry conditions, note that they are going to be
used only
+ // in CompleteCommit / CompleteAbort states, so we can use the
semantics
+ // of the producerId / epoch that's applicable in those states.
+ // Also note that the retry-on-overflow condition happens only
when
+ // overflow happens during last commit / abort, overflows during
+ // InitProducerId(keepPreparedTxn=true) go through
retry-on-epoch-bump
+ // path. Example:
+ // 1. State ONGOING, {pid, epoch} = {42, 10}, {nextPid,
nextEpoch} = {73, 85} -- the client-facing epoch has overflown
+ // 2. Commit transitions to {pid, epoch} = {73, 86} and the
client retries with {73, 85} -- retryOnEpochBump
+ // Another example where we overflow during commit after going
through overflow with keepPreparedTxn = true
+ // 1. State ONGOING, {pid, epoch} = {42, 10}, {nextPid,
nextEpoch} = {73, 32766}
+ // 2. Commit transitions to {pid, epoch} = {85, 0}, prevPid =
73 and the client retries with {73, 32766} -- retryOnOverflow
+
+ // True if the client retried a request that had overflowed the
epoch, and a new producer ID is stored in the txnMetadata
+ val retryOnOverflow = !isEpochFence &&
txnMetadata.prevProducerId == producerId &&
+ producerEpoch == Short.MaxValue - 1 &&
txnMetadata.producerEpoch == 0
+ // True if the client retried an endTxn request, and the bumped
producer epoch is stored in the txnMetadata.
+ val retryOnEpochBump = !isEpochFence &&
txnMetadata.producerEpoch == producerEpoch + 1
+
+ val isValidEpoch = {
+ if (!isEpochFence) {
+ // With transactions V2, state + same epoch is not
sufficient to determine if a retry transition is valid. If the epoch is the
+ // same it actually indicates the next endTransaction call.
Instead, we want to check the epoch matches with the epoch in the retry
conditions.
+ // Return producer fenced even in the cases where the epoch
is higher and could indicate an invalid state transition.
+ // Use the following criteria to determine if a v2 retry is
valid:
+ txnMetadata.state match {
+ case TransactionState.ONGOING | TransactionState.EMPTY |
TransactionState.DEAD | TransactionState.PREPARE_EPOCH_FENCE =>
+ producerEpoch == txnMetadata.clientProducerEpoch
+ case TransactionState.PREPARE_COMMIT |
TransactionState.PREPARE_ABORT =>
+ retryOnEpochBump
+ case TransactionState.COMPLETE_COMMIT |
TransactionState.COMPLETE_ABORT =>
+ retryOnEpochBump || retryOnOverflow || producerEpoch ==
txnMetadata.producerEpoch
+ }
+ } else {
+ // If the epoch is going to be fenced, it bumps the epoch
differently with TV2.
+ (!isFromClient || producerEpoch ==
txnMetadata.clientProducerEpoch) && producerEpoch >=
txnMetadata.clientProducerEpoch
}
- } else {
- // If the epoch is going to be fenced, it bumps the epoch
differently with TV2.
- (!isFromClient || producerEpoch == txnMetadata.producerEpoch)
&& producerEpoch >= txnMetadata.producerEpoch
}
- }
- val isRetry = retryOnEpochBump || retryOnOverflow
-
- def generateTxnTransitMetadataForTxnCompletion(nextState:
TransactionState, noPartitionAdded: Boolean): ApiResult[(Int,
TxnTransitMetadata)] = {
- // EndTxn completion on TV2 bumps epoch, so rotate producer ID
whenever the current epoch is exhausted.
- // This must also apply to the epoch-fence path.
- val nextProducerIdOrErrors =
- if (txnMetadata.isProducerEpochExhausted) {
- try {
- Right(producerIdManager.generateProducerId())
- } catch {
- case e: Exception => Left(Errors.forException(e))
+ val isRetry = retryOnEpochBump || retryOnOverflow
+
+ def generateTxnTransitMetadataForTxnCompletion(nextState:
TransactionState, noPartitionAdded: Boolean): ApiResult[(Int,
TxnTransitMetadata)] = {
+ // EndTxn completion on TV2 bumps epoch, so rotate producer ID
whenever the current epoch is exhausted.
+ // This must also apply to the epoch-fence path.
+ val nextProducerIdOrErrors =
+ if (txnMetadata.isProducerEpochExhausted) {
+ try {
+ Right(producerIdManager.generateProducerId())
+ } catch {
+ case e: Exception => Left(Errors.forException(e))
+ }
+ } else {
+ Right(txnMetadata.nextProducerId)
}
+
+ // If the next producer epoch is set (which can happen if we
used InitProducerId(keepPreparedTxn))
+ // then we also need to bump this epoch -- we want to fence
requests that were issued with that
+ // epoch after the transaction is complete. Consider this
example:
+ // 1. (initial state): Ongoing, producerId = 42,
producerEpoch = 100, nextProducerId = -1, nextProducerEpoch = -1
+ // 2. InitProducerId(keepPreparedTxn): Ongoing, producerId =
42, producerEpoch = 100, nextProducerId = 42, nextProducerEpoch = 101
+ // 3. CommitTxn: timed out (but actually delayed)
+ // 4. CommitTxn + complete: CompleteCommit, producerId = 42,
producerEpoch = 101, nextProducerId = -1, nextProducerEpoch = -1
+ // 5. Start new transaction, do some actions
+ // 6. Delayed CommitTxn from step 3 arrives and commits
partially completed transaction.
+ // In order to prevent this problem, we need to bump the epoch
at step 4 (so it becomes 102) even though we
+ // already bumped it at step 2.
+ // Note that we can arrive at this scenario when a client did
a few keep-prepared calls and then
+ // decided to do one without keep-prepared (e.g. to
force-terminate), so this logic applies
+ // to isEpochFence as well.
+ val nextProducerEpoch = if (txnMetadata.hasNextProducerEpoch) {
+ if (txnMetadata.isProducerEpochExhausted) 0.toShort else
(txnMetadata.nextProducerEpoch + 1).toShort
} else {
- Right(RecordBatch.NO_PRODUCER_ID)
+ RecordBatch.NO_PRODUCER_EPOCH
}
- if (nextState == TransactionState.PREPARE_ABORT && isEpochFence)
{
- // We should clear the pending state to make way for the
transition to PrepareAbort
- txnMetadata.pendingState(util.Optional.empty())
- // For TV2+, don't manually set the epoch - let
prepareAbortOrCommit handle it naturally.
- }
+ if (nextState == TransactionState.PREPARE_ABORT &&
isEpochFence) {
+ // We should clear the pending state to make way for the
transition to PrepareAbort
+ txnMetadata.pendingState(util.Optional.empty())
+ // For TV2+, don't manually set the epoch - let
prepareAbortOrCommit handle it naturally.
+ }
- nextProducerIdOrErrors.flatMap {
- nextProducerId =>
- Right(coordinatorEpoch,
txnMetadata.prepareAbortOrCommit(nextState, clientTransactionVersion,
nextProducerId.asInstanceOf[Long], time.milliseconds(), noPartitionAdded))
+ nextProducerIdOrErrors.flatMap {
+ nextProducerId =>
+ Right(coordinatorEpoch,
txnMetadata.prepareAbortOrCommit(nextState, clientTransactionVersion,
nextProducerId, nextProducerEpoch, time.milliseconds(), noPartitionAdded))
+ }
}
- }
-
- if (txnMetadata.pendingTransitionInProgress &&
txnMetadata.pendingState.get != TransactionState.PREPARE_EPOCH_FENCE) {
Review Comment:
This check is now done upfront.
##########
core/src/test/scala/integration/kafka/api/TransactionsTest.scala:
##########
@@ -750,33 +650,12 @@ class TransactionsTest extends IntegrationTestHarness {
producer.flush()
// Check and assert that epoch of the transaction is Short.MaxValue - 1
(before timeout)
- val adminClient2 = createAdminClient()
- try {
- val coordinatorId2 =
adminClient2.describeTransactions(java.util.List.of(transactionalId))
- .description(transactionalId).get().coordinatorId()
- val coordinatorBroker2 = brokers.find(_.config.brokerId ==
coordinatorId2).get
- val txnCoordinator2 =
coordinatorBroker2.asInstanceOf[kafka.server.BrokerServer].transactionCoordinator
-
-
txnCoordinator2.transactionManager.getTransactionState(transactionalId).foreach
{ txnMetadataOpt =>
- txnMetadataOpt.foreach { epochAndMetadata =>
- val currentEpoch =
epochAndMetadata.transactionMetadata.producerEpoch()
- assertEquals((Short.MaxValue - 1).toShort, currentEpoch,
- s"Expected epoch to be ${Short.MaxValue - 1}, but got
$currentEpoch")
- }
- }
+ val currentEpoch = getProducerEpoch(transactionalId)
+ assertEquals((Short.MaxValue - 1).toShort, currentEpoch,
+ s"Expected epoch to be ${Short.MaxValue - 1}, but got $currentEpoch")
- // Wait until state is complete abort
- waitUntilTrue(() => {
- val listResult = adminClient2.listTransactions()
- val txns = listResult.all().get().asScala
- txns.exists(txn =>
- txn.transactionalId() == transactionalId &&
- txn.state() == TransactionState.COMPLETE_ABORT
- )
- }, "Transaction was not aborted on timeout")
- } finally {
- adminClient2.close()
- }
+ // Wait until state is complete abort
+ waitForTransactionState(transactionalId, TransactionState.COMPLETE_ABORT)
Review Comment:
This is refactored to a helper function.
##########
core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala:
##########
@@ -66,9 +66,9 @@ class TransactionMarkerChannelManagerTest {
private val txnTimeoutMs = 0
private val txnResult = TransactionResult.COMMIT
private val txnMetadata1 = new TransactionMetadata(transactionalId1,
producerId1, producerId1, RecordBatch.NO_PRODUCER_ID,
- producerEpoch, lastProducerEpoch, txnTimeoutMs,
TransactionState.PREPARE_COMMIT, util.Set.of(partition1, partition2), 0L, 0L,
TransactionVersion.TV_2)
+ producerEpoch, lastProducerEpoch, RecordBatch.NO_PRODUCER_EPOCH,
txnTimeoutMs, TransactionState.PREPARE_COMMIT, util.Set.of(partition1,
partition2), 0L, 0L, TransactionVersion.TV_2)
Review Comment:
Only mechanical changes here.
##########
transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLog.java:
##########
@@ -84,18 +84,25 @@ public static byte[] valueToBytes(TxnTransitMetadata
txnMetadata,
.setPartitionIds(entry.getValue().stream().map(TopicPartition::partition).toList())).toList();
}
- return MessageUtil.toVersionPrefixedBytes(
- transactionVersionLevel.transactionLogValueVersion(),
- new TransactionLogValue()
- .setProducerId(txnMetadata.producerId())
- .setProducerEpoch(txnMetadata.producerEpoch())
- .setTransactionTimeoutMs(txnMetadata.txnTimeoutMs())
- .setTransactionStatus(txnMetadata.txnState().id())
-
.setTransactionLastUpdateTimestampMs(txnMetadata.txnLastUpdateTimestamp())
-
.setTransactionStartTimestampMs(txnMetadata.txnStartTimestamp())
- .setTransactionPartitions(transactionPartitions)
-
.setClientTransactionVersion(txnMetadata.clientTransactionVersion().featureLevel())
- );
+ short logVersion =
transactionVersionLevel.transactionLogValueVersion();
+ TransactionLogValue value = new TransactionLogValue()
+ .setProducerId(txnMetadata.producerId())
+ .setProducerEpoch(txnMetadata.producerEpoch())
+ .setTransactionTimeoutMs(txnMetadata.txnTimeoutMs())
+ .setTransactionStatus(txnMetadata.txnState().id())
+
.setTransactionLastUpdateTimestampMs(txnMetadata.txnLastUpdateTimestamp())
+
.setTransactionStartTimestampMs(txnMetadata.txnStartTimestamp())
+ .setTransactionPartitions(transactionPartitions)
+
.setClientTransactionVersion(txnMetadata.clientTransactionVersion().featureLevel());
+
+ // Tagged fields are only supported in log version 1+ (flexible
versions)
+ if (logVersion >= 1) {
+ value.setPreviousProducerId(txnMetadata.prevProducerId());
+ value.setNextProducerId(txnMetadata.nextProducerId());
Review Comment:
This is actually a miss for TV2 and needs to be fixed independently.
##########
core/src/test/scala/integration/kafka/api/TransactionsTest.scala:
##########
@@ -1056,6 +935,545 @@ class TransactionsTest extends IntegrationTestHarness {
producer.abortTransaction()
}
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
+
@MethodSource(Array("getTestGroupProtocolParametersConsumerGroupProtocolOnly"))
+ def testFencingWithDualIdentity(groupProtocol: String): Unit = {
+ // Test that zombie producer is properly fenced when dual identity is
established
+ // after crash recovery with 2PC keepPreparedTxn.
+ val transactionalId = "test-fencing-dual-identity"
+ val testTopic = s"test-fencing-topic-${System.nanoTime()}"
+ createTopic(testTopic, 1, brokerCount, topicConfig())
+
+ val consumer = transactionalConsumers.head
+ consumer.subscribe(Seq(testTopic).asJava)
+ consumer.poll(Duration.ofMillis(100))
+
+ // Start transaction and send records (simulating crash before commit)
+ val producer1 = createTransactionalProducer(transactionalId, enable2PC =
true)
+ producer1.initTransactions()
+ producer1.beginTransaction()
+ val numRecords = 5
+ for (i <- 0 until numRecords) {
+ producer1.send(new ProducerRecord(testTopic, 0, s"key-$i".getBytes,
s"value-$i".getBytes))
+ }
+ producer1.flush()
+ // Simulate crash - don't commit (leave transaction prepared)
+
+ // Create new producer and recover with
initTransactions(keepPreparedTxn=true)
+ val producer2 = createTransactionalProducer(transactionalId, enable2PC =
true)
+ producer2.initTransactions(true) // This establishes dual identity
+
+ // Old (zombie) producer tries to abort - should get
ProducerFencedException
+ assertThrows(classOf[ProducerFencedException], () => {
+ producer1.abortTransaction()
+ }, "Zombie producer should be fenced when trying to abort")
+
+ // New producer commits the prepared transaction
+ producer2.commitTransaction()
+
+ // Wait for transaction to reach COMPLETE_COMMIT state
+ waitForTransactionState(transactionalId, TransactionState.COMPLETE_COMMIT)
+
+ // Verify consumer sees the records from the first producer
+ consumer.seekToBeginning(consumer.assignment())
+ val consumedRecords = consumeRecordsFor(consumer)
+ assertEquals(numRecords, consumedRecords.size,
+ "Consumer should see all records from first producer after commit")
+
+ // Verify transaction state
+ val txnDescription =
adminClient.describeTransactions(java.util.List.of(transactionalId))
+ .description(transactionalId).get()
+ assertEquals(TransactionState.COMPLETE_COMMIT, txnDescription.state(),
+ "Transaction should be in COMPLETE_COMMIT state")
+
+ producer1.close()
+ producer2.close()
+ consumer.unsubscribe()
+ }
+
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
+
@MethodSource(Array("getTestGroupProtocolParametersConsumerGroupProtocolOnly"))
+ def testPrepareAndCompleteTransactionCommit(groupProtocol: String): Unit = {
+ // Test the 2PC API: prepareTransaction() and completeTransaction() for
commit path.
+ // This tests the client-side 2PC API where the producer explicitly calls
prepareTransaction()
+ // and then completeTransaction() to commit.
+ val transactionalId = "test-prepare-complete-commit"
+ val testTopic = s"test-prepare-complete-topic-${System.nanoTime()}"
+ createTopic(testTopic, 1, brokerCount, topicConfig())
+
+ val consumer = transactionalConsumers.head
+ consumer.subscribe(Seq(testTopic).asJava)
+ consumer.poll(Duration.ofMillis(100))
+
+ // Start transaction and produce records
+ val producer1 = createTransactionalProducer(transactionalId, enable2PC =
true)
+ producer1.initTransactions()
+ producer1.beginTransaction()
+ val numRecords = 5
+ for (i <- 0 until numRecords) {
+ producer1.send(new ProducerRecord(testTopic, 0, s"key-$i".getBytes,
s"value-$i".getBytes))
+ }
+ producer1.flush()
+
+ // Prepare the transaction (moves to PREPARED state)
+ val preparedState = producer1.prepareTransaction()
+ assertNotNull(preparedState, "prepareTransaction should return prepared
state")
+
+ // Simulate crash - don't call commit/abort, just leave in prepared state
+ // (In real scenario, producer would crash here)
+
+ // New producer recovers and completes the transaction
+ val producer2 = createTransactionalProducer(transactionalId, enable2PC =
true)
+ producer2.initTransactions(true) // keepPreparedTxn=true
+
+ // Complete the transaction with the prepared state (should commit)
+ producer2.completeTransaction(preparedState)
+
+ // Wait for transaction to reach COMPLETE_COMMIT state
+ waitForTransactionState(transactionalId, TransactionState.COMPLETE_COMMIT)
+
+ // Verify consumer sees the records
+ consumer.seekToBeginning(consumer.assignment())
+ val consumedRecords = consumeRecordsFor(consumer)
+ assertEquals(numRecords, consumedRecords.size,
+ "Consumer should see all records after commit")
+
+ // Verify transaction state
+ val txnDescription =
adminClient.describeTransactions(java.util.List.of(transactionalId))
+ .description(transactionalId).get()
+ assertEquals(TransactionState.COMPLETE_COMMIT, txnDescription.state(),
+ "Transaction should be in COMPLETE_COMMIT state")
+
+ producer1.close()
+ producer2.close()
+ consumer.unsubscribe()
+ }
+
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
+
@MethodSource(Array("getTestGroupProtocolParametersConsumerGroupProtocolOnly"))
+ def testPrepareAndCompleteTransactionAbort(groupProtocol: String): Unit = {
+ // Test the 2PC API: prepareTransaction() and completeTransaction() for
abort path.
+ // This tests that completeTransaction() with a non-matching state
triggers abort.
+ val transactionalId = "test-prepare-complete-abort"
+ val testTopic = s"test-prepare-complete-abort-topic-${System.nanoTime()}"
+ createTopic(testTopic, 1, brokerCount, topicConfig())
+
+ val consumer = transactionalConsumers.head
+ consumer.subscribe(Seq(testTopic).asJava)
+ consumer.poll(Duration.ofMillis(100))
+
+ val producer1 = createTransactionalProducer(transactionalId, enable2PC =
true)
+ producer1.initTransactions()
+
+ // First transaction - prepare but then abort
+ producer1.beginTransaction()
+ for (i <- 0 until 3) {
+ producer1.send(new ProducerRecord(testTopic, 0, s"key1-$i".getBytes,
s"value1-$i".getBytes))
+ }
+ producer1.flush()
+
+ // Get marker for first transaction
+ val marker1 = producer1.prepareTransaction()
+ assertNotNull(marker1, "prepareTransaction should return prepared state")
+
+ // Abort the first prepared transaction
+ producer1.abortTransaction()
+
+ // Second transaction - prepare and leave in prepared state
+ producer1.beginTransaction()
+ val numRecords = 5
+ for (i <- 0 until numRecords) {
+ producer1.send(new ProducerRecord(testTopic, 0, s"key2-$i".getBytes,
s"value2-$i".getBytes))
+ }
+ producer1.flush()
+
+ // Get marker for second transaction
+ val marker2 = producer1.prepareTransaction()
+ assertNotNull(marker2, "prepareTransaction should return prepared state")
+
+ // Simulate crash - leave second transaction in prepared state
+
+ // New producer recovers
+ val producer2 = createTransactionalProducer(transactionalId, enable2PC =
true)
+ producer2.initTransactions(true) // keepPreparedTxn=true
+
+ // Complete with marker1 (old, aborted transaction) - should trigger abort
+ // because it doesn't match the current prepared state (marker2)
+ producer2.completeTransaction(marker1)
+
+ // Wait for transaction to reach COMPLETE_ABORT state
+ waitForTransactionState(transactionalId, TransactionState.COMPLETE_ABORT)
+
+ // Verify consumer sees NO records (both transactions aborted)
+ consumer.seekToBeginning(consumer.assignment())
+ val consumedRecords = consumeRecordsFor(consumer)
+ assertEquals(0, consumedRecords.size,
+ "Consumer should see no records after abort")
+
+ // Verify transaction state
+ val txnDescription =
adminClient.describeTransactions(java.util.List.of(transactionalId))
+ .description(transactionalId).get()
+ assertEquals(TransactionState.COMPLETE_ABORT, txnDescription.state(),
+ "Transaction should be in COMPLETE_ABORT state")
+
+ producer1.close()
+ producer2.close()
+ consumer.unsubscribe()
+ }
+
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
+ @MethodSource(Array("getTestGroupProtocolParametersAll"))
+ def testProducerCrashAndRecoverWith2PC(groupProtocol: String): Unit = {
+ // Test producer crash and recovery with 2PC keepPrepared transaction flow.
+ // Note: This uses a standard transactional producer. 2PC is enabled
server-side
+ // and triggered by calling initTransactions(keepPreparedTxn=true) after
crash.
+
+ def test2PCRecovery(numCrashes: Int, shouldCommit: Boolean): Unit = {
+ val transactionalId = s"test-2pc-recovery-${System.nanoTime()}"
+ val testTopic = s"test-2pc-topic-${System.nanoTime()}"
+ createTopic(testTopic, 1, brokerCount, topicConfig())
+
+ val consumer = transactionalConsumers.head
+ consumer.subscribe(Seq(testTopic).asJava)
+ consumer.poll(Duration.ofMillis(100)) // Trigger assignment
+
+ // Create producer and send records in a transaction
+ var producer = createTransactionalProducer(transactionalId, enable2PC =
true)
+ producer.initTransactions()
+ producer.beginTransaction()
+
+ val numRecords = 5
+ for (i <- 0 until numRecords) {
+ producer.send(new ProducerRecord(testTopic, 0, s"key-$i".getBytes,
+ s"value-$i".getBytes))
+ }
+ producer.flush()
+
+ // Verify records not visible to read_committed consumer
+ consumer.poll(Duration.ofMillis(100))
+ assertEquals(0, consumer.assignment().asScala.map(tp =>
+ consumer.position(tp)).sum, "Records should not be visible before
commit")
+
+ // Simulate crash by closing without committing
+ producer.close(Duration.ZERO)
+
+ val baseEpoch: Short = 0
+
+ // Crash and recover numCrashes times
+ for (crashNum <- 1 to numCrashes) {
+ val recoveredProducer = createTransactionalProducer(transactionalId,
enable2PC = true)
+ // Use keepPreparedTxn=true to preserve in-flight transaction
+ recoveredProducer.initTransactions(true)
+
+ // Verify dual identity after recovery
+ val txnDescription =
adminClient.describeTransactions(java.util.List.of(transactionalId))
+ .description(transactionalId).get()
+
+ // After crash recovery, server should set up dual identity
+ assertTrue(txnDescription.producerEpoch() >= baseEpoch,
+ s"Crash $crashNum: client epoch should be >= base epoch")
+
+ // For simplicity in this test, we just verify epoch progression.
+ // Detailed dual identity verification is in unit tests.
+
+ if (crashNum < numCrashes) {
+ // Simulate another crash
+ recoveredProducer.close(Duration.ZERO)
+ } else {
+ // Last recovery - complete the transaction
+ producer = recoveredProducer
+ }
+ }
+
+ // Complete prepared transaction directly. Cannot call
beginTransaction()
+ // in prepared state - must call commitTransaction() or
abortTransaction() directly.
+ if (shouldCommit) {
+ producer.commitTransaction()
+ } else {
+ producer.abortTransaction()
+ }
+
+ // Wait for the transaction to fully complete on the server before
proceeding.
+ // The client-side commitTransaction() returns after receiving the
EndTxn response,
+ // but the server might still be writing markers and transitioning to
COMPLETE_* state.
+ val expectedState = if (shouldCommit) TransactionState.COMPLETE_COMMIT
else TransactionState.COMPLETE_ABORT
+ waitForTransactionState(transactionalId, expectedState)
+
+ // Verify consumer sees correct records
+ consumer.seekToBeginning(consumer.assignment())
+ val consumedRecords = consumeRecordsFor(consumer)
+
+ if (shouldCommit) {
+ assertEquals(numRecords, consumedRecords.size,
+ "Consumer should see all records after commit")
+ } else {
+ assertEquals(0, consumedRecords.size,
+ "Consumer should see no records after abort")
+ }
+
+ // Verify fresh transaction works with bumped epoch
+ producer.beginTransaction()
+ producer.send(new ProducerRecord(testTopic, 0, "fresh-key".getBytes,
+ "fresh-value".getBytes))
+ producer.commitTransaction()
+
+ // Seek to beginning to read all records (both 2PC and fresh)
+ consumer.seekToBeginning(consumer.assignment())
+ val allRecords = consumeRecordsFor(consumer)
+ val expectedTotal = if (shouldCommit) numRecords + 1 else 1
+ assertEquals(expectedTotal, allRecords.size,
+ s"Should have $expectedTotal total records after fresh transaction")
+
+ producer.close()
+
+ // Unsubscribe consumer for next test iteration
+ consumer.unsubscribe()
+ }
+
+ // Test single crash with commit
+ test2PCRecovery(numCrashes = 1, shouldCommit = true)
+
+ // Test single crash with abort
+ test2PCRecovery(numCrashes = 1, shouldCommit = false)
+
+ // Test multiple crashes before final commit (validates epoch progression)
+ test2PCRecovery(numCrashes = 3, shouldCommit = true)
+ }
+
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
+
@MethodSource(Array("getTestGroupProtocolParametersConsumerGroupProtocolOnly"))
+ def testProducerIdRotationWithEpochExhaustion(groupProtocol: String): Unit =
{
+ // Test producer ID rotation when client epoch is exhausted. This tests
both scenarios:
+ // 1. Rotation during initTransactions(keepPreparedTxn=true)
+ // 2. Rotation during commitTransaction()
+
+ def testRotation(startEpoch: Short, doubleRotation: Boolean = false): Unit
= {
+ val transactionalId = s"test-rotation-${System.nanoTime()}"
+ val testTopic = s"test-rotation-topic-${System.nanoTime()}"
+ createTopic(testTopic, 1, brokerCount, topicConfig())
+
+ val consumer = transactionalConsumers.head
+ consumer.subscribe(Seq(testTopic).asJava)
+ consumer.poll(Duration.ofMillis(100))
+
+ // Establish transactional ID
+ var producer = createTransactionalProducer(transactionalId, enable2PC =
true)
+ producer.initTransactions()
+ producer.close()
+
+ // Set epoch to trigger rotation at desired point
+ setProducerEpoch(transactionalId, startEpoch)
+
+ // Create producer and start a prepared transaction
+ producer = createTransactionalProducer(transactionalId, enable2PC = true)
+ producer.initTransactions()
+ producer.beginTransaction()
+ val numRecords = 3
+ for (i <- 0 until numRecords) {
+ producer.send(new ProducerRecord(testTopic, 0, s"key-$i".getBytes,
s"value-$i".getBytes))
+ }
+ producer.flush()
+ // Don't commit - leave transaction prepared (simulates crash)
+
+ val originalProducerId = getProducerId(transactionalId)
+
+ if (doubleRotation) {
+ // First rotation: loop calling initTransactions(true) until rotation
occurs
+ var rotationCount = 0
+ var currentClientId = originalProducerId
+ var iteration = 0
+ while (rotationCount == 0 && iteration < 20) {
+ iteration += 1
+ val recoveryProducer = createTransactionalProducer(transactionalId,
enable2PC = true)
+ recoveryProducer.initTransactions(true) // keepPreparedTxn
+
+ val clientId = getClientProducerId(transactionalId)
+ if (clientId != currentClientId) {
+ rotationCount = 1
+ currentClientId = clientId
+ }
+ recoveryProducer.close(Duration.ZERO)
+ }
+
+ assertTrue(rotationCount >= 1, s"First rotation should have occurred
after $iteration iterations")
+ assertEquals(0.toShort, getClientProducerEpoch(transactionalId),
+ "After first rotation, client epoch should be exactly 0")
+
+ // Set client epoch high again to trigger second rotation
+ setClientProducerEpoch(transactionalId, startEpoch)
+
+ // Do a compensating epoch bump so that we get at the same epoch as
w/o double rotation
+ val recoveryProducer = createTransactionalProducer(transactionalId,
enable2PC = true)
+ recoveryProducer.initTransactions(true)
+ recoveryProducer.close(Duration.ZERO)
+ }
+
+ // We do 3 total epoch increments from the initial startEpoch:
+ // 1. First initTransactions(): startEpoch → startEpoch + 1
+ // 2. Second initTransactions(true): startEpoch + 1 → startEpoch + 2
+ // 3. commitTransaction(): startEpoch + 2 → startEpoch + 3
+ // Rotation may happen during step 2 or 3 depending on startEpoch.
+ val finalProducer = createTransactionalProducer(transactionalId,
enable2PC = true)
+ finalProducer.initTransactions(true) // keepPreparedTxn - may rotate
here
+
+ // Complete the transaction - may rotate here
+ finalProducer.commitTransaction()
+
+ // Wait for transaction to reach COMPLETE_COMMIT state
+ waitForTransactionState(transactionalId,
TransactionState.COMPLETE_COMMIT)
+
+ // Verify that rotation happened (regardless of whether it occurred
during
+ // initTransactions() or commitTransaction()). After transaction
completes, verify:
+ // - Producer ID changed (rotation occurred)
+ // - Total epoch increments = 3 (accounting for overflow)
+ val finalProducerId = getProducerId(transactionalId)
+ assertNotEquals(originalProducerId, finalProducerId,
+ s"Producer ID should have rotated by transaction completion
(original=$originalProducerId, final=$finalProducerId)")
+
+ // Verify epoch overflow occurred and total epoch increments.
+ val finalEpoch = getProducerEpoch(transactionalId)
+
+ // Final epoch is less than startEpoch (overflow occurred)
+ assertTrue(finalEpoch < startEpoch,
+ s"Overflow should have occurred: finalEpoch=$finalEpoch <
startEpoch=$startEpoch")
+
+ // Total epoch increments = 3 (accounting for overflow)
+ // Formula: finalEpoch + (MaxValue - startEpoch) = total increments
+ // This accounts for increments from startEpoch to MaxValue boundary,
then
+ // wrap to 0 and increments to finalEpoch.
+ val totalIncrements = finalEpoch + Short.MaxValue - startEpoch
+ assertEquals(3, totalIncrements,
+ s"Should have 3 total epoch increments: finalEpoch=$finalEpoch + " +
+ s"(MaxValue=${Short.MaxValue} - startEpoch=$startEpoch) =
$totalIncrements")
+
+ // Verify consumer sees all records
+ consumer.seekToBeginning(consumer.assignment())
+ val consumedRecords = consumeRecordsFor(consumer)
+ assertEquals(numRecords, consumedRecords.size,
+ "Consumer should see all records despite rotation")
+
+ finalProducer.close()
+ producer.close()
+ consumer.unsubscribe()
+ }
+
+ // Scenario 1: Rotation happens during
initTransactions(keepPreparedTxn=true) call
+ // Example flow:
+ // 1. After setProducerEpoch: epoch=32765
+ // 2. First initTransactions() + beginTransaction(): epoch=32766,
transaction ONGOING
+ // 3. initTransactions(keepPreparedTxn=true): tries 32766+1=32767 →
rotation triggered
+ // → Creates dual identity: producerId unchanged,
nextProducerId=<new>, nextProducerEpoch=0
+ // 4. commitTransaction(): Completes transition with nextProducerEpoch
bumped 0→1
+ // → Final state: producerId=<new>, epoch=1
+ testRotation((Short.MaxValue - 2).toShort) // 32765
+
+ // Scenario 1 with double rotation: First rotation during iteration,
second during final init/commit
+ testRotation((Short.MaxValue - 2).toShort, doubleRotation = true)
+
+ // Scenario 2: Rotation happens during commitTransaction() call
+ // Example flow:
+ // 1. After setProducerEpoch: epoch=32764
+ // 2. First initTransactions() + beginTransaction(): epoch=32765,
transaction ONGOING
+ // 3. initTransactions(keepPreparedTxn=true): bumps to 32766 (not
exhausted yet)
+ // → Creates dual identity: producerId unchanged,
nextProducerId=<producerId>, nextProducerEpoch=32766
+ // 4. commitTransaction(): tries 32766+1=32767 → rotation triggered
+ // → Final state: producerId=<new>, epoch=0
+ testRotation((Short.MaxValue - 3).toShort) // 32764
+
+ // Scenario 2 with double rotation: First rotation during iteration,
second during final init/commit
+ testRotation((Short.MaxValue - 3).toShort, doubleRotation = true)
+ }
+
+ // Helper methods
+
+ private def sendOffset(commit: (KafkaProducer[Array[Byte], Array[Byte]],
Review Comment:
This helper got moved.
##########
transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionMetadata.java:
##########
@@ -155,7 +181,7 @@ public TxnTransitMetadata prepareIncrementProducerEpoch(
int newTxnTimeoutMs,
Optional<Short> expectedProducerEpoch,
long updateTimestamp) {
- if (isProducerEpochExhausted())
+ if (isEpochExhausted(producerEpoch))
Review Comment:
Now`isProducerEpochExhausted()` uses clientProducerEpoch(), but here were
validating `producerEpoch`, because that's what's going to be incremented
further in the function.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]