artemlivshits commented on code in PR #16719:
URL: https://github.com/apache/kafka/pull/16719#discussion_r1720359067


##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -181,6 +183,7 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
               newMetadata.producerEpoch,
               TransactionResult.ABORT,
               isFromClient = false,
+              clientTransactionVersion = 0,

Review Comment:
   Is using version=0 the right thing here?  I'd expect that we want epoch bump 
on aborted transactions.



##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -668,11 +709,23 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
     }
   }
 
+  def isInvalidTxnTransition(txnMetadata: TransactionMetadata, 
txnMarkerResult: TransactionResult) = {
+    !((List(PrepareCommit, CompleteCommit).contains(txnMetadata.state) && 
txnMarkerResult == TransactionResult.COMMIT) ||
+      (List(PrepareAbort, CompleteAbort).contains(txnMetadata.state) && 
txnMarkerResult == TransactionResult.ABORT))
+  }
+
+  def isRetryEndTxn(txnMetadata: TransactionMetadata, producerId: Long, 
producerEpoch: Short): Boolean = {
+    // The previous producer ID matches and the epoch is either + 1 the 
request epoch or 0 if the epoch overflowed.

Review Comment:
   Can we add some comments on how we check the conditions?  Also, I think if 
we structure the code to check retry conditions for (PrepareCommit, 
PrepareAbort) and then for (CompleteAbort, CompleteCommit), I think it may be 
more readable, especially if we annotate with comments for each case that 
explain the transitions.



##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -594,7 +635,7 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
                     txnMetadata.inLock {
                       if (txnMetadata.producerId != producerId)
                         Left(Errors.INVALID_PRODUCER_ID_MAPPING)
-                      else if (txnMetadata.producerEpoch != producerEpoch)
+                      else if (txnMetadata.producerEpoch != producerEpoch && 
(requestIsAtLeastTransactionsV2 && txnMetadata.producerEpoch != producerEpoch + 
1))

Review Comment:
   Should it be `txnMetadata.producerEpoch != producerEpoch && 
(!requestIsAtLeastTransactionsV2 || txnMetadata.producerEpoch != producerEpoch 
+ 1)`?  As written we effectively ignore epoch check when 
`requestIsAtLeastTransactionsV2==false`. 



##########
core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala:
##########
@@ -79,7 +79,7 @@ object TransactionLog {
     // Serialize with version 0 (highest non-flexible version) until 
transaction.version 1 is enabled
     // which enables flexible fields in records.
     val version: Short =
-      if (usesFlexibleRecords) 1 else 0
+      if (transactionVersionLevel > 1) 1 else 0

Review Comment:
   According to comments the condition should be `transactionVersionLevel >= 1`.



##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -540,24 +572,29 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
                   txnMetadata.lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH
                 }
 
-                Right(coordinatorEpoch, 
txnMetadata.prepareAbortOrCommit(nextState, time.milliseconds()))
+                nextProducerIdOrErrors match {
+                  case Left(error) =>
+                    Left(error)
+                  case Right(nextProducerId) =>
+                    Right(coordinatorEpoch, 
txnMetadata.prepareAbortOrCommit(nextState, clientTransactionVersion, 
nextProducerId, time.milliseconds()))
+                }
               case CompleteCommit =>

Review Comment:
   Yeah, I wonder if it would be more readable to keep this code as it was, but 
when we check producerId and epoch above just do a version-specific producer id 
and epoch check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to