[ 
https://issues.apache.org/jira/browse/KAFKA-20090?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan updated KAFKA-20090:
-----------------------------------
    Description: 
When transaction version 2 was introduced, epoch bumps happen on every 
transaction. 

The original EndTransaction logic considers retries and because of epoch bumps 
we wanted to be careful to not fence ourselves. This means that for 
EndTransaction retries, we have to check if the epoch has been bumped to 
consider a retry.

The original logic returns the current producer ID and epoch in the transaction 
metadata when a retry has been identified. The normal end transaction case with 
max epoch - 1 was considered and accounted for – the state there is safe to 
return to the producer.

However, we didn't consider that in the case of fencing epoch bumps with max 
epoch - 1, where we also bump the epoch, but don't create a new producer ID and 
epoch. In this scenario the producer was expected to be fenced and call init 
producer ID, so this isn't a problem, but it is a problem if we try to return 
it to the producer.

There is a scenario we race a timeout and end transaction abort with max epoch 
- 1, we can consider the end transaction request a "retry" and return max epoch 
as the current producer's epoch instead of fencing. 

1. The fencing abort on transactional timeout bumps the epoch to max

2. The EndTxn request with max epoch - 1 is considered a "retry" and we return 
max epoch

3. The producer can start a transaction since we don't check epochs on starting 
transactions

4. We cannot commit this transaction with TV2 and we cannot timeout the 
transaction. It is stuck in Ongoing forever. 

I modified 
[https://github.com/apache/kafka/blob/aad33e4e41aaa94b06f10a5be0094b717b98900f/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala#L1329]
 to capture this behavior. I added the following code to the end:
{code:java}
// Transition to COMPLETE_ABORT since we can't do it via writing markers 
response callback 

txnMetadata.completeTransitionTo(new 
TxnTransitMetadata(txnMetadata.producerId(), txnMetadata.prevProducerId(), 
txnMetadata.nextProducerId(), Short.MaxValue, Short.MaxValue -1, txnTimeoutMs, 
txnMetadata.pendingState().get(), new util.HashSet[TopicPartition](), 
txnMetadata.txnLastUpdateTimestamp(), txnMetadata.txnLastUpdateTimestamp(), 
TV_2)) 

coordinator.handleEndTransaction(transactionalId, producerId, 
epochAtMaxBoundary, TransactionResult.ABORT, TV_2, endTxnCallback) 
assertEquals(10, newProducerId) assertEquals(Short.MaxValue, newEpoch) 
assertEquals(Errors.NONE, error){code}

  was:
When transaction version 2 was introduced, epoch bumps happen on every 
transaction. 

The original EndTransaction logic considers retries and because of epoch bumps 
we wanted to be careful to not fence ourselves. This means that for 
EndTransaction retries, we have to check if the epoch has been bumped to 
consider a retry.

The original logic returns the current producer ID and epoch in the transaction 
metadata when a retry has been identified. The normal end transaction case with 
max epoch - 1 was considered and accounted for – the state there is safe to 
return to the producer.

However, we didn't consider that in the case of fencing epoch bumps with max 
epoch - 1, where we also bump the epoch, but don't create a new producer ID and 
epoch. In this scenario the producer was expected to be fenced and call init 
producer ID. 

There is a scenario we race a timeout and end transaction abort with max epoch 
- 1, we can consider the end transaction request a "retry" and return max epoch 
as the current producer's epoch instead of fencing. 

1. The fencing abort on transactional timeout bumps the epoch to max

2. The EndTxn request with max epoch - 1 is considered a "retry" and we return 
max epoch

3. The producer can start a transaction since we don't check epochs on starting 
transactions

4. We cannot commit this transaction with TV2 and we cannot timeout the 
transaction. It is stuck in Ongoing forever. 

I modified 
[https://github.com/apache/kafka/blob/aad33e4e41aaa94b06f10a5be0094b717b98900f/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala#L1329]
 to capture this behavior. I added the following code to the end:



{code:java}
// Transition to COMPLETE_ABORT since we can't do it via writing markers 
response callback 

txnMetadata.completeTransitionTo(new 
TxnTransitMetadata(txnMetadata.producerId(), txnMetadata.prevProducerId(), 
txnMetadata.nextProducerId(), Short.MaxValue, Short.MaxValue -1, txnTimeoutMs, 
txnMetadata.pendingState().get(), new util.HashSet[TopicPartition](), 
txnMetadata.txnLastUpdateTimestamp(), txnMetadata.txnLastUpdateTimestamp(), 
TV_2)) 

coordinator.handleEndTransaction(transactionalId, producerId, 
epochAtMaxBoundary, TransactionResult.ABORT, TV_2, endTxnCallback) 
assertEquals(10, newProducerId) assertEquals(Short.MaxValue, newEpoch) 
assertEquals(Errors.NONE, error){code}


> TV2 can allow for ongoing transactions with max epoch that never complete
> -------------------------------------------------------------------------
>
>                 Key: KAFKA-20090
>                 URL: https://issues.apache.org/jira/browse/KAFKA-20090
>             Project: Kafka
>          Issue Type: Task
>            Reporter: Justine Olshan
>            Priority: Major
>
> When transaction version 2 was introduced, epoch bumps happen on every 
> transaction. 
> The original EndTransaction logic considers retries and because of epoch 
> bumps we wanted to be careful to not fence ourselves. This means that for 
> EndTransaction retries, we have to check if the epoch has been bumped to 
> consider a retry.
> The original logic returns the current producer ID and epoch in the 
> transaction metadata when a retry has been identified. The normal end 
> transaction case with max epoch - 1 was considered and accounted for – the 
> state there is safe to return to the producer.
> However, we didn't consider that in the case of fencing epoch bumps with max 
> epoch - 1, where we also bump the epoch, but don't create a new producer ID 
> and epoch. In this scenario the producer was expected to be fenced and call 
> init producer ID, so this isn't a problem, but it is a problem if we try to 
> return it to the producer.
> There is a scenario we race a timeout and end transaction abort with max 
> epoch - 1, we can consider the end transaction request a "retry" and return 
> max epoch as the current producer's epoch instead of fencing. 
> 1. The fencing abort on transactional timeout bumps the epoch to max
> 2. The EndTxn request with max epoch - 1 is considered a "retry" and we 
> return max epoch
> 3. The producer can start a transaction since we don't check epochs on 
> starting transactions
> 4. We cannot commit this transaction with TV2 and we cannot timeout the 
> transaction. It is stuck in Ongoing forever. 
> I modified 
> [https://github.com/apache/kafka/blob/aad33e4e41aaa94b06f10a5be0094b717b98900f/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala#L1329]
>  to capture this behavior. I added the following code to the end:
> {code:java}
> // Transition to COMPLETE_ABORT since we can't do it via writing markers 
> response callback 
> txnMetadata.completeTransitionTo(new 
> TxnTransitMetadata(txnMetadata.producerId(), txnMetadata.prevProducerId(), 
> txnMetadata.nextProducerId(), Short.MaxValue, Short.MaxValue -1, 
> txnTimeoutMs, txnMetadata.pendingState().get(), new 
> util.HashSet[TopicPartition](), txnMetadata.txnLastUpdateTimestamp(), 
> txnMetadata.txnLastUpdateTimestamp(), TV_2)) 
> coordinator.handleEndTransaction(transactionalId, producerId, 
> epochAtMaxBoundary, TransactionResult.ABORT, TV_2, endTxnCallback) 
> assertEquals(10, newProducerId) assertEquals(Short.MaxValue, newEpoch) 
> assertEquals(Errors.NONE, error){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to