This is an automated email from the ASF dual-hosted git repository.

jianbin pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/incubator-seata.git


The following commit(s) were added to refs/heads/2.x by this push:
     new 7ce96e4c7c feature : RocketMQ transaction are supported (#6230)
7ce96e4c7c is described below

commit 7ce96e4c7ce0d16b6b8c3f8e6d19fd6550312f00
Author: justabug <bug...@users.noreply.github.com>
AuthorDate: Sun Mar 3 21:20:46 2024 +0800

    feature : RocketMQ transaction are supported (#6230)
---
 all/pom.xml                                        |   5 +
 bom/pom.xml                                        |   5 +
 changes/en-us/2.x.md                               |   1 +
 changes/zh-cn/2.x.md                               |   1 +
 .../org/apache/seata/common/ConfigurationKeys.java |   5 +
 .../org/apache/seata/common/DefaultValues.java     |   2 +
 .../org/apache/seata/core/context/RootContext.java |   2 +
 .../apache/seata/core/model/ResourceManager.java   |   9 ++
 dependencies/pom.xml                               |   7 +
 pom.xml                                            |   1 +
 .../apache/seata/rm/AbstractResourceManager.java   |  15 +++
 .../apache/seata/rm/DefaultResourceManager.java    |   6 +
 {seata-spring-boot-starter => rocketmq}/pom.xml    |  38 ++----
 .../integration/rocketmq/SeataMQProducer.java      | 149 +++++++++++++++++++++
 .../rocketmq/SeataMQProducerFactory.java           |  63 +++++++++
 .../seata/integration/rocketmq/TCCRocketMQ.java    |  79 +++++++++++
 .../integration/rocketmq/TCCRocketMQImpl.java      |  96 +++++++++++++
 .../rocketmq/SeataMQProducerFactoryTest.java       |  36 +++++
 .../integration/rocketmq/SeataMQProducerTest.java  |  31 +++++
 rocketmq/src/test/resources/file.conf              |  25 ++++
 rocketmq/src/test/resources/registry.conf          |  34 +++++
 seata-spring-boot-starter/pom.xml                  |   5 +
 .../rm/tcc/interceptor/ProxyUtilsTccTest.java      |   8 ++
 .../apache/seata/mockserver/MockCoordinator.java   |   2 +-
 test/pom.xml                                       |  11 ++
 .../rocketmq/SeataMQProducerSendTest.java          | 139 +++++++++++++++++++
 26 files changed, 744 insertions(+), 31 deletions(-)

diff --git a/all/pom.xml b/all/pom.xml
index d3542d1758..97cd30bbba 100644
--- a/all/pom.xml
+++ b/all/pom.xml
@@ -192,6 +192,11 @@
                 </exclusion>
             </exclusions>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seata</groupId>
+            <artifactId>seata-rocketmq</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.seata</groupId>
             <artifactId>seata-sqlparser-core</artifactId>
diff --git a/bom/pom.xml b/bom/pom.xml
index 224c84d1f6..514d4f048b 100644
--- a/bom/pom.xml
+++ b/bom/pom.xml
@@ -191,6 +191,11 @@
                 <artifactId>seata-http</artifactId>
                 <version>${project.version}</version>
             </dependency>
+            <dependency>
+                <groupId>org.apache.seata</groupId>
+                <artifactId>seata-rocketmq</artifactId>
+                <version>${project.version}</version>
+            </dependency>
             <dependency>
                 <groupId>org.apache.seata</groupId>
                 <artifactId>seata-rm</artifactId>
diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md
index 64b2df332e..2df25a629d 100644
--- a/changes/en-us/2.x.md
+++ b/changes/en-us/2.x.md
@@ -6,6 +6,7 @@ Add changes here for all PR submitted to the 2.x branch.
 - [[#6370](https://github.com/seata/seata/pull/6370)] seata saga decouple 
spring, optimize architecture.
 - [[#6205](https://github.com/apache/incubator-seata/pull/6205)] mock server
 - [[#6169](https://github.com/apache/incubator-seata/pull/6169)] full support 
for states in the refactored state machine designer
+- [[#6230](https://github.com/apache/incubator-seata/pull/6230)] RocketMQ 
transaction are supported
 
 ### bugfix:
 - [[#6090](https://github.com/apache/incubator-seata/pull/6090)] fix the TCC 
aspect exception handling process, do not wrapping the internal call exceptions
diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md
index 4c5d160841..6099f3ae33 100644
--- a/changes/zh-cn/2.x.md
+++ b/changes/zh-cn/2.x.md
@@ -6,6 +6,7 @@
 - [[#6370](https://github.com/seata/seata/pull/6370)] seata saga spring接耦、架构优化。
 - [[#6205](https://github.com/apache/incubator-seata/pull/6205)] 提供mock server
 - [[#6169](https://github.com/apache/incubator-seata/pull/6169)] 支持新版本状态机设计器
+- [[#6230](https://github.com/apache/incubator-seata/pull/6230)] 支持RocketMQ消息事务
 
 ### bugfix:
 - [[#6090](https://github.com/apache/incubator-seata/pull/6090)] 
修复tcc切面异常处理过程,不对内部调用异常做包装处理,直接向外抛出
diff --git 
a/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java 
b/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java
index 7e6521c5ab..e1c0f11f39 100644
--- a/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java
+++ b/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java
@@ -1006,4 +1006,9 @@ public interface ConfigurationKeys {
      * The constant SERVER_APPLICATION_DATA_SIZE_CHECK
      */
     String SERVER_APPLICATION_DATA_SIZE_CHECK = SERVER_PREFIX + 
"applicationDataLimitCheck";
+
+    /**
+     * The constant ROCKET_MQ_MSG_TIMEOUT
+     */
+    String ROCKET_MQ_MSG_TIMEOUT = SERVER_PREFIX + "rocketmqMsgTimeout";
 }
diff --git a/common/src/main/java/org/apache/seata/common/DefaultValues.java 
b/common/src/main/java/org/apache/seata/common/DefaultValues.java
index 67e21838fb..0c2dd0f7b8 100644
--- a/common/src/main/java/org/apache/seata/common/DefaultValues.java
+++ b/common/src/main/java/org/apache/seata/common/DefaultValues.java
@@ -312,4 +312,6 @@ public interface DefaultValues {
      * Default druid location in classpath
      */
     String DRUID_LOCATION = "lib/sqlparser/druid.jar";
+
+    int DEFAULT_ROCKET_MQ_MSG_TIMEOUT = 60 * 1000;
 }
diff --git a/core/src/main/java/org/apache/seata/core/context/RootContext.java 
b/core/src/main/java/org/apache/seata/core/context/RootContext.java
index eb78a1bf2e..85453afb5d 100644
--- a/core/src/main/java/org/apache/seata/core/context/RootContext.java
+++ b/core/src/main/java/org/apache/seata/core/context/RootContext.java
@@ -48,6 +48,8 @@ public class RootContext {
      */
     public static final String KEY_XID = "TX_XID";
 
+    public static final String KEY_BRANCHID = "TX_BRANCHID";
+
     /**
      * The constant HIDDEN_KEY_XID for sofa-rpc integration.
      */
diff --git 
a/core/src/main/java/org/apache/seata/core/model/ResourceManager.java 
b/core/src/main/java/org/apache/seata/core/model/ResourceManager.java
index ab65cdb76e..ee59058965 100644
--- a/core/src/main/java/org/apache/seata/core/model/ResourceManager.java
+++ b/core/src/main/java/org/apache/seata/core/model/ResourceManager.java
@@ -51,4 +51,13 @@ public interface ResourceManager extends 
ResourceManagerInbound, ResourceManager
      * @return The BranchType of ResourceManager.
      */
     BranchType getBranchType();
+
+    /**
+     * Get the GlobalStatus.
+     *
+     * @param branchType The BranchType of ResourceManager.
+     * @param xid The xid of transaction.
+     * @return The GlobalStatus of transaction.
+     */
+    GlobalStatus getGlobalStatus(BranchType branchType, String xid);
 }
diff --git a/dependencies/pom.xml b/dependencies/pom.xml
index ad5b15c6ab..b317f642ec 100644
--- a/dependencies/pom.xml
+++ b/dependencies/pom.xml
@@ -114,6 +114,8 @@
         <!-- for jdbc driver when package -->
         <mysql5.version>${mysql.version}</mysql5.version>
         <mysql8.version>8.0.27</mysql8.version>
+        <!-- rocketmq -->
+        <rocketmq-version>5.0.0</rocketmq-version>
 
         <!-- # for kotlin -->
         <kotlin.version>1.4.32</kotlin.version>
@@ -781,6 +783,11 @@
                 <artifactId>janino</artifactId>
                 <version>${janino-version}</version>
             </dependency>
+            <dependency>
+                <groupId>org.apache.rocketmq</groupId>
+                <artifactId>rocketmq-client</artifactId>
+                <version>${rocketmq-version}</version>
+            </dependency>
 
             <!-- web -->
             <dependency>
diff --git a/pom.xml b/pom.xml
index d403ecebc7..a9feab80b9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -57,6 +57,7 @@
         <module>integration/brpc</module>
         <module>rm</module>
         <module>rm-datasource</module>
+        <module>rocketmq</module>
         <module>spring</module>
         <module>tcc</module>
         <module>test</module>
diff --git a/rm/src/main/java/org/apache/seata/rm/AbstractResourceManager.java 
b/rm/src/main/java/org/apache/seata/rm/AbstractResourceManager.java
index 1c82579031..fa4e776b11 100644
--- a/rm/src/main/java/org/apache/seata/rm/AbstractResourceManager.java
+++ b/rm/src/main/java/org/apache/seata/rm/AbstractResourceManager.java
@@ -28,6 +28,7 @@ import org.apache.seata.core.exception.TransactionException;
 import org.apache.seata.core.exception.TransactionExceptionCode;
 import org.apache.seata.core.model.BranchStatus;
 import org.apache.seata.core.model.BranchType;
+import org.apache.seata.core.model.GlobalStatus;
 import org.apache.seata.core.model.Resource;
 import org.apache.seata.core.model.ResourceManager;
 import org.apache.seata.core.protocol.ResultCode;
@@ -35,6 +36,8 @@ import 
org.apache.seata.core.protocol.transaction.BranchRegisterRequest;
 import org.apache.seata.core.protocol.transaction.BranchRegisterResponse;
 import org.apache.seata.core.protocol.transaction.BranchReportRequest;
 import org.apache.seata.core.protocol.transaction.BranchReportResponse;
+import org.apache.seata.core.protocol.transaction.GlobalStatusRequest;
+import org.apache.seata.core.protocol.transaction.GlobalStatusResponse;
 import org.apache.seata.core.rpc.netty.RmNettyRemotingClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -140,4 +143,16 @@ public abstract class AbstractResourceManager implements 
ResourceManager {
     public void registerResource(Resource resource) {
         
RmNettyRemotingClient.getInstance().registerResource(resource.getResourceGroupId(),
 resource.getResourceId());
     }
+
+    @Override
+    public GlobalStatus getGlobalStatus(BranchType branchType, String xid) {
+        GlobalStatusRequest queryGlobalStatus = new GlobalStatusRequest();
+        queryGlobalStatus.setXid(xid);
+        try {
+            GlobalStatusResponse response = (GlobalStatusResponse) 
RmNettyRemotingClient.getInstance().sendSyncRequest(queryGlobalStatus);
+            return response.getGlobalStatus();
+        } catch (TimeoutException e) {
+            throw new RuntimeException(e);
+        }
+    }
 }
diff --git a/rm/src/main/java/org/apache/seata/rm/DefaultResourceManager.java 
b/rm/src/main/java/org/apache/seata/rm/DefaultResourceManager.java
index 45097b0de0..51bbff569c 100644
--- a/rm/src/main/java/org/apache/seata/rm/DefaultResourceManager.java
+++ b/rm/src/main/java/org/apache/seata/rm/DefaultResourceManager.java
@@ -27,6 +27,7 @@ import org.apache.seata.common.util.CollectionUtils;
 import org.apache.seata.core.exception.TransactionException;
 import org.apache.seata.core.model.BranchStatus;
 import org.apache.seata.core.model.BranchType;
+import org.apache.seata.core.model.GlobalStatus;
 import org.apache.seata.core.model.Resource;
 import org.apache.seata.core.model.ResourceManager;
 
@@ -150,6 +151,11 @@ public class DefaultResourceManager implements 
ResourceManager {
         throw new FrameworkException("DefaultResourceManager isn't a real 
ResourceManager");
     }
 
+    @Override
+    public GlobalStatus getGlobalStatus(BranchType branchType, String xid) {
+        return getResourceManager(branchType).getGlobalStatus(branchType, xid);
+    }
+
     private static class SingletonHolder {
         private static DefaultResourceManager INSTANCE = new 
DefaultResourceManager();
     }
diff --git a/seata-spring-boot-starter/pom.xml b/rocketmq/pom.xml
similarity index 55%
copy from seata-spring-boot-starter/pom.xml
copy to rocketmq/pom.xml
index 517923778e..23297aff44 100644
--- a/seata-spring-boot-starter/pom.xml
+++ b/rocketmq/pom.xml
@@ -24,48 +24,26 @@
         <groupId>org.apache.seata</groupId>
         <artifactId>seata-parent</artifactId>
         <version>${revision}</version>
+        <relativePath>../pom.xml</relativePath>
     </parent>
     <modelVersion>4.0.0</modelVersion>
-    <artifactId>seata-spring-boot-starter</artifactId>
+    <artifactId>seata-rocketmq</artifactId>
     <packaging>jar</packaging>
-    <name>seata-spring-boot-starter ${project.version}</name>
-    <description>spring-boot-starter for Seata built with Maven</description>
+    <name>seata-rocketmq ${project.version}</name>
+    <description>rocketmq integration for Seata built with Maven</description>
 
     <dependencies>
         <dependency>
             <groupId>${project.groupId}</groupId>
-            <artifactId>seata-spring-autoconfigure-client</artifactId>
+            <artifactId>seata-tcc</artifactId>
             <version>${project.version}</version>
         </dependency>
-        <!--seata-->
         <dependency>
-            <groupId>${project.groupId}</groupId>
-            <artifactId>seata-all</artifactId>
-            <version>${project.version}</version>
-            <exclusions>
-                <exclusion>
-                    <artifactId>log4j</artifactId>
-                    <groupId>log4j</groupId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-        <!--spring-->
-        <dependency>
-            <groupId>org.springframework</groupId>
-            <artifactId>spring-webmvc</artifactId>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-client</artifactId>
             <scope>provided</scope>
-            <optional>true</optional>
-        </dependency>
-        <dependency>
-            <groupId>org.springframework.boot</groupId>
-            <artifactId>spring-boot-autoconfigure</artifactId>
-            <optional>true</optional>
-        </dependency>
-        <dependency>
-            <groupId>org.springframework.boot</groupId>
-            <artifactId>spring-boot-configuration-processor</artifactId>
-            <optional>true</optional>
         </dependency>
     </dependencies>
 
+
 </project>
diff --git 
a/rocketmq/src/main/java/org/apache/seata/integration/rocketmq/SeataMQProducer.java
 
b/rocketmq/src/main/java/org/apache/seata/integration/rocketmq/SeataMQProducer.java
new file mode 100644
index 0000000000..2846d00073
--- /dev/null
+++ 
b/rocketmq/src/main/java/org/apache/seata/integration/rocketmq/SeataMQProducer.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.integration.rocketmq;
+
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.seata.common.util.StringUtils;
+import org.apache.seata.core.context.RootContext;
+import org.apache.seata.core.model.GlobalStatus;
+import org.apache.seata.rm.DefaultResourceManager;
+import org.apache.rocketmq.client.Validators;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.LocalTransactionState;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.TransactionListener;
+import org.apache.rocketmq.client.producer.TransactionMQProducer;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Seata MQ Producer
+ **/
+public class SeataMQProducer extends TransactionMQProducer {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SeataMQProducer.class);
+
+    private static final List<GlobalStatus> COMMIT_STATUSES = 
Arrays.asList(GlobalStatus.Committed, GlobalStatus.Committing, 
GlobalStatus.CommitRetrying);
+    private static final List<GlobalStatus> ROLLBACK_STATUSES = 
Arrays.asList(GlobalStatus.Rollbacked, GlobalStatus.Rollbacking, 
GlobalStatus.RollbackRetrying);
+
+    public static String PROPERTY_SEATA_XID = RootContext.KEY_XID;
+    public static String PROPERTY_SEATA_BRANCHID = RootContext.KEY_BRANCHID;
+    private TransactionListener transactionListener;
+
+    private TCCRocketMQ tccRocketMQ;
+
+    SeataMQProducer(final String producerGroup) {
+        this(null, producerGroup, null);
+    }
+
+    SeataMQProducer(final String namespace, final String producerGroup, 
RPCHook rpcHook) {
+        super(namespace, producerGroup, rpcHook);
+        this.transactionListener = new TransactionListener() {
+            @Override
+            public LocalTransactionState executeLocalTransaction(Message msg, 
Object arg) {
+                return LocalTransactionState.UNKNOW;
+            }
+
+            @Override
+            public LocalTransactionState checkLocalTransaction(MessageExt msg) 
{
+                String xid = msg.getProperty(PROPERTY_SEATA_XID);
+                if (StringUtils.isBlank(xid)) {
+                    LOGGER.error("msg has no xid, msgTransactionId: {}, msg 
will be rollback", msg.getTransactionId());
+                    return LocalTransactionState.ROLLBACK_MESSAGE;
+                }
+                GlobalStatus globalStatus = 
DefaultResourceManager.get().getGlobalStatus(SeataMQProducerFactory.ROCKET_BRANCH_TYPE,
 xid);
+                if (COMMIT_STATUSES.contains(globalStatus)) {
+                    return LocalTransactionState.COMMIT_MESSAGE;
+                } else if (ROLLBACK_STATUSES.contains(globalStatus) || 
GlobalStatus.isOnePhaseTimeout(globalStatus)) {
+                    return LocalTransactionState.ROLLBACK_MESSAGE;
+                } else if (GlobalStatus.Finished.equals(globalStatus)) {
+                    LOGGER.error("global transaction finished, msg will be 
rollback, xid: {}", xid);
+                    return LocalTransactionState.ROLLBACK_MESSAGE;
+                }
+                return LocalTransactionState.UNKNOW;
+            }
+        };
+    }
+
+    @Override
+    public SendResult send(Message msg) throws MQClientException, 
MQBrokerException, RemotingException, InterruptedException {
+        return send(msg, this.getSendMsgTimeout());
+    }
+
+    @Override
+    public SendResult send(Message msg, long timeout) throws 
MQClientException, MQBrokerException, RemotingException, InterruptedException {
+        if (RootContext.inGlobalTransaction()) {
+            if (tccRocketMQ == null) {
+                throw new RuntimeException("TCCRocketMQ is not initialized");
+            }
+            return tccRocketMQ.prepare(msg, timeout);
+        } else {
+            return super.send(msg, timeout);
+        }
+    }
+
+    public SendResult doSendMessageInTransaction(final Message msg, long 
timeout, String xid, long branchId) throws MQClientException {
+        msg.setTopic(withNamespace(msg.getTopic()));
+        if (msg.getDelayTimeLevel() != 0) {
+            MessageAccessor.clearProperty(msg, 
MessageConst.PROPERTY_DELAY_TIME_LEVEL);
+        }
+        Validators.checkMessage(msg, this);
+
+        SendResult sendResult = null;
+        MessageAccessor.putProperty(msg, 
MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
+        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, 
this.getProducerGroup());
+        MessageAccessor.putProperty(msg, PROPERTY_SEATA_XID, xid);
+        MessageAccessor.putProperty(msg, PROPERTY_SEATA_BRANCHID, 
String.valueOf(branchId));
+        try {
+            sendResult = super.send(msg, timeout);
+        } catch (Exception e) {
+            throw new MQClientException("send message Exception", e);
+        }
+
+        if (SendStatus.SEND_OK != sendResult.getSendStatus()) {
+            throw new RuntimeException("Message send fail.status=" + 
sendResult.getSendStatus());
+        }
+        if (sendResult.getTransactionId() != null) {
+            msg.putUserProperty("__transactionId__", 
sendResult.getTransactionId());
+        }
+        String transactionId = 
msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
+        if (null != transactionId && !"".equals(transactionId)) {
+            msg.setTransactionId(transactionId);
+        }
+        return sendResult;
+    }
+
+
+    @Override
+    public TransactionListener getTransactionListener() {
+        return transactionListener;
+    }
+
+    public void setTccRocketMQ(TCCRocketMQ tccRocketMQ) {
+        this.tccRocketMQ = tccRocketMQ;
+    }
+}
diff --git 
a/rocketmq/src/main/java/org/apache/seata/integration/rocketmq/SeataMQProducerFactory.java
 
b/rocketmq/src/main/java/org/apache/seata/integration/rocketmq/SeataMQProducerFactory.java
new file mode 100644
index 0000000000..63414aa129
--- /dev/null
+++ 
b/rocketmq/src/main/java/org/apache/seata/integration/rocketmq/SeataMQProducerFactory.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.integration.rocketmq;
+
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.seata.common.exception.NotSupportYetException;
+import org.apache.seata.core.model.BranchType;
+import org.apache.seata.integration.tx.api.util.ProxyUtil;
+
+/**
+ * SeataMQProducer Factory
+ **/
+public class SeataMQProducerFactory {
+
+    public static final String ROCKET_TCC_NAME = "tccRocketMQ";
+    public static final BranchType ROCKET_BRANCH_TYPE = BranchType.TCC;
+
+    /**
+     * Default Producer, it can be replaced to Map after multi-resource is 
supported
+     */
+    private static SeataMQProducer defaultProducer;
+
+    public static SeataMQProducer createSingle(String nameServer, String 
producerGroup) throws MQClientException {
+        return createSingle(nameServer, null, producerGroup, null);
+    }
+
+    public static SeataMQProducer createSingle(String nameServer, String 
namespace,
+                                               String groupName, RPCHook 
rpcHook) throws MQClientException {
+        if (defaultProducer == null) {
+            synchronized (SeataMQProducerFactory.class) {
+                if (defaultProducer == null) {
+                    defaultProducer = new SeataMQProducer(namespace, 
groupName, rpcHook);
+                    defaultProducer.setNamesrvAddr(nameServer);
+                    TCCRocketMQ tccRocketMQProxy = ProxyUtil.createProxy(new 
TCCRocketMQImpl());
+                    tccRocketMQProxy.setProducer(defaultProducer);
+                    defaultProducer.setTccRocketMQ(tccRocketMQProxy);
+                    defaultProducer.start();
+                    return defaultProducer;
+                }
+            }
+        }
+        throw new NotSupportYetException("only one seata producer is 
permitted");
+    }
+
+    public static SeataMQProducer getProducer() {
+        return defaultProducer;
+    }
+}
diff --git 
a/rocketmq/src/main/java/org/apache/seata/integration/rocketmq/TCCRocketMQ.java 
b/rocketmq/src/main/java/org/apache/seata/integration/rocketmq/TCCRocketMQ.java
new file mode 100644
index 0000000000..44ea5dfac3
--- /dev/null
+++ 
b/rocketmq/src/main/java/org/apache/seata/integration/rocketmq/TCCRocketMQ.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.integration.rocketmq;
+
+import org.apache.seata.core.exception.TransactionException;
+import org.apache.seata.rm.tcc.api.BusinessActionContext;
+
+import java.net.UnknownHostException;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+
+/**
+ * The interface Tcc rocket mq.
+ */
+public interface TCCRocketMQ {
+
+    /**
+     * set SeataMQProducer
+     *
+     * @param producer the producer
+     */
+    void setProducer(SeataMQProducer producer);
+
+    /**
+     * RocketMQ half send
+     *
+     * @param message  the message
+     * @param timeout  the timeout
+     * @return SendResult
+     */
+    SendResult prepare(Message message, long timeout) throws MQClientException;
+
+    /**
+     * RocketMQ half send commit
+     *
+     * @param context the BusinessActionContext
+     * @return SendResult
+     * @throws UnknownHostException
+     * @throws MQBrokerException
+     * @throws RemotingException
+     * @throws InterruptedException
+     */
+    boolean commit(BusinessActionContext context)
+            throws UnknownHostException, MQBrokerException, RemotingException, 
InterruptedException, TransactionException, TimeoutException;
+
+    /**
+     * RocketMQ half send rollback
+     *
+     * @param context the BusinessActionContext
+     * @return
+     * @throws UnknownHostException
+     * @throws MQBrokerException
+     * @throws RemotingException
+     * @throws InterruptedException
+     */
+    boolean rollback(BusinessActionContext context)
+            throws UnknownHostException, MQBrokerException, RemotingException, 
InterruptedException, TransactionException;
+
+
+}
\ No newline at end of file
diff --git 
a/rocketmq/src/main/java/org/apache/seata/integration/rocketmq/TCCRocketMQImpl.java
 
b/rocketmq/src/main/java/org/apache/seata/integration/rocketmq/TCCRocketMQImpl.java
new file mode 100644
index 0000000000..bd1a797203
--- /dev/null
+++ 
b/rocketmq/src/main/java/org/apache/seata/integration/rocketmq/TCCRocketMQImpl.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.integration.rocketmq;
+
+import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
+import org.apache.seata.core.exception.TransactionException;
+import org.apache.seata.rm.tcc.api.BusinessActionContext;
+import org.apache.seata.rm.tcc.api.BusinessActionContextUtil;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.LocalTransactionState;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.seata.rm.tcc.api.LocalTCC;
+import org.apache.seata.rm.tcc.api.TwoPhaseBusinessAction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * the type TCCRocketMQImpl
+ */
+@LocalTCC
+public class TCCRocketMQImpl implements TCCRocketMQ {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(TCCRocketMQImpl.class);
+    private static final String ROCKET_MSG_KEY = "ROCKET_MSG";
+    private static final String ROCKET_SEND_RESULT_KEY = "ROCKET_SEND_RESULT";
+
+    private SeataMQProducer producer;
+    private DefaultMQProducerImpl producerImpl;
+
+    @Override
+    public void setProducer(SeataMQProducer producer) {
+        this.producer = producer;
+        this.producerImpl = producer.getDefaultMQProducerImpl();
+    }
+
+    @Override
+    @TwoPhaseBusinessAction(name = SeataMQProducerFactory.ROCKET_TCC_NAME)
+    public SendResult prepare(Message message, long timeout) throws 
MQClientException {
+        BusinessActionContext context = BusinessActionContextUtil.getContext();
+        LOGGER.info("RocketMQ message send prepare, xid = {}", 
context.getXid());
+        Map<String, Object> params = new HashMap<>(8);
+        SendResult sendResult = producer.doSendMessageInTransaction(message, 
timeout, context.getXid(), context.getBranchId());
+        message.setDeliverTimeMs(0);
+        params.put(ROCKET_MSG_KEY, message);
+        params.put(ROCKET_SEND_RESULT_KEY, sendResult);
+        BusinessActionContextUtil.addContext(params);
+        return sendResult;
+    }
+
+    @Override
+    public boolean commit(BusinessActionContext context)
+            throws UnknownHostException, MQBrokerException, RemotingException, 
InterruptedException, TimeoutException, TransactionException {
+        Message message = context.getActionContext(ROCKET_MSG_KEY, 
Message.class);
+        SendResult sendResult = 
context.getActionContext(ROCKET_SEND_RESULT_KEY, SendResult.class);
+        if (message == null || sendResult == null) {
+            throw new TransactionException("TCCRocketMQ commit but cannot find 
message and sendResult");
+        }
+        this.producerImpl.endTransaction(message, sendResult, 
LocalTransactionState.COMMIT_MESSAGE, null);
+        LOGGER.info("RocketMQ message send commit, xid = {}, branchId = {}", 
context.getXid(), context.getBranchId());
+        return true;
+    }
+
+    @Override
+    public boolean rollback(BusinessActionContext context)
+            throws UnknownHostException, MQBrokerException, RemotingException, 
InterruptedException, TransactionException {
+        Message message = context.getActionContext(ROCKET_MSG_KEY, 
Message.class);
+        SendResult sendResult = 
context.getActionContext(ROCKET_SEND_RESULT_KEY, SendResult.class);
+        if (message == null || sendResult == null) {
+            LOGGER.error("TCCRocketMQ rollback but cannot find message and 
sendResult");
+        }
+        this.producerImpl.endTransaction(message, sendResult, 
LocalTransactionState.ROLLBACK_MESSAGE, null);
+        LOGGER.info("RocketMQ message send rollback, xid = {}, branchId = {}", 
context.getXid(), context.getBranchId());
+        return true;
+    }
+}
\ No newline at end of file
diff --git 
a/rocketmq/src/test/java/org/apache/seata/integration/rocketmq/SeataMQProducerFactoryTest.java
 
b/rocketmq/src/test/java/org/apache/seata/integration/rocketmq/SeataMQProducerFactoryTest.java
new file mode 100644
index 0000000000..975bb885d8
--- /dev/null
+++ 
b/rocketmq/src/test/java/org/apache/seata/integration/rocketmq/SeataMQProducerFactoryTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.integration.rocketmq;
+
+import org.apache.seata.common.exception.NotSupportYetException;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+/**
+ * seata mq producer factory test
+ **/
+public class SeataMQProducerFactoryTest {
+
+    @Test
+    public void testCreateSingle() throws Exception {
+        SeataMQProducerFactory.createSingle("127.0.0.1:9876", "test");
+        Assertions.assertThrows(NotSupportYetException.class, () -> 
SeataMQProducerFactory.createSingle("127.0.0.1:9876", "test"));
+
+        SeataMQProducer producer = SeataMQProducerFactory.getProducer();
+        Assertions.assertNotNull(producer);
+    }
+}
diff --git 
a/rocketmq/src/test/java/org/apache/seata/integration/rocketmq/SeataMQProducerTest.java
 
b/rocketmq/src/test/java/org/apache/seata/integration/rocketmq/SeataMQProducerTest.java
new file mode 100644
index 0000000000..7b8ab979d5
--- /dev/null
+++ 
b/rocketmq/src/test/java/org/apache/seata/integration/rocketmq/SeataMQProducerTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.integration.rocketmq;
+
+import org.junit.jupiter.api.Test;
+
+/**
+ * seata mq producer test
+ **/
+public class SeataMQProducerTest {
+
+    @Test
+    public void testCreate(){
+        new SeataMQProducer("testProducerGroup");
+        new SeataMQProducer("testNamespace", "testProducerGroup",null);
+    }
+}
diff --git a/rocketmq/src/test/resources/file.conf 
b/rocketmq/src/test/resources/file.conf
new file mode 100644
index 0000000000..46c3e0401c
--- /dev/null
+++ b/rocketmq/src/test/resources/file.conf
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+service {
+  #transaction service group mapping
+  vgroupMapping.default_tx_group = "default"
+  #only support when registry.type=file, please don't set multiple addresses
+  default.grouplist = "127.0.0.1:8091"
+  #disable seata
+  disableGlobalTransaction = false
+}
\ No newline at end of file
diff --git a/rocketmq/src/test/resources/registry.conf 
b/rocketmq/src/test/resources/registry.conf
new file mode 100644
index 0000000000..5ad014bf55
--- /dev/null
+++ b/rocketmq/src/test/resources/registry.conf
@@ -0,0 +1,34 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+registry {
+  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
+  type = "file"
+
+  file {
+    name = "file.conf"
+  }
+}
+
+config {
+  # file、nacos 、apollo、zk、consul、etcd3
+  type = "file"
+
+  file {
+    name = "file.conf"
+  }
+}
\ No newline at end of file
diff --git a/seata-spring-boot-starter/pom.xml 
b/seata-spring-boot-starter/pom.xml
index 517923778e..24f449166b 100644
--- a/seata-spring-boot-starter/pom.xml
+++ b/seata-spring-boot-starter/pom.xml
@@ -66,6 +66,11 @@
             <artifactId>spring-boot-configuration-processor</artifactId>
             <optional>true</optional>
         </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-client</artifactId>
+            <optional>true</optional>
+        </dependency>
     </dependencies>
 
 </project>
diff --git 
a/tcc/src/test/java/org/apache/seata/rm/tcc/interceptor/ProxyUtilsTccTest.java 
b/tcc/src/test/java/org/apache/seata/rm/tcc/interceptor/ProxyUtilsTccTest.java
index 48416d90e9..9a243733e9 100644
--- 
a/tcc/src/test/java/org/apache/seata/rm/tcc/interceptor/ProxyUtilsTccTest.java
+++ 
b/tcc/src/test/java/org/apache/seata/rm/tcc/interceptor/ProxyUtilsTccTest.java
@@ -25,6 +25,7 @@ import org.apache.seata.core.context.RootContext;
 import org.apache.seata.core.exception.TransactionException;
 import org.apache.seata.core.model.BranchStatus;
 import org.apache.seata.core.model.BranchType;
+import org.apache.seata.core.model.GlobalStatus;
 import org.apache.seata.core.model.Resource;
 import org.apache.seata.core.model.ResourceManager;
 import org.apache.seata.integration.tx.api.util.ProxyUtil;
@@ -90,6 +91,13 @@ public class ProxyUtilsTccTest {
         public BranchType getBranchType() {
             return null;
         }
+
+        @Override
+        public GlobalStatus getGlobalStatus(BranchType branchType, String xid) 
{
+            return null;
+        }
+
+
     };
 
 
diff --git 
a/test-mock-server/src/main/java/org/apache/seata/mockserver/MockCoordinator.java
 
b/test-mock-server/src/main/java/org/apache/seata/mockserver/MockCoordinator.java
index 727d32e824..eba654fa48 100644
--- 
a/test-mock-server/src/main/java/org/apache/seata/mockserver/MockCoordinator.java
+++ 
b/test-mock-server/src/main/java/org/apache/seata/mockserver/MockCoordinator.java
@@ -254,7 +254,7 @@ public class MockCoordinator extends 
AbstractTCInboundHandler implements Transac
     }
 
     private void checkMockActionFail(String xid) throws TransactionException {
-        if (expectedResultMap.get(xid) == ResultCode.Failed) {
+        if (ResultCode.Failed == expectedResultMap.get(xid)) {
             throw new TransactionException(TransactionExceptionCode.Broken, 
"mock action expect fail");
         }
     }
diff --git a/test/pom.xml b/test/pom.xml
index e9d638fe38..91d4c9f60f 100644
--- a/test/pom.xml
+++ b/test/pom.xml
@@ -141,6 +141,17 @@
             <artifactId>seata-sqlparser-druid</artifactId>
             <version>${project.version}</version>
         </dependency>
+
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>seata-rocketmq</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-client</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
 
diff --git 
a/test/src/test/java/org/apache/seata/integration/rocketmq/SeataMQProducerSendTest.java
 
b/test/src/test/java/org/apache/seata/integration/rocketmq/SeataMQProducerSendTest.java
new file mode 100644
index 0000000000..e1e778f7c3
--- /dev/null
+++ 
b/test/src/test/java/org/apache/seata/integration/rocketmq/SeataMQProducerSendTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.integration.rocketmq;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.MQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import 
org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.seata.common.ConfigurationKeys;
+import org.apache.seata.common.ConfigurationTestHelper;
+import org.apache.seata.core.context.RootContext;
+import org.apache.seata.core.exception.TransactionException;
+import org.apache.seata.core.model.TransactionManager;
+import org.apache.seata.core.rpc.netty.mockserver.ProtocolTestConstants;
+import org.apache.seata.core.rpc.netty.mockserver.TmClientTest;
+import org.apache.seata.mockserver.MockServer;
+import org.apache.seata.rm.RMClient;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * seata mq producer test
+ **/
+public class SeataMQProducerSendTest {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SeataMQProducerSendTest.class);
+
+
+    private static final String TOPIC = "seata-test";
+    private static final String NAME_SERVER = "127.0.0.1:9876";
+
+    private static SeataMQProducer producer;
+
+    @BeforeAll
+    public static void before() throws MQClientException {
+        
ConfigurationTestHelper.putConfig(ConfigurationKeys.SERVER_SERVICE_PORT_CAMEL, 
String.valueOf(ProtocolTestConstants.MOCK_SERVER_PORT));
+        MockServer.start(ProtocolTestConstants.MOCK_SERVER_PORT);
+        producer = SeataMQProducerFactory.createSingle(NAME_SERVER, "test");
+        // should start mq server here
+    }
+
+    @AfterAll
+    public static void after() {
+        MockServer.close();
+        
ConfigurationTestHelper.removeConfig(ConfigurationKeys.SERVER_SERVICE_PORT_CAMEL);
+        producer.shutdown();
+    }
+
+    @Test
+    @Disabled
+    public void testSendCommit() throws MQBrokerException, RemotingException, 
InterruptedException, MQClientException, TransactionException {
+        TransactionManager tm = getTmAndBegin();
+
+        CountDownLatch countDownLatch = new CountDownLatch(1);
+        MQPushConsumer consumer = startConsume(countDownLatch);
+        producer.send(new Message(TOPIC, 
"testMessage".getBytes(StandardCharsets.UTF_8)));
+
+        tm.commit(RootContext.getXID());
+        LOGGER.info("global commit");
+        boolean await = countDownLatch.await(2, TimeUnit.SECONDS);
+        LOGGER.info("await:{}", await);
+        consumer.shutdown();
+    }
+
+    @Test
+    @Disabled
+    public void testSendRollback()
+        throws MQBrokerException, RemotingException, InterruptedException, 
MQClientException, TransactionException {
+        TransactionManager tm = getTmAndBegin();
+
+        CountDownLatch countDownLatch = new CountDownLatch(1);
+        MQPushConsumer consumer = startConsume(countDownLatch);
+
+        producer.send(new Message(TOPIC, 
"testMessage".getBytes(StandardCharsets.UTF_8)));
+
+        tm.rollback(RootContext.getXID());
+        LOGGER.info("global rollback");
+        try {
+            boolean await = countDownLatch.await(2, TimeUnit.SECONDS);
+            LOGGER.info("await:{}", await);
+        } catch (Exception e) {
+            Assertions.assertEquals(e.getClass(), InterruptedException.class);
+        } finally {
+            consumer.shutdown();
+        }
+    }
+
+
+    private static MQPushConsumer startConsume(CountDownLatch countDownLatch) 
throws MQClientException {
+        DefaultMQPushConsumer consumer = new 
DefaultMQPushConsumer("yourGroup");
+        consumer.setNamesrvAddr(NAME_SERVER);
+        
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+        consumer.subscribe(TOPIC,"*");
+        consumer.registerMessageListener((MessageListenerConcurrently) (msg, 
context) -> {
+            LOGGER.info("%s Receive New Messages: {} {}", 
Thread.currentThread().getName(), msg);
+            countDownLatch.countDown();
+            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+        });
+        consumer.start();
+        return consumer;
+    }
+
+
+    private static TransactionManager getTmAndBegin() throws 
TransactionException {
+        TransactionManager tm = TmClientTest.getTm();
+        RMClient.init(ProtocolTestConstants.APPLICATION_ID, 
ProtocolTestConstants.SERVICE_GROUP);
+        String xid = tm.begin(ProtocolTestConstants.APPLICATION_ID, 
ProtocolTestConstants.SERVICE_GROUP, "testRocket", 60000);
+        RootContext.bind(xid);
+        return tm;
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@seata.apache.org
For additional commands, e-mail: notifications-h...@seata.apache.org

Reply via email to