dajac commented on code in PR #16719:
URL: https://github.com/apache/kafka/pull/16719#discussion_r1773211470
##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -502,12 +507,16 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
producerEpoch: Short,
txnMarkerResult: TransactionResult,
isFromClient: Boolean,
+ clientTransactionVersion: Short,
Review Comment:
At first, I was a bit puzzle by this version here because it was not clear
to me whether it was the version of the RPC or the TV. In order to make it
explicit, I wonder if we should pass the `TransactionVersion` here. If
possible, I would also advice to `clientTransactionVersion >= 2` condition
within a method in `TransactionVersion`. We did this in the `GroupVersion` and
I find it pretty elegant. What do you think?
##########
core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala:
##########
@@ -335,18 +329,32 @@ private[transaction] class TransactionMetadata(val
transactionalId: String,
(topicPartitions ++ addedTopicPartitions).toSet, newTxnStartTimestamp,
updateTimestamp)
}
- def prepareAbortOrCommit(newState: TransactionState, updateTimestamp: Long):
TxnTransitMetadata = {
- prepareTransitionTo(newState, producerId, producerEpoch,
lastProducerEpoch, txnTimeoutMs, topicPartitions.toSet,
- txnStartTimestamp, updateTimestamp)
+ def prepareAbortOrCommit(newState: TransactionState,
clientTransactionVersion: Short, nextProducerId: Long, updateTimestamp: Long):
TxnTransitMetadata = {
+
+ val (updatedProducerEpoch, updatedLastProducerEpoch) = if
(clientTransactionVersion >= 2) {
+ // We already ensured that we do not overflow here. MAX_SHORT is the
highest possible value.
+ ((producerEpoch + 1).toShort, producerEpoch)
+ } else {
+ (producerEpoch, lastProducerEpoch)
+ }
+
+ prepareTransitionTo(newState, producerId, nextProducerId,
updatedProducerEpoch, updatedLastProducerEpoch, txnTimeoutMs,
topicPartitions.toSet,
+ txnStartTimestamp, updateTimestamp, clientTransactionVersion)
}
def prepareComplete(updateTimestamp: Long): TxnTransitMetadata = {
val newState = if (state == PrepareCommit) CompleteCommit else
CompleteAbort
// Since the state change was successfully written to the log, unset the
flag for a failed epoch fence
hasFailedEpochFence = false
- prepareTransitionTo(newState, producerId, producerEpoch,
lastProducerEpoch, txnTimeoutMs, Set.empty[TopicPartition],
- txnStartTimestamp, updateTimestamp)
+ val (updatedProducerId, updatedProducerEpoch) = if
(clientTransactionVersion >= 2 && nextProducerId != RecordBatch.NO_PRODUCER_ID)
{
+ // If we overflowed on epoch bump, we have to set it as the producer ID
now the marker has been written.
+ (nextProducerId, 0.toShort)
+ } else {
+ (producerId, producerEpoch)
+ }
Review Comment:
nit: The indentation seems be to off here.
##########
core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala:
##########
@@ -335,18 +329,32 @@ private[transaction] class TransactionMetadata(val
transactionalId: String,
(topicPartitions ++ addedTopicPartitions).toSet, newTxnStartTimestamp,
updateTimestamp)
}
- def prepareAbortOrCommit(newState: TransactionState, updateTimestamp: Long):
TxnTransitMetadata = {
- prepareTransitionTo(newState, producerId, producerEpoch,
lastProducerEpoch, txnTimeoutMs, topicPartitions.toSet,
- txnStartTimestamp, updateTimestamp)
+ def prepareAbortOrCommit(newState: TransactionState,
clientTransactionVersion: Short, nextProducerId: Long, updateTimestamp: Long):
TxnTransitMetadata = {
+
Review Comment:
nit: I suppose that we could remove this empty line.
##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -668,11 +726,19 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
}
}
+ // When a client and server support V2, every endTransaction call bumps the
producer epoch. When checking epoch, we want to
+ // check epoch + 1. Epoch bumps from PrepareEpochFence state are handled
separately, so this method should not be used to check that case.
+ // Returns true if the transaction state epoch is the specified producer
epoch + 1 and epoch bump on every transaction is expected.
+ def endTxnEpochBumped(txnMetadata: TransactionMetadata, producerEpoch:
Short): Boolean = {
Review Comment:
nit: Could we keep it private?
##########
core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala:
##########
@@ -163,70 +163,64 @@ private[transaction] case object PrepareEpochFence
extends TransactionState {
}
private[transaction] object TransactionMetadata {
- def apply(transactionalId: String, producerId: Long, producerEpoch: Short,
txnTimeoutMs: Int, timestamp: Long) =
- new TransactionMetadata(transactionalId, producerId,
RecordBatch.NO_PRODUCER_ID, producerEpoch,
- RecordBatch.NO_PRODUCER_EPOCH, txnTimeoutMs, Empty,
collection.mutable.Set.empty[TopicPartition], timestamp, timestamp)
-
- def apply(transactionalId: String, producerId: Long, producerEpoch: Short,
txnTimeoutMs: Int,
- state: TransactionState, timestamp: Long) =
- new TransactionMetadata(transactionalId, producerId,
RecordBatch.NO_PRODUCER_ID, producerEpoch,
- RecordBatch.NO_PRODUCER_EPOCH, txnTimeoutMs, state,
collection.mutable.Set.empty[TopicPartition], timestamp, timestamp)
-
- def apply(transactionalId: String, producerId: Long, lastProducerId: Long,
producerEpoch: Short,
- lastProducerEpoch: Short, txnTimeoutMs: Int, state:
TransactionState, timestamp: Long) =
- new TransactionMetadata(transactionalId, producerId, lastProducerId,
producerEpoch, lastProducerEpoch,
- txnTimeoutMs, state, collection.mutable.Set.empty[TopicPartition],
timestamp, timestamp)
-
def isEpochExhausted(producerEpoch: Short): Boolean = producerEpoch >=
Short.MaxValue - 1
}
// this is a immutable object representing the target transition of the
transaction metadata
private[transaction] case class TxnTransitMetadata(producerId: Long,
- lastProducerId: Long,
+ prevProducerId: Long,
+ nextProducerId: Long,
producerEpoch: Short,
lastProducerEpoch: Short,
txnTimeoutMs: Int,
txnState: TransactionState,
topicPartitions:
immutable.Set[TopicPartition],
txnStartTimestamp: Long,
- txnLastUpdateTimestamp:
Long) {
+ txnLastUpdateTimestamp:
Long,
+ clientTransactionVersion:
Short) {
override def toString: String = {
"TxnTransitMetadata(" +
s"producerId=$producerId, " +
- s"lastProducerId=$lastProducerId, " +
+ s"previousProducerId=$prevProducerId, " +
+ s"nextProducerId=$nextProducerId, " +
s"producerEpoch=$producerEpoch, " +
s"lastProducerEpoch=$lastProducerEpoch, " +
s"txnTimeoutMs=$txnTimeoutMs, " +
s"txnState=$txnState, " +
s"topicPartitions=$topicPartitions, " +
s"txnStartTimestamp=$txnStartTimestamp, " +
- s"txnLastUpdateTimestamp=$txnLastUpdateTimestamp)"
+ s"txnLastUpdateTimestamp=$txnLastUpdateTimestamp, " +
+ s"clientTransactionVersion=$clientTransactionVersion)"
}
}
/**
*
* @param producerId producer id
- * @param lastProducerId last producer id assigned to the producer
+ * @param previousProducerId producer id for the last committed
transaction with this transactional ID
+ * @param nextProducerId Latest producer ID sent to the producer for
the given transactional ID
* @param producerEpoch current epoch of the producer
* @param lastProducerEpoch last epoch of the producer
* @param txnTimeoutMs timeout to be used to abort long running
transactions
* @param state current state of the transaction
* @param topicPartitions current set of partitions that are part of
this transaction
* @param txnStartTimestamp time the transaction was started, i.e., when
first partition is added
* @param txnLastUpdateTimestamp updated when any operation updates the
TransactionMetadata. To be used for expiration
+ * @param clientTransactionVersion TransactionVersion used by the client
when the staet was transitioned
Review Comment:
nit: Should we re-align all the descriptions?
##########
core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala:
##########
@@ -79,7 +79,7 @@ object TransactionLog {
// Serialize with version 0 (highest non-flexible version) until
transaction.version 1 is enabled
// which enables flexible fields in records.
val version: Short =
- if (usesFlexibleRecords) 1 else 0
+ if (transactionVersionLevel >= 1) 1 else 0
Review Comment:
If we pass `TransactionVersion` to this method too, we could have a
`recordVersion` which returns the version to use here.
##########
core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala:
##########
@@ -335,18 +329,32 @@ private[transaction] class TransactionMetadata(val
transactionalId: String,
(topicPartitions ++ addedTopicPartitions).toSet, newTxnStartTimestamp,
updateTimestamp)
}
- def prepareAbortOrCommit(newState: TransactionState, updateTimestamp: Long):
TxnTransitMetadata = {
- prepareTransitionTo(newState, producerId, producerEpoch,
lastProducerEpoch, txnTimeoutMs, topicPartitions.toSet,
- txnStartTimestamp, updateTimestamp)
+ def prepareAbortOrCommit(newState: TransactionState,
clientTransactionVersion: Short, nextProducerId: Long, updateTimestamp: Long):
TxnTransitMetadata = {
+
+ val (updatedProducerEpoch, updatedLastProducerEpoch) = if
(clientTransactionVersion >= 2) {
+ // We already ensured that we do not overflow here. MAX_SHORT is the
highest possible value.
+ ((producerEpoch + 1).toShort, producerEpoch)
+ } else {
+ (producerEpoch, lastProducerEpoch)
+ }
+
+ prepareTransitionTo(newState, producerId, nextProducerId,
updatedProducerEpoch, updatedLastProducerEpoch, txnTimeoutMs,
topicPartitions.toSet,
+ txnStartTimestamp, updateTimestamp, clientTransactionVersion)
}
def prepareComplete(updateTimestamp: Long): TxnTransitMetadata = {
val newState = if (state == PrepareCommit) CompleteCommit else
CompleteAbort
// Since the state change was successfully written to the log, unset the
flag for a failed epoch fence
hasFailedEpochFence = false
- prepareTransitionTo(newState, producerId, producerEpoch,
lastProducerEpoch, txnTimeoutMs, Set.empty[TopicPartition],
- txnStartTimestamp, updateTimestamp)
+ val (updatedProducerId, updatedProducerEpoch) = if
(clientTransactionVersion >= 2 && nextProducerId != RecordBatch.NO_PRODUCER_ID)
{
Review Comment:
Echoing on one of my previous comment, it would be great to have
`clientTransactionVersion >= 2` centralized in one place.
##########
transaction-coordinator/src/main/resources/common/message/TransactionLogValue.json:
##########
@@ -37,6 +41,8 @@
{ "name": "TransactionLastUpdateTimestampMs", "type": "int64", "versions":
"0+",
"about": "Time the transaction was last updated"},
{ "name": "TransactionStartTimestampMs", "type": "int64", "versions": "0+",
- "about": "Time the transaction was started"}
+ "about": "Time the transaction was started"},
+ { "name": "clientTransactionVersion", "type": "int16", "default": 0,
"taggedVersions": "1+", "tag": 2,
Review Comment:
nit: `ClientTransactionVersion`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]