hachikuji commented on a change in pull request #8239:
URL: https://github.com/apache/kafka/pull/8239#discussion_r453899181
##########
File path:
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
##########
@@ -501,6 +502,21 @@ class TransactionCoordinator(brokerId: Int,
info(s"Aborting sending of transaction markers and returning
$error error to client for $transactionalId's EndTransaction request of
$txnMarkerResult, " +
s"since appending $newMetadata to transaction log with
coordinator epoch $coordinatorEpoch failed")
+ if (isEpochFence) {
+ txnManager.getTransactionState(transactionalId).foreach {
+ case None =>
+ warn(s"The coordinator still owns the transaction
partition for $transactionalId, but there is " +
+ s"no metadata in the cache; this is not expected")
+
+ case Some(epochAndMetadata) =>
+ if (epochAndMetadata.coordinatorEpoch == coordinatorEpoch)
{
+ // This was attempted epoch fence that failed, so mark
this state on the metadata
+ epochAndMetadata.transactionMetadata.hasFailedEpochFence
= true
+ warn("")
Review comment:
Looks like this was forgotten.
##########
File path:
core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
##########
@@ -210,7 +214,9 @@ private[transaction] class TransactionMetadata(val
transactionalId: String,
if (producerEpoch == Short.MaxValue)
throw new IllegalStateException(s"Cannot fence producer with epoch equal
to Short.MaxValue since this would overflow")
- prepareTransitionTo(PrepareEpochFence, producerId, (producerEpoch +
1).toShort, RecordBatch.NO_PRODUCER_EPOCH, txnTimeoutMs,
+ val bumpedEpoch = if (hasFailedEpochFence) producerEpoch else
(producerEpoch + 1).toShort
Review comment:
I believe it's accurate to say that if `hasFailedEpochFence` is set,
then the bumped epoch could not have been returned to the client. Is that
right? It might be worth a comment emphasizing that.
##########
File path:
core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
##########
@@ -564,7 +564,7 @@ class TransactionCoordinatorTest {
.anyTimes()
val originalMetadata = new TransactionMetadata(transactionalId,
producerId, producerId, (producerEpoch + 1).toShort,
- producerEpoch, txnTimeoutMs, Ongoing, partitions, time.milliseconds(),
time.milliseconds())
+ (producerEpoch - 1).toShort, txnTimeoutMs, Ongoing, partitions,
time.milliseconds(), time.milliseconds())
Review comment:
Hmm was this wrong? It seems weird to have last producer epoch set to a
value which is 2 less than the producer epoch.
##########
File path:
core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
##########
@@ -614,6 +614,83 @@ class TransactionCoordinatorTest {
EasyMock.verify(transactionManager)
}
+ @Test
+ def
shouldNotRepeatedlyBumpEpochDueToInitPidDuringOngoingTxnIfAppendToLogFails():
Unit = {
+ val txnMetadata = new TransactionMetadata(transactionalId, producerId,
producerId, producerEpoch,
+ RecordBatch.NO_PRODUCER_EPOCH, txnTimeoutMs, Ongoing, partitions,
time.milliseconds(), time.milliseconds())
+
+
EasyMock.expect(transactionManager.validateTransactionTimeoutMs(EasyMock.anyInt()))
+ .andReturn(true)
+ .anyTimes()
+
+
EasyMock.expect(transactionManager.putTransactionStateIfNotExists(EasyMock.anyObject[TransactionMetadata]()))
+ .andReturn(Right(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata)))
+ .anyTimes()
+
+
EasyMock.expect(transactionManager.getTransactionState(EasyMock.eq(transactionalId)))
+ .andAnswer(() =>
Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata))))
+ .anyTimes()
+
+ /* val txnMetadataAfterAppendFailure = new
TransactionMetadata(transactionalId, producerId, producerId, (producerEpoch +
1).toShort,
Review comment:
nit: did we not need this?
##########
File path:
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
##########
@@ -394,8 +395,8 @@ class TransactionCoordinator(brokerId: Int,
if (nextState == PrepareAbort &&
txnMetadata.pendingState.contains(PrepareEpochFence)) {
// We should clear the pending state to make way for the
transition to PrepareAbort and also bump
// the epoch in the transaction metadata we are about to
append.
+ isEpochFence = true
txnMetadata.pendingState = None
- txnMetadata.lastProducerEpoch = txnMetadata.producerEpoch
Review comment:
Can you explain why we no longer need to set this?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]