This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new a2d9dad8b7b Automatically start a distributed transaction if 
needed(#20234) (#22029)
a2d9dad8b7b is described below

commit a2d9dad8b7bd718361d67b507fe292980dd09983
Author: ZhangCheng <[email protected]>
AuthorDate: Sat Nov 12 14:09:32 2022 +0800

    Automatically start a distributed transaction if needed(#20234) (#22029)
    
    * Automatically start a xa transaction if needed
    
    * Fix
    
    * Move logic to ProxySQLExecutor
    
    * Fix
    
    * Add base support
    
    * Rename method name
---
 .../backend/communication/ProxySQLExecutor.java    | 38 +++++++++++++++++++---
 1 file changed, 33 insertions(+), 5 deletions(-)

diff --git 
a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
 
b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
index 84698306bac..29995964113 100644
--- 
a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
+++ 
b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.proxy.backend.communication;
 
+import org.apache.shardingsphere.dialect.SQLExceptionTransformEngine;
 import 
org.apache.shardingsphere.dialect.exception.transaction.TableModifyInTransactionException;
 import org.apache.shardingsphere.infra.binder.type.TableAvailable;
 import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
@@ -43,6 +44,7 @@ import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.JDBCDatabaseCo
 import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.JDBCBackendConnection;
 import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.ProxyJDBCExecutor;
 import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.statement.JDBCBackendStatement;
+import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.transaction.JDBCBackendTransactionManager;
 import org.apache.shardingsphere.proxy.backend.context.BackendExecutorContext;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 import 
org.apache.shardingsphere.proxy.backend.session.transaction.TransactionStatus;
@@ -52,6 +54,8 @@ import 
org.apache.shardingsphere.sql.parser.sql.common.statement.ddl.DDLStatemen
 import 
org.apache.shardingsphere.sql.parser.sql.common.statement.ddl.FetchStatement;
 import 
org.apache.shardingsphere.sql.parser.sql.common.statement.ddl.MoveStatement;
 import 
org.apache.shardingsphere.sql.parser.sql.common.statement.ddl.TruncateStatement;
+import 
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.DMLStatement;
+import 
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.SelectStatement;
 import 
org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dml.MySQLInsertStatement;
 import 
org.apache.shardingsphere.sql.parser.sql.dialect.statement.opengauss.OpenGaussStatement;
 import 
org.apache.shardingsphere.sql.parser.sql.dialect.statement.opengauss.ddl.OpenGaussCursorStatement;
@@ -144,16 +148,40 @@ public final class ProxySQLExecutor {
      * @throws SQLException SQL exception
      */
     public List<ExecuteResult> execute(final ExecutionContext 
executionContext) throws SQLException {
+        return isNeedImplicitCommitTransaction(executionContext) ? 
doExecuteWithImplicitCommitTransaction(executionContext) : 
doExecute(executionContext);
+    }
+    
+    private boolean isNeedImplicitCommitTransaction(final ExecutionContext 
executionContext) {
+        TransactionStatus transactionStatus = 
backendConnection.getConnectionSession().getTransactionStatus();
+        SQLStatement sqlStatement = 
executionContext.getSqlStatementContext().getSqlStatement();
+        return 
TransactionType.isDistributedTransaction(transactionStatus.getTransactionType())
 && !transactionStatus.isInTransaction() && sqlStatement instanceof DMLStatement
+                && !(sqlStatement instanceof SelectStatement) && 
executionContext.getExecutionUnits().size() > 1;
+    }
+    
+    private List<ExecuteResult> doExecuteWithImplicitCommitTransaction(final 
ExecutionContext executionContext) throws SQLException {
+        List<ExecuteResult> result;
+        JDBCBackendTransactionManager transactionManager = new 
JDBCBackendTransactionManager(backendConnection);
+        try {
+            transactionManager.begin();
+            result = doExecute(executionContext);
+            transactionManager.commit();
+            // CHECKSTYLE:OFF
+        } catch (final Exception ex) {
+            // CHECKSTYLE:ON
+            transactionManager.rollback();
+            String databaseName = 
backendConnection.getConnectionSession().getDatabaseName();
+            throw SQLExceptionTransformEngine.toSQLException(ex, 
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData()
+                    .getDatabase(databaseName).getProtocolType().getType());
+        }
+        return result;
+    }
+    
+    private List<ExecuteResult> doExecute(final ExecutionContext 
executionContext) throws SQLException {
         String databaseName = 
backendConnection.getConnectionSession().getDatabaseName();
         Collection<ShardingSphereRule> rules = 
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getDatabase(databaseName).getRuleMetaData().getRules();
         int maxConnectionsSizePerQuery = ProxyContext.getInstance()
                 
.getContextManager().getMetaDataContexts().getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
         boolean isReturnGeneratedKeys = 
executionContext.getSqlStatementContext().getSqlStatement() instanceof 
MySQLInsertStatement;
-        return execute(executionContext, rules, maxConnectionsSizePerQuery, 
isReturnGeneratedKeys);
-    }
-    
-    private List<ExecuteResult> execute(final ExecutionContext 
executionContext, final Collection<ShardingSphereRule> rules,
-                                        final int maxConnectionsSizePerQuery, 
final boolean isReturnGeneratedKeys) throws SQLException {
         return hasRawExecutionRule(rules) ? rawExecute(executionContext, 
rules, maxConnectionsSizePerQuery)
                 : useDriverToExecute(executionContext, rules, 
maxConnectionsSizePerQuery, isReturnGeneratedKeys, 
SQLExecutorExceptionHandler.isExceptionThrown());
     }

Reply via email to