artemlivshits commented on code in PR #17822:
URL: https://github.com/apache/kafka/pull/17822#discussion_r1844337025


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -339,27 +338,24 @@ private TransactionalRequestResult 
beginCompletingTransaction(TransactionResult
         if (!newPartitionsInTransaction.isEmpty())
             enqueueRequest(addPartitionsToTransactionHandler());
 
-        // If the error is an INVALID_PRODUCER_ID_MAPPING error, the server 
will not accept an EndTxnRequest, so skip
-        // directly to InitProducerId. Otherwise, we must first abort the 
transaction, because the producer will be
-        // fenced if we directly call InitProducerId.
-        if (!(lastError instanceof InvalidPidMappingException)) {
-            EndTxnRequest.Builder builder = new EndTxnRequest.Builder(
-                    new EndTxnRequestData()
-                            .setTransactionalId(transactionalId)
-                            .setProducerId(producerIdAndEpoch.producerId)
-                            .setProducerEpoch(producerIdAndEpoch.epoch)
-                            .setCommitted(transactionResult.id),
-                    isTransactionV2Enabled
-            );
+        EndTxnRequest.Builder builder = new EndTxnRequest.Builder(
+            new EndTxnRequestData()
+                .setTransactionalId(transactionalId)
+                .setProducerId(producerIdAndEpoch.producerId)
+                .setProducerEpoch(producerIdAndEpoch.epoch)
+                .setCommitted(transactionResult.id),
+            isTransactionV2Enabled
+        );
 
-            EndTxnHandler handler = new EndTxnHandler(builder);
-            enqueueRequest(handler);
-            if (!epochBumpRequired) {
-                return handler.result;
-            }
+        EndTxnHandler handler = new EndTxnHandler(builder);
+        enqueueRequest(handler);
+
+        // If an epoch bump is required for recovery, initialize the 
transaction after completing the EndTxn request.
+        if (epochBumpRequired) {

Review Comment:
   It'll  be a separate change, because for for old protocol we still need to 
do the bump explicitly on the client.  For the new protocol, we won't need to 
to bump from the client.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to