[
https://issues.apache.org/jira/browse/KAFKA-20090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18060049#comment-18060049
]
sanghyeok An commented on KAFKA-20090:
--------------------------------------
[~jolshan] [~alivshits] [~chia7712]
Hi! There’s something I’d like to discuss with you all.
There is some non-standard behavior, so I wanted to let you know.
In my view, it seems to behave differently at the epoch boundary, but it likely
won’t cause any issues in practice.
Please take a look at the details below. * Given parameter for
endTransaction(...)
** producerId=10, producerEpoch=32767 (because of fenced)
* *#1 - It can be triggered by initProducerId(...) or
abortTimeoutTransaction(...)*
**
[https://github.com/apache/kafka/blob/172aa3cbcd2236be878dfcb47d0ebc35cc77a834/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L767]
** Local State
*** producerId=10, prevProducerId=-1, nextProducerId=-1, producerEpoch=32766,
lastProducerEpoch=-1, state=ONGOING, pendingState=PREPARE_EPOCH
* #2
**
[https://github.com/apache/kafka/blob/172aa3cbcd2236be878dfcb47d0ebc35cc77a834/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L905]
** Local State
*** producerId=10, prevProducerId=-1, nextProducerId=-1, producerEpoch=32766,
lastProducerEpoch=-1, state=ONGOING, pendingState=PREPARE_ABORT
* #3
**
[https://github.com/apache/kafka/blob/172aa3cbcd2236be878dfcb47d0ebc35cc77a834/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L905]
** Local State
*** producerId=10, prevProducerId=10, nextProducerId=11, producerEpoch=32767,
lastProducerEpoch=32766, state=PREPARE_ABORT, pendingState=Optional.Empty
* #4
**
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L962|https://github.com/apache/kafka/blob/172aa3cbcd2236be878dfcb47d0ebc35cc77a834/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L905]
** Local State
*** producerId=10, prevProducerId=10, nextProducerId=11, producerEpoch=32767,
lastProducerEpoch=32766, state=PREPARE_ABORT, pendingState=COMPLETED_ABORT
* *#5*
**
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L962|https://github.com/apache/kafka/blob/172aa3cbcd2236be878dfcb47d0ebc35cc77a834/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L905]
** Local State
*** *producerId=10, prevProducerId=10, nextProducerId=11, producerEpoch=32767,
lastProducerEpoch=32766, state=PREPARE_ABORT, pendingState=COMPLETED_ABORT*
** *Call Response Callback -> new ProducerId = 11, new ProducerEpoch = 0*
* *#6*
**
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L962|https://github.com/apache/kafka/blob/172aa3cbcd2236be878dfcb47d0ebc35cc77a834/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L905]
** Local State
*** producerId=11, prevProducerId=10, nextProducerId=-1, producerEpoch=0,
lastProducerEpoch=32766, state=COMPLETE_ABORT, pendingState=Optional.Empty
* #7 - endTransaction(...). I assume that this is trigger by
handleEndTransaction(...) due to late client abort transaction request.
**
[https://github.com/apache/kafka/blob/172aa3cbcd2236be878dfcb47d0ebc35cc77a834/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L767]
** Client (Abort Request)
*** producerId=10, producerEpoch=32766
** Local State
*** producerId=11, prevProducerId=10, nextProducerId=-1, producerEpoch=0,
lastProducerEpoch=32766, state=COMPLETE_ABORT, pendingState=Optional.Empty
* *#8*
** *isRetry* = True (because of *retryOnOverFlow* is True)
**
[https://github.com/apache/kafka/blob/172aa3cbcd2236be878dfcb47d0ebc35cc77a834/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L870-L873]
** *Return Left(Errors.NONE)*
* *#9*
**
https://github.com/apache/kafka/blob/172aa3cbcd2236be878dfcb47d0ebc35cc77a834/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L908-L909
** *Call response callback ->* *ProducerId = 11, producerEpoch = 0*
The strange part is #5.
In normal cases, the values recorded in the local state are returned as-is.
However, at the epoch boundary, the response values differ from the local state
values
* epoch boundary
** local state : producerId={*}10{*}, epoch={*}32767{*}
** response: producerId={*}11{*}, epoch={*}0{*}
* normal
** local state : producerId=10, epoch=101
** response: producerId=10, epoch=101
The problematic value, however, is not delivered to the client.
* In case of *InitProducerId(...)*
**
In the endTransaction(...), it calls responseCallback(Errors.NONE,
newPreSendMetadata.producerId, newPreSendMetadata.producerEpoch)
**
[https://github.com/apache/kafka/blob/172aa3cbcd2236be878dfcb47d0ebc35cc77a834/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L186-L195]
*** Therefore, here, it returns Errors.CONCURRENT_TRANSACTIONS with
producerId=-1, producerEpoch = -1
* In case of *Timeout Abort Transaction*
**
[https://github.com/apache/kafka/blob/172aa3cbcd2236be878dfcb47d0ebc35cc77a834/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L1009-L1025]
** It left log only.
So, even though an unexpected behavior occurs during the intermediate
steps({*}#5{*}),
I believe it will still work correctly because there is no path through which
the ProducerId and ProducerEpoch are actually delivered to the client.
The root cause of this issue is that when
{{TransactionMetadata.prepareFenceProducerEpoch()}} is called, it returns epoch
*32767* at epoch *32766* without rotating the ProducerId.
If we change {{prepareFenceProducerEpoch()}} in Transaction V2 to rotate the
ProducerId when the epoch is exhausted, the semantics of epoch fencing would
diverge between Transaction V1 and Transaction V2. That would likely broaden
the scope of the change.
Given that the behavior at the boundary is somewhat unexpected but appears to
have no client-visible impact, my view is that simply removing
{{!isEpochFence}} is the smallest change that still achieves the goal.
What do you all think?
Please share your opinion!
> TV2 can allow for ongoing transactions with max epoch that never complete
> -------------------------------------------------------------------------
>
> Key: KAFKA-20090
> URL: https://issues.apache.org/jira/browse/KAFKA-20090
> Project: Kafka
> Issue Type: Task
> Reporter: Justine Olshan
> Assignee: sanghyeok An
> Priority: Critical
>
> When transaction version 2 was introduced, epoch bumps happen on every
> transaction.
> The original EndTransaction logic considers retries and because of epoch
> bumps we wanted to be careful to not fence ourselves. This means that for
> EndTransaction retries, we have to check if the epoch has been bumped to
> consider a retry.
> The original logic returns the current producer ID and epoch in the
> transaction metadata when a retry has been identified. The normal end
> transaction case with max epoch - 1 was considered and accounted for – the
> state there is safe to return to the producer.
> However, we didn't consider that in the case of fencing epoch bumps with max
> epoch - 1, where we also bump the epoch, but don't create a new producer ID
> and epoch. In this scenario the producer was expected to be fenced and call
> init producer ID, so this isn't a problem, but it is a problem if we try to
> return it to the producer.
> There is a scenario we race a timeout and end transaction abort with max
> epoch - 1, we can consider the end transaction request a "retry" and return
> max epoch as the current producer's epoch instead of fencing.
> 1. The fencing abort on transactional timeout bumps the epoch to max
> 2. The EndTxn request with max epoch - 1 is considered a "retry" and we
> return max epoch
> 3. The producer can start a transaction since we don't check epochs on
> starting transactions
> 4. We cannot commit this transaction with TV2 and we cannot timeout the
> transaction. It is stuck in Ongoing forever.
> I modified
> [https://github.com/apache/kafka/blob/aad33e4e41aaa94b06f10a5be0094b717b98900f/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala#L1329]
> to capture this behavior. I added the following code to the end:
> {code:java}
> // Transition to COMPLETE_ABORT since we can't do it via writing markers
> response callback
> txnMetadata.completeTransitionTo(new
> TxnTransitMetadata(txnMetadata.producerId(), txnMetadata.prevProducerId(),
> txnMetadata.nextProducerId(), Short.MaxValue, Short.MaxValue -1,
> txnTimeoutMs, txnMetadata.pendingState().get(), new
> util.HashSet[TopicPartition](), txnMetadata.txnLastUpdateTimestamp(),
> txnMetadata.txnLastUpdateTimestamp(), TV_2))
> coordinator.handleEndTransaction(transactionalId, producerId,
> epochAtMaxBoundary, TransactionResult.ABORT, TV_2, endTxnCallback)
> assertEquals(10, newProducerId) assertEquals(Short.MaxValue, newEpoch)
> assertEquals(Errors.NONE, error){code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)