This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new a2d9dad8b7b Automatically start a distributed transaction if
needed(#20234) (#22029)
a2d9dad8b7b is described below
commit a2d9dad8b7bd718361d67b507fe292980dd09983
Author: ZhangCheng <[email protected]>
AuthorDate: Sat Nov 12 14:09:32 2022 +0800
Automatically start a distributed transaction if needed(#20234) (#22029)
* Automatically start a xa transaction if needed
* Fix
* Move logic to ProxySQLExecutor
* Fix
* Add base support
* Rename method name
---
.../backend/communication/ProxySQLExecutor.java | 38 +++++++++++++++++++---
1 file changed, 33 insertions(+), 5 deletions(-)
diff --git
a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
index 84698306bac..29995964113 100644
---
a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
+++
b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.proxy.backend.communication;
+import org.apache.shardingsphere.dialect.SQLExceptionTransformEngine;
import
org.apache.shardingsphere.dialect.exception.transaction.TableModifyInTransactionException;
import org.apache.shardingsphere.infra.binder.type.TableAvailable;
import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
@@ -43,6 +44,7 @@ import
org.apache.shardingsphere.proxy.backend.communication.jdbc.JDBCDatabaseCo
import
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.JDBCBackendConnection;
import
org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.ProxyJDBCExecutor;
import
org.apache.shardingsphere.proxy.backend.communication.jdbc.statement.JDBCBackendStatement;
+import
org.apache.shardingsphere.proxy.backend.communication.jdbc.transaction.JDBCBackendTransactionManager;
import org.apache.shardingsphere.proxy.backend.context.BackendExecutorContext;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import
org.apache.shardingsphere.proxy.backend.session.transaction.TransactionStatus;
@@ -52,6 +54,8 @@ import
org.apache.shardingsphere.sql.parser.sql.common.statement.ddl.DDLStatemen
import
org.apache.shardingsphere.sql.parser.sql.common.statement.ddl.FetchStatement;
import
org.apache.shardingsphere.sql.parser.sql.common.statement.ddl.MoveStatement;
import
org.apache.shardingsphere.sql.parser.sql.common.statement.ddl.TruncateStatement;
+import
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.DMLStatement;
+import
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.SelectStatement;
import
org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dml.MySQLInsertStatement;
import
org.apache.shardingsphere.sql.parser.sql.dialect.statement.opengauss.OpenGaussStatement;
import
org.apache.shardingsphere.sql.parser.sql.dialect.statement.opengauss.ddl.OpenGaussCursorStatement;
@@ -144,16 +148,40 @@ public final class ProxySQLExecutor {
* @throws SQLException SQL exception
*/
public List<ExecuteResult> execute(final ExecutionContext
executionContext) throws SQLException {
+ return isNeedImplicitCommitTransaction(executionContext) ?
doExecuteWithImplicitCommitTransaction(executionContext) :
doExecute(executionContext);
+ }
+
+ private boolean isNeedImplicitCommitTransaction(final ExecutionContext
executionContext) {
+ TransactionStatus transactionStatus =
backendConnection.getConnectionSession().getTransactionStatus();
+ SQLStatement sqlStatement =
executionContext.getSqlStatementContext().getSqlStatement();
+ return
TransactionType.isDistributedTransaction(transactionStatus.getTransactionType())
&& !transactionStatus.isInTransaction() && sqlStatement instanceof DMLStatement
+ && !(sqlStatement instanceof SelectStatement) &&
executionContext.getExecutionUnits().size() > 1;
+ }
+
+ private List<ExecuteResult> doExecuteWithImplicitCommitTransaction(final
ExecutionContext executionContext) throws SQLException {
+ List<ExecuteResult> result;
+ JDBCBackendTransactionManager transactionManager = new
JDBCBackendTransactionManager(backendConnection);
+ try {
+ transactionManager.begin();
+ result = doExecute(executionContext);
+ transactionManager.commit();
+ // CHECKSTYLE:OFF
+ } catch (final Exception ex) {
+ // CHECKSTYLE:ON
+ transactionManager.rollback();
+ String databaseName =
backendConnection.getConnectionSession().getDatabaseName();
+ throw SQLExceptionTransformEngine.toSQLException(ex,
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData()
+ .getDatabase(databaseName).getProtocolType().getType());
+ }
+ return result;
+ }
+
+ private List<ExecuteResult> doExecute(final ExecutionContext
executionContext) throws SQLException {
String databaseName =
backendConnection.getConnectionSession().getDatabaseName();
Collection<ShardingSphereRule> rules =
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getDatabase(databaseName).getRuleMetaData().getRules();
int maxConnectionsSizePerQuery = ProxyContext.getInstance()
.getContextManager().getMetaDataContexts().getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
boolean isReturnGeneratedKeys =
executionContext.getSqlStatementContext().getSqlStatement() instanceof
MySQLInsertStatement;
- return execute(executionContext, rules, maxConnectionsSizePerQuery,
isReturnGeneratedKeys);
- }
-
- private List<ExecuteResult> execute(final ExecutionContext
executionContext, final Collection<ShardingSphereRule> rules,
- final int maxConnectionsSizePerQuery,
final boolean isReturnGeneratedKeys) throws SQLException {
return hasRawExecutionRule(rules) ? rawExecute(executionContext,
rules, maxConnectionsSizePerQuery)
: useDriverToExecute(executionContext, rules,
maxConnectionsSizePerQuery, isReturnGeneratedKeys,
SQLExecutorExceptionHandler.isExceptionThrown());
}