dajac commented on code in PR #16719:
URL: https://github.com/apache/kafka/pull/16719#discussion_r1773211470


##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -502,12 +507,16 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
                              producerEpoch: Short,
                              txnMarkerResult: TransactionResult,
                              isFromClient: Boolean,
+                             clientTransactionVersion: Short,

Review Comment:
   At first, I was a bit puzzle by this version here because it was not clear 
to me whether it was the version of the RPC or the TV. In order to make it 
explicit, I wonder if we should pass the `TransactionVersion` here. If 
possible, I would also advice to `clientTransactionVersion >= 2` condition 
within a method in `TransactionVersion`. We did this in the `GroupVersion` and 
I find it pretty elegant. What do you think?



##########
core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala:
##########
@@ -335,18 +329,32 @@ private[transaction] class TransactionMetadata(val 
transactionalId: String,
       (topicPartitions ++ addedTopicPartitions).toSet, newTxnStartTimestamp, 
updateTimestamp)
   }
 
-  def prepareAbortOrCommit(newState: TransactionState, updateTimestamp: Long): 
TxnTransitMetadata = {
-    prepareTransitionTo(newState, producerId, producerEpoch, 
lastProducerEpoch, txnTimeoutMs, topicPartitions.toSet,
-      txnStartTimestamp, updateTimestamp)
+  def prepareAbortOrCommit(newState: TransactionState, 
clientTransactionVersion: Short, nextProducerId: Long, updateTimestamp: Long): 
TxnTransitMetadata = {
+
+    val (updatedProducerEpoch, updatedLastProducerEpoch) = if 
(clientTransactionVersion >= 2) {
+      // We already ensured that we do not overflow here. MAX_SHORT is the 
highest possible value.
+      ((producerEpoch + 1).toShort, producerEpoch)
+    } else {
+      (producerEpoch, lastProducerEpoch)
+    }
+
+    prepareTransitionTo(newState, producerId, nextProducerId, 
updatedProducerEpoch, updatedLastProducerEpoch, txnTimeoutMs, 
topicPartitions.toSet,
+      txnStartTimestamp, updateTimestamp, clientTransactionVersion)
   }
 
   def prepareComplete(updateTimestamp: Long): TxnTransitMetadata = {
     val newState = if (state == PrepareCommit) CompleteCommit else 
CompleteAbort
 
     // Since the state change was successfully written to the log, unset the 
flag for a failed epoch fence
     hasFailedEpochFence = false
-    prepareTransitionTo(newState, producerId, producerEpoch, 
lastProducerEpoch, txnTimeoutMs, Set.empty[TopicPartition],
-      txnStartTimestamp, updateTimestamp)
+    val (updatedProducerId, updatedProducerEpoch) = if 
(clientTransactionVersion >= 2 && nextProducerId != RecordBatch.NO_PRODUCER_ID) 
{
+      // If we overflowed on epoch bump, we have to set it as the producer ID 
now the marker has been written.
+        (nextProducerId, 0.toShort)
+      } else {
+        (producerId, producerEpoch)
+      }

Review Comment:
   nit: The indentation seems be to off here.



##########
core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala:
##########
@@ -335,18 +329,32 @@ private[transaction] class TransactionMetadata(val 
transactionalId: String,
       (topicPartitions ++ addedTopicPartitions).toSet, newTxnStartTimestamp, 
updateTimestamp)
   }
 
-  def prepareAbortOrCommit(newState: TransactionState, updateTimestamp: Long): 
TxnTransitMetadata = {
-    prepareTransitionTo(newState, producerId, producerEpoch, 
lastProducerEpoch, txnTimeoutMs, topicPartitions.toSet,
-      txnStartTimestamp, updateTimestamp)
+  def prepareAbortOrCommit(newState: TransactionState, 
clientTransactionVersion: Short, nextProducerId: Long, updateTimestamp: Long): 
TxnTransitMetadata = {
+

Review Comment:
   nit: I suppose that we could remove this empty line.



##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -668,11 +726,19 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
     }
   }
 
+  // When a client and server support V2, every endTransaction call bumps the 
producer epoch. When checking epoch, we want to
+  // check epoch + 1. Epoch bumps from PrepareEpochFence state are handled 
separately, so this method should not be used to check that case.
+  // Returns true if the transaction state epoch is the specified producer 
epoch + 1 and epoch bump on every transaction is expected.
+  def endTxnEpochBumped(txnMetadata: TransactionMetadata, producerEpoch: 
Short): Boolean = {

Review Comment:
   nit: Could we keep it private?



##########
core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala:
##########
@@ -163,70 +163,64 @@ private[transaction] case object PrepareEpochFence 
extends TransactionState {
 }
 
 private[transaction] object TransactionMetadata {
-  def apply(transactionalId: String, producerId: Long, producerEpoch: Short, 
txnTimeoutMs: Int, timestamp: Long) =
-    new TransactionMetadata(transactionalId, producerId, 
RecordBatch.NO_PRODUCER_ID, producerEpoch,
-      RecordBatch.NO_PRODUCER_EPOCH, txnTimeoutMs, Empty, 
collection.mutable.Set.empty[TopicPartition], timestamp, timestamp)
-
-  def apply(transactionalId: String, producerId: Long, producerEpoch: Short, 
txnTimeoutMs: Int,
-            state: TransactionState, timestamp: Long) =
-    new TransactionMetadata(transactionalId, producerId, 
RecordBatch.NO_PRODUCER_ID, producerEpoch,
-      RecordBatch.NO_PRODUCER_EPOCH, txnTimeoutMs, state, 
collection.mutable.Set.empty[TopicPartition], timestamp, timestamp)
-
-  def apply(transactionalId: String, producerId: Long, lastProducerId: Long, 
producerEpoch: Short,
-            lastProducerEpoch: Short, txnTimeoutMs: Int, state: 
TransactionState, timestamp: Long) =
-    new TransactionMetadata(transactionalId, producerId, lastProducerId, 
producerEpoch, lastProducerEpoch,
-      txnTimeoutMs, state, collection.mutable.Set.empty[TopicPartition], 
timestamp, timestamp)
-
   def isEpochExhausted(producerEpoch: Short): Boolean = producerEpoch >= 
Short.MaxValue - 1
 }
 
 // this is a immutable object representing the target transition of the 
transaction metadata
 private[transaction] case class TxnTransitMetadata(producerId: Long,
-                                                   lastProducerId: Long,
+                                                   prevProducerId: Long,
+                                                   nextProducerId: Long,
                                                    producerEpoch: Short,
                                                    lastProducerEpoch: Short,
                                                    txnTimeoutMs: Int,
                                                    txnState: TransactionState,
                                                    topicPartitions: 
immutable.Set[TopicPartition],
                                                    txnStartTimestamp: Long,
-                                                   txnLastUpdateTimestamp: 
Long) {
+                                                   txnLastUpdateTimestamp: 
Long,
+                                                   clientTransactionVersion: 
Short) {
   override def toString: String = {
     "TxnTransitMetadata(" +
       s"producerId=$producerId, " +
-      s"lastProducerId=$lastProducerId, " +
+      s"previousProducerId=$prevProducerId, " +
+      s"nextProducerId=$nextProducerId, " +
       s"producerEpoch=$producerEpoch, " +
       s"lastProducerEpoch=$lastProducerEpoch, " +
       s"txnTimeoutMs=$txnTimeoutMs, " +
       s"txnState=$txnState, " +
       s"topicPartitions=$topicPartitions, " +
       s"txnStartTimestamp=$txnStartTimestamp, " +
-      s"txnLastUpdateTimestamp=$txnLastUpdateTimestamp)"
+      s"txnLastUpdateTimestamp=$txnLastUpdateTimestamp, " +
+      s"clientTransactionVersion=$clientTransactionVersion)"
   }
 }
 
 /**
   *
   * @param producerId            producer id
-  * @param lastProducerId        last producer id assigned to the producer
+  * @param previousProducerId    producer id for the last committed 
transaction with this transactional ID
+  * @param nextProducerId        Latest producer ID sent to the producer for 
the given transactional ID
   * @param producerEpoch         current epoch of the producer
   * @param lastProducerEpoch     last epoch of the producer
   * @param txnTimeoutMs          timeout to be used to abort long running 
transactions
   * @param state                 current state of the transaction
   * @param topicPartitions       current set of partitions that are part of 
this transaction
   * @param txnStartTimestamp     time the transaction was started, i.e., when 
first partition is added
   * @param txnLastUpdateTimestamp   updated when any operation updates the 
TransactionMetadata. To be used for expiration
+  * @param clientTransactionVersion    TransactionVersion used by the client 
when the staet was transitioned

Review Comment:
   nit: Should we re-align all the descriptions?



##########
core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala:
##########
@@ -79,7 +79,7 @@ object TransactionLog {
     // Serialize with version 0 (highest non-flexible version) until 
transaction.version 1 is enabled
     // which enables flexible fields in records.
     val version: Short =
-      if (usesFlexibleRecords) 1 else 0
+      if (transactionVersionLevel >= 1) 1 else 0

Review Comment:
   If we pass `TransactionVersion` to this method too, we could have a 
`recordVersion` which returns the version to use here.



##########
core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala:
##########
@@ -335,18 +329,32 @@ private[transaction] class TransactionMetadata(val 
transactionalId: String,
       (topicPartitions ++ addedTopicPartitions).toSet, newTxnStartTimestamp, 
updateTimestamp)
   }
 
-  def prepareAbortOrCommit(newState: TransactionState, updateTimestamp: Long): 
TxnTransitMetadata = {
-    prepareTransitionTo(newState, producerId, producerEpoch, 
lastProducerEpoch, txnTimeoutMs, topicPartitions.toSet,
-      txnStartTimestamp, updateTimestamp)
+  def prepareAbortOrCommit(newState: TransactionState, 
clientTransactionVersion: Short, nextProducerId: Long, updateTimestamp: Long): 
TxnTransitMetadata = {
+
+    val (updatedProducerEpoch, updatedLastProducerEpoch) = if 
(clientTransactionVersion >= 2) {
+      // We already ensured that we do not overflow here. MAX_SHORT is the 
highest possible value.
+      ((producerEpoch + 1).toShort, producerEpoch)
+    } else {
+      (producerEpoch, lastProducerEpoch)
+    }
+
+    prepareTransitionTo(newState, producerId, nextProducerId, 
updatedProducerEpoch, updatedLastProducerEpoch, txnTimeoutMs, 
topicPartitions.toSet,
+      txnStartTimestamp, updateTimestamp, clientTransactionVersion)
   }
 
   def prepareComplete(updateTimestamp: Long): TxnTransitMetadata = {
     val newState = if (state == PrepareCommit) CompleteCommit else 
CompleteAbort
 
     // Since the state change was successfully written to the log, unset the 
flag for a failed epoch fence
     hasFailedEpochFence = false
-    prepareTransitionTo(newState, producerId, producerEpoch, 
lastProducerEpoch, txnTimeoutMs, Set.empty[TopicPartition],
-      txnStartTimestamp, updateTimestamp)
+    val (updatedProducerId, updatedProducerEpoch) = if 
(clientTransactionVersion >= 2 && nextProducerId != RecordBatch.NO_PRODUCER_ID) 
{

Review Comment:
   Echoing on one of my previous comment, it would be great to have 
`clientTransactionVersion >= 2` centralized in one place.



##########
transaction-coordinator/src/main/resources/common/message/TransactionLogValue.json:
##########
@@ -37,6 +41,8 @@
     { "name": "TransactionLastUpdateTimestampMs", "type": "int64", "versions": 
"0+",
       "about": "Time the transaction was last updated"},
     { "name": "TransactionStartTimestampMs", "type": "int64", "versions": "0+",
-      "about": "Time the transaction was started"}
+      "about": "Time the transaction was started"},
+    { "name": "clientTransactionVersion", "type": "int16", "default": 0, 
"taggedVersions": "1+", "tag": 2,

Review Comment:
   nit: `ClientTransactionVersion`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to