artemlivshits commented on code in PR #17698:
URL: https://github.com/apache/kafka/pull/17698#discussion_r1847462022


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -411,7 +421,7 @@ RuntimeException lastError() {
     synchronized boolean isSendToPartitionAllowed(TopicPartition tp) {
         if (hasFatalError())
             return false;
-        return !isTransactional() || partitionsInTransaction.contains(tp);
+        return !isTransactional() || partitionsInTransaction.contains(tp) || 
isTransactionV2Enabled();

Review Comment:
   Minor perf: we should check if `isTransactionV2Enabled` first as it's going 
to be the primary code path in the future and we don't need to look up into 
`partitionsInTransaction`.



##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -476,7 +476,7 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
   private def logInvalidStateTransitionAndReturnError(transactionalId: String,
                                                       transactionState: 
TransactionState,
                                                       transactionResult: 
TransactionResult) = {
-    debug(s"TransactionalId: $transactionalId's state is $transactionState, 
but received transaction " +
+    error(s"TransactionalId: $transactionalId's state is $transactionState, 
but received transaction " +

Review Comment:
   I think it makes sense to keep it an error, these cases shouldn't be too 
frequent.



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##########
@@ -904,16 +904,27 @@ private void sendProduceRequest(long now, int 
destination, short acks, int timeo
         }
 
         String transactionalId = null;
+
+        // To determine what produce version to use:
+        //   If it is not transactional, produce version = latest
+        //   If it is transactional but transaction V2 disabled, produce 
version = min(latest, LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2)
+        //   If it is transactional and transaction V2 enabled, produce 
version = latest

Review Comment:
   Maybe simplify the comment to match the logic: when we use transaction V1 
protocol we downgrade the request version to  
`LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2` so that the broker knows that we're 
using transaction protocol V1.



##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -564,33 +592,10 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
                 else
                   PrepareAbort
 
-                // Maybe allocate new producer ID if we are bumping epoch and 
epoch is exhausted
-                val nextProducerIdOrErrors =
-                  if (clientTransactionVersion.supportsEpochBump() && 
!txnMetadata.pendingState.contains(PrepareEpochFence) && 
txnMetadata.isProducerEpochExhausted) {
-                    try {
-                      Right(producerIdManager.generateProducerId())
-                    } catch {
-                      case e: Exception => Left(Errors.forException(e))
-                    }
-                  } else {
-                    Right(RecordBatch.NO_PRODUCER_ID)
-                  }
-
-                if (nextState == PrepareAbort && 
txnMetadata.pendingState.contains(PrepareEpochFence)) {
-                  // We should clear the pending state to make way for the 
transition to PrepareAbort and also bump
-                  // the epoch in the transaction metadata we are about to 
append.
-                  isEpochFence = true
-                  txnMetadata.pendingState = None
-                  txnMetadata.producerEpoch = producerEpoch
-                  txnMetadata.lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH
-                }
-
-                nextProducerIdOrErrors.flatMap {
-                  nextProducerId =>
-                    Right(coordinatorEpoch, 
txnMetadata.prepareAbortOrCommit(nextState, clientTransactionVersion, 
nextProducerId.asInstanceOf[Long], time.milliseconds()))
-                }
+                generateTxnTransitMetadataForTxnCompletion(nextState)
               case CompleteCommit =>
-                if (txnMarkerResult == TransactionResult.COMMIT)
+                // The epoch should be valid as it is checked above
+                if (txnMarkerResult == TransactionResult.COMMIT || 
currentTxnMetadataIsAtLeastTransactionsV2)

Review Comment:
   We need to go through  the full epoch bump.  We also should have the logic 
for the CompleteAbort case. 
    For the same reasons described here: 
https://github.com/apache/kafka/pull/17698#discussion_r1841263720.
   
   And we need to check the epoch to see if this is a retry or an empty abort, 
as described here: 
https://github.com/apache/kafka/pull/17698#discussion_r1841274726.  For retry 
we just return success.  For abort we initiate the full abort flow.  If we 
detect a retry abort on a CompleteCommit, it's an invalid state.
   



##########
core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala:
##########
@@ -112,11 +113,14 @@ private[transaction] case object PrepareCommit extends 
TransactionState {
  * Group is preparing to abort
  *
  * transition: received acks from all partitions => CompleteAbort
+ *
+ * Note, In transaction v2, we allow Empty to transition to PrepareCommit. 
because the client may not know the

Review Comment:
   Comment says "PrepareCommit"?



##########
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/producer/ProducerRequestBenchmark.java:
##########
@@ -68,7 +68,7 @@ public class ProducerRequestBenchmark {
             .setTopicData(new 
ProduceRequestData.TopicProduceDataCollection(TOPIC_PRODUCE_DATA.iterator()));
 
     private static ProduceRequest request() {
-        return ProduceRequest.forMagic(RecordBatch.CURRENT_MAGIC_VALUE, 
PRODUCE_REQUEST_DATA).build();
+        return ProduceRequest.forMagic(RecordBatch.CURRENT_MAGIC_VALUE, 
PRODUCE_REQUEST_DATA, true).build();

Review Comment:
   Should it be `false`?



##########
core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala:
##########
@@ -490,9 +494,10 @@ private[transaction] class TransactionMetadata(val 
transactionalId: String,
           }
 
         case CompleteAbort | CompleteCommit => // from write markers
+          // With transaction V2, we allow Empty transaction to be aborted, so 
the txnStartTimestamp can be -1.

Review Comment:
   When we start abort flow from Empty state (or from CompleteCommit / 
CompleteAbort states) we need to properly update the txnStartTimestamp to 
indicate the start of the abort.



##########
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##########
@@ -225,7 +245,7 @@ class AddPartitionsToTxnManager(
                   val code =
                     if (partitionResult.partitionErrorCode == 
Errors.PRODUCER_FENCED.code)
                       Errors.INVALID_PRODUCER_EPOCH.code
-                    else if (partitionResult.partitionErrorCode() == 
Errors.TRANSACTION_ABORTABLE.code && 
transactionDataAndCallbacks.transactionSupportedOperation != genericError) // 
For backward compatibility with clients.
+                    else if (partitionResult.partitionErrorCode() == 
Errors.TRANSACTION_ABORTABLE.code && 
transactionDataAndCallbacks.transactionSupportedOperation != 
genericErrorSupported) // For backward compatibility with clients.

Review Comment:
   Is this the correct logic?  I presume we also want to return abortable 
errors for addPartition case.



##########
clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java:
##########
@@ -53,13 +54,15 @@ public static Builder forMagic(byte magic, 
ProduceRequestData data) {
             maxVersion = 2;
         } else {
             minVersion = 3;
-            maxVersion = ApiKeys.PRODUCE.latestVersion();
+            short latestVersion = ApiKeys.PRODUCE.latestVersion();
+            maxVersion = useTransactionV1Version ?
+                (short) Math.min(latestVersion, 
LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2) : latestVersion;
         }
         return new Builder(minVersion, maxVersion, data);
     }
 
     public static Builder forCurrentMagic(ProduceRequestData data) {
-        return forMagic(RecordBatch.CURRENT_MAGIC_VALUE, data);
+        return forMagic(RecordBatch.CURRENT_MAGIC_VALUE, data, true);

Review Comment:
   Should it be `false`?  If it should, can we write a unit test that would 
catch this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to