This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 6867d040715 Optimize ShardingSpherePreparedStatement for multi 
executionContext (#28796)
6867d040715 is described below

commit 6867d040715d1850bad7ee5a2c2e375b46f5c79d
Author: Chuxin Chen <[email protected]>
AuthorDate: Thu Oct 19 11:18:53 2023 +0800

    Optimize ShardingSpherePreparedStatement for multi executionContext (#28796)
    
    * Refactor ShardingSpherePreparedStatement for support multi 
executionContext.
    
    * Refactor ShardingSpherePreparedStatement for support multi 
executionContext.
---
 .../jdbc/adapter/AbstractStatementAdapter.java     |  16 ++-
 .../statement/ShardingSpherePreparedStatement.java |   8 +-
 .../core/statement/ShardingSphereStatement.java    | 138 +++++++++++++--------
 .../driver/jdbc/adapter/StatementAdapterTest.java  |   2 +-
 .../proxy/backend/connector/DatabaseConnector.java |  11 +-
 5 files changed, 113 insertions(+), 62 deletions(-)

diff --git 
a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
 
b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
index dabd42d7d2a..3ccbc6d1404 100644
--- 
a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
+++ 
b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
@@ -57,7 +57,7 @@ public abstract class AbstractStatementAdapter extends 
AbstractUnsupportedOperat
     
     private boolean closed;
     
-    protected final boolean isNeedImplicitCommitTransaction(final 
ShardingSphereConnection connection, final ExecutionContext executionContext) {
+    protected final boolean isNeedImplicitCommitTransaction(final 
ShardingSphereConnection connection, final Collection<ExecutionContext> 
executionContexts) {
         if (connection.getAutoCommit()) {
             return false;
         }
@@ -66,11 +66,19 @@ public abstract class AbstractStatementAdapter extends 
AbstractUnsupportedOperat
         if 
(!TransactionType.isDistributedTransaction(connectionTransaction.getTransactionType())
 || isInTransaction) {
             return false;
         }
-        return isModifiedSQL(executionContext) && 
executionContext.getExecutionUnits().size() > 1;
+        if (1 == executionContexts.size()) {
+            SQLStatement sqlStatement = 
executionContexts.iterator().next().getSqlStatementContext().getSqlStatement();
+            return isWriteDMLStatement(sqlStatement) && 
executionContexts.iterator().next().getExecutionUnits().size() > 1;
+        }
+        for (ExecutionContext each : executionContexts) {
+            if 
(isWriteDMLStatement(each.getSqlStatementContext().getSqlStatement())) {
+                return true;
+            }
+        }
+        return false;
     }
     
-    private boolean isModifiedSQL(final ExecutionContext executionContext) {
-        SQLStatement sqlStatement = 
executionContext.getSqlStatementContext().getSqlStatement();
+    private boolean isWriteDMLStatement(final SQLStatement sqlStatement) {
         return sqlStatement instanceof DMLStatement && !(sqlStatement 
instanceof SelectStatement);
     }
     
diff --git 
a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
 
b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
index beae1457f78..228e07a9bce 100644
--- 
a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
+++ 
b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
@@ -20,7 +20,6 @@ package org.apache.shardingsphere.driver.jdbc.core.statement;
 import com.google.common.base.Strings;
 import lombok.AccessLevel;
 import lombok.Getter;
-import 
org.apache.shardingsphere.infra.exception.dialect.SQLExceptionTransformEngine;
 import org.apache.shardingsphere.driver.executor.DriverExecutor;
 import org.apache.shardingsphere.driver.executor.batch.BatchExecutionUnit;
 import 
org.apache.shardingsphere.driver.executor.batch.BatchPreparedStatementExecutor;
@@ -33,15 +32,16 @@ import 
org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResult
 import 
org.apache.shardingsphere.driver.jdbc.core.statement.metadata.ShardingSphereParameterMetaData;
 import 
org.apache.shardingsphere.driver.jdbc.exception.syntax.EmptySQLException;
 import org.apache.shardingsphere.infra.binder.context.aware.ParameterAware;
-import org.apache.shardingsphere.infra.binder.engine.SQLBindEngine;
 import 
org.apache.shardingsphere.infra.binder.context.segment.insert.keygen.GeneratedKeyContext;
 import 
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
 import 
org.apache.shardingsphere.infra.binder.context.statement.dml.InsertStatementContext;
 import 
org.apache.shardingsphere.infra.binder.context.statement.dml.SelectStatementContext;
+import org.apache.shardingsphere.infra.binder.engine.SQLBindEngine;
 import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
 import org.apache.shardingsphere.infra.connection.kernel.KernelProcessor;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.mysql.type.MySQLDatabaseType;
+import 
org.apache.shardingsphere.infra.exception.dialect.SQLExceptionTransformEngine;
 import org.apache.shardingsphere.infra.executor.audit.SQLAuditEngine;
 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
 import 
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
@@ -356,7 +356,7 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
                 Collection<ExecuteResult> executeResults = 
executor.getRawExecutor().execute(createRawExecutionGroupContext(), 
executionContext.getQueryContext(), new RawSQLExecutorCallback());
                 return accumulate(executeResults);
             }
-            return isNeedImplicitCommitTransaction(connection, 
executionContext) ? executeUpdateWithImplicitCommitTransaction() : 
useDriverToExecuteUpdate();
+            return isNeedImplicitCommitTransaction(connection, 
Collections.singleton(executionContext)) ? 
executeUpdateWithImplicitCommitTransaction() : useDriverToExecuteUpdate();
             // CHECKSTYLE:OFF
         } catch (final RuntimeException ex) {
             // CHECKSTYLE:ON
@@ -426,7 +426,7 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
                 Collection<ExecuteResult> executeResults = 
executor.getRawExecutor().execute(createRawExecutionGroupContext(), 
executionContext.getQueryContext(), new RawSQLExecutorCallback());
                 return executeResults.iterator().next() instanceof QueryResult;
             }
-            return isNeedImplicitCommitTransaction(connection, 
executionContext) ? executeWithImplicitCommitTransaction() : 
useDriverToExecute();
+            return isNeedImplicitCommitTransaction(connection, 
Collections.singleton(executionContext)) ? 
executeWithImplicitCommitTransaction() : useDriverToExecute();
             // CHECKSTYLE:OFF
         } catch (final RuntimeException ex) {
             // CHECKSTYLE:ON
diff --git 
a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
 
b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
index 1533f0a93db..d787b19f559 100644
--- 
a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
+++ 
b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.driver.jdbc.core.statement;
 
+import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import lombok.AccessLevel;
 import lombok.Getter;
@@ -123,7 +124,7 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
     
     private boolean returnGeneratedKeys;
     
-    private ExecutionContext executionContext;
+    private Collection<ExecutionContext> executionContexts;
     
     private ResultSet currentResultSet;
     
@@ -174,12 +175,8 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
             if (useFederation) {
                 return executeFederationQuery(queryContext);
             }
-            executionContext = createExecutionContext(queryContext);
-            List<QueryResult> queryResults = executeQuery0();
-            MergedResult mergedResult = mergeQuery(queryResults);
-            boolean selectContainsEnhancedTable =
-                    executionContext.getSqlStatementContext() instanceof 
SelectStatementContext && ((SelectStatementContext) 
executionContext.getSqlStatementContext()).isContainsEnhancedTable();
-            result = new ShardingSphereResultSet(getResultSets(), 
mergedResult, this, selectContainsEnhancedTable, executionContext);
+            executionContexts = createExecutionContext(queryContext);
+            result = doExecuteQuery(executionContexts);
             // CHECKSTYLE:OFF
         } catch (final RuntimeException ex) {
             // CHECKSTYLE:ON
@@ -192,6 +189,20 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
         return result;
     }
     
+    private ShardingSphereResultSet doExecuteQuery(final 
Collection<ExecutionContext> executionContexts) throws SQLException {
+        ShardingSphereResultSet result = null;
+        for (ExecutionContext each : executionContexts) {
+            List<QueryResult> queryResults = executeQuery0(each);
+            MergedResult mergedResult = mergeQuery(queryResults, 
each.getSqlStatementContext());
+            boolean selectContainsEnhancedTable =
+                    each.getSqlStatementContext() instanceof 
SelectStatementContext && ((SelectStatementContext) 
each.getSqlStatementContext()).isContainsEnhancedTable();
+            if (null == result) {
+                result = new ShardingSphereResultSet(getResultSets(), 
mergedResult, this, selectContainsEnhancedTable, each);
+            }
+        }
+        return result;
+    }
+    
     private boolean decide(final QueryContext queryContext, final 
ShardingSphereDatabase database, final RuleMetaData globalRuleMetaData) {
         return 
executor.getSqlFederationEngine().decide(queryContext.getSqlStatementContext(), 
queryContext.getParameters(), database, globalRuleMetaData);
     }
@@ -214,12 +225,12 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
                 : Optional.empty();
     }
     
-    private List<QueryResult> executeQuery0() throws SQLException {
+    private List<QueryResult> executeQuery0(final ExecutionContext 
executionContext) throws SQLException {
         if 
(metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getRules().stream().anyMatch(RawExecutionRule.class::isInstance))
 {
             return executor.getRawExecutor().execute(
-                    createRawExecutionContext(), 
executionContext.getQueryContext(), new 
RawSQLExecutorCallback()).stream().map(QueryResult.class::cast).collect(Collectors.toList());
+                    createRawExecutionContext(executionContext), 
executionContext.getQueryContext(), new 
RawSQLExecutorCallback()).stream().map(QueryResult.class::cast).collect(Collectors.toList());
         }
-        ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = 
createExecutionGroupContext();
+        ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = 
createExecutionGroupContext(executionContext);
         cacheStatements(executionGroupContext.getInputGroups());
         StatementExecuteQueryCallback callback = new 
StatementExecuteQueryCallback(metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType(),
                 
metaDataContexts.getMetaData().getDatabase(databaseName).getResourceMetaData(), 
executionContext.getSqlStatementContext().getSqlStatement(),
@@ -304,9 +315,9 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
         }
     }
     
-    private int executeUpdate(final ExecuteUpdateCallback updateCallback, 
final SQLStatementContext sqlStatementContext) throws SQLException {
-        return isNeedImplicitCommitTransaction(connection, executionContext) ? 
executeUpdateWithImplicitCommitTransaction(updateCallback, sqlStatementContext)
-                : useDriverToExecuteUpdate(updateCallback, 
sqlStatementContext);
+    private int executeUpdate(final ExecuteUpdateCallback updateCallback, 
final SQLStatementContext sqlStatementContext, final 
Collection<ExecutionContext> executionContexts) throws SQLException {
+        return isNeedImplicitCommitTransaction(connection, executionContexts) 
? executeUpdateWithImplicitCommitTransaction(updateCallback, 
sqlStatementContext, executionContexts)
+                : useDriverToExecuteUpdate(updateCallback, 
sqlStatementContext, executionContexts);
     }
     
     private int executeUpdate0(final String sql, final ExecuteUpdateCallback 
updateCallback, final TrafficExecutorCallback<Integer> trafficCallback) throws 
SQLException {
@@ -319,18 +330,23 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
             JDBCExecutionUnit executionUnit = 
createTrafficExecutionUnit(trafficInstanceId, queryContext);
             return executor.getTrafficExecutor().execute(executionUnit, 
trafficCallback);
         }
-        executionContext = createExecutionContext(queryContext);
+        executionContexts = createExecutionContext(queryContext);
         if 
(metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getRules().stream().anyMatch(RawExecutionRule.class::isInstance))
 {
-            return 
accumulate(executor.getRawExecutor().execute(createRawExecutionContext(), 
executionContext.getQueryContext(), new RawSQLExecutorCallback()));
+            Collection<ExecuteResult> results = new LinkedList<>();
+            for (ExecutionContext each : executionContexts) {
+                
results.addAll(executor.getRawExecutor().execute(createRawExecutionContext(each),
 each.getQueryContext(), new RawSQLExecutorCallback()));
+            }
+            return accumulate(results);
         }
-        return executeUpdate(updateCallback, 
executionContext.getSqlStatementContext());
+        return executeUpdate(updateCallback, 
queryContext.getSqlStatementContext(), executionContexts);
     }
     
-    private int executeUpdateWithImplicitCommitTransaction(final 
ExecuteUpdateCallback updateCallback, final SQLStatementContext 
sqlStatementContext) throws SQLException {
+    private int executeUpdateWithImplicitCommitTransaction(final 
ExecuteUpdateCallback updateCallback, final SQLStatementContext 
sqlStatementContext,
+                                                           final 
Collection<ExecutionContext> executionContexts) throws SQLException {
         int result;
         try {
             connection.setAutoCommit(false);
-            result = useDriverToExecuteUpdate(updateCallback, 
sqlStatementContext);
+            result = useDriverToExecuteUpdate(updateCallback, 
sqlStatementContext, executionContexts);
             connection.commit();
             // CHECKSTYLE:OFF
         } catch (final RuntimeException ex) {
@@ -343,12 +359,21 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
         return result;
     }
     
-    private int useDriverToExecuteUpdate(final ExecuteUpdateCallback 
updateCallback, final SQLStatementContext sqlStatementContext) throws 
SQLException {
-        ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = 
createExecutionGroupContext();
-        cacheStatements(executionGroupContext.getInputGroups());
-        JDBCExecutorCallback<Integer> callback = 
createExecuteUpdateCallback(updateCallback, sqlStatementContext);
-        return 
executor.getRegularExecutor().executeUpdate(executionGroupContext,
-                executionContext.getQueryContext(), 
executionContext.getRouteContext().getRouteUnits(), callback);
+    private int useDriverToExecuteUpdate(final ExecuteUpdateCallback 
updateCallback, final SQLStatementContext sqlStatementContext,
+                                         final Collection<ExecutionContext> 
executionContexts) throws SQLException {
+        Integer result = null;
+        Preconditions.checkArgument(!executionContexts.isEmpty());
+        for (ExecutionContext each : executionContexts) {
+            ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = 
createExecutionGroupContext(each);
+            cacheStatements(executionGroupContext.getInputGroups());
+            JDBCExecutorCallback<Integer> callback = 
createExecuteUpdateCallback(updateCallback, sqlStatementContext);
+            int effectedCount = 
executor.getRegularExecutor().executeUpdate(executionGroupContext,
+                    each.getQueryContext(), 
each.getRouteContext().getRouteUnits(), callback);
+            if (null == result) {
+                result = effectedCount;
+            }
+        }
+        return result;
     }
     
     private JDBCExecutorCallback<Integer> createExecuteUpdateCallback(final 
ExecuteUpdateCallback updateCallback, final SQLStatementContext 
sqlStatementContext) {
@@ -446,12 +471,16 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
                 ResultSet resultSet = executeFederationQuery(queryContext);
                 return null != resultSet;
             }
-            executionContext = createExecutionContext(queryContext);
+            executionContexts = createExecutionContext(queryContext);
             if 
(metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getRules().stream().anyMatch(RawExecutionRule.class::isInstance))
 {
-                Collection<ExecuteResult> results = 
executor.getRawExecutor().execute(createRawExecutionContext(), 
executionContext.getQueryContext(), new RawSQLExecutorCallback());
+                Collection<ExecuteResult> results = new LinkedList<>();
+                for (ExecutionContext each : executionContexts) {
+                    
results.addAll(executor.getRawExecutor().execute(createRawExecutionContext(each),
 each.getQueryContext(), new RawSQLExecutorCallback()));
+                }
                 return results.iterator().next() instanceof QueryResult;
             }
-            return isNeedImplicitCommitTransaction(connection, 
executionContext) ? executeWithImplicitCommitTransaction(executeCallback) : 
useDriverToExecute(executeCallback);
+            return isNeedImplicitCommitTransaction(connection, 
executionContexts) ? executeWithImplicitCommitTransaction(executeCallback, 
executionContexts)
+                    : useDriverToExecute(executeCallback, executionContexts);
         } finally {
             currentResultSet = null;
         }
@@ -507,31 +536,31 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
         return protocolType.getTrunkDatabaseType().orElse(protocolType);
     }
     
-    private ExecutionContext createExecutionContext(final QueryContext 
queryContext) throws SQLException {
+    private Collection<ExecutionContext> createExecutionContext(final 
QueryContext queryContext) throws SQLException {
         clearStatements();
         RuleMetaData globalRuleMetaData = 
metaDataContexts.getMetaData().getGlobalRuleMetaData();
         ShardingSphereDatabase currentDatabase = 
metaDataContexts.getMetaData().getDatabase(databaseName);
         SQLAuditEngine.audit(queryContext.getSqlStatementContext(), 
queryContext.getParameters(), globalRuleMetaData, currentDatabase, null, 
queryContext.getHintValueContext());
-        return kernelProcessor.generateExecutionContext(queryContext, 
currentDatabase, globalRuleMetaData, metaDataContexts.getMetaData().getProps(),
-                
connection.getDatabaseConnectionManager().getConnectionContext());
+        return 
Collections.singleton(kernelProcessor.generateExecutionContext(queryContext, 
currentDatabase, globalRuleMetaData, metaDataContexts.getMetaData().getProps(),
+                
connection.getDatabaseConnectionManager().getConnectionContext()));
     }
     
-    private ExecutionGroupContext<JDBCExecutionUnit> 
createExecutionGroupContext() throws SQLException {
+    private ExecutionGroupContext<JDBCExecutionUnit> 
createExecutionGroupContext(final ExecutionContext executionContext) throws 
SQLException {
         DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> 
prepareEngine = createDriverExecutionPrepareEngine();
         return prepareEngine.prepare(executionContext.getRouteContext(), 
executionContext.getExecutionUnits(), new 
ExecutionGroupReportContext(databaseName));
     }
     
-    private ExecutionGroupContext<RawSQLExecutionUnit> 
createRawExecutionContext() throws SQLException {
+    private ExecutionGroupContext<RawSQLExecutionUnit> 
createRawExecutionContext(final ExecutionContext executionContext) throws 
SQLException {
         int maxConnectionsSizePerQuery = 
metaDataContexts.getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
         return new RawExecutionPrepareEngine(maxConnectionsSizePerQuery, 
metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getRules())
                 .prepare(executionContext.getRouteContext(), 
executionContext.getExecutionUnits(), new 
ExecutionGroupReportContext(databaseName));
     }
     
-    private boolean executeWithImplicitCommitTransaction(final ExecuteCallback 
callback) throws SQLException {
+    private boolean executeWithImplicitCommitTransaction(final ExecuteCallback 
callback, final Collection<ExecutionContext> executionContexts) throws 
SQLException {
         boolean result;
         try {
             connection.setAutoCommit(false);
-            result = useDriverToExecute(callback);
+            result = useDriverToExecute(callback, executionContexts);
             connection.commit();
             // CHECKSTYLE:OFF
         } catch (final Exception ex) {
@@ -544,12 +573,20 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
         return result;
     }
     
-    private boolean useDriverToExecute(final ExecuteCallback callback) throws 
SQLException {
-        ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = 
createExecutionGroupContext();
-        cacheStatements(executionGroupContext.getInputGroups());
-        JDBCExecutorCallback<Boolean> jdbcExecutorCallback = 
createExecuteCallback(callback, 
executionContext.getSqlStatementContext().getSqlStatement());
-        return executor.getRegularExecutor().execute(executionGroupContext,
-                executionContext.getQueryContext(), 
executionContext.getRouteContext().getRouteUnits(), jdbcExecutorCallback);
+    private boolean useDriverToExecute(final ExecuteCallback callback, final 
Collection<ExecutionContext> executionContexts) throws SQLException {
+        Boolean result = null;
+        Preconditions.checkArgument(!executionContexts.isEmpty());
+        for (ExecutionContext each : executionContexts) {
+            ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = 
createExecutionGroupContext(each);
+            cacheStatements(executionGroupContext.getInputGroups());
+            JDBCExecutorCallback<Boolean> jdbcExecutorCallback = 
createExecuteCallback(callback, 
each.getSqlStatementContext().getSqlStatement());
+            boolean isWrite = 
executor.getRegularExecutor().execute(executionGroupContext,
+                    each.getQueryContext(), 
each.getRouteContext().getRouteUnits(), jdbcExecutorCallback);
+            if (null == result) {
+                result = isWrite;
+            }
+        }
+        return result;
     }
     
     private void cacheStatements(final 
Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups) throws 
SQLException {
@@ -593,15 +630,16 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
         if (useFederation) {
             return executor.getSqlFederationEngine().getResultSet();
         }
-        if (executionContext.getSqlStatementContext() instanceof 
SelectStatementContext || 
executionContext.getSqlStatementContext().getSqlStatement() instanceof 
DALStatement) {
+        if (executionContexts.iterator().next().getSqlStatementContext() 
instanceof SelectStatementContext
+                || 
executionContexts.iterator().next().getSqlStatementContext().getSqlStatement() 
instanceof DALStatement) {
             List<ResultSet> resultSets = getResultSets();
             if (resultSets.isEmpty()) {
                 return currentResultSet;
             }
-            MergedResult mergedResult = 
mergeQuery(getQueryResults(resultSets));
-            boolean selectContainsEnhancedTable =
-                    executionContext.getSqlStatementContext() instanceof 
SelectStatementContext && ((SelectStatementContext) 
executionContext.getSqlStatementContext()).isContainsEnhancedTable();
-            currentResultSet = new ShardingSphereResultSet(resultSets, 
mergedResult, this, selectContainsEnhancedTable, executionContext);
+            SQLStatementContext sqlStatementContext = 
executionContexts.iterator().next().getSqlStatementContext();
+            MergedResult mergedResult = 
mergeQuery(getQueryResults(resultSets), sqlStatementContext);
+            boolean selectContainsEnhancedTable = sqlStatementContext 
instanceof SelectStatementContext && ((SelectStatementContext) 
sqlStatementContext).isContainsEnhancedTable();
+            currentResultSet = new ShardingSphereResultSet(resultSets, 
mergedResult, this, selectContainsEnhancedTable, 
executionContexts.iterator().next());
         }
         return currentResultSet;
     }
@@ -626,10 +664,10 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
         return result;
     }
     
-    private MergedResult mergeQuery(final List<QueryResult> queryResults) 
throws SQLException {
+    private MergedResult mergeQuery(final List<QueryResult> queryResults, 
final SQLStatementContext sqlStatementContext) throws SQLException {
         MergeEngine mergeEngine = new 
MergeEngine(metaDataContexts.getMetaData().getDatabase(databaseName),
                 metaDataContexts.getMetaData().getProps(), 
connection.getDatabaseConnectionManager().getConnectionContext());
-        return mergeEngine.merge(queryResults, 
executionContext.getSqlStatementContext());
+        return mergeEngine.merge(queryResults, sqlStatementContext);
     }
     
     @SuppressWarnings("MagicConstant")
@@ -652,7 +690,7 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
     @Override
     public boolean isAccumulate() {
         return 
metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().findRules(DataNodeContainedRule.class).stream()
-                .anyMatch(each -> 
each.isNeedAccumulate(executionContext.getSqlStatementContext().getTablesContext().getTableNames()));
+                .anyMatch(each -> 
each.isNeedAccumulate(executionContexts.iterator().next().getSqlStatementContext().getTablesContext().getTableNames()));
     }
     
     @Override
@@ -678,8 +716,8 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
     }
     
     private Optional<GeneratedKeyContext> findGeneratedKey() {
-        return executionContext.getSqlStatementContext() instanceof 
InsertStatementContext
-                ? ((InsertStatementContext) 
executionContext.getSqlStatementContext()).getGeneratedKeyContext()
+        return executionContexts.iterator().next().getSqlStatementContext() 
instanceof InsertStatementContext
+                ? ((InsertStatementContext) 
executionContexts.iterator().next().getSqlStatementContext()).getGeneratedKeyContext()
                 : Optional.empty();
     }
     
diff --git 
a/jdbc/core/src/test/java/org/apache/shardingsphere/driver/jdbc/adapter/StatementAdapterTest.java
 
b/jdbc/core/src/test/java/org/apache/shardingsphere/driver/jdbc/adapter/StatementAdapterTest.java
index e907a752eac..816625d8614 100644
--- 
a/jdbc/core/src/test/java/org/apache/shardingsphere/driver/jdbc/adapter/StatementAdapterTest.java
+++ 
b/jdbc/core/src/test/java/org/apache/shardingsphere/driver/jdbc/adapter/StatementAdapterTest.java
@@ -272,6 +272,6 @@ class StatementAdapterTest {
     
     @SneakyThrows(ReflectiveOperationException.class)
     private void setExecutionContext(final ShardingSphereStatement statement, 
final ExecutionContext executionContext) {
-        
Plugins.getMemberAccessor().set(statement.getClass().getDeclaredField("executionContext"),
 statement, executionContext);
+        
Plugins.getMemberAccessor().set(statement.getClass().getDeclaredField("executionContexts"),
 statement, Collections.singleton(executionContext));
     }
 }
diff --git 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/DatabaseConnector.java
 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/DatabaseConnector.java
index 6b45eff2eda..871853cec25 100644
--- 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/DatabaseConnector.java
+++ 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/DatabaseConnector.java
@@ -18,7 +18,6 @@
 package org.apache.shardingsphere.proxy.backend.connector;
 
 import com.google.common.base.Preconditions;
-import 
org.apache.shardingsphere.infra.exception.dialect.SQLExceptionTransformEngine;
 import 
org.apache.shardingsphere.infra.binder.context.aware.CursorDefinitionAware;
 import 
org.apache.shardingsphere.infra.binder.context.segment.insert.keygen.GeneratedKeyContext;
 import 
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
@@ -31,6 +30,8 @@ import 
org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
 import org.apache.shardingsphere.infra.connection.kernel.KernelProcessor;
 import 
org.apache.shardingsphere.infra.connection.refresher.MetaDataRefreshEngine;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
+import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
+import 
org.apache.shardingsphere.infra.exception.dialect.SQLExceptionTransformEngine;
 import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
@@ -46,7 +47,6 @@ import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import 
org.apache.shardingsphere.infra.metadata.database.schema.util.SystemSchemaUtils;
 import 
org.apache.shardingsphere.infra.rule.identifier.type.DataNodeContainedRule;
 import org.apache.shardingsphere.infra.session.query.QueryContext;
-import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
 import 
org.apache.shardingsphere.proxy.backend.connector.jdbc.executor.callback.ProxyJDBCExecutorCallback;
@@ -191,7 +191,12 @@ public final class DatabaseConnector implements 
DatabaseBackendHandler {
             SQLStatement sqlStatement = 
executionContexts.iterator().next().getSqlStatementContext().getSqlStatement();
             return isWriteDMLStatement(sqlStatement) && 
executionContexts.iterator().next().getExecutionUnits().size() > 1;
         }
-        return executionContexts.stream().anyMatch(each -> 
isWriteDMLStatement(each.getSqlStatementContext().getSqlStatement()));
+        for (ExecutionContext each : executionContexts) {
+            if 
(isWriteDMLStatement(each.getSqlStatementContext().getSqlStatement())) {
+                return true;
+            }
+        }
+        return false;
     }
     
     private boolean isWriteDMLStatement(final SQLStatement sqlStatement) {

Reply via email to