[
https://issues.apache.org/jira/browse/KAFKA-20090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18056518#comment-18056518
]
Artem Livshits commented on KAFKA-20090:
----------------------------------------
Hey folks, for TV2 the fencing logic should go through the regular abort flow
as the epoch is bumped on every abort (with TV1 only fencing aborts bumped the
epoch, so the fencing aborts the logic was special). The regular abort flow
already includes the logic to rotate producer id (and properly pass it via
nextProducerId state). So if we consider the full InitProducerId flow it could
go like this:
(case 1 – no ongoing transaction, this is the same for TV1 and TV2)
# If epoch is max - 1, then rotate producer id, set epoch to 0.
# transition to EMPTY
(case 2 - there is ongoing transaction, the flow for TV2 is to abort ongoing
transaction, then retry)
# initiate abort and respond to client with concurrent transactions error
# abort will bump the epoch for the markers and if needed initiate producer id
rotation (save to nextProdicerId)
# abort completes and finishes producer id rotation (if initiated), thus we
never have epoch==max in completed state
# at this point the InitProducerId will get retried and goes to case 1
Note, that there could be a case where the client retries endTxn with abort and
gets a success (because transaction got aborted by timeout or concurrent
InitProducerId) – this is ok because it matches the client state. The client
could continue and initiate another transaction, this is also ok, because one
of 2 things happen:
# another client successfully finishes case 1 of InitProducerId, so the
retrying client gets fenced when initiating a transaction
# the retrying client successfully starts a transaction, then the other
client's InitProducerId will see case 2 and will follow the abort protocol,
eventually fencing the retrying client
Looking through the code, the logic is actually mostly in place except for one
condition:
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L818]
`if (!isEpochFence && txnMetadata.isProducerEpochExhausted) {` the
`!isEpochFence` prevents the proper producer id rotation and we end up in this
state. I think just removing the `!isEpochFence` condition should fix the
issue.
> TV2 can allow for ongoing transactions with max epoch that never complete
> -------------------------------------------------------------------------
>
> Key: KAFKA-20090
> URL: https://issues.apache.org/jira/browse/KAFKA-20090
> Project: Kafka
> Issue Type: Task
> Reporter: Justine Olshan
> Assignee: sanghyeok An
> Priority: Major
>
> When transaction version 2 was introduced, epoch bumps happen on every
> transaction.
> The original EndTransaction logic considers retries and because of epoch
> bumps we wanted to be careful to not fence ourselves. This means that for
> EndTransaction retries, we have to check if the epoch has been bumped to
> consider a retry.
> The original logic returns the current producer ID and epoch in the
> transaction metadata when a retry has been identified. The normal end
> transaction case with max epoch - 1 was considered and accounted for – the
> state there is safe to return to the producer.
> However, we didn't consider that in the case of fencing epoch bumps with max
> epoch - 1, where we also bump the epoch, but don't create a new producer ID
> and epoch. In this scenario the producer was expected to be fenced and call
> init producer ID, so this isn't a problem, but it is a problem if we try to
> return it to the producer.
> There is a scenario we race a timeout and end transaction abort with max
> epoch - 1, we can consider the end transaction request a "retry" and return
> max epoch as the current producer's epoch instead of fencing.
> 1. The fencing abort on transactional timeout bumps the epoch to max
> 2. The EndTxn request with max epoch - 1 is considered a "retry" and we
> return max epoch
> 3. The producer can start a transaction since we don't check epochs on
> starting transactions
> 4. We cannot commit this transaction with TV2 and we cannot timeout the
> transaction. It is stuck in Ongoing forever.
> I modified
> [https://github.com/apache/kafka/blob/aad33e4e41aaa94b06f10a5be0094b717b98900f/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala#L1329]
> to capture this behavior. I added the following code to the end:
> {code:java}
> // Transition to COMPLETE_ABORT since we can't do it via writing markers
> response callback
> txnMetadata.completeTransitionTo(new
> TxnTransitMetadata(txnMetadata.producerId(), txnMetadata.prevProducerId(),
> txnMetadata.nextProducerId(), Short.MaxValue, Short.MaxValue -1,
> txnTimeoutMs, txnMetadata.pendingState().get(), new
> util.HashSet[TopicPartition](), txnMetadata.txnLastUpdateTimestamp(),
> txnMetadata.txnLastUpdateTimestamp(), TV_2))
> coordinator.handleEndTransaction(transactionalId, producerId,
> epochAtMaxBoundary, TransactionResult.ABORT, TV_2, endTxnCallback)
> assertEquals(10, newProducerId) assertEquals(Short.MaxValue, newEpoch)
> assertEquals(Errors.NONE, error){code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)