chickenchickenlove commented on code in PR #21469:
URL: https://github.com/apache/kafka/pull/21469#discussion_r2802843581
##########
transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionMetadata.java:
##########
@@ -139,11 +139,12 @@ public TxnTransitMetadata prepareNoTransit() {
public TxnTransitMetadata prepareFenceProducerEpoch() {
if (producerEpoch == Short.MAX_VALUE)
- throw new IllegalStateException("Cannot fence producer with epoch
equal to Short.MaxValue since this would overflow");
+ LOGGER.error("Fencing producer {} {} with epoch equal to
Short.MaxValue, this must not happen unless there is a bug", transactionalId,
producerId);
// If we've already failed to fence an epoch (because the write to the
log failed), we don't increase it again.
// This is safe because we never return the epoch to client if we fail
to fence the epoch
- short bumpedEpoch = hasFailedEpochFence ? producerEpoch : (short)
(producerEpoch + 1);
+ // Also don't increase if producerEpoch is already at max, to avoid
overflow.
+ short bumpedEpoch = hasFailedEpochFence || producerEpoch ==
Short.MAX_VALUE ? producerEpoch : (short) (producerEpoch + 1);
Review Comment:
IMHO, it would be better to keep it as it is.
AFAIK, it can affect both TV1 and TV2.
However, it seems that TV2 don' use this epoch generated here.
Even when TV2 is fenced, it actually uses the epoch obtained from
txnMetadata.prepareAbortOrCommit().
https://github.com/apache/kafka/blob/trunk/core%2Fsrc%2Fmain%2Fscala%2Fkafka%2Fcoordinator%2Ftransaction%2FTransactionCoordinator.scala#L831-L838
So I believe any change here would affect only TV1...!
What do you think? If I'm wrong, sorry for making you confused!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]