rreddy-22 commented on code in PR #19429: URL: https://github.com/apache/kafka/pull/19429#discussion_r2051214889
########## clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java: ########## @@ -654,6 +657,46 @@ public void initTransactions() { transactionManager.maybeUpdateTransactionV2Enabled(true); } + /** + * Initialize the transactional state for this producer, similar to {@link #initTransactions()} but + * with additional handling for two-phase commit (2PC). Must be called before any send operations + * that require a {@code transactionalId}. + * <p> + * Unlike the standard {@link #initTransactions()}, when {@code keepPreparedTxn} is set to + * {@code true}, the producer does <em>not</em> automatically abort existing transactions + * in the “prepare” phase. Instead, it enters a recovery mode allowing only finalization + * of those previously prepared transactions. This behavior is crucial for 2PC scenarios, + * where transactions should remain intact until the external transaction manager decides + * whether to commit or abort. + * <p> + * When {@code keepPreparedTxn} is {@code false}, this behaves like the normal transactional + * initialization, aborting any unfinished transactions and resetting the producer for + * new writes. + * + * @param keepPreparedTxn true to retain any in-flight prepared transactions (necessary for 2PC + * recovery), false to abort existing transactions and behave like + * the standard initTransactions + * + * @throws IllegalStateException if no {@code transactional.id} is configured + * @throws org.apache.kafka.common.errors.UnsupportedVersionException if the broker does not + * support transactions (i.e. if its version is lower than 0.11.0.0) + * @throws org.apache.kafka.common.errors.TransactionalIdAuthorizationException if the configured + * {@code transactional.id} is unauthorized either for normal transaction writes or 2PC. + * @throws KafkaException if the producer encounters a fatal error or any other unexpected error + * @throws TimeoutException if the time taken for initialize the transaction has surpassed <code>max.block.ms</code>. + * @throws InterruptException if the thread is interrupted while blocked + */ + public void initTransactions(boolean keepPreparedTxn) { Review Comment: makes sense! I made the change, just had one question, do we still want to leave in initTransactions() in the KafkaProducer class for the javadoc? Or should I remove it here as well so it uses the default? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org