This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d5edcc449b4e8511697d16151721ada218c90a3e
Author: Yunze Xu <xyzinfern...@163.com>
AuthorDate: Thu Apr 20 18:11:52 2023 +0800

    [fix][txn] Fix transaction is not aborted when send or ACK failed (#20055)
    
    (cherry picked from commit 00d09cbbd2b3063fecbdc3988b5eef7824f40ce0)
---
 .../broker/transaction/TransactionProduceTest.java | 35 +++++++++--
 .../client/impl/transaction/TransactionImpl.java   | 70 ++++++++++------------
 2 files changed, 62 insertions(+), 43 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
index cdbb1563280..ddd8cf07903 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
@@ -26,6 +26,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import lombok.Cleanup;
@@ -43,6 +45,7 @@ import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.transaction.Transaction;
@@ -51,10 +54,11 @@ import org.apache.pulsar.common.api.proto.MarkerType;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.protocol.Commands;
+import 
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 /**
@@ -70,7 +74,7 @@ public class TransactionProduceTest extends 
TransactionTestBase {
     private static final String ACK_COMMIT_TOPIC = NAMESPACE1 + "/ack-commit";
     private static final String ACK_ABORT_TOPIC = NAMESPACE1 + "/ack-abort";
     private static final int NUM_PARTITIONS = 16;
-    @BeforeMethod
+    @BeforeClass
     protected void setup() throws Exception {
         setUpBase(1, NUM_PARTITIONS, PRODUCE_COMMIT_TOPIC, TOPIC_PARTITION);
         admin.topics().createPartitionedTopic(PRODUCE_ABORT_TOPIC, 
TOPIC_PARTITION);
@@ -78,7 +82,7 @@ public class TransactionProduceTest extends 
TransactionTestBase {
         admin.topics().createPartitionedTopic(ACK_ABORT_TOPIC, 
TOPIC_PARTITION);
     }
 
-    @AfterMethod(alwaysRun = true)
+    @AfterClass(alwaysRun = true)
     protected void cleanup() throws Exception {
         super.internalCleanup();
     }
@@ -369,5 +373,26 @@ public class TransactionProduceTest extends 
TransactionTestBase {
         return pendingAckCount;
     }
 
-
+    @Test
+    public void testCommitFailure() throws Exception {
+        Transaction txn = pulsarClient.newTransaction().build().get();
+        final String topic = NAMESPACE1 + "/test-commit-failure";
+        @Cleanup
+        final Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).create();
+        producer.newMessage(txn).value(new byte[1024 * 1024 * 10]).sendAsync();
+        try {
+            txn.commit().get();
+            Assert.fail();
+        } catch (ExecutionException e) {
+            Assert.assertTrue(e.getCause() instanceof 
PulsarClientException.TransactionHasOperationFailedException);
+            Assert.assertEquals(txn.getState(), Transaction.State.ABORTED);
+        }
+        try {
+            
getPulsarServiceList().get(0).getTransactionMetadataStoreService().getTxnMeta(txn.getTxnID())
+                    .getNow(null);
+            Assert.fail();
+        } catch (CompletionException e) {
+            Assert.assertTrue(e.getCause() instanceof 
CoordinatorException.TransactionNotFoundException);
+        }
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
index b7e085ed82a..d1260ba045e 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl.transaction;
 import com.google.common.collect.Lists;
 import io.netty.util.Timeout;
 import io.netty.util.TimerTask;
+import java.util.Arrays;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -186,13 +187,14 @@ public class TransactionImpl implements Transaction , 
TimerTask {
     @Override
     public CompletableFuture<Void> commit() {
         timeout.cancel();
-        return checkIfOpenOrCommitting().thenCompose((value) -> {
+        return checkState(State.OPEN, State.COMMITTING).thenCompose((value) -> 
{
             CompletableFuture<Void> commitFuture = new CompletableFuture<>();
             this.state = State.COMMITTING;
             opFuture.whenComplete((v, e) -> {
                 if (hasOpsFailed) {
-                    abort().whenComplete((vx, ex) -> 
commitFuture.completeExceptionally(new PulsarClientException
-                            .TransactionHasOperationFailedException()));
+                    checkState(State.COMMITTING).thenCompose(__ -> 
internalAbort()).whenComplete((vx, ex) ->
+                            commitFuture.completeExceptionally(
+                                    new 
PulsarClientException.TransactionHasOperationFailedException()));
                 } else {
                     tcClient.commitAsync(txnId)
                             .whenComplete((vx, ex) -> {
@@ -216,28 +218,30 @@ public class TransactionImpl implements Transaction , 
TimerTask {
     @Override
     public CompletableFuture<Void> abort() {
         timeout.cancel();
-        return checkIfOpenOrAborting().thenCompose(value -> {
-            CompletableFuture<Void> abortFuture = new CompletableFuture<>();
-            this.state = State.ABORTING;
-            opFuture.whenComplete((v, e) -> {
-                tcClient.abortAsync(txnId).whenComplete((vx, ex) -> {
+        return checkState(State.OPEN, State.ABORTING).thenCompose(__ -> 
internalAbort());
+    }
 
-                    if (ex != null) {
-                        if (ex instanceof TransactionNotFoundException
-                                || ex instanceof InvalidTxnStatusException) {
-                            this.state = State.ERROR;
-                        }
-                        abortFuture.completeExceptionally(ex);
-                    } else {
-                        this.state = State.ABORTED;
-                        abortFuture.complete(null);
+    private CompletableFuture<Void> internalAbort() {
+        CompletableFuture<Void> abortFuture = new CompletableFuture<>();
+        this.state = State.ABORTING;
+        opFuture.whenComplete((v, e) -> {
+            tcClient.abortAsync(txnId).whenComplete((vx, ex) -> {
+
+                if (ex != null) {
+                    if (ex instanceof TransactionNotFoundException
+                            || ex instanceof InvalidTxnStatusException) {
+                        this.state = State.ERROR;
                     }
+                    abortFuture.completeExceptionally(ex);
+                } else {
+                    this.state = State.ABORTED;
+                    abortFuture.complete(null);
+                }
 
-                });
             });
-
-            return abortFuture;
         });
+
+        return abortFuture;
     }
 
     @Override
@@ -261,25 +265,15 @@ public class TransactionImpl implements Transaction , 
TimerTask {
         }
     }
 
-    private CompletableFuture<Void> checkIfOpenOrCommitting() {
-        if (state == State.OPEN || state == State.COMMITTING) {
-            return CompletableFuture.completedFuture(null);
-        } else {
-            return invalidTxnStatusFuture();
-        }
-    }
-
-    private CompletableFuture<Void> checkIfOpenOrAborting() {
-        if (state == State.OPEN || state == State.ABORTING) {
-            return CompletableFuture.completedFuture(null);
-        } else {
-            return invalidTxnStatusFuture();
+    private CompletableFuture<Void> checkState(State... expectedStates) {
+        final State actualState = STATE_UPDATE.get(this);
+        for (State expectedState : expectedStates) {
+            if (actualState == expectedState) {
+                return CompletableFuture.completedFuture(null);
+            }
         }
-    }
-
-    private CompletableFuture<Void> invalidTxnStatusFuture() {
         return FutureUtil.failedFuture(new InvalidTxnStatusException("[" + 
txnIdMostBits + ":"
-                + txnIdLeastBits + "] with unexpected state : "
-                + state.name() + ", expect " + State.OPEN + " state!"));
+                + txnIdLeastBits + "] with unexpected state: " + 
actualState.name() + ", expect: "
+                + Arrays.toString(expectedStates)));
     }
 }

Reply via email to