This is an automated email from the ASF dual-hosted git repository.
lujingshang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new d724f54 add xa handler (#14130)
d724f54 is described below
commit d724f54c9f73ee6048453b85a730fa4a946d4d39
Author: roodkcab <[email protected]>
AuthorDate: Tue Dec 21 11:42:28 2021 +0800
add xa handler (#14130)
---
.../features/transaction/use-norms/xa.cn.md | 97 +++++++++++++++++++++
.../features/transaction/use-norms/xa.en.md | 98 ++++++++++++++++++++++
.../transaction/core/TransactionType.java | 4 +-
.../session/transaction/TransactionStatus.java | 23 ++++-
.../TransactionBackendHandlerFactory.java | 4 +
.../text/transaction/TransactionXAHandler.java | 95 +++++++++++++++++++++
6 files changed, 318 insertions(+), 3 deletions(-)
diff --git a/docs/document/content/features/transaction/use-norms/xa.cn.md
b/docs/document/content/features/transaction/use-norms/xa.cn.md
index e07ac8c..a3ab055 100644
--- a/docs/document/content/features/transaction/use-norms/xa.cn.md
+++ b/docs/document/content/features/transaction/use-norms/xa.cn.md
@@ -13,3 +13,100 @@ weight = 2
## 不支持项
* 服务宕机后,在其它机器上恢复提交/回滚中的数据。
+
+## 通过 XA 语句控制的分布式事务
+* 通过 XA START 可以手动开启 XA 事务,注意该事务完全由用户管理,ShardingSphere 只负责将语句转发至后端数据库;
+* 服务宕机后,需要通过 XA RECOVER 获取未提交或回滚的事务,也可以在 COMMIT 时使用 ONE PHASE 跳过 PERPARE。
+
+MySQL [(none)]> use test1
│MySQL [(none)]> use test2
+Reading table information for completion of table and column names
│Reading table information for completion of table and
column names
+You can turn off this feature to get a quicker startup with -A
│You can turn off this feature to get a quicker
startup with -A
+
│
+Database changed
│Database changed
+MySQL [test1]> XA START '61c052438d3eb';
│MySQL [test2]> XA START '61c0524390927';
+Query OK, 0 rows affected (0.030 sec)
│Query OK, 0 rows affected (0.009 sec)
+
│
+MySQL [test1]> update test set val = 'xatest1' where id = 1;
│MySQL [test2]> update test set val = 'xatest2' where
id = 1;
+Query OK, 1 row affected (0.077 sec)
│Query OK, 1 row affected (0.010 sec)
+
│
+MySQL [test1]> XA END '61c052438d3eb';
│MySQL [test2]> XA END '61c0524390927';
+Query OK, 0 rows affected (0.006 sec)
│Query OK, 0 rows affected (0.008 sec)
+
│
+MySQL [test1]> XA PREPARE '61c052438d3eb';
│MySQL [test2]> XA PREPARE '61c0524390927';
+Query OK, 0 rows affected (0.018 sec)
│Query OK, 0 rows affected (0.011 sec)
+
│
+MySQL [test1]> XA COMMIT '61c052438d3eb';
│MySQL [test2]> XA COMMIT '61c0524390927';
+Query OK, 0 rows affected (0.011 sec)
│Query OK, 0 rows affected (0.018 sec)
+
│
+MySQL [test1]> select * from test where id = 1;
│MySQL [test2]> select * from test where id = 1;
++----+---------+
│+----+---------+
+| id | val |
│| id | val |
++----+---------+
│+----+---------+
+| 1 | xatest1 |
│| 1 | xatest2 |
++----+---------+
│+----+---------+
+1 row in set (0.016 sec)
│1 row in set (0.129 sec)
+
+MySQL [test1]> XA START '61c05243994c3';
│MySQL [test2]> XA START '61c052439bd7b';
+Query OK, 0 rows affected (0.047 sec)
│Query OK, 0 rows affected (0.006 sec)
+
│
+MySQL [test1]> update test set val = 'xarollback' where id = 1;
│MySQL [test2]> update test set val = 'xarollback'
where id = 1;
+Query OK, 1 row affected (0.175 sec)
│Query OK, 1 row affected (0.008 sec)
+
│
+MySQL [test1]> XA END '61c05243994c3';
│MySQL [test2]> XA END '61c052439bd7b';
+Query OK, 0 rows affected (0.007 sec)
│Query OK, 0 rows affected (0.014 sec)
+
│
+MySQL [test1]> XA PREPARE '61c05243994c3';
│MySQL [test2]> XA PREPARE '61c052439bd7b';
+Query OK, 0 rows affected (0.013 sec)
│Query OK, 0 rows affected (0.019 sec)
+
│
+MySQL [test1]> XA ROLLBACK '61c05243994c3';
│MySQL [test2]> XA ROLLBACK '61c052439bd7b';
+Query OK, 0 rows affected (0.010 sec)
│Query OK, 0 rows affected (0.010 sec)
+
│
+MySQL [test1]> select * from test where id = 1;
│MySQL [test2]> select * from test where id = 1;
++----+---------+
│+----+---------+
+| id | val |
│| id | val |
++----+---------+
│+----+---------+
+| 1 | xatest1 |
│| 1 | xatest2 |
++----+---------+
│+----+---------+
+1 row in set (0.009 sec)
│1 row in set (0.083 sec)
+
+MySQL [test1]> XA START '61c052438d3eb';
+Query OK, 0 rows affected (0.030 sec)
+
+MySQL [test1]> update test set val = 'recover' where id = 1;
+Query OK, 1 row affected (0.072 sec)
+
+MySQL [test1]> select * from test where id = 1;
++----+---------+
+| id | val |
++----+---------+
+| 1 | recover |
++----+---------+
+1 row in set (0.039 sec)
+
+MySQL [test1]> XA END '61c052438d3eb';
+Query OK, 0 rows affected (0.005 sec)
+
+MySQL [test1]> XA PREPARE '61c052438d3eb';
+Query OK, 0 rows affected (0.020 sec)
+
+MySQL [test1]> XA RECOVER;
++----------+--------------+--------------+---------------+
+| formatID | gtrid_length | bqual_length | data |
++----------+--------------+--------------+---------------+
+| 1 | 13 | 0 | 61c052438d3eb |
++----------+--------------+--------------+---------------+
+1 row in set (0.010 sec)
+
+MySQL [test1]> XA RECOVER CONVERT XID;
++----------+--------------+--------------+------------------------------+
+| formatID | gtrid_length | bqual_length | data |
++----------+--------------+--------------+------------------------------+
+| 1 | 13 | 0 | 0x36316330353234333864336562 |
++----------+--------------+--------------+------------------------------+
+1 row in set (0.011 sec)
+
+MySQL [test1]> XA COMMIT 0x36316330353234333864336562;
+Query OK, 0 rows affected (0.029 sec)
+
+MySQL [test1]> XA RECOVER;
+Empty set (0.011 sec)
diff --git a/docs/document/content/features/transaction/use-norms/xa.en.md
b/docs/document/content/features/transaction/use-norms/xa.en.md
index 30384c5..17a0a20 100644
--- a/docs/document/content/features/transaction/use-norms/xa.en.md
+++ b/docs/document/content/features/transaction/use-norms/xa.en.md
@@ -13,3 +13,101 @@ weight = 2
## Unsupported
* Recover committing and rolling back in other machines after the service is
down.
+
+## XA Transaction managed by XA Statement
+
+* When using XA START to open a XA Transaction, ShardingSphere will pass it to
backend database directly, you have to manage this transaction by yourself;
+* When recover from a crush, you have to call XA RECOVER to check unfinished
transaction, and choose to commit or rollback using xid. Or you can use ONE
PHASE commit without PREPARE.
+
+MySQL [(none)]> use test1
│MySQL [(none)]> use test2
+Reading table information for completion of table and column names
│Reading table information for completion of table and
column names
+You can turn off this feature to get a quicker startup with -A
│You can turn off this feature to get a quicker
startup with -A
+
│
+Database changed
│Database changed
+MySQL [test1]> XA START '61c052438d3eb';
│MySQL [test2]> XA START '61c0524390927';
+Query OK, 0 rows affected (0.030 sec)
│Query OK, 0 rows affected (0.009 sec)
+
│
+MySQL [test1]> update test set val = 'xatest1' where id = 1;
│MySQL [test2]> update test set val = 'xatest2' where
id = 1;
+Query OK, 1 row affected (0.077 sec)
│Query OK, 1 row affected (0.010 sec)
+
│
+MySQL [test1]> XA END '61c052438d3eb';
│MySQL [test2]> XA END '61c0524390927';
+Query OK, 0 rows affected (0.006 sec)
│Query OK, 0 rows affected (0.008 sec)
+
│
+MySQL [test1]> XA PREPARE '61c052438d3eb';
│MySQL [test2]> XA PREPARE '61c0524390927';
+Query OK, 0 rows affected (0.018 sec)
│Query OK, 0 rows affected (0.011 sec)
+
│
+MySQL [test1]> XA COMMIT '61c052438d3eb';
│MySQL [test2]> XA COMMIT '61c0524390927';
+Query OK, 0 rows affected (0.011 sec)
│Query OK, 0 rows affected (0.018 sec)
+
│
+MySQL [test1]> select * from test where id = 1;
│MySQL [test2]> select * from test where id = 1;
++----+---------+
│+----+---------+
+| id | val |
│| id | val |
++----+---------+
│+----+---------+
+| 1 | xatest1 |
│| 1 | xatest2 |
++----+---------+
│+----+---------+
+1 row in set (0.016 sec)
│1 row in set (0.129 sec)
+
+MySQL [test1]> XA START '61c05243994c3';
│MySQL [test2]> XA START '61c052439bd7b';
+Query OK, 0 rows affected (0.047 sec)
│Query OK, 0 rows affected (0.006 sec)
+
│
+MySQL [test1]> update test set val = 'xarollback' where id = 1;
│MySQL [test2]> update test set val = 'xarollback'
where id = 1;
+Query OK, 1 row affected (0.175 sec)
│Query OK, 1 row affected (0.008 sec)
+
│
+MySQL [test1]> XA END '61c05243994c3';
│MySQL [test2]> XA END '61c052439bd7b';
+Query OK, 0 rows affected (0.007 sec)
│Query OK, 0 rows affected (0.014 sec)
+
│
+MySQL [test1]> XA PREPARE '61c05243994c3';
│MySQL [test2]> XA PREPARE '61c052439bd7b';
+Query OK, 0 rows affected (0.013 sec)
│Query OK, 0 rows affected (0.019 sec)
+
│
+MySQL [test1]> XA ROLLBACK '61c05243994c3';
│MySQL [test2]> XA ROLLBACK '61c052439bd7b';
+Query OK, 0 rows affected (0.010 sec)
│Query OK, 0 rows affected (0.010 sec)
+
│
+MySQL [test1]> select * from test where id = 1;
│MySQL [test2]> select * from test where id = 1;
++----+---------+
│+----+---------+
+| id | val |
│| id | val |
++----+---------+
│+----+---------+
+| 1 | xatest1 |
│| 1 | xatest2 |
++----+---------+
│+----+---------+
+1 row in set (0.009 sec)
│1 row in set (0.083 sec)
+
+MySQL [test1]> XA START '61c052438d3eb';
+Query OK, 0 rows affected (0.030 sec)
+
+MySQL [test1]> update test set val = 'recover' where id = 1;
+Query OK, 1 row affected (0.072 sec)
+
+MySQL [test1]> select * from test where id = 1;
++----+---------+
+| id | val |
++----+---------+
+| 1 | recover |
++----+---------+
+1 row in set (0.039 sec)
+
+MySQL [test1]> XA END '61c052438d3eb';
+Query OK, 0 rows affected (0.005 sec)
+
+MySQL [test1]> XA PREPARE '61c052438d3eb';
+Query OK, 0 rows affected (0.020 sec)
+
+MySQL [test1]> XA RECOVER;
++----------+--------------+--------------+---------------+
+| formatID | gtrid_length | bqual_length | data |
++----------+--------------+--------------+---------------+
+| 1 | 13 | 0 | 61c052438d3eb |
++----------+--------------+--------------+---------------+
+1 row in set (0.010 sec)
+
+MySQL [test1]> XA RECOVER CONVERT XID;
++----------+--------------+--------------+------------------------------+
+| formatID | gtrid_length | bqual_length | data |
++----------+--------------+--------------+------------------------------+
+| 1 | 13 | 0 | 0x36316330353234333864336562 |
++----------+--------------+--------------+------------------------------+
+1 row in set (0.011 sec)
+
+MySQL [test1]> XA COMMIT 0x36316330353234333864336562;
+Query OK, 0 rows affected (0.029 sec)
+
+MySQL [test1]> XA RECOVER;
+Empty set (0.011 sec)
diff --git
a/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/main/java/org/apache/shardingsphere/transaction/core/TransactionType.java
b/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/main/java/org/apache/shardingsphere/transaction/core/TransactionType.java
index c309097..29f8c6d 100644
---
a/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/main/java/org/apache/shardingsphere/transaction/core/TransactionType.java
+++
b/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/main/java/org/apache/shardingsphere/transaction/core/TransactionType.java
@@ -22,7 +22,7 @@ package org.apache.shardingsphere.transaction.core;
*/
public enum TransactionType {
- LOCAL, XA, BASE;
+ LOCAL, XA, BASE, MANUALXA;
/**
* Judge whether distributed transaction.
@@ -31,6 +31,6 @@ public enum TransactionType {
* @return is distributed transaction or not
*/
public static boolean isDistributedTransaction(final TransactionType
transactionType) {
- return XA == transactionType || BASE == transactionType;
+ return XA == transactionType || BASE == transactionType || MANUALXA ==
transactionType;
}
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/session/transaction/TransactionStatus.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/session/transaction/TransactionStatus.java
index 73d36b4..4659be3 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/session/transaction/TransactionStatus.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/session/transaction/TransactionStatus.java
@@ -36,12 +36,33 @@ public final class TransactionStatus {
private volatile boolean inTransaction;
private volatile TransactionType transactionType;
+
+ @Setter
+ private volatile boolean manualXA;
public TransactionStatus(final TransactionType initialTransactionType) {
transactionType = initialTransactionType;
}
/**
+ * Get current transaction type of this session.
+ *
+ * @return MANUALXA when in manual xa transaction or predefined
transaction type if not
+ */
+ public TransactionType getTransactionType() {
+ return manualXA ? TransactionType.MANUALXA : transactionType;
+ }
+
+ /**
+ * Check there's any transaction on this session.
+ *
+ * @return is in transaction or in manual xa transaction
+ */
+ public boolean isInTransaction() {
+ return inTransaction || manualXA;
+ }
+
+ /**
* Change transaction type of current channel.
*
* @param transactionType transaction type
@@ -59,6 +80,6 @@ public final class TransactionStatus {
* @return is in connection held transaction or not
*/
public boolean isInConnectionHeldTransaction() {
- return inTransaction && TransactionType.BASE != transactionType;
+ return isInTransaction() && TransactionType.BASE !=
getTransactionType();
}
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/transaction/TransactionBackendHandlerFactory.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/transaction/TransactionBackendHandlerFactory.java
index 7fa8da6..5fd9b74 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/transaction/TransactionBackendHandlerFactory.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/transaction/TransactionBackendHandlerFactory.java
@@ -31,6 +31,7 @@ import
org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.RollbackToS
import
org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.SavepointStatement;
import
org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.SetAutoCommitStatement;
import
org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.TCLStatement;
+import
org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.XAStatement;
import org.apache.shardingsphere.transaction.core.TransactionOperationType;
/**
@@ -70,6 +71,9 @@ public final class TransactionBackendHandlerFactory {
if (tclStatement instanceof RollbackStatement) {
return new TransactionBackendHandler(tclStatement,
TransactionOperationType.ROLLBACK, connectionSession);
}
+ if (tclStatement instanceof XAStatement) {
+ return new TransactionXAHandler(sqlStatementContext, sql,
connectionSession);
+ }
return new BroadcastDatabaseBackendHandler(sqlStatementContext, sql,
connectionSession);
}
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/transaction/TransactionXAHandler.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/transaction/TransactionXAHandler.java
new file mode 100644
index 0000000..a6063c9
--- /dev/null
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/transaction/TransactionXAHandler.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.proxy.backend.text.transaction;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
+import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
+import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
+import org.apache.shardingsphere.proxy.backend.text.TextProtocolBackendHandler;
+import
org.apache.shardingsphere.proxy.backend.text.data.impl.SchemaAssignedDatabaseBackendHandler;
+import
org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.TCLStatement;
+import
org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.XAStatement;
+import org.apache.shardingsphere.transaction.TransactionHolder;
+
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * XA transaction handler.
+ * TODO Currently XA transaction started with `XA START` doesn't support for
database with multiple datasource, a flag should be added for this both in init
progress and add datasource from distSQL.
+ */
+@RequiredArgsConstructor
+public final class TransactionXAHandler implements TextProtocolBackendHandler {
+
+ private final XAStatement tclStatement;
+
+ private final ConnectionSession connectionSession;
+
+ private final SchemaAssignedDatabaseBackendHandler backendHandler;
+
+ public TransactionXAHandler(final SQLStatementContext<? extends
TCLStatement> sqlStatementContext, final String sql, final ConnectionSession
connectionSession) {
+ this.tclStatement = (XAStatement)
sqlStatementContext.getSqlStatement();
+ this.connectionSession = connectionSession;
+ this.backendHandler = new
SchemaAssignedDatabaseBackendHandler(sqlStatementContext, sql,
connectionSession);
+ }
+
+ @Override
+ public boolean next() throws SQLException {
+ return this.tclStatement.getOp().equals("RECOVER") &&
this.backendHandler.next();
+ }
+
+ @Override
+ public Collection<Object> getRowData() throws SQLException {
+ return this.tclStatement.getOp().equals("RECOVER") ?
this.backendHandler.getRowData() : Collections.emptyList();
+ }
+
+ @Override
+ public ResponseHeader execute() throws SQLException {
+ switch (tclStatement.getOp()) {
+ case "START":
+ case "BEGIN":
+ /**
+ * we have to let session occupy the thread when doing xa
transaction.
+ * according to
https://dev.mysql.com/doc/refman/5.7/en/xa-states.html XA and local
transactions are mutually exclusive
+ */
+ if
(connectionSession.getTransactionStatus().isInTransaction()) {
+ throw new SQLException("can not start in a Active
transaction");
+ }
+ ResponseHeader header = backendHandler.execute();
+ TransactionHolder.setInTransaction();
+ connectionSession.getTransactionStatus().setManualXA(true);
+ return header;
+ case "END":
+ case "PREPARE":
+ case "RECOVER":
+ return backendHandler.execute();
+ case "COMMIT":
+ case "ROLLBACK":
+ try {
+ return backendHandler.execute();
+ } finally {
+
connectionSession.getTransactionStatus().setManualXA(false);
+ TransactionHolder.clear();
+ }
+ default:
+ throw new SQLException("unrecognized XA statement " +
tclStatement.getOp());
+ }
+ }
+}