artemlivshits commented on code in PR #17698: URL: https://github.com/apache/kafka/pull/17698#discussion_r1847462022
########## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ########## @@ -411,7 +421,7 @@ RuntimeException lastError() { synchronized boolean isSendToPartitionAllowed(TopicPartition tp) { if (hasFatalError()) return false; - return !isTransactional() || partitionsInTransaction.contains(tp); + return !isTransactional() || partitionsInTransaction.contains(tp) || isTransactionV2Enabled(); Review Comment: Minor perf: we should check if `isTransactionV2Enabled` first as it's going to be the primary code path in the future and we don't need to look up into `partitionsInTransaction`. ########## core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala: ########## @@ -476,7 +476,7 @@ class TransactionCoordinator(txnConfig: TransactionConfig, private def logInvalidStateTransitionAndReturnError(transactionalId: String, transactionState: TransactionState, transactionResult: TransactionResult) = { - debug(s"TransactionalId: $transactionalId's state is $transactionState, but received transaction " + + error(s"TransactionalId: $transactionalId's state is $transactionState, but received transaction " + Review Comment: I think it makes sense to keep it an error, these cases shouldn't be too frequent. ########## clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java: ########## @@ -904,16 +904,27 @@ private void sendProduceRequest(long now, int destination, short acks, int timeo } String transactionalId = null; + + // To determine what produce version to use: + // If it is not transactional, produce version = latest + // If it is transactional but transaction V2 disabled, produce version = min(latest, LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2) + // If it is transactional and transaction V2 enabled, produce version = latest Review Comment: Maybe simplify the comment to match the logic: when we use transaction V1 protocol we downgrade the request version to `LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2` so that the broker knows that we're using transaction protocol V1. ########## core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala: ########## @@ -564,33 +592,10 @@ class TransactionCoordinator(txnConfig: TransactionConfig, else PrepareAbort - // Maybe allocate new producer ID if we are bumping epoch and epoch is exhausted - val nextProducerIdOrErrors = - if (clientTransactionVersion.supportsEpochBump() && !txnMetadata.pendingState.contains(PrepareEpochFence) && txnMetadata.isProducerEpochExhausted) { - try { - Right(producerIdManager.generateProducerId()) - } catch { - case e: Exception => Left(Errors.forException(e)) - } - } else { - Right(RecordBatch.NO_PRODUCER_ID) - } - - if (nextState == PrepareAbort && txnMetadata.pendingState.contains(PrepareEpochFence)) { - // We should clear the pending state to make way for the transition to PrepareAbort and also bump - // the epoch in the transaction metadata we are about to append. - isEpochFence = true - txnMetadata.pendingState = None - txnMetadata.producerEpoch = producerEpoch - txnMetadata.lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH - } - - nextProducerIdOrErrors.flatMap { - nextProducerId => - Right(coordinatorEpoch, txnMetadata.prepareAbortOrCommit(nextState, clientTransactionVersion, nextProducerId.asInstanceOf[Long], time.milliseconds())) - } + generateTxnTransitMetadataForTxnCompletion(nextState) case CompleteCommit => - if (txnMarkerResult == TransactionResult.COMMIT) + // The epoch should be valid as it is checked above + if (txnMarkerResult == TransactionResult.COMMIT || currentTxnMetadataIsAtLeastTransactionsV2) Review Comment: We need to go through the full epoch bump. We also should have the logic for the CompleteAbort case. For the same reasons described here: https://github.com/apache/kafka/pull/17698#discussion_r1841263720. And we need to check the epoch to see if this is a retry or an empty abort, as described here: https://github.com/apache/kafka/pull/17698#discussion_r1841274726. For retry we just return success. For abort we initiate the full abort flow. If we detect a retry abort on a CompleteCommit, it's an invalid state. ########## core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala: ########## @@ -112,11 +113,14 @@ private[transaction] case object PrepareCommit extends TransactionState { * Group is preparing to abort * * transition: received acks from all partitions => CompleteAbort + * + * Note, In transaction v2, we allow Empty to transition to PrepareCommit. because the client may not know the Review Comment: Comment says "PrepareCommit"? ########## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/producer/ProducerRequestBenchmark.java: ########## @@ -68,7 +68,7 @@ public class ProducerRequestBenchmark { .setTopicData(new ProduceRequestData.TopicProduceDataCollection(TOPIC_PRODUCE_DATA.iterator())); private static ProduceRequest request() { - return ProduceRequest.forMagic(RecordBatch.CURRENT_MAGIC_VALUE, PRODUCE_REQUEST_DATA).build(); + return ProduceRequest.forMagic(RecordBatch.CURRENT_MAGIC_VALUE, PRODUCE_REQUEST_DATA, true).build(); Review Comment: Should it be `false`? ########## core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala: ########## @@ -490,9 +494,10 @@ private[transaction] class TransactionMetadata(val transactionalId: String, } case CompleteAbort | CompleteCommit => // from write markers + // With transaction V2, we allow Empty transaction to be aborted, so the txnStartTimestamp can be -1. Review Comment: When we start abort flow from Empty state (or from CompleteCommit / CompleteAbort states) we need to properly update the txnStartTimestamp to indicate the start of the abort. ########## core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala: ########## @@ -225,7 +245,7 @@ class AddPartitionsToTxnManager( val code = if (partitionResult.partitionErrorCode == Errors.PRODUCER_FENCED.code) Errors.INVALID_PRODUCER_EPOCH.code - else if (partitionResult.partitionErrorCode() == Errors.TRANSACTION_ABORTABLE.code && transactionDataAndCallbacks.transactionSupportedOperation != genericError) // For backward compatibility with clients. + else if (partitionResult.partitionErrorCode() == Errors.TRANSACTION_ABORTABLE.code && transactionDataAndCallbacks.transactionSupportedOperation != genericErrorSupported) // For backward compatibility with clients. Review Comment: Is this the correct logic? I presume we also want to return abortable errors for addPartition case. ########## clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java: ########## @@ -53,13 +54,15 @@ public static Builder forMagic(byte magic, ProduceRequestData data) { maxVersion = 2; } else { minVersion = 3; - maxVersion = ApiKeys.PRODUCE.latestVersion(); + short latestVersion = ApiKeys.PRODUCE.latestVersion(); + maxVersion = useTransactionV1Version ? + (short) Math.min(latestVersion, LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2) : latestVersion; } return new Builder(minVersion, maxVersion, data); } public static Builder forCurrentMagic(ProduceRequestData data) { - return forMagic(RecordBatch.CURRENT_MAGIC_VALUE, data); + return forMagic(RecordBatch.CURRENT_MAGIC_VALUE, data, true); Review Comment: Should it be `false`? If it should, can we write a unit test that would catch this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org