artemlivshits commented on code in PR #20039:
URL: https://github.com/apache/kafka/pull/20039#discussion_r2178558356


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -1500,8 +1500,20 @@ public void handleResponse(AbstractResponse response) {
                 ProducerIdAndEpoch producerIdAndEpoch = new 
ProducerIdAndEpoch(initProducerIdResponse.data().producerId(),
                         initProducerIdResponse.data().producerEpoch());
                 setProducerIdAndEpoch(producerIdAndEpoch);
-                // TO_DO Add code to handle transition to prepared_txn when 
keepPrepared = true
-                transitionTo(State.READY);
+                // If this is a transaction with keepPreparedTxn=true, 
transition directly
+                // to PREPARED_TRANSACTION state IFF there is an ongoing 
transaction.
+                if (builder.data.keepPreparedTxn() &&
+                    initProducerIdResponse.data().ongoingTxnProducerId() != 
RecordBatch.NO_PRODUCER_ID
+                ) {
+                    transitionTo(State.PREPARED_TRANSACTION);
+                    // Update the preparedTxnState with the ongoing pid and 
epoch from the response.
+                    // This will be used to complete the transaction later.
+                    String serializedState = 
initProducerIdResponse.data().ongoingTxnProducerId() + 
+                            ":" + 
initProducerIdResponse.data().ongoingTxnProducerEpoch();
+                    TransactionManager.this.preparedTxnState = new 
PreparedTxnState(serializedState);

Review Comment:
   Why do we need to serialize (especially custom one-off code) just to get the 
id / epoch pair again in the state?



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -1500,8 +1500,21 @@ public void handleResponse(AbstractResponse response) {
                 ProducerIdAndEpoch producerIdAndEpoch = new 
ProducerIdAndEpoch(initProducerIdResponse.data().producerId(),
                         initProducerIdResponse.data().producerEpoch());
                 setProducerIdAndEpoch(producerIdAndEpoch);
-                // TO_DO Add code to handle transition to prepared_txn when 
keepPrepared = true
-                transitionTo(State.READY);
+                // If this is a 2PC-enabled transaction with 
keepPreparedTxn=true, transition directly
+                // to PREPARED_TRANSACTION state IFF there is an ongoing 
transaction.
+                if (enable2PC &&

Review Comment:
   That's what KIP says: 
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071659#KIP939:SupportParticipationin2PC-RPCChanges
   
   Could you also check other places in the code that handle `keepPreparedTxn` 
to make sure we don't accidentally disable if if enable2PC is false?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to