artemlivshits commented on code in PR #19193:
URL: https://github.com/apache/kafka/pull/19193#discussion_r1996307814
##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -124,10 +127,18 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
// if transactional id is empty then return error as invalid request.
This is
// to make TransactionCoordinator's behavior consistent with producer
client
responseCallback(initTransactionError(Errors.INVALID_REQUEST))
- } else if (!txnManager.validateTransactionTimeoutMs(transactionTimeoutMs))
{
+ } else if (enableTwoPCFlag && !txnManager.isTransaction2pcEnabled()) {
+ // if the request is to enable two-phase commit but the broker 2PC
config is set to false,
+ // then return an error.
+ responseCallback(initTransactionError(Errors.INVALID_TXN_STATE))
Review Comment:
The KIP says we should return `TRANSACTIONAL_ID_AUTHORIZATION_FAILED` in
this case.
##########
transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionStateManagerConfig.java:
##########
@@ -47,25 +48,33 @@ public final class TransactionStateManagerConfig {
public static final int
TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_DEFAULT =
(int) TimeUnit.HOURS.toMillis(1);
public static final String
TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONS_INTERVAL_MS_DOC = "The interval at
which to remove transactions that have expired due to
<code>transactional.id.expiration.ms</code> passing";
+ public static final String TRANSACTIONS_2PC_ENABLED_CONFIG =
"transaction.two.phase.commit.enable";
+ public static final boolean TRANSACTIONS_2PC_ENABLED_DEFAULT = false;
+ public static final String TRANSACTIONS_2PC_ENABLED_DOC = "Enable to allow
participation in Two-Phase Commit (2PC) transactions with an external
transaction coordinator";
Review Comment:
Maybe just "Allow participation ..."
##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -124,10 +127,18 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
// if transactional id is empty then return error as invalid request.
This is
// to make TransactionCoordinator's behavior consistent with producer
client
responseCallback(initTransactionError(Errors.INVALID_REQUEST))
- } else if (!txnManager.validateTransactionTimeoutMs(transactionTimeoutMs))
{
+ } else if (enableTwoPCFlag && !txnManager.isTransaction2pcEnabled()) {
+ // if the request is to enable two-phase commit but the broker 2PC
config is set to false,
+ // then return an error.
+ responseCallback(initTransactionError(Errors.INVALID_TXN_STATE))
+ } else if (keepPreparedTxn) {
+ // if the request is to keep the prepared transaction, then return an
unsupported version error.
Review Comment:
Could you clarify in the comment, that this error is returned because this
features is not implemented yet?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]