rreddy-22 commented on code in PR #19714:
URL: https://github.com/apache/kafka/pull/19714#discussion_r2096139253


##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -884,6 +884,50 @@ public void abortTransaction() throws 
ProducerFencedException {
         producerMetrics.recordAbortTxn(time.nanoseconds() - abortStart);
     }
 
+    /**
+     * Completes a prepared transaction by comparing the provided prepared 
transaction state with the
+     * current prepared state on the producer.
+     * If they match, the transaction is committed; otherwise, it is aborted.
+     * 
+     * @param preparedTxnState              The prepared transaction state to 
compare against the current state
+     * @throws IllegalStateException if the producer is not in prepared 
transaction state
+     * @throws InvalidTxnStateException if the producer is not in prepared 
state
+     * @throws ProducerFencedException fatal error indicating another producer 
with the same transactional.id is active
+     * @throws KafkaException if the producer has encountered a previous fatal 
error or for any other unexpected error
+     * @throws TimeoutException if the time taken for completing the 
transaction has surpassed <code>max.block.ms</code>
+     * @throws InterruptException if the thread is interrupted while blocked
+     */
+    @Override
+    public void completeTransaction(PreparedTxnState preparedTxnState) throws 
ProducerFencedException {
+        throwIfNoTransactionManager();
+        throwIfProducerClosed();
+        
+        if (!transactionManager.isPrepared()) {
+            throw new InvalidTxnStateException("Cannot complete transaction 
because no transaction has been prepared. " +
+                "Call prepareTransaction() first, or make sure 
initTransaction(true) was called.");
+        }
+        
+        // Get the current prepared transaction state
+        PreparedTxnState currentPreparedState = 
transactionManager.preparedTransactionState();
+        
+        long completeStart = time.nanoseconds();
+        
+        // Compare the prepared transaction state token and commit or abort 
accordingly
+        if (currentPreparedState.equals(preparedTxnState)) {

Review Comment:
   Yes I wanted to but we would basically redo the checks 
   throwIfNoTransactionManager();
   throwIfProducerClosed(); and I was wondering if we wanted different logging 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to