rreddy-22 commented on code in PR #19714: URL: https://github.com/apache/kafka/pull/19714#discussion_r2096139253
########## clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java: ########## @@ -884,6 +884,50 @@ public void abortTransaction() throws ProducerFencedException { producerMetrics.recordAbortTxn(time.nanoseconds() - abortStart); } + /** + * Completes a prepared transaction by comparing the provided prepared transaction state with the + * current prepared state on the producer. + * If they match, the transaction is committed; otherwise, it is aborted. + * + * @param preparedTxnState The prepared transaction state to compare against the current state + * @throws IllegalStateException if the producer is not in prepared transaction state + * @throws InvalidTxnStateException if the producer is not in prepared state + * @throws ProducerFencedException fatal error indicating another producer with the same transactional.id is active + * @throws KafkaException if the producer has encountered a previous fatal error or for any other unexpected error + * @throws TimeoutException if the time taken for completing the transaction has surpassed <code>max.block.ms</code> + * @throws InterruptException if the thread is interrupted while blocked + */ + @Override + public void completeTransaction(PreparedTxnState preparedTxnState) throws ProducerFencedException { + throwIfNoTransactionManager(); + throwIfProducerClosed(); + + if (!transactionManager.isPrepared()) { + throw new InvalidTxnStateException("Cannot complete transaction because no transaction has been prepared. " + + "Call prepareTransaction() first, or make sure initTransaction(true) was called."); + } + + // Get the current prepared transaction state + PreparedTxnState currentPreparedState = transactionManager.preparedTransactionState(); + + long completeStart = time.nanoseconds(); + + // Compare the prepared transaction state token and commit or abort accordingly + if (currentPreparedState.equals(preparedTxnState)) { Review Comment: Yes I wanted to but we would basically redo the checks throwIfNoTransactionManager(); throwIfProducerClosed(); and I was wondering if we wanted different logging -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org