This is an automated email from the ASF dual-hosted git repository. jianbin pushed a commit to branch 2.x in repository https://gitbox.apache.org/repos/asf/incubator-seata.git
The following commit(s) were added to refs/heads/2.x by this push: new 7ce96e4c7c feature : RocketMQ transaction are supported (#6230) 7ce96e4c7c is described below commit 7ce96e4c7ce0d16b6b8c3f8e6d19fd6550312f00 Author: justabug <bug...@users.noreply.github.com> AuthorDate: Sun Mar 3 21:20:46 2024 +0800 feature : RocketMQ transaction are supported (#6230) --- all/pom.xml | 5 + bom/pom.xml | 5 + changes/en-us/2.x.md | 1 + changes/zh-cn/2.x.md | 1 + .../org/apache/seata/common/ConfigurationKeys.java | 5 + .../org/apache/seata/common/DefaultValues.java | 2 + .../org/apache/seata/core/context/RootContext.java | 2 + .../apache/seata/core/model/ResourceManager.java | 9 ++ dependencies/pom.xml | 7 + pom.xml | 1 + .../apache/seata/rm/AbstractResourceManager.java | 15 +++ .../apache/seata/rm/DefaultResourceManager.java | 6 + {seata-spring-boot-starter => rocketmq}/pom.xml | 38 ++---- .../integration/rocketmq/SeataMQProducer.java | 149 +++++++++++++++++++++ .../rocketmq/SeataMQProducerFactory.java | 63 +++++++++ .../seata/integration/rocketmq/TCCRocketMQ.java | 79 +++++++++++ .../integration/rocketmq/TCCRocketMQImpl.java | 96 +++++++++++++ .../rocketmq/SeataMQProducerFactoryTest.java | 36 +++++ .../integration/rocketmq/SeataMQProducerTest.java | 31 +++++ rocketmq/src/test/resources/file.conf | 25 ++++ rocketmq/src/test/resources/registry.conf | 34 +++++ seata-spring-boot-starter/pom.xml | 5 + .../rm/tcc/interceptor/ProxyUtilsTccTest.java | 8 ++ .../apache/seata/mockserver/MockCoordinator.java | 2 +- test/pom.xml | 11 ++ .../rocketmq/SeataMQProducerSendTest.java | 139 +++++++++++++++++++ 26 files changed, 744 insertions(+), 31 deletions(-) diff --git a/all/pom.xml b/all/pom.xml index d3542d1758..97cd30bbba 100644 --- a/all/pom.xml +++ b/all/pom.xml @@ -192,6 +192,11 @@ </exclusion> </exclusions> </dependency> + <dependency> + <groupId>org.apache.seata</groupId> + <artifactId>seata-rocketmq</artifactId> + <version>${project.version}</version> + </dependency> <dependency> <groupId>org.apache.seata</groupId> <artifactId>seata-sqlparser-core</artifactId> diff --git a/bom/pom.xml b/bom/pom.xml index 224c84d1f6..514d4f048b 100644 --- a/bom/pom.xml +++ b/bom/pom.xml @@ -191,6 +191,11 @@ <artifactId>seata-http</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.seata</groupId> + <artifactId>seata-rocketmq</artifactId> + <version>${project.version}</version> + </dependency> <dependency> <groupId>org.apache.seata</groupId> <artifactId>seata-rm</artifactId> diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md index 64b2df332e..2df25a629d 100644 --- a/changes/en-us/2.x.md +++ b/changes/en-us/2.x.md @@ -6,6 +6,7 @@ Add changes here for all PR submitted to the 2.x branch. - [[#6370](https://github.com/seata/seata/pull/6370)] seata saga decouple spring, optimize architecture. - [[#6205](https://github.com/apache/incubator-seata/pull/6205)] mock server - [[#6169](https://github.com/apache/incubator-seata/pull/6169)] full support for states in the refactored state machine designer +- [[#6230](https://github.com/apache/incubator-seata/pull/6230)] RocketMQ transaction are supported ### bugfix: - [[#6090](https://github.com/apache/incubator-seata/pull/6090)] fix the TCC aspect exception handling process, do not wrapping the internal call exceptions diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md index 4c5d160841..6099f3ae33 100644 --- a/changes/zh-cn/2.x.md +++ b/changes/zh-cn/2.x.md @@ -6,6 +6,7 @@ - [[#6370](https://github.com/seata/seata/pull/6370)] seata saga spring接耦、架构优化。 - [[#6205](https://github.com/apache/incubator-seata/pull/6205)] 提供mock server - [[#6169](https://github.com/apache/incubator-seata/pull/6169)] 支持新版本状态机设计器 +- [[#6230](https://github.com/apache/incubator-seata/pull/6230)] 支持RocketMQ消息事务 ### bugfix: - [[#6090](https://github.com/apache/incubator-seata/pull/6090)] 修复tcc切面异常处理过程,不对内部调用异常做包装处理,直接向外抛出 diff --git a/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java b/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java index 7e6521c5ab..e1c0f11f39 100644 --- a/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java +++ b/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java @@ -1006,4 +1006,9 @@ public interface ConfigurationKeys { * The constant SERVER_APPLICATION_DATA_SIZE_CHECK */ String SERVER_APPLICATION_DATA_SIZE_CHECK = SERVER_PREFIX + "applicationDataLimitCheck"; + + /** + * The constant ROCKET_MQ_MSG_TIMEOUT + */ + String ROCKET_MQ_MSG_TIMEOUT = SERVER_PREFIX + "rocketmqMsgTimeout"; } diff --git a/common/src/main/java/org/apache/seata/common/DefaultValues.java b/common/src/main/java/org/apache/seata/common/DefaultValues.java index 67e21838fb..0c2dd0f7b8 100644 --- a/common/src/main/java/org/apache/seata/common/DefaultValues.java +++ b/common/src/main/java/org/apache/seata/common/DefaultValues.java @@ -312,4 +312,6 @@ public interface DefaultValues { * Default druid location in classpath */ String DRUID_LOCATION = "lib/sqlparser/druid.jar"; + + int DEFAULT_ROCKET_MQ_MSG_TIMEOUT = 60 * 1000; } diff --git a/core/src/main/java/org/apache/seata/core/context/RootContext.java b/core/src/main/java/org/apache/seata/core/context/RootContext.java index eb78a1bf2e..85453afb5d 100644 --- a/core/src/main/java/org/apache/seata/core/context/RootContext.java +++ b/core/src/main/java/org/apache/seata/core/context/RootContext.java @@ -48,6 +48,8 @@ public class RootContext { */ public static final String KEY_XID = "TX_XID"; + public static final String KEY_BRANCHID = "TX_BRANCHID"; + /** * The constant HIDDEN_KEY_XID for sofa-rpc integration. */ diff --git a/core/src/main/java/org/apache/seata/core/model/ResourceManager.java b/core/src/main/java/org/apache/seata/core/model/ResourceManager.java index ab65cdb76e..ee59058965 100644 --- a/core/src/main/java/org/apache/seata/core/model/ResourceManager.java +++ b/core/src/main/java/org/apache/seata/core/model/ResourceManager.java @@ -51,4 +51,13 @@ public interface ResourceManager extends ResourceManagerInbound, ResourceManager * @return The BranchType of ResourceManager. */ BranchType getBranchType(); + + /** + * Get the GlobalStatus. + * + * @param branchType The BranchType of ResourceManager. + * @param xid The xid of transaction. + * @return The GlobalStatus of transaction. + */ + GlobalStatus getGlobalStatus(BranchType branchType, String xid); } diff --git a/dependencies/pom.xml b/dependencies/pom.xml index ad5b15c6ab..b317f642ec 100644 --- a/dependencies/pom.xml +++ b/dependencies/pom.xml @@ -114,6 +114,8 @@ <!-- for jdbc driver when package --> <mysql5.version>${mysql.version}</mysql5.version> <mysql8.version>8.0.27</mysql8.version> + <!-- rocketmq --> + <rocketmq-version>5.0.0</rocketmq-version> <!-- # for kotlin --> <kotlin.version>1.4.32</kotlin.version> @@ -781,6 +783,11 @@ <artifactId>janino</artifactId> <version>${janino-version}</version> </dependency> + <dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-client</artifactId> + <version>${rocketmq-version}</version> + </dependency> <!-- web --> <dependency> diff --git a/pom.xml b/pom.xml index d403ecebc7..a9feab80b9 100644 --- a/pom.xml +++ b/pom.xml @@ -57,6 +57,7 @@ <module>integration/brpc</module> <module>rm</module> <module>rm-datasource</module> + <module>rocketmq</module> <module>spring</module> <module>tcc</module> <module>test</module> diff --git a/rm/src/main/java/org/apache/seata/rm/AbstractResourceManager.java b/rm/src/main/java/org/apache/seata/rm/AbstractResourceManager.java index 1c82579031..fa4e776b11 100644 --- a/rm/src/main/java/org/apache/seata/rm/AbstractResourceManager.java +++ b/rm/src/main/java/org/apache/seata/rm/AbstractResourceManager.java @@ -28,6 +28,7 @@ import org.apache.seata.core.exception.TransactionException; import org.apache.seata.core.exception.TransactionExceptionCode; import org.apache.seata.core.model.BranchStatus; import org.apache.seata.core.model.BranchType; +import org.apache.seata.core.model.GlobalStatus; import org.apache.seata.core.model.Resource; import org.apache.seata.core.model.ResourceManager; import org.apache.seata.core.protocol.ResultCode; @@ -35,6 +36,8 @@ import org.apache.seata.core.protocol.transaction.BranchRegisterRequest; import org.apache.seata.core.protocol.transaction.BranchRegisterResponse; import org.apache.seata.core.protocol.transaction.BranchReportRequest; import org.apache.seata.core.protocol.transaction.BranchReportResponse; +import org.apache.seata.core.protocol.transaction.GlobalStatusRequest; +import org.apache.seata.core.protocol.transaction.GlobalStatusResponse; import org.apache.seata.core.rpc.netty.RmNettyRemotingClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -140,4 +143,16 @@ public abstract class AbstractResourceManager implements ResourceManager { public void registerResource(Resource resource) { RmNettyRemotingClient.getInstance().registerResource(resource.getResourceGroupId(), resource.getResourceId()); } + + @Override + public GlobalStatus getGlobalStatus(BranchType branchType, String xid) { + GlobalStatusRequest queryGlobalStatus = new GlobalStatusRequest(); + queryGlobalStatus.setXid(xid); + try { + GlobalStatusResponse response = (GlobalStatusResponse) RmNettyRemotingClient.getInstance().sendSyncRequest(queryGlobalStatus); + return response.getGlobalStatus(); + } catch (TimeoutException e) { + throw new RuntimeException(e); + } + } } diff --git a/rm/src/main/java/org/apache/seata/rm/DefaultResourceManager.java b/rm/src/main/java/org/apache/seata/rm/DefaultResourceManager.java index 45097b0de0..51bbff569c 100644 --- a/rm/src/main/java/org/apache/seata/rm/DefaultResourceManager.java +++ b/rm/src/main/java/org/apache/seata/rm/DefaultResourceManager.java @@ -27,6 +27,7 @@ import org.apache.seata.common.util.CollectionUtils; import org.apache.seata.core.exception.TransactionException; import org.apache.seata.core.model.BranchStatus; import org.apache.seata.core.model.BranchType; +import org.apache.seata.core.model.GlobalStatus; import org.apache.seata.core.model.Resource; import org.apache.seata.core.model.ResourceManager; @@ -150,6 +151,11 @@ public class DefaultResourceManager implements ResourceManager { throw new FrameworkException("DefaultResourceManager isn't a real ResourceManager"); } + @Override + public GlobalStatus getGlobalStatus(BranchType branchType, String xid) { + return getResourceManager(branchType).getGlobalStatus(branchType, xid); + } + private static class SingletonHolder { private static DefaultResourceManager INSTANCE = new DefaultResourceManager(); } diff --git a/seata-spring-boot-starter/pom.xml b/rocketmq/pom.xml similarity index 55% copy from seata-spring-boot-starter/pom.xml copy to rocketmq/pom.xml index 517923778e..23297aff44 100644 --- a/seata-spring-boot-starter/pom.xml +++ b/rocketmq/pom.xml @@ -24,48 +24,26 @@ <groupId>org.apache.seata</groupId> <artifactId>seata-parent</artifactId> <version>${revision}</version> + <relativePath>../pom.xml</relativePath> </parent> <modelVersion>4.0.0</modelVersion> - <artifactId>seata-spring-boot-starter</artifactId> + <artifactId>seata-rocketmq</artifactId> <packaging>jar</packaging> - <name>seata-spring-boot-starter ${project.version}</name> - <description>spring-boot-starter for Seata built with Maven</description> + <name>seata-rocketmq ${project.version}</name> + <description>rocketmq integration for Seata built with Maven</description> <dependencies> <dependency> <groupId>${project.groupId}</groupId> - <artifactId>seata-spring-autoconfigure-client</artifactId> + <artifactId>seata-tcc</artifactId> <version>${project.version}</version> </dependency> - <!--seata--> <dependency> - <groupId>${project.groupId}</groupId> - <artifactId>seata-all</artifactId> - <version>${project.version}</version> - <exclusions> - <exclusion> - <artifactId>log4j</artifactId> - <groupId>log4j</groupId> - </exclusion> - </exclusions> - </dependency> - <!--spring--> - <dependency> - <groupId>org.springframework</groupId> - <artifactId>spring-webmvc</artifactId> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-client</artifactId> <scope>provided</scope> - <optional>true</optional> - </dependency> - <dependency> - <groupId>org.springframework.boot</groupId> - <artifactId>spring-boot-autoconfigure</artifactId> - <optional>true</optional> - </dependency> - <dependency> - <groupId>org.springframework.boot</groupId> - <artifactId>spring-boot-configuration-processor</artifactId> - <optional>true</optional> </dependency> </dependencies> + </project> diff --git a/rocketmq/src/main/java/org/apache/seata/integration/rocketmq/SeataMQProducer.java b/rocketmq/src/main/java/org/apache/seata/integration/rocketmq/SeataMQProducer.java new file mode 100644 index 0000000000..2846d00073 --- /dev/null +++ b/rocketmq/src/main/java/org/apache/seata/integration/rocketmq/SeataMQProducer.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.integration.rocketmq; + +import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.seata.common.util.StringUtils; +import org.apache.seata.core.context.RootContext; +import org.apache.seata.core.model.GlobalStatus; +import org.apache.seata.rm.DefaultResourceManager; +import org.apache.rocketmq.client.Validators; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.LocalTransactionState; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.TransactionListener; +import org.apache.rocketmq.client.producer.TransactionMQProducer; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageAccessor; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.List; + +/** + * Seata MQ Producer + **/ +public class SeataMQProducer extends TransactionMQProducer { + + private static final Logger LOGGER = LoggerFactory.getLogger(SeataMQProducer.class); + + private static final List<GlobalStatus> COMMIT_STATUSES = Arrays.asList(GlobalStatus.Committed, GlobalStatus.Committing, GlobalStatus.CommitRetrying); + private static final List<GlobalStatus> ROLLBACK_STATUSES = Arrays.asList(GlobalStatus.Rollbacked, GlobalStatus.Rollbacking, GlobalStatus.RollbackRetrying); + + public static String PROPERTY_SEATA_XID = RootContext.KEY_XID; + public static String PROPERTY_SEATA_BRANCHID = RootContext.KEY_BRANCHID; + private TransactionListener transactionListener; + + private TCCRocketMQ tccRocketMQ; + + SeataMQProducer(final String producerGroup) { + this(null, producerGroup, null); + } + + SeataMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) { + super(namespace, producerGroup, rpcHook); + this.transactionListener = new TransactionListener() { + @Override + public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { + return LocalTransactionState.UNKNOW; + } + + @Override + public LocalTransactionState checkLocalTransaction(MessageExt msg) { + String xid = msg.getProperty(PROPERTY_SEATA_XID); + if (StringUtils.isBlank(xid)) { + LOGGER.error("msg has no xid, msgTransactionId: {}, msg will be rollback", msg.getTransactionId()); + return LocalTransactionState.ROLLBACK_MESSAGE; + } + GlobalStatus globalStatus = DefaultResourceManager.get().getGlobalStatus(SeataMQProducerFactory.ROCKET_BRANCH_TYPE, xid); + if (COMMIT_STATUSES.contains(globalStatus)) { + return LocalTransactionState.COMMIT_MESSAGE; + } else if (ROLLBACK_STATUSES.contains(globalStatus) || GlobalStatus.isOnePhaseTimeout(globalStatus)) { + return LocalTransactionState.ROLLBACK_MESSAGE; + } else if (GlobalStatus.Finished.equals(globalStatus)) { + LOGGER.error("global transaction finished, msg will be rollback, xid: {}", xid); + return LocalTransactionState.ROLLBACK_MESSAGE; + } + return LocalTransactionState.UNKNOW; + } + }; + } + + @Override + public SendResult send(Message msg) throws MQClientException, MQBrokerException, RemotingException, InterruptedException { + return send(msg, this.getSendMsgTimeout()); + } + + @Override + public SendResult send(Message msg, long timeout) throws MQClientException, MQBrokerException, RemotingException, InterruptedException { + if (RootContext.inGlobalTransaction()) { + if (tccRocketMQ == null) { + throw new RuntimeException("TCCRocketMQ is not initialized"); + } + return tccRocketMQ.prepare(msg, timeout); + } else { + return super.send(msg, timeout); + } + } + + public SendResult doSendMessageInTransaction(final Message msg, long timeout, String xid, long branchId) throws MQClientException { + msg.setTopic(withNamespace(msg.getTopic())); + if (msg.getDelayTimeLevel() != 0) { + MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL); + } + Validators.checkMessage(msg, this); + + SendResult sendResult = null; + MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); + MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.getProducerGroup()); + MessageAccessor.putProperty(msg, PROPERTY_SEATA_XID, xid); + MessageAccessor.putProperty(msg, PROPERTY_SEATA_BRANCHID, String.valueOf(branchId)); + try { + sendResult = super.send(msg, timeout); + } catch (Exception e) { + throw new MQClientException("send message Exception", e); + } + + if (SendStatus.SEND_OK != sendResult.getSendStatus()) { + throw new RuntimeException("Message send fail.status=" + sendResult.getSendStatus()); + } + if (sendResult.getTransactionId() != null) { + msg.putUserProperty("__transactionId__", sendResult.getTransactionId()); + } + String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); + if (null != transactionId && !"".equals(transactionId)) { + msg.setTransactionId(transactionId); + } + return sendResult; + } + + + @Override + public TransactionListener getTransactionListener() { + return transactionListener; + } + + public void setTccRocketMQ(TCCRocketMQ tccRocketMQ) { + this.tccRocketMQ = tccRocketMQ; + } +} diff --git a/rocketmq/src/main/java/org/apache/seata/integration/rocketmq/SeataMQProducerFactory.java b/rocketmq/src/main/java/org/apache/seata/integration/rocketmq/SeataMQProducerFactory.java new file mode 100644 index 0000000000..63414aa129 --- /dev/null +++ b/rocketmq/src/main/java/org/apache/seata/integration/rocketmq/SeataMQProducerFactory.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.integration.rocketmq; + +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.seata.common.exception.NotSupportYetException; +import org.apache.seata.core.model.BranchType; +import org.apache.seata.integration.tx.api.util.ProxyUtil; + +/** + * SeataMQProducer Factory + **/ +public class SeataMQProducerFactory { + + public static final String ROCKET_TCC_NAME = "tccRocketMQ"; + public static final BranchType ROCKET_BRANCH_TYPE = BranchType.TCC; + + /** + * Default Producer, it can be replaced to Map after multi-resource is supported + */ + private static SeataMQProducer defaultProducer; + + public static SeataMQProducer createSingle(String nameServer, String producerGroup) throws MQClientException { + return createSingle(nameServer, null, producerGroup, null); + } + + public static SeataMQProducer createSingle(String nameServer, String namespace, + String groupName, RPCHook rpcHook) throws MQClientException { + if (defaultProducer == null) { + synchronized (SeataMQProducerFactory.class) { + if (defaultProducer == null) { + defaultProducer = new SeataMQProducer(namespace, groupName, rpcHook); + defaultProducer.setNamesrvAddr(nameServer); + TCCRocketMQ tccRocketMQProxy = ProxyUtil.createProxy(new TCCRocketMQImpl()); + tccRocketMQProxy.setProducer(defaultProducer); + defaultProducer.setTccRocketMQ(tccRocketMQProxy); + defaultProducer.start(); + return defaultProducer; + } + } + } + throw new NotSupportYetException("only one seata producer is permitted"); + } + + public static SeataMQProducer getProducer() { + return defaultProducer; + } +} diff --git a/rocketmq/src/main/java/org/apache/seata/integration/rocketmq/TCCRocketMQ.java b/rocketmq/src/main/java/org/apache/seata/integration/rocketmq/TCCRocketMQ.java new file mode 100644 index 0000000000..44ea5dfac3 --- /dev/null +++ b/rocketmq/src/main/java/org/apache/seata/integration/rocketmq/TCCRocketMQ.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.integration.rocketmq; + +import org.apache.seata.core.exception.TransactionException; +import org.apache.seata.rm.tcc.api.BusinessActionContext; + +import java.net.UnknownHostException; +import java.util.concurrent.TimeoutException; + +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.remoting.exception.RemotingException; + +/** + * The interface Tcc rocket mq. + */ +public interface TCCRocketMQ { + + /** + * set SeataMQProducer + * + * @param producer the producer + */ + void setProducer(SeataMQProducer producer); + + /** + * RocketMQ half send + * + * @param message the message + * @param timeout the timeout + * @return SendResult + */ + SendResult prepare(Message message, long timeout) throws MQClientException; + + /** + * RocketMQ half send commit + * + * @param context the BusinessActionContext + * @return SendResult + * @throws UnknownHostException + * @throws MQBrokerException + * @throws RemotingException + * @throws InterruptedException + */ + boolean commit(BusinessActionContext context) + throws UnknownHostException, MQBrokerException, RemotingException, InterruptedException, TransactionException, TimeoutException; + + /** + * RocketMQ half send rollback + * + * @param context the BusinessActionContext + * @return + * @throws UnknownHostException + * @throws MQBrokerException + * @throws RemotingException + * @throws InterruptedException + */ + boolean rollback(BusinessActionContext context) + throws UnknownHostException, MQBrokerException, RemotingException, InterruptedException, TransactionException; + + +} \ No newline at end of file diff --git a/rocketmq/src/main/java/org/apache/seata/integration/rocketmq/TCCRocketMQImpl.java b/rocketmq/src/main/java/org/apache/seata/integration/rocketmq/TCCRocketMQImpl.java new file mode 100644 index 0000000000..bd1a797203 --- /dev/null +++ b/rocketmq/src/main/java/org/apache/seata/integration/rocketmq/TCCRocketMQImpl.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.integration.rocketmq; + +import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; +import org.apache.seata.core.exception.TransactionException; +import org.apache.seata.rm.tcc.api.BusinessActionContext; +import org.apache.seata.rm.tcc.api.BusinessActionContextUtil; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.LocalTransactionState; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.seata.rm.tcc.api.LocalTCC; +import org.apache.seata.rm.tcc.api.TwoPhaseBusinessAction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeoutException; + +/** + * the type TCCRocketMQImpl + */ +@LocalTCC +public class TCCRocketMQImpl implements TCCRocketMQ { + private static final Logger LOGGER = LoggerFactory.getLogger(TCCRocketMQImpl.class); + private static final String ROCKET_MSG_KEY = "ROCKET_MSG"; + private static final String ROCKET_SEND_RESULT_KEY = "ROCKET_SEND_RESULT"; + + private SeataMQProducer producer; + private DefaultMQProducerImpl producerImpl; + + @Override + public void setProducer(SeataMQProducer producer) { + this.producer = producer; + this.producerImpl = producer.getDefaultMQProducerImpl(); + } + + @Override + @TwoPhaseBusinessAction(name = SeataMQProducerFactory.ROCKET_TCC_NAME) + public SendResult prepare(Message message, long timeout) throws MQClientException { + BusinessActionContext context = BusinessActionContextUtil.getContext(); + LOGGER.info("RocketMQ message send prepare, xid = {}", context.getXid()); + Map<String, Object> params = new HashMap<>(8); + SendResult sendResult = producer.doSendMessageInTransaction(message, timeout, context.getXid(), context.getBranchId()); + message.setDeliverTimeMs(0); + params.put(ROCKET_MSG_KEY, message); + params.put(ROCKET_SEND_RESULT_KEY, sendResult); + BusinessActionContextUtil.addContext(params); + return sendResult; + } + + @Override + public boolean commit(BusinessActionContext context) + throws UnknownHostException, MQBrokerException, RemotingException, InterruptedException, TimeoutException, TransactionException { + Message message = context.getActionContext(ROCKET_MSG_KEY, Message.class); + SendResult sendResult = context.getActionContext(ROCKET_SEND_RESULT_KEY, SendResult.class); + if (message == null || sendResult == null) { + throw new TransactionException("TCCRocketMQ commit but cannot find message and sendResult"); + } + this.producerImpl.endTransaction(message, sendResult, LocalTransactionState.COMMIT_MESSAGE, null); + LOGGER.info("RocketMQ message send commit, xid = {}, branchId = {}", context.getXid(), context.getBranchId()); + return true; + } + + @Override + public boolean rollback(BusinessActionContext context) + throws UnknownHostException, MQBrokerException, RemotingException, InterruptedException, TransactionException { + Message message = context.getActionContext(ROCKET_MSG_KEY, Message.class); + SendResult sendResult = context.getActionContext(ROCKET_SEND_RESULT_KEY, SendResult.class); + if (message == null || sendResult == null) { + LOGGER.error("TCCRocketMQ rollback but cannot find message and sendResult"); + } + this.producerImpl.endTransaction(message, sendResult, LocalTransactionState.ROLLBACK_MESSAGE, null); + LOGGER.info("RocketMQ message send rollback, xid = {}, branchId = {}", context.getXid(), context.getBranchId()); + return true; + } +} \ No newline at end of file diff --git a/rocketmq/src/test/java/org/apache/seata/integration/rocketmq/SeataMQProducerFactoryTest.java b/rocketmq/src/test/java/org/apache/seata/integration/rocketmq/SeataMQProducerFactoryTest.java new file mode 100644 index 0000000000..975bb885d8 --- /dev/null +++ b/rocketmq/src/test/java/org/apache/seata/integration/rocketmq/SeataMQProducerFactoryTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.integration.rocketmq; + +import org.apache.seata.common.exception.NotSupportYetException; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +/** + * seata mq producer factory test + **/ +public class SeataMQProducerFactoryTest { + + @Test + public void testCreateSingle() throws Exception { + SeataMQProducerFactory.createSingle("127.0.0.1:9876", "test"); + Assertions.assertThrows(NotSupportYetException.class, () -> SeataMQProducerFactory.createSingle("127.0.0.1:9876", "test")); + + SeataMQProducer producer = SeataMQProducerFactory.getProducer(); + Assertions.assertNotNull(producer); + } +} diff --git a/rocketmq/src/test/java/org/apache/seata/integration/rocketmq/SeataMQProducerTest.java b/rocketmq/src/test/java/org/apache/seata/integration/rocketmq/SeataMQProducerTest.java new file mode 100644 index 0000000000..7b8ab979d5 --- /dev/null +++ b/rocketmq/src/test/java/org/apache/seata/integration/rocketmq/SeataMQProducerTest.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.integration.rocketmq; + +import org.junit.jupiter.api.Test; + +/** + * seata mq producer test + **/ +public class SeataMQProducerTest { + + @Test + public void testCreate(){ + new SeataMQProducer("testProducerGroup"); + new SeataMQProducer("testNamespace", "testProducerGroup",null); + } +} diff --git a/rocketmq/src/test/resources/file.conf b/rocketmq/src/test/resources/file.conf new file mode 100644 index 0000000000..46c3e0401c --- /dev/null +++ b/rocketmq/src/test/resources/file.conf @@ -0,0 +1,25 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +service { + #transaction service group mapping + vgroupMapping.default_tx_group = "default" + #only support when registry.type=file, please don't set multiple addresses + default.grouplist = "127.0.0.1:8091" + #disable seata + disableGlobalTransaction = false +} \ No newline at end of file diff --git a/rocketmq/src/test/resources/registry.conf b/rocketmq/src/test/resources/registry.conf new file mode 100644 index 0000000000..5ad014bf55 --- /dev/null +++ b/rocketmq/src/test/resources/registry.conf @@ -0,0 +1,34 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +registry { + # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa + type = "file" + + file { + name = "file.conf" + } +} + +config { + # file、nacos 、apollo、zk、consul、etcd3 + type = "file" + + file { + name = "file.conf" + } +} \ No newline at end of file diff --git a/seata-spring-boot-starter/pom.xml b/seata-spring-boot-starter/pom.xml index 517923778e..24f449166b 100644 --- a/seata-spring-boot-starter/pom.xml +++ b/seata-spring-boot-starter/pom.xml @@ -66,6 +66,11 @@ <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency> + <dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-client</artifactId> + <optional>true</optional> + </dependency> </dependencies> </project> diff --git a/tcc/src/test/java/org/apache/seata/rm/tcc/interceptor/ProxyUtilsTccTest.java b/tcc/src/test/java/org/apache/seata/rm/tcc/interceptor/ProxyUtilsTccTest.java index 48416d90e9..9a243733e9 100644 --- a/tcc/src/test/java/org/apache/seata/rm/tcc/interceptor/ProxyUtilsTccTest.java +++ b/tcc/src/test/java/org/apache/seata/rm/tcc/interceptor/ProxyUtilsTccTest.java @@ -25,6 +25,7 @@ import org.apache.seata.core.context.RootContext; import org.apache.seata.core.exception.TransactionException; import org.apache.seata.core.model.BranchStatus; import org.apache.seata.core.model.BranchType; +import org.apache.seata.core.model.GlobalStatus; import org.apache.seata.core.model.Resource; import org.apache.seata.core.model.ResourceManager; import org.apache.seata.integration.tx.api.util.ProxyUtil; @@ -90,6 +91,13 @@ public class ProxyUtilsTccTest { public BranchType getBranchType() { return null; } + + @Override + public GlobalStatus getGlobalStatus(BranchType branchType, String xid) { + return null; + } + + }; diff --git a/test-mock-server/src/main/java/org/apache/seata/mockserver/MockCoordinator.java b/test-mock-server/src/main/java/org/apache/seata/mockserver/MockCoordinator.java index 727d32e824..eba654fa48 100644 --- a/test-mock-server/src/main/java/org/apache/seata/mockserver/MockCoordinator.java +++ b/test-mock-server/src/main/java/org/apache/seata/mockserver/MockCoordinator.java @@ -254,7 +254,7 @@ public class MockCoordinator extends AbstractTCInboundHandler implements Transac } private void checkMockActionFail(String xid) throws TransactionException { - if (expectedResultMap.get(xid) == ResultCode.Failed) { + if (ResultCode.Failed == expectedResultMap.get(xid)) { throw new TransactionException(TransactionExceptionCode.Broken, "mock action expect fail"); } } diff --git a/test/pom.xml b/test/pom.xml index e9d638fe38..91d4c9f60f 100644 --- a/test/pom.xml +++ b/test/pom.xml @@ -141,6 +141,17 @@ <artifactId>seata-sqlparser-druid</artifactId> <version>${project.version}</version> </dependency> + + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>seata-rocketmq</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-client</artifactId> + <scope>test</scope> + </dependency> </dependencies> diff --git a/test/src/test/java/org/apache/seata/integration/rocketmq/SeataMQProducerSendTest.java b/test/src/test/java/org/apache/seata/integration/rocketmq/SeataMQProducerSendTest.java new file mode 100644 index 0000000000..e1e778f7c3 --- /dev/null +++ b/test/src/test/java/org/apache/seata/integration/rocketmq/SeataMQProducerSendTest.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.integration.rocketmq; + +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.MQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.seata.common.ConfigurationKeys; +import org.apache.seata.common.ConfigurationTestHelper; +import org.apache.seata.core.context.RootContext; +import org.apache.seata.core.exception.TransactionException; +import org.apache.seata.core.model.TransactionManager; +import org.apache.seata.core.rpc.netty.mockserver.ProtocolTestConstants; +import org.apache.seata.core.rpc.netty.mockserver.TmClientTest; +import org.apache.seata.mockserver.MockServer; +import org.apache.seata.rm.RMClient; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.StandardCharsets; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * seata mq producer test + **/ +public class SeataMQProducerSendTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(SeataMQProducerSendTest.class); + + + private static final String TOPIC = "seata-test"; + private static final String NAME_SERVER = "127.0.0.1:9876"; + + private static SeataMQProducer producer; + + @BeforeAll + public static void before() throws MQClientException { + ConfigurationTestHelper.putConfig(ConfigurationKeys.SERVER_SERVICE_PORT_CAMEL, String.valueOf(ProtocolTestConstants.MOCK_SERVER_PORT)); + MockServer.start(ProtocolTestConstants.MOCK_SERVER_PORT); + producer = SeataMQProducerFactory.createSingle(NAME_SERVER, "test"); + // should start mq server here + } + + @AfterAll + public static void after() { + MockServer.close(); + ConfigurationTestHelper.removeConfig(ConfigurationKeys.SERVER_SERVICE_PORT_CAMEL); + producer.shutdown(); + } + + @Test + @Disabled + public void testSendCommit() throws MQBrokerException, RemotingException, InterruptedException, MQClientException, TransactionException { + TransactionManager tm = getTmAndBegin(); + + CountDownLatch countDownLatch = new CountDownLatch(1); + MQPushConsumer consumer = startConsume(countDownLatch); + producer.send(new Message(TOPIC, "testMessage".getBytes(StandardCharsets.UTF_8))); + + tm.commit(RootContext.getXID()); + LOGGER.info("global commit"); + boolean await = countDownLatch.await(2, TimeUnit.SECONDS); + LOGGER.info("await:{}", await); + consumer.shutdown(); + } + + @Test + @Disabled + public void testSendRollback() + throws MQBrokerException, RemotingException, InterruptedException, MQClientException, TransactionException { + TransactionManager tm = getTmAndBegin(); + + CountDownLatch countDownLatch = new CountDownLatch(1); + MQPushConsumer consumer = startConsume(countDownLatch); + + producer.send(new Message(TOPIC, "testMessage".getBytes(StandardCharsets.UTF_8))); + + tm.rollback(RootContext.getXID()); + LOGGER.info("global rollback"); + try { + boolean await = countDownLatch.await(2, TimeUnit.SECONDS); + LOGGER.info("await:{}", await); + } catch (Exception e) { + Assertions.assertEquals(e.getClass(), InterruptedException.class); + } finally { + consumer.shutdown(); + } + } + + + private static MQPushConsumer startConsume(CountDownLatch countDownLatch) throws MQClientException { + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("yourGroup"); + consumer.setNamesrvAddr(NAME_SERVER); + consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); + consumer.subscribe(TOPIC,"*"); + consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> { + LOGGER.info("%s Receive New Messages: {} {}", Thread.currentThread().getName(), msg); + countDownLatch.countDown(); + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + }); + consumer.start(); + return consumer; + } + + + private static TransactionManager getTmAndBegin() throws TransactionException { + TransactionManager tm = TmClientTest.getTm(); + RMClient.init(ProtocolTestConstants.APPLICATION_ID, ProtocolTestConstants.SERVICE_GROUP); + String xid = tm.begin(ProtocolTestConstants.APPLICATION_ID, ProtocolTestConstants.SERVICE_GROUP, "testRocket", 60000); + RootContext.bind(xid); + return tm; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@seata.apache.org For additional commands, e-mail: notifications-h...@seata.apache.org