This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a1e1758ecc Refactor DriverExecutorFacade (#31572)
9a1e1758ecc is described below

commit 9a1e1758ecc43b1f807f08a24a4c3f179e4fe431
Author: Liang Zhang <[email protected]>
AuthorDate: Tue Jun 4 01:02:40 2024 +0800

    Refactor DriverExecutorFacade (#31572)
    
    * Refactor DriverExecutorFacade
    
    * Refactor DriverExecutorFacade
    
    * Refactor DriverExecutorFacade
    
    * Refactor DriverExecutorFacade
---
 ...verExecutor.java => DriverExecuteExecutor.java} | 283 +++------------------
 .../executor/DriverExecuteQueryExecutor.java       | 211 +++++++++++++++
 .../executor/DriverExecuteUpdateExecutor.java      | 232 +++++++++++++++++
 .../driver/executor/DriverExecutorFacade.java      |  67 +++++
 .../jdbc/adapter/AbstractStatementAdapter.java     |   9 +-
 .../statement/ShardingSpherePreparedStatement.java |  51 ++--
 .../core/statement/ShardingSphereStatement.java    |  35 ++-
 .../StatementAddCallback.java}                     |  13 +-
 .../{ => callback}/StatementReplayCallback.java    |  14 +-
 .../statement/CircuitBreakerPreparedStatement.java |  10 +-
 10 files changed, 608 insertions(+), 317 deletions(-)

diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecuteExecutor.java
similarity index 50%
rename from 
jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java
rename to 
jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecuteExecutor.java
index 44109f16d41..e19437efbc3 100644
--- 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecuteExecutor.java
@@ -17,25 +17,19 @@
 
 package org.apache.shardingsphere.driver.executor;
 
-import lombok.Getter;
+import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.driver.executor.callback.ExecuteCallback;
 import org.apache.shardingsphere.driver.executor.callback.ExecuteQueryCallback;
-import 
org.apache.shardingsphere.driver.executor.callback.ExecuteUpdateCallback;
 import 
org.apache.shardingsphere.driver.executor.callback.impl.PreparedStatementExecuteQueryCallback;
 import 
org.apache.shardingsphere.driver.executor.callback.impl.StatementExecuteQueryCallback;
 import 
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
-import 
org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResultSet;
-import 
org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResultSetUtils;
-import 
org.apache.shardingsphere.driver.jdbc.core.statement.StatementReplayCallback;
-import 
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
-import 
org.apache.shardingsphere.infra.binder.context.statement.dml.SelectStatementContext;
+import 
org.apache.shardingsphere.driver.jdbc.core.statement.callback.StatementAddCallback;
+import 
org.apache.shardingsphere.driver.jdbc.core.statement.callback.StatementReplayCallback;
 import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
 import org.apache.shardingsphere.infra.connection.kernel.KernelProcessor;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
-import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
 import 
org.apache.shardingsphere.infra.exception.dialect.SQLExceptionTransformEngine;
 import org.apache.shardingsphere.infra.executor.audit.SQLAuditEngine;
-import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
 import 
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
 import 
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
@@ -43,19 +37,15 @@ import 
org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
-import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
-import 
org.apache.shardingsphere.infra.executor.sql.execute.result.update.UpdateResult;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
-import org.apache.shardingsphere.infra.merge.MergeEngine;
-import org.apache.shardingsphere.infra.merge.result.MergedResult;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
@@ -68,7 +58,6 @@ import 
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.SelectState
 import org.apache.shardingsphere.sqlfederation.engine.SQLFederationEngine;
 import 
org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext;
 import org.apache.shardingsphere.traffic.executor.TrafficExecutor;
-import org.apache.shardingsphere.traffic.executor.TrafficExecutorCallback;
 import org.apache.shardingsphere.traffic.rule.TrafficRule;
 import org.apache.shardingsphere.transaction.api.TransactionType;
 import 
org.apache.shardingsphere.transaction.implicit.ImplicitTransactionCallback;
@@ -79,18 +68,17 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
-import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
-import java.util.stream.Collectors;
 
 /**
- * Driver executor.
+ * Driver execute executor.
  */
-public final class DriverExecutor implements AutoCloseable {
+@RequiredArgsConstructor
+public final class DriverExecuteExecutor {
     
     private final ShardingSphereConnection connection;
     
@@ -106,65 +94,41 @@ public final class DriverExecutor implements AutoCloseable 
{
     
     private ExecuteType executeType = ExecuteType.REGULAR;
     
-    private final KernelProcessor kernelProcessor;
-    
-    @Getter
-    private final List<Statement> statements = new ArrayList<>();
-    
-    @Getter
-    private final List<List<Object>> parameterSets = new ArrayList<>();
-    
-    public DriverExecutor(final ShardingSphereConnection connection) {
-        this.connection = connection;
-        metaData = 
connection.getContextManager().getMetaDataContexts().getMetaData();
-        ExecutorEngine executorEngine = 
connection.getContextManager().getExecutorEngine();
-        JDBCExecutor jdbcExecutor = new JDBCExecutor(executorEngine, 
connection.getDatabaseConnectionManager().getConnectionContext());
-        regularExecutor = new DriverJDBCExecutor(connection.getDatabaseName(), 
connection.getContextManager(), jdbcExecutor);
-        rawExecutor = new RawExecutor(executorEngine, 
connection.getDatabaseConnectionManager().getConnectionContext());
-        String schemaName = new 
DatabaseTypeRegistry(metaData.getDatabase(connection.getDatabaseName()).getProtocolType()).getDefaultSchemaName(connection.getDatabaseName());
-        trafficExecutor = new TrafficExecutor();
-        sqlFederationEngine = new 
SQLFederationEngine(connection.getDatabaseName(), schemaName, metaData, 
connection.getContextManager().getMetaDataContexts().getStatistics(), 
jdbcExecutor);
-        kernelProcessor = new KernelProcessor();
-    }
-    
     /**
-     * Execute query.
+     * Execute.
      *
      * @param database database
      * @param queryContext query context
      * @param prepareEngine prepare engine
-     * @param statement statement
-     * @param columnLabelAndIndexMap column label and index map
+     * @param executeCallback execute callback
      * @param statementReplayCallback statement replay callback
-     * @return result set
+     * @param statementAddCallback statement add callback
+     * @return execute result
      * @throws SQLException SQL exception
      */
     @SuppressWarnings("rawtypes")
-    public ResultSet executeQuery(final ShardingSphereDatabase database, final 
QueryContext queryContext,
-                                  final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine, 
final Statement statement,
-                                  final Map<String, Integer> 
columnLabelAndIndexMap, final StatementReplayCallback statementReplayCallback) 
throws SQLException {
+    public boolean execute(final ShardingSphereDatabase database, final 
QueryContext queryContext,
+                           final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
+                           final ExecuteCallback executeCallback, final 
StatementReplayCallback statementReplayCallback, final StatementAddCallback 
statementAddCallback) throws SQLException {
         Optional<String> trafficInstanceId = 
connection.getTrafficInstanceId(metaData.getGlobalRuleMetaData().getSingleRule(TrafficRule.class),
 queryContext);
         if (trafficInstanceId.isPresent()) {
-            return trafficExecutor.execute(
-                    connection.getProcessId(), database.getName(), 
trafficInstanceId.get(), queryContext, prepareEngine, 
getTrafficExecuteQueryCallback(prepareEngine.getType()));
+            executeType = ExecuteType.TRAFFIC;
+            return trafficExecutor.execute(connection.getProcessId(), 
database.getName(), trafficInstanceId.get(), queryContext, prepareEngine, 
executeCallback::execute);
         }
         if (sqlFederationEngine.decide(queryContext.getSqlStatementContext(), 
queryContext.getParameters(), database, metaData.getGlobalRuleMetaData())) {
-            return sqlFederationEngine.executeQuery(
+            executeType = ExecuteType.FEDERATION;
+            ResultSet resultSet = sqlFederationEngine.executeQuery(
                     prepareEngine, getExecuteQueryCallback(database, 
queryContext, prepareEngine.getType()), new SQLFederationContext(false, 
queryContext, metaData, connection.getProcessId()));
+            return null != resultSet;
         }
-        List<QueryResult> queryResults = executePushDownQuery(database, 
queryContext, prepareEngine, statementReplayCallback);
-        MergedResult mergedResult = mergeQuery(database, queryResults, 
queryContext.getSqlStatementContext());
-        boolean selectContainsEnhancedTable = 
queryContext.getSqlStatementContext() instanceof SelectStatementContext
-                && ((SelectStatementContext) 
queryContext.getSqlStatementContext()).isContainsEnhancedTable();
-        List<ResultSet> resultSets = getResultSets();
-        return new ShardingSphereResultSet(resultSets, mergedResult, 
statement, selectContainsEnhancedTable, queryContext.getSqlStatementContext(),
-                null == columnLabelAndIndexMap
-                        ? 
ShardingSphereResultSetUtils.createColumnLabelAndIndexMap(queryContext.getSqlStatementContext(),
 selectContainsEnhancedTable, resultSets.get(0).getMetaData())
-                        : columnLabelAndIndexMap);
-    }
-    
-    private TrafficExecutorCallback<ResultSet> 
getTrafficExecuteQueryCallback(final String jdbcDriverType) {
-        return JDBCDriverType.STATEMENT.equals(jdbcDriverType) ? ((sql, 
statement) -> statement.executeQuery(sql)) : ((sql, statement) -> 
((PreparedStatement) statement).executeQuery());
+        ExecutionContext executionContext = createExecutionContext(database, 
queryContext);
+        if (hasRawExecutionRule(database)) {
+            Collection<ExecuteResult> results = 
rawExecutor.execute(createRawExecutionGroupContext(database, executionContext), 
queryContext, new RawSQLExecutorCallback());
+            return results.iterator().next() instanceof QueryResult;
+        }
+        boolean isNeedImplicitCommitTransaction = 
isNeedImplicitCommitTransaction(
+                connection, 
queryContext.getSqlStatementContext().getSqlStatement(), 
executionContext.getExecutionUnits().size() > 1);
+        return executeWithExecutionContext(database, executeCallback, 
executionContext, prepareEngine, isNeedImplicitCommitTransaction, 
statementReplayCallback, statementAddCallback);
     }
     
     private ExecuteQueryCallback getExecuteQueryCallback(final 
ShardingSphereDatabase database, final QueryContext queryContext, final String 
jdbcDriverType) {
@@ -175,27 +139,6 @@ public final class DriverExecutor implements AutoCloseable 
{
                         
queryContext.getSqlStatementContext().getSqlStatement(), 
SQLExecutorExceptionHandler.isExceptionThrown());
     }
     
-    @SuppressWarnings({"rawtypes", "unchecked"})
-    private List<QueryResult> executePushDownQuery(final 
ShardingSphereDatabase database, final QueryContext queryContext,
-                                                   final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
-                                                   final 
StatementReplayCallback statementReplayCallback) throws SQLException {
-        ExecutionContext executionContext = createExecutionContext(database, 
queryContext);
-        if (hasRawExecutionRule(database)) {
-            return 
rawExecutor.execute(createRawExecutionGroupContext(database, executionContext),
-                    queryContext, new 
RawSQLExecutorCallback()).stream().map(QueryResult.class::cast).collect(Collectors.toList());
-        }
-        ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = 
prepareEngine.prepare(executionContext.getRouteContext(), 
executionContext.getExecutionUnits(),
-                new ExecutionGroupReportContext(connection.getProcessId(), 
database.getName(), new Grantee("", "")));
-        for (ExecutionGroup<JDBCExecutionUnit> each : 
executionGroupContext.getInputGroups()) {
-            statements.addAll(getStatements(each));
-            if 
(JDBCDriverType.PREPARED_STATEMENT.equals(prepareEngine.getType())) {
-                parameterSets.addAll(getParameterSets(each));
-            }
-        }
-        statementReplayCallback.replay(statements, parameterSets);
-        return regularExecutor.executeQuery(executionGroupContext, 
queryContext, getExecuteQueryCallback(database, queryContext, 
prepareEngine.getType()));
-    }
-    
     private boolean hasRawExecutionRule(final ShardingSphereDatabase database) 
{
         return 
!database.getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty();
     }
@@ -216,18 +159,10 @@ public final class DriverExecutor implements 
AutoCloseable {
         return result;
     }
     
-    private ExecutionContext createExecutionContext(final 
ShardingSphereDatabase database, final QueryContext queryContext) throws 
SQLException {
-        clearStatements();
+    private ExecutionContext createExecutionContext(final 
ShardingSphereDatabase database, final QueryContext queryContext) {
         RuleMetaData globalRuleMetaData = metaData.getGlobalRuleMetaData();
         SQLAuditEngine.audit(queryContext, globalRuleMetaData, database);
-        return kernelProcessor.generateExecutionContext(queryContext, 
database, globalRuleMetaData, metaData.getProps(), 
connection.getDatabaseConnectionManager().getConnectionContext());
-    }
-    
-    private void clearStatements() throws SQLException {
-        for (Statement each : statements) {
-            each.close();
-        }
-        statements.clear();
+        return new KernelProcessor().generateExecutionContext(queryContext, 
database, globalRuleMetaData, metaData.getProps(), 
connection.getDatabaseConnectionManager().getConnectionContext());
     }
     
     private ExecutionGroupContext<RawSQLExecutionUnit> 
createRawExecutionGroupContext(final ShardingSphereDatabase database, final 
ExecutionContext executionContext) throws SQLException {
@@ -236,57 +171,6 @@ public final class DriverExecutor implements AutoCloseable 
{
                 executionContext.getRouteContext(), 
executionContext.getExecutionUnits(), new 
ExecutionGroupReportContext(connection.getProcessId(), database.getName(), new 
Grantee("", "")));
     }
     
-    private MergedResult mergeQuery(final ShardingSphereDatabase database, 
final List<QueryResult> queryResults, final SQLStatementContext 
sqlStatementContext) throws SQLException {
-        MergeEngine mergeEngine = new 
MergeEngine(metaData.getGlobalRuleMetaData(), database, metaData.getProps(), 
connection.getDatabaseConnectionManager().getConnectionContext());
-        return mergeEngine.merge(queryResults, sqlStatementContext);
-    }
-    
-    private List<ResultSet> getResultSets() throws SQLException {
-        List<ResultSet> result = new ArrayList<>(statements.size());
-        for (Statement each : statements) {
-            if (null != each.getResultSet()) {
-                result.add(each.getResultSet());
-            }
-        }
-        return result;
-    }
-    
-    /**
-     * Execute update.
-     *
-     * @param database database
-     * @param queryContext query context
-     * @param prepareEngine prepare engine
-     * @param updateCallback update callback
-     * @param statementReplayCallback statement replay callback
-     * @return updated row count
-     * @throws SQLException SQL exception
-     */
-    @SuppressWarnings("rawtypes")
-    public int executeUpdate(final ShardingSphereDatabase database, final 
QueryContext queryContext,
-                             final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
-                             final ExecuteUpdateCallback updateCallback, final 
StatementReplayCallback statementReplayCallback) throws SQLException {
-        Optional<String> trafficInstanceId = 
connection.getTrafficInstanceId(metaData.getGlobalRuleMetaData().getSingleRule(TrafficRule.class),
 queryContext);
-        if (trafficInstanceId.isPresent()) {
-            return trafficExecutor.execute(connection.getProcessId(), 
database.getName(), trafficInstanceId.get(), queryContext, prepareEngine, 
updateCallback::executeUpdate);
-        }
-        ExecutionContext executionContext = createExecutionContext(database, 
queryContext);
-        return hasRawExecutionRule(database)
-                ? 
accumulate(rawExecutor.execute(createRawExecutionGroupContext(database, 
executionContext), queryContext, new RawSQLExecutorCallback()))
-                : executeUpdate(database, updateCallback, 
queryContext.getSqlStatementContext(), executionContext, prepareEngine, 
isNeedImplicitCommitTransaction(
-                        connection, 
queryContext.getSqlStatementContext().getSqlStatement(), 
executionContext.getExecutionUnits().size() > 1), statementReplayCallback);
-    }
-    
-    @SuppressWarnings("rawtypes")
-    private int executeUpdate(final ShardingSphereDatabase database, final 
ExecuteUpdateCallback updateCallback, final SQLStatementContext 
sqlStatementContext, final ExecutionContext executionContext,
-                              final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine, 
final boolean isNeedImplicitCommitTransaction,
-                              final StatementReplayCallback 
statementReplayCallback) throws SQLException {
-        return isNeedImplicitCommitTransaction
-                ? executeWithImplicitCommitTransaction(() -> 
useDriverToExecuteUpdate(
-                        database, updateCallback, sqlStatementContext, 
executionContext, prepareEngine, statementReplayCallback), connection, 
database.getProtocolType())
-                : useDriverToExecuteUpdate(database, updateCallback, 
sqlStatementContext, executionContext, prepareEngine, statementReplayCallback);
-    }
-    
     private <T> T executeWithImplicitCommitTransaction(final 
ImplicitTransactionCallback<T> callback, final Connection connection, final 
DatabaseType databaseType) throws SQLException {
         T result;
         try {
@@ -304,110 +188,31 @@ public final class DriverExecutor implements 
AutoCloseable {
         return result;
     }
     
-    @SuppressWarnings({"rawtypes", "unchecked"})
-    private int useDriverToExecuteUpdate(final ShardingSphereDatabase 
database, final ExecuteUpdateCallback updateCallback, final SQLStatementContext 
sqlStatementContext,
-                                         final ExecutionContext 
executionContext, final DriverExecutionPrepareEngine<JDBCExecutionUnit, 
Connection> prepareEngine,
-                                         final StatementReplayCallback 
statementReplayCallback) throws SQLException {
-        ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = 
createExecutionGroupContext(database, executionContext, prepareEngine);
-        for (ExecutionGroup<JDBCExecutionUnit> each : 
executionGroupContext.getInputGroups()) {
-            statements.addAll(getStatements(each));
-            if 
(JDBCDriverType.PREPARED_STATEMENT.equals(prepareEngine.getType())) {
-                parameterSets.addAll(getParameterSets(each));
-            }
-        }
-        statementReplayCallback.replay(statements, parameterSets);
-        JDBCExecutorCallback<Integer> callback = 
createExecuteUpdateCallback(database, updateCallback, sqlStatementContext, 
prepareEngine.getType());
-        return regularExecutor.executeUpdate(executionGroupContext, 
executionContext.getQueryContext(), 
executionContext.getRouteContext().getRouteUnits(), callback);
-    }
-    
     private ExecutionGroupContext<JDBCExecutionUnit> 
createExecutionGroupContext(final ShardingSphereDatabase database, final 
ExecutionContext executionContext,
                                                                                
  final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> 
prepareEngine) throws SQLException {
         return prepareEngine.prepare(executionContext.getRouteContext(), 
executionContext.getExecutionUnits(),
                 new ExecutionGroupReportContext(connection.getProcessId(), 
database.getName(), new Grantee("", "")));
     }
     
-    private JDBCExecutorCallback<Integer> createExecuteUpdateCallback(final 
ShardingSphereDatabase database,
-                                                                      final 
ExecuteUpdateCallback updateCallback, final SQLStatementContext 
sqlStatementContext, final String jdbcDriverType) {
-        boolean isExceptionThrown = 
SQLExecutorExceptionHandler.isExceptionThrown();
-        return new JDBCExecutorCallback<Integer>(database.getProtocolType(), 
database.getResourceMetaData(), sqlStatementContext.getSqlStatement(), 
isExceptionThrown) {
-            
-            @Override
-            protected Integer executeSQL(final String sql, final Statement 
statement, final ConnectionMode connectionMode, final DatabaseType storageType) 
throws SQLException {
-                return JDBCDriverType.STATEMENT.equals(jdbcDriverType) ? 
updateCallback.executeUpdate(sql, statement) : ((PreparedStatement) 
statement).executeUpdate();
-            }
-            
-            @Override
-            protected Optional<Integer> getSaneResult(final SQLStatement 
sqlStatement, final SQLException ex) {
-                return Optional.empty();
-            }
-        };
-    }
-    
-    private int accumulate(final Collection<ExecuteResult> results) {
-        int result = 0;
-        for (ExecuteResult each : results) {
-            result += ((UpdateResult) each).getUpdateCount();
-        }
-        return result;
-    }
-    
-    /**
-     * Execute.
-     *
-     * @param database database
-     * @param queryContext query context
-     * @param prepareEngine prepare engine
-     * @param executeCallback execute callback
-     * @param statementReplayCallback statement replay callback
-     * @return execute result
-     * @throws SQLException SQL exception
-     */
-    @SuppressWarnings("rawtypes")
-    public boolean execute(final ShardingSphereDatabase database, final 
QueryContext queryContext,
-                           final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
-                           final ExecuteCallback executeCallback, final 
StatementReplayCallback statementReplayCallback) throws SQLException {
-        Optional<String> trafficInstanceId = 
connection.getTrafficInstanceId(metaData.getGlobalRuleMetaData().getSingleRule(TrafficRule.class),
 queryContext);
-        if (trafficInstanceId.isPresent()) {
-            executeType = ExecuteType.TRAFFIC;
-            return trafficExecutor.execute(connection.getProcessId(), 
database.getName(), trafficInstanceId.get(), queryContext, prepareEngine, 
executeCallback::execute);
-        }
-        if (sqlFederationEngine.decide(queryContext.getSqlStatementContext(), 
queryContext.getParameters(), database, metaData.getGlobalRuleMetaData())) {
-            executeType = ExecuteType.FEDERATION;
-            ResultSet resultSet = sqlFederationEngine.executeQuery(
-                    prepareEngine, getExecuteQueryCallback(database, 
queryContext, prepareEngine.getType()), new SQLFederationContext(false, 
queryContext, metaData, connection.getProcessId()));
-            return null != resultSet;
-        }
-        ExecutionContext executionContext = createExecutionContext(database, 
queryContext);
-        if (hasRawExecutionRule(database)) {
-            Collection<ExecuteResult> results = 
rawExecutor.execute(createRawExecutionGroupContext(database, executionContext), 
queryContext, new RawSQLExecutorCallback());
-            return results.iterator().next() instanceof QueryResult;
-        }
-        boolean isNeedImplicitCommitTransaction = 
isNeedImplicitCommitTransaction(
-                connection, 
queryContext.getSqlStatementContext().getSqlStatement(), 
executionContext.getExecutionUnits().size() > 1);
-        return executeWithExecutionContext(database, executeCallback, 
executionContext, prepareEngine, isNeedImplicitCommitTransaction, 
statementReplayCallback);
-    }
-    
     @SuppressWarnings("rawtypes")
     private boolean executeWithExecutionContext(final ShardingSphereDatabase 
database, final ExecuteCallback executeCallback, final ExecutionContext 
executionContext,
-                                                final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
-                                                final boolean 
isNeedImplicitCommitTransaction, final StatementReplayCallback 
statementReplayCallback) throws SQLException {
+                                                final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine, 
final boolean isNeedImplicitCommitTransaction,
+                                                final StatementReplayCallback 
statementReplayCallback, final StatementAddCallback statementAddCallback) 
throws SQLException {
         return isNeedImplicitCommitTransaction
-                ? executeWithImplicitCommitTransaction(() -> 
useDriverToExecute(database, executeCallback, executionContext, prepareEngine, 
statementReplayCallback), connection,
+                ? executeWithImplicitCommitTransaction(() -> 
useDriverToExecute(database, executeCallback, executionContext, prepareEngine, 
statementReplayCallback, statementAddCallback), connection,
                         database.getProtocolType())
-                : useDriverToExecute(database, executeCallback, 
executionContext, prepareEngine, statementReplayCallback);
+                : useDriverToExecute(database, executeCallback, 
executionContext, prepareEngine, statementReplayCallback, statementAddCallback);
     }
     
     @SuppressWarnings({"rawtypes", "unchecked"})
     private boolean useDriverToExecute(final ShardingSphereDatabase database, 
final ExecuteCallback callback, final ExecutionContext executionContext,
-                                       final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine, 
final StatementReplayCallback statementReplayCallback) throws SQLException {
+                                       final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
+                                       final StatementReplayCallback 
statementReplayCallback, final StatementAddCallback statementAddCallback) 
throws SQLException {
         ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = 
createExecutionGroupContext(database, executionContext, prepareEngine);
         for (ExecutionGroup<JDBCExecutionUnit> each : 
executionGroupContext.getInputGroups()) {
-            statements.addAll(getStatements(each));
-            if 
(JDBCDriverType.PREPARED_STATEMENT.equals(prepareEngine.getType())) {
-                parameterSets.addAll(getParameterSets(each));
-            }
+            statementAddCallback.add(getStatements(each), 
JDBCDriverType.PREPARED_STATEMENT.equals(prepareEngine.getType()) ? 
getParameterSets(each) : Collections.emptyList());
         }
-        statementReplayCallback.replay(statements, parameterSets);
+        statementReplayCallback.replay();
         JDBCExecutorCallback<Boolean> jdbcExecutorCallback = 
createExecuteCallback(database, callback, 
executionContext.getSqlStatementContext().getSqlStatement(), 
prepareEngine.getType());
         return regularExecutor.execute(executionGroupContext, 
executionContext.getQueryContext(), 
executionContext.getRouteContext().getRouteUnits(), jdbcExecutorCallback);
     }
@@ -461,20 +266,6 @@ public final class DriverExecutor implements AutoCloseable 
{
         }
     }
     
-    /**
-     * Clear.
-     */
-    public void clear() {
-        statements.clear();
-        parameterSets.clear();
-    }
-    
-    @Override
-    public void close() throws SQLException {
-        sqlFederationEngine.close();
-        trafficExecutor.close();
-    }
-    
     public enum ExecuteType {
         
         TRAFFIC, FEDERATION, REGULAR
diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecuteQueryExecutor.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecuteQueryExecutor.java
new file mode 100644
index 00000000000..4897c612f10
--- /dev/null
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecuteQueryExecutor.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.driver.executor;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.driver.executor.callback.ExecuteQueryCallback;
+import 
org.apache.shardingsphere.driver.executor.callback.impl.PreparedStatementExecuteQueryCallback;
+import 
org.apache.shardingsphere.driver.executor.callback.impl.StatementExecuteQueryCallback;
+import 
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
+import 
org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResultSet;
+import 
org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResultSetUtils;
+import 
org.apache.shardingsphere.driver.jdbc.core.statement.callback.StatementAddCallback;
+import 
org.apache.shardingsphere.driver.jdbc.core.statement.callback.StatementReplayCallback;
+import 
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
+import 
org.apache.shardingsphere.infra.binder.context.statement.dml.SelectStatementContext;
+import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
+import org.apache.shardingsphere.infra.connection.kernel.KernelProcessor;
+import org.apache.shardingsphere.infra.executor.audit.SQLAuditEngine;
+import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
+import 
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
+import 
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
+import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
+import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
+import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
+import 
org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
+import org.apache.shardingsphere.infra.merge.MergeEngine;
+import org.apache.shardingsphere.infra.merge.result.MergedResult;
+import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
+import org.apache.shardingsphere.infra.metadata.user.Grantee;
+import 
org.apache.shardingsphere.infra.rule.attribute.raw.RawExecutionRuleAttribute;
+import org.apache.shardingsphere.infra.session.query.QueryContext;
+import org.apache.shardingsphere.sqlfederation.engine.SQLFederationEngine;
+import 
org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext;
+import org.apache.shardingsphere.traffic.executor.TrafficExecutor;
+import org.apache.shardingsphere.traffic.executor.TrafficExecutorCallback;
+import org.apache.shardingsphere.traffic.rule.TrafficRule;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * Driver execute query executor.
+ */
+@RequiredArgsConstructor
+public final class DriverExecuteQueryExecutor {
+    
+    private final ShardingSphereConnection connection;
+    
+    private final ShardingSphereMetaData metaData;
+    
+    private final DriverJDBCExecutor regularExecutor;
+    
+    private final RawExecutor rawExecutor;
+    
+    private final TrafficExecutor trafficExecutor;
+    
+    private final SQLFederationEngine sqlFederationEngine;
+    
+    private final Collection<Statement> statements = new LinkedList<>();
+    
+    /**
+     * Execute query.
+     *
+     * @param database database
+     * @param queryContext query context
+     * @param prepareEngine prepare engine
+     * @param statement statement
+     * @param columnLabelAndIndexMap column label and index map
+     * @param statementReplayCallback statement replay callback
+     * @param statementAddCallback statement add callback
+     * @return result set
+     * @throws SQLException SQL exception
+     */
+    @SuppressWarnings("rawtypes")
+    public ResultSet executeQuery(final ShardingSphereDatabase database, final 
QueryContext queryContext,
+                                  final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine, 
final Statement statement, final Map<String, Integer> columnLabelAndIndexMap,
+                                  final StatementReplayCallback 
statementReplayCallback, final StatementAddCallback statementAddCallback) 
throws SQLException {
+        statements.clear();
+        Optional<String> trafficInstanceId = 
connection.getTrafficInstanceId(metaData.getGlobalRuleMetaData().getSingleRule(TrafficRule.class),
 queryContext);
+        if (trafficInstanceId.isPresent()) {
+            return trafficExecutor.execute(
+                    connection.getProcessId(), database.getName(), 
trafficInstanceId.get(), queryContext, prepareEngine, 
getTrafficExecuteQueryCallback(prepareEngine.getType()));
+        }
+        if (sqlFederationEngine.decide(queryContext.getSqlStatementContext(), 
queryContext.getParameters(), database, metaData.getGlobalRuleMetaData())) {
+            return sqlFederationEngine.executeQuery(
+                    prepareEngine, getExecuteQueryCallback(database, 
queryContext, prepareEngine.getType()), new SQLFederationContext(false, 
queryContext, metaData, connection.getProcessId()));
+        }
+        List<QueryResult> queryResults = executePushDownQuery(database, 
queryContext, prepareEngine, statementReplayCallback, statementAddCallback);
+        MergedResult mergedResult = mergeQuery(database, queryResults, 
queryContext.getSqlStatementContext());
+        boolean selectContainsEnhancedTable = 
queryContext.getSqlStatementContext() instanceof SelectStatementContext
+                && ((SelectStatementContext) 
queryContext.getSqlStatementContext()).isContainsEnhancedTable();
+        List<ResultSet> resultSets = getResultSets();
+        return new ShardingSphereResultSet(resultSets, mergedResult, 
statement, selectContainsEnhancedTable, queryContext.getSqlStatementContext(),
+                null == columnLabelAndIndexMap
+                        ? 
ShardingSphereResultSetUtils.createColumnLabelAndIndexMap(queryContext.getSqlStatementContext(),
 selectContainsEnhancedTable, resultSets.get(0).getMetaData())
+                        : columnLabelAndIndexMap);
+    }
+    
+    private TrafficExecutorCallback<ResultSet> 
getTrafficExecuteQueryCallback(final String jdbcDriverType) {
+        return JDBCDriverType.STATEMENT.equals(jdbcDriverType) ? ((sql, 
statement) -> statement.executeQuery(sql)) : ((sql, statement) -> 
((PreparedStatement) statement).executeQuery());
+    }
+    
+    private ExecuteQueryCallback getExecuteQueryCallback(final 
ShardingSphereDatabase database, final QueryContext queryContext, final String 
jdbcDriverType) {
+        return JDBCDriverType.STATEMENT.equals(jdbcDriverType)
+                ? new 
StatementExecuteQueryCallback(database.getProtocolType(), 
database.getResourceMetaData(),
+                        
queryContext.getSqlStatementContext().getSqlStatement(), 
SQLExecutorExceptionHandler.isExceptionThrown())
+                : new 
PreparedStatementExecuteQueryCallback(database.getProtocolType(), 
database.getResourceMetaData(),
+                        
queryContext.getSqlStatementContext().getSqlStatement(), 
SQLExecutorExceptionHandler.isExceptionThrown());
+    }
+    
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    private List<QueryResult> executePushDownQuery(final 
ShardingSphereDatabase database, final QueryContext queryContext,
+                                                   final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
+                                                   final 
StatementReplayCallback statementReplayCallback, final StatementAddCallback 
statementAddCallback) throws SQLException {
+        ExecutionContext executionContext = createExecutionContext(database, 
queryContext);
+        if (hasRawExecutionRule(database)) {
+            return rawExecutor.execute(
+                    createRawExecutionGroupContext(database, 
executionContext), queryContext, new 
RawSQLExecutorCallback()).stream().map(QueryResult.class::cast).collect(Collectors.toList());
+        }
+        ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = 
prepareEngine.prepare(executionContext.getRouteContext(), 
executionContext.getExecutionUnits(),
+                new ExecutionGroupReportContext(connection.getProcessId(), 
database.getName(), new Grantee("", "")));
+        for (ExecutionGroup<JDBCExecutionUnit> each : 
executionGroupContext.getInputGroups()) {
+            Collection<Statement> statements = getStatements(each);
+            this.statements.addAll(statements);
+            statementAddCallback.add(statements, 
JDBCDriverType.PREPARED_STATEMENT.equals(prepareEngine.getType()) ? 
getParameterSets(each) : Collections.emptyList());
+        }
+        statementReplayCallback.replay();
+        return regularExecutor.executeQuery(executionGroupContext, 
queryContext, getExecuteQueryCallback(database, queryContext, 
prepareEngine.getType()));
+    }
+    
+    private boolean hasRawExecutionRule(final ShardingSphereDatabase database) 
{
+        return 
!database.getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty();
+    }
+    
+    private Collection<Statement> getStatements(final 
ExecutionGroup<JDBCExecutionUnit> executionGroup) {
+        Collection<Statement> result = new LinkedList<>();
+        for (JDBCExecutionUnit each : executionGroup.getInputs()) {
+            result.add(each.getStorageResource());
+        }
+        return result;
+    }
+    
+    private Collection<List<Object>> getParameterSets(final 
ExecutionGroup<JDBCExecutionUnit> executionGroup) {
+        Collection<List<Object>> result = new LinkedList<>();
+        for (JDBCExecutionUnit each : executionGroup.getInputs()) {
+            result.add(each.getExecutionUnit().getSqlUnit().getParameters());
+        }
+        return result;
+    }
+    
+    private ExecutionContext createExecutionContext(final 
ShardingSphereDatabase database, final QueryContext queryContext) {
+        RuleMetaData globalRuleMetaData = metaData.getGlobalRuleMetaData();
+        SQLAuditEngine.audit(queryContext, globalRuleMetaData, database);
+        return new KernelProcessor().generateExecutionContext(queryContext, 
database, globalRuleMetaData, metaData.getProps(), 
connection.getDatabaseConnectionManager().getConnectionContext());
+    }
+    
+    private ExecutionGroupContext<RawSQLExecutionUnit> 
createRawExecutionGroupContext(final ShardingSphereDatabase database, final 
ExecutionContext executionContext) throws SQLException {
+        int maxConnectionsSizePerQuery = 
metaData.getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
+        return new RawExecutionPrepareEngine(maxConnectionsSizePerQuery, 
database.getRuleMetaData().getRules()).prepare(
+                executionContext.getRouteContext(), 
executionContext.getExecutionUnits(), new 
ExecutionGroupReportContext(connection.getProcessId(), database.getName(), new 
Grantee("", "")));
+    }
+    
+    private MergedResult mergeQuery(final ShardingSphereDatabase database, 
final List<QueryResult> queryResults, final SQLStatementContext 
sqlStatementContext) throws SQLException {
+        MergeEngine mergeEngine = new 
MergeEngine(metaData.getGlobalRuleMetaData(), database, metaData.getProps(), 
connection.getDatabaseConnectionManager().getConnectionContext());
+        return mergeEngine.merge(queryResults, sqlStatementContext);
+    }
+    
+    private List<ResultSet> getResultSets() throws SQLException {
+        List<ResultSet> result = new ArrayList<>(statements.size());
+        for (Statement each : statements) {
+            if (null != each.getResultSet()) {
+                result.add(each.getResultSet());
+            }
+        }
+        return result;
+    }
+}
diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecuteUpdateExecutor.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecuteUpdateExecutor.java
new file mode 100644
index 00000000000..7592dcab94d
--- /dev/null
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecuteUpdateExecutor.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.driver.executor;
+
+import lombok.RequiredArgsConstructor;
+import 
org.apache.shardingsphere.driver.executor.callback.ExecuteUpdateCallback;
+import 
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
+import 
org.apache.shardingsphere.driver.jdbc.core.statement.callback.StatementAddCallback;
+import 
org.apache.shardingsphere.driver.jdbc.core.statement.callback.StatementReplayCallback;
+import 
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
+import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
+import org.apache.shardingsphere.infra.connection.kernel.KernelProcessor;
+import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
+import 
org.apache.shardingsphere.infra.exception.dialect.SQLExceptionTransformEngine;
+import org.apache.shardingsphere.infra.executor.audit.SQLAuditEngine;
+import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
+import 
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
+import 
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
+import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.result.update.UpdateResult;
+import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
+import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
+import 
org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
+import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.metadata.user.Grantee;
+import 
org.apache.shardingsphere.infra.rule.attribute.raw.RawExecutionRuleAttribute;
+import org.apache.shardingsphere.infra.session.query.QueryContext;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
+import 
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.DMLStatement;
+import 
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.SelectStatement;
+import org.apache.shardingsphere.traffic.executor.TrafficExecutor;
+import org.apache.shardingsphere.traffic.rule.TrafficRule;
+import org.apache.shardingsphere.transaction.api.TransactionType;
+import 
org.apache.shardingsphere.transaction.implicit.ImplicitTransactionCallback;
+import org.apache.shardingsphere.transaction.rule.TransactionRule;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Driver execute update executor.
+ */
+@RequiredArgsConstructor
+public final class DriverExecuteUpdateExecutor {
+    
+    private final ShardingSphereConnection connection;
+    
+    private final ShardingSphereMetaData metaData;
+    
+    private final DriverJDBCExecutor regularExecutor;
+    
+    private final RawExecutor rawExecutor;
+    
+    private final TrafficExecutor trafficExecutor;
+    
+    /**
+     * Execute update.
+     *
+     * @param database database
+     * @param queryContext query context
+     * @param prepareEngine prepare engine
+     * @param updateCallback update callback
+     * @param statementReplayCallback statement replay callback
+     * @param statementAddCallback statement add callback
+     * @return updated row count
+     * @throws SQLException SQL exception
+     */
+    @SuppressWarnings("rawtypes")
+    public int executeUpdate(final ShardingSphereDatabase database, final 
QueryContext queryContext,
+                             final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
+                             final ExecuteUpdateCallback updateCallback, final 
StatementReplayCallback statementReplayCallback, final StatementAddCallback 
statementAddCallback) throws SQLException {
+        Optional<String> trafficInstanceId = 
connection.getTrafficInstanceId(metaData.getGlobalRuleMetaData().getSingleRule(TrafficRule.class),
 queryContext);
+        if (trafficInstanceId.isPresent()) {
+            return trafficExecutor.execute(connection.getProcessId(), 
database.getName(), trafficInstanceId.get(), queryContext, prepareEngine, 
updateCallback::executeUpdate);
+        }
+        ExecutionContext executionContext = createExecutionContext(database, 
queryContext);
+        return hasRawExecutionRule(database)
+                ? 
accumulate(rawExecutor.execute(createRawExecutionGroupContext(database, 
executionContext), queryContext, new RawSQLExecutorCallback()))
+                : executeUpdate(database, updateCallback, 
queryContext.getSqlStatementContext(), executionContext, prepareEngine, 
isNeedImplicitCommitTransaction(
+                        connection, 
queryContext.getSqlStatementContext().getSqlStatement(), 
executionContext.getExecutionUnits().size() > 1), statementReplayCallback, 
statementAddCallback);
+    }
+    
+    @SuppressWarnings("rawtypes")
+    private int executeUpdate(final ShardingSphereDatabase database, final 
ExecuteUpdateCallback updateCallback, final SQLStatementContext 
sqlStatementContext, final ExecutionContext executionContext,
+                              final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine, 
final boolean isNeedImplicitCommitTransaction,
+                              final StatementReplayCallback 
statementReplayCallback, final StatementAddCallback statementAddCallback) 
throws SQLException {
+        return isNeedImplicitCommitTransaction
+                ? executeWithImplicitCommitTransaction(() -> 
useDriverToExecuteUpdate(
+                        database, updateCallback, sqlStatementContext, 
executionContext, prepareEngine, statementReplayCallback, 
statementAddCallback), connection, database.getProtocolType())
+                : useDriverToExecuteUpdate(database, updateCallback, 
sqlStatementContext, executionContext, prepareEngine, statementReplayCallback, 
statementAddCallback);
+    }
+    
+    private boolean hasRawExecutionRule(final ShardingSphereDatabase database) 
{
+        return 
!database.getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty();
+    }
+    
+    private Collection<Statement> getStatements(final 
ExecutionGroup<JDBCExecutionUnit> executionGroup) {
+        Collection<Statement> result = new LinkedList<>();
+        for (JDBCExecutionUnit each : executionGroup.getInputs()) {
+            result.add(each.getStorageResource());
+        }
+        return result;
+    }
+    
+    private Collection<List<Object>> getParameterSets(final 
ExecutionGroup<JDBCExecutionUnit> executionGroup) {
+        Collection<List<Object>> result = new LinkedList<>();
+        for (JDBCExecutionUnit each : executionGroup.getInputs()) {
+            result.add(each.getExecutionUnit().getSqlUnit().getParameters());
+        }
+        return result;
+    }
+    
+    private ExecutionContext createExecutionContext(final 
ShardingSphereDatabase database, final QueryContext queryContext) {
+        SQLAuditEngine.audit(queryContext, metaData.getGlobalRuleMetaData(), 
database);
+        return new KernelProcessor().generateExecutionContext(
+                queryContext, database, metaData.getGlobalRuleMetaData(), 
metaData.getProps(), 
connection.getDatabaseConnectionManager().getConnectionContext());
+    }
+    
+    private ExecutionGroupContext<RawSQLExecutionUnit> 
createRawExecutionGroupContext(final ShardingSphereDatabase database, final 
ExecutionContext executionContext) throws SQLException {
+        int maxConnectionsSizePerQuery = 
metaData.getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
+        return new RawExecutionPrepareEngine(maxConnectionsSizePerQuery, 
database.getRuleMetaData().getRules()).prepare(
+                executionContext.getRouteContext(), 
executionContext.getExecutionUnits(), new 
ExecutionGroupReportContext(connection.getProcessId(), database.getName(), new 
Grantee("", "")));
+    }
+    
+    private <T> T executeWithImplicitCommitTransaction(final 
ImplicitTransactionCallback<T> callback, final Connection connection, final 
DatabaseType databaseType) throws SQLException {
+        T result;
+        try {
+            connection.setAutoCommit(false);
+            result = callback.execute();
+            connection.commit();
+            // CHECKSTYLE:OFF
+        } catch (final Exception ex) {
+            // CHECKSTYLE:ON
+            connection.rollback();
+            throw SQLExceptionTransformEngine.toSQLException(ex, databaseType);
+        } finally {
+            connection.setAutoCommit(true);
+        }
+        return result;
+    }
+    
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    private int useDriverToExecuteUpdate(final ShardingSphereDatabase 
database, final ExecuteUpdateCallback updateCallback, final SQLStatementContext 
sqlStatementContext,
+                                         final ExecutionContext 
executionContext, final DriverExecutionPrepareEngine<JDBCExecutionUnit, 
Connection> prepareEngine,
+                                         final StatementReplayCallback 
statementReplayCallback, final StatementAddCallback statementAddCallback) 
throws SQLException {
+        ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = 
createExecutionGroupContext(database, executionContext, prepareEngine);
+        for (ExecutionGroup<JDBCExecutionUnit> each : 
executionGroupContext.getInputGroups()) {
+            statementAddCallback.add(getStatements(each), 
JDBCDriverType.PREPARED_STATEMENT.equals(prepareEngine.getType()) ? 
getParameterSets(each) : Collections.emptyList());
+        }
+        statementReplayCallback.replay();
+        JDBCExecutorCallback<Integer> callback = 
createExecuteUpdateCallback(database, updateCallback, sqlStatementContext, 
prepareEngine.getType());
+        return regularExecutor.executeUpdate(executionGroupContext, 
executionContext.getQueryContext(), 
executionContext.getRouteContext().getRouteUnits(), callback);
+    }
+    
+    private ExecutionGroupContext<JDBCExecutionUnit> 
createExecutionGroupContext(final ShardingSphereDatabase database, final 
ExecutionContext executionContext,
+                                                                               
  final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> 
prepareEngine) throws SQLException {
+        return prepareEngine.prepare(executionContext.getRouteContext(), 
executionContext.getExecutionUnits(),
+                new ExecutionGroupReportContext(connection.getProcessId(), 
database.getName(), new Grantee("", "")));
+    }
+    
+    private JDBCExecutorCallback<Integer> createExecuteUpdateCallback(final 
ShardingSphereDatabase database,
+                                                                      final 
ExecuteUpdateCallback updateCallback, final SQLStatementContext 
sqlStatementContext, final String jdbcDriverType) {
+        boolean isExceptionThrown = 
SQLExecutorExceptionHandler.isExceptionThrown();
+        return new JDBCExecutorCallback<Integer>(database.getProtocolType(), 
database.getResourceMetaData(), sqlStatementContext.getSqlStatement(), 
isExceptionThrown) {
+            
+            @Override
+            protected Integer executeSQL(final String sql, final Statement 
statement, final ConnectionMode connectionMode, final DatabaseType storageType) 
throws SQLException {
+                return JDBCDriverType.STATEMENT.equals(jdbcDriverType) ? 
updateCallback.executeUpdate(sql, statement) : ((PreparedStatement) 
statement).executeUpdate();
+            }
+            
+            @Override
+            protected Optional<Integer> getSaneResult(final SQLStatement 
sqlStatement, final SQLException ex) {
+                return Optional.empty();
+            }
+        };
+    }
+    
+    private int accumulate(final Collection<ExecuteResult> results) {
+        int result = 0;
+        for (ExecuteResult each : results) {
+            result += ((UpdateResult) each).getUpdateCount();
+        }
+        return result;
+    }
+    
+    private boolean isNeedImplicitCommitTransaction(final 
ShardingSphereConnection connection, final SQLStatement sqlStatement, final 
boolean multiExecutionUnits) {
+        if (!connection.getAutoCommit()) {
+            return false;
+        }
+        TransactionType transactionType = 
connection.getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getSingleRule(TransactionRule.class).getDefaultType();
+        boolean isInTransaction = 
connection.getDatabaseConnectionManager().getConnectionTransaction().isInTransaction();
+        if (!TransactionType.isDistributedTransaction(transactionType) || 
isInTransaction) {
+            return false;
+        }
+        return isWriteDMLStatement(sqlStatement) && multiExecutionUnits;
+    }
+    
+    private boolean isWriteDMLStatement(final SQLStatement sqlStatement) {
+        return sqlStatement instanceof DMLStatement && !(sqlStatement 
instanceof SelectStatement);
+    }
+}
diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutorFacade.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutorFacade.java
new file mode 100644
index 00000000000..376fa5de3e2
--- /dev/null
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutorFacade.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.driver.executor;
+
+import lombok.Getter;
+import 
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
+import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
+import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import org.apache.shardingsphere.sqlfederation.engine.SQLFederationEngine;
+import org.apache.shardingsphere.traffic.executor.TrafficExecutor;
+
+import java.sql.SQLException;
+
+/**
+ * Driver executor facade.
+ */
+public final class DriverExecutorFacade implements AutoCloseable {
+    
+    private final TrafficExecutor trafficExecutor;
+    
+    private final SQLFederationEngine sqlFederationEngine;
+    
+    @Getter
+    private final DriverExecuteQueryExecutor queryExecutor;
+    
+    @Getter
+    private final DriverExecuteUpdateExecutor updateExecutor;
+    
+    @Getter
+    private final DriverExecuteExecutor executeExecutor;
+    
+    public DriverExecutorFacade(final ShardingSphereConnection connection) {
+        JDBCExecutor jdbcExecutor = new 
JDBCExecutor(connection.getContextManager().getExecutorEngine(), 
connection.getDatabaseConnectionManager().getConnectionContext());
+        DriverJDBCExecutor regularExecutor = new 
DriverJDBCExecutor(connection.getDatabaseName(), 
connection.getContextManager(), jdbcExecutor);
+        RawExecutor rawExecutor = new 
RawExecutor(connection.getContextManager().getExecutorEngine(), 
connection.getDatabaseConnectionManager().getConnectionContext());
+        trafficExecutor = new TrafficExecutor();
+        ShardingSphereMetaData metaData = 
connection.getContextManager().getMetaDataContexts().getMetaData();
+        String schemaName = new 
DatabaseTypeRegistry(metaData.getDatabase(connection.getDatabaseName()).getProtocolType()).getDefaultSchemaName(connection.getDatabaseName());
+        sqlFederationEngine = new 
SQLFederationEngine(connection.getDatabaseName(), schemaName, metaData, 
connection.getContextManager().getMetaDataContexts().getStatistics(), 
jdbcExecutor);
+        queryExecutor = new DriverExecuteQueryExecutor(connection, metaData, 
regularExecutor, rawExecutor, trafficExecutor, sqlFederationEngine);
+        updateExecutor = new DriverExecuteUpdateExecutor(connection, metaData, 
regularExecutor, rawExecutor, trafficExecutor);
+        executeExecutor = new DriverExecuteExecutor(connection, metaData, 
regularExecutor, rawExecutor, trafficExecutor, sqlFederationEngine);
+    }
+    
+    @Override
+    public void close() throws SQLException {
+        trafficExecutor.close();
+        sqlFederationEngine.close();
+    }
+}
diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
index 7f52c2fbf47..c3e0185a3fc 100644
--- 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
@@ -19,7 +19,6 @@ package org.apache.shardingsphere.driver.jdbc.adapter;
 
 import lombok.AccessLevel;
 import lombok.Getter;
-import org.apache.shardingsphere.driver.executor.DriverExecutor;
 import 
org.apache.shardingsphere.driver.jdbc.adapter.executor.ForceExecuteTemplate;
 import 
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
 import org.apache.shardingsphere.driver.jdbc.core.statement.StatementManager;
@@ -67,8 +66,6 @@ public abstract class AbstractStatementAdapter extends 
WrapperAdapter implements
     
     protected abstract Collection<? extends Statement> getRoutedStatements();
     
-    protected abstract DriverExecutor getExecutor();
-    
     protected abstract StatementManager getStatementManager();
     
     @SuppressWarnings({"unchecked", "rawtypes"})
@@ -225,9 +222,7 @@ public abstract class AbstractStatementAdapter extends 
WrapperAdapter implements
         closed = true;
         try {
             forceExecuteTemplate.execute((Collection) getRoutedStatements(), 
Statement::close);
-            if (null != getExecutor()) {
-                getExecutor().close();
-            }
+            closeExecutor();
             if (null != getStatementManager()) {
                 getStatementManager().close();
             }
@@ -235,4 +230,6 @@ public abstract class AbstractStatementAdapter extends 
WrapperAdapter implements
             getRoutedStatements().clear();
         }
     }
+    
+    protected abstract void closeExecutor() throws SQLException;
 }
diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
index aeb8ccbd569..49cf8046d88 100644
--- 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.driver.jdbc.core.statement;
 
 import lombok.AccessLevel;
 import lombok.Getter;
-import org.apache.shardingsphere.driver.executor.DriverExecutor;
+import org.apache.shardingsphere.driver.executor.DriverExecutorFacade;
 import org.apache.shardingsphere.driver.executor.batch.BatchExecutionUnit;
 import 
org.apache.shardingsphere.driver.executor.batch.BatchPreparedStatementExecutor;
 import 
org.apache.shardingsphere.driver.jdbc.adapter.AbstractPreparedStatementAdapter;
@@ -27,6 +27,7 @@ import 
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConne
 import 
org.apache.shardingsphere.driver.jdbc.core.resultset.GeneratedKeysResultSet;
 import 
org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResultSet;
 import 
org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResultSetUtils;
+import 
org.apache.shardingsphere.driver.jdbc.core.statement.callback.StatementAddCallback;
 import 
org.apache.shardingsphere.driver.jdbc.core.statement.metadata.ShardingSphereParameterMetaData;
 import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
 import org.apache.shardingsphere.infra.binder.context.aware.ParameterAware;
@@ -101,7 +102,9 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
     
     private final String sql;
     
-    private final List<PreparedStatement> statements;
+    private final List<PreparedStatement> statements = new ArrayList<>();
+    
+    private final List<List<Object>> parameterSets = new ArrayList<>();
     
     private final SQLStatementContext sqlStatementContext;
     
@@ -112,8 +115,7 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
     @Getter
     private final ParameterMetaData parameterMetaData;
     
-    @Getter(AccessLevel.PROTECTED)
-    private final DriverExecutor executor;
+    private final DriverExecutorFacade driverExecutorFacade;
     
     private final BatchPreparedStatementExecutor 
batchPreparedStatementExecutor;
     
@@ -169,7 +171,6 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
         metaData = 
connection.getContextManager().getMetaDataContexts().getMetaData();
         hintValueContext = SQLHintUtils.extractHint(sql);
         this.sql = SQLHintUtils.removeHint(sql);
-        statements = new ArrayList<>();
         SQLParserRule sqlParserRule = 
metaData.getGlobalRuleMetaData().getSingleRule(SQLParserRule.class);
         SQLParserEngine sqlParserEngine = 
sqlParserRule.getSQLParserEngine(metaData.getDatabase(connection.getDatabaseName()).getProtocolType());
         SQLStatement sqlStatement = sqlParserEngine.parse(this.sql, true);
@@ -178,7 +179,7 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
         
connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName);
         parameterMetaData = new ShardingSphereParameterMetaData(sqlStatement);
         statementOption = returnGeneratedKeys ? new StatementOption(true, 
columns) : new StatementOption(resultSetType, resultSetConcurrency, 
resultSetHoldability);
-        executor = new DriverExecutor(connection);
+        driverExecutorFacade = new DriverExecutorFacade(connection);
         ShardingSphereDatabase database = metaData.getDatabase(databaseName);
         JDBCExecutor jdbcExecutor = new 
JDBCExecutor(connection.getContextManager().getExecutorEngine(), 
connection.getDatabaseConnectionManager().getConnectionContext());
         batchPreparedStatementExecutor = new 
BatchPreparedStatementExecutor(database, jdbcExecutor, 
connection.getProcessId());
@@ -205,14 +206,11 @@ public final class ShardingSpherePreparedStatement 
extends AbstractPreparedState
             
handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement());
             ShardingSphereDatabase database = 
metaData.getDatabase(databaseName);
             findGeneratedKey().ifPresent(optional -> 
generatedValues.addAll(optional.getGeneratedValues()));
-            currentResultSet = executor.executeQuery(database, queryContext, 
createDriverExecutionPrepareEngine(database), this, columnLabelAndIndexMap,
-                    (StatementReplayCallback<PreparedStatement>) this::replay);
+            currentResultSet = 
driverExecutorFacade.getQueryExecutor().executeQuery(database, queryContext, 
createDriverExecutionPrepareEngine(database), this, columnLabelAndIndexMap,
+                    this::replay, (StatementAddCallback<PreparedStatement>) 
this::addStatements);
             if (currentResultSet instanceof ShardingSphereResultSet) {
                 columnLabelAndIndexMap = ((ShardingSphereResultSet) 
currentResultSet).getColumnLabelAndIndexMap();
             }
-            for (Statement each : executor.getStatements()) {
-                statements.add((PreparedStatement) each);
-            }
             return currentResultSet;
             // CHECKSTYLE:OFF
         } catch (final RuntimeException ex) {
@@ -231,6 +229,11 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
         }
     }
     
+    private void addStatements(final Collection<PreparedStatement> statements, 
final Collection<List<Object>> parameterSets) {
+        this.statements.addAll(statements);
+        this.parameterSets.addAll(parameterSets);
+    }
+    
     private void resetParameters() throws SQLException {
         replaySetParameter(statements, 
Collections.singletonList(getParameters()));
     }
@@ -252,11 +255,8 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
             QueryContext queryContext = createQueryContext();
             
handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement());
             ShardingSphereDatabase database = 
metaData.getDatabase(databaseName);
-            final int result = executor.executeUpdate(database, queryContext, 
createDriverExecutionPrepareEngine(database),
-                    (sql, statement) -> ((PreparedStatement) 
statement).executeUpdate(), (StatementReplayCallback<PreparedStatement>) 
this::replay);
-            for (Statement each : executor.getStatements()) {
-                statements.add((PreparedStatement) each);
-            }
+            int result = 
driverExecutorFacade.getUpdateExecutor().executeUpdate(database, queryContext, 
createDriverExecutionPrepareEngine(database),
+                    (sql, statement) -> ((PreparedStatement) 
statement).executeUpdate(), this::replay, 
(StatementAddCallback<PreparedStatement>) this::addStatements);
             findGeneratedKey().ifPresent(optional -> 
generatedValues.addAll(optional.getGeneratedValues()));
             return result;
             // CHECKSTYLE:OFF
@@ -280,11 +280,9 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
             QueryContext queryContext = createQueryContext();
             
handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement());
             ShardingSphereDatabase database = 
metaData.getDatabase(databaseName);
-            final boolean result = executor.execute(database, queryContext, 
createDriverExecutionPrepareEngine(database), (sql, statement) -> 
((PreparedStatement) statement).execute(),
-                    (StatementReplayCallback<PreparedStatement>) this::replay);
-            for (Statement each : executor.getStatements()) {
-                statements.add((PreparedStatement) each);
-            }
+            boolean result = driverExecutorFacade.getExecuteExecutor().execute(
+                    database, queryContext, 
createDriverExecutionPrepareEngine(database), (sql, statement) -> 
((PreparedStatement) statement).execute(),
+                    this::replay, (StatementAddCallback<PreparedStatement>) 
this::addStatements);
             findGeneratedKey().ifPresent(optional -> 
generatedValues.addAll(optional.getGeneratedValues()));
             return result;
             // CHECKSTYLE:OFF
@@ -302,7 +300,7 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
         if (null != currentResultSet) {
             return currentResultSet;
         }
-        Optional<ResultSet> resultSet = executor.getResultSet();
+        Optional<ResultSet> resultSet = 
driverExecutorFacade.getExecuteExecutor().getResultSet();
         if (resultSet.isPresent()) {
             return resultSet.get();
         }
@@ -354,7 +352,7 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
         return mergeEngine.merge(queryResults, sqlStatementContext);
     }
     
-    private void replay(final List<PreparedStatement> statements, final 
List<List<Object>> parameterSets) throws SQLException {
+    private void replay() throws SQLException {
         replaySetParameter(statements, parameterSets);
         for (Statement each : statements) {
             getMethodInvocationRecorder().replay(each);
@@ -370,8 +368,8 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
     private void clearPrevious() {
         currentResultSet = null;
         statements.clear();
+        parameterSets.clear();
         generatedValues.clear();
-        executor.clear();
     }
     
     private Optional<GeneratedKeyContext> findGeneratedKey() {
@@ -520,4 +518,9 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
     public Collection<PreparedStatement> getRoutedStatements() {
         return statements;
     }
+    
+    @Override
+    protected void closeExecutor() throws SQLException {
+        driverExecutorFacade.close();
+    }
 }
diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
index b31ff18bcad..a0b4c90b698 100644
--- 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.driver.jdbc.core.statement;
 
 import lombok.AccessLevel;
 import lombok.Getter;
-import org.apache.shardingsphere.driver.executor.DriverExecutor;
+import org.apache.shardingsphere.driver.executor.DriverExecutorFacade;
 import org.apache.shardingsphere.driver.executor.batch.BatchStatementExecutor;
 import org.apache.shardingsphere.driver.executor.callback.ExecuteCallback;
 import 
org.apache.shardingsphere.driver.executor.callback.ExecuteUpdateCallback;
@@ -27,6 +27,7 @@ import 
org.apache.shardingsphere.driver.jdbc.adapter.AbstractStatementAdapter;
 import 
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
 import 
org.apache.shardingsphere.driver.jdbc.core.resultset.GeneratedKeysResultSet;
 import 
org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResultSet;
+import 
org.apache.shardingsphere.driver.jdbc.core.statement.callback.StatementAddCallback;
 import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
 import 
org.apache.shardingsphere.infra.binder.context.segment.insert.keygen.GeneratedKeyContext;
 import 
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
@@ -84,8 +85,7 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
     
     private final StatementOption statementOption;
     
-    @Getter(AccessLevel.PROTECTED)
-    private final DriverExecutor executor;
+    private final DriverExecutorFacade driverExecutorFacade;
     
     @Getter(AccessLevel.PROTECTED)
     private final StatementManager statementManager;
@@ -113,7 +113,7 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
         metaData = 
connection.getContextManager().getMetaDataContexts().getMetaData();
         statements = new LinkedList<>();
         statementOption = new StatementOption(resultSetType, 
resultSetConcurrency, resultSetHoldability);
-        executor = new DriverExecutor(connection);
+        driverExecutorFacade = new DriverExecutorFacade(connection);
         statementManager = new StatementManager();
         batchStatementExecutor = new BatchStatementExecutor(this);
         databaseName = connection.getDatabaseName();
@@ -125,9 +125,8 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
         try {
             prepareExecute(queryContext);
             ShardingSphereDatabase database = 
metaData.getDatabase(databaseName);
-            currentResultSet = executor.executeQuery(database, queryContext, 
createDriverExecutionPrepareEngine(database), this, null,
-                    (StatementReplayCallback<Statement>) (statements, 
parameterSets) -> replay(statements));
-            statements.addAll(executor.getStatements());
+            currentResultSet = 
driverExecutorFacade.getQueryExecutor().executeQuery(database, queryContext, 
createDriverExecutionPrepareEngine(database), this, null,
+                    this::replay, (StatementAddCallback<Statement>) 
(statements, parameterSets) -> this.statements.addAll(statements));
             return currentResultSet;
             // CHECKSTYLE:OFF
         } catch (final RuntimeException ex) {
@@ -197,10 +196,8 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
         QueryContext queryContext = createQueryContext(sql);
         prepareExecute(queryContext);
         ShardingSphereDatabase database = metaData.getDatabase(databaseName);
-        int result = executor.executeUpdate(database, queryContext, 
createDriverExecutionPrepareEngine(database), updateCallback,
-                (StatementReplayCallback<Statement>) (statements, 
parameterSets) -> replay(statements));
-        statements.addAll(executor.getStatements());
-        return result;
+        return 
driverExecutorFacade.getUpdateExecutor().executeUpdate(database, queryContext, 
createDriverExecutionPrepareEngine(database),
+                updateCallback, this::replay, 
(StatementAddCallback<Statement>) (statements, parameterSets) -> 
this.statements.addAll(statements));
     }
     
     @Override
@@ -261,10 +258,8 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
         QueryContext queryContext = createQueryContext(sql);
         prepareExecute(queryContext);
         ShardingSphereDatabase database = metaData.getDatabase(databaseName);
-        boolean result = executor.execute(database, queryContext, 
createDriverExecutionPrepareEngine(database),
-                executeCallback, (StatementReplayCallback<Statement>) 
(statements, parameterSets) -> replay(statements));
-        statements.addAll(executor.getStatements());
-        return result;
+        return driverExecutorFacade.getExecuteExecutor().execute(database, 
queryContext, createDriverExecutionPrepareEngine(database), executeCallback,
+                this::replay, (StatementAddCallback<Statement>) (statements, 
parameterSets) -> this.statements.addAll(statements));
     }
     
     private QueryContext createQueryContext(final String originSQL) throws 
SQLException {
@@ -296,7 +291,6 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
             each.close();
         }
         statements.clear();
-        executor.clear();
     }
     
     private DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> 
createDriverExecutionPrepareEngine(final ShardingSphereDatabase database) {
@@ -305,7 +299,7 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
                 database.getRuleMetaData().getRules(), 
database.getResourceMetaData().getStorageUnits());
     }
     
-    private void replay(final List<Statement> statements) throws SQLException {
+    private void replay() throws SQLException {
         for (Statement each : statements) {
             getMethodInvocationRecorder().replay(each);
         }
@@ -331,7 +325,7 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
         if (null != currentResultSet) {
             return currentResultSet;
         }
-        Optional<ResultSet> resultSet = executor.getResultSet();
+        Optional<ResultSet> resultSet = 
driverExecutorFacade.getExecuteExecutor().getResultSet();
         if (resultSet.isPresent()) {
             return resultSet.get();
         }
@@ -430,4 +424,9 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
         return 
DatabaseTypedSPILoader.findService(GeneratedKeyColumnProvider.class, 
metaData.getDatabase(databaseName).getProtocolType())
                 
.map(GeneratedKeyColumnProvider::getColumnName).orElse(columnName);
     }
+    
+    @Override
+    protected void closeExecutor() throws SQLException {
+        driverExecutorFacade.close();
+    }
 }
diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/StatementReplayCallback.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/callback/StatementAddCallback.java
similarity index 74%
copy from 
jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/StatementReplayCallback.java
copy to 
jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/callback/StatementAddCallback.java
index 0ad5b4456c0..70482d733cc 100644
--- 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/StatementReplayCallback.java
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/callback/StatementAddCallback.java
@@ -15,25 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.driver.jdbc.core.statement;
+package org.apache.shardingsphere.driver.jdbc.core.statement.callback;
 
-import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.Collection;
 import java.util.List;
 
 /**
- * Statement replay callback.
+ * Statement add callback.
  *
  * @param <T> type of statement
  */
-public interface StatementReplayCallback<T extends Statement> {
+public interface StatementAddCallback<T extends Statement> {
     
     /**
-     * Replay statement.
+     * Add statements and parameter sets.
      *
      * @param statements statements
      * @param parameterSets parameter sets
-     * @throws SQLException SQL exception
      */
-    void replay(List<T> statements, List<List<Object>> parameterSets) throws 
SQLException;
+    void add(Collection<T> statements, Collection<List<Object>> parameterSets);
 }
diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/StatementReplayCallback.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/callback/StatementReplayCallback.java
similarity index 69%
rename from 
jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/StatementReplayCallback.java
rename to 
jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/callback/StatementReplayCallback.java
index 0ad5b4456c0..3aefa19a73c 100644
--- 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/StatementReplayCallback.java
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/callback/StatementReplayCallback.java
@@ -15,25 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.driver.jdbc.core.statement;
+package org.apache.shardingsphere.driver.jdbc.core.statement.callback;
 
 import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.List;
 
 /**
  * Statement replay callback.
- *
- * @param <T> type of statement
  */
-public interface StatementReplayCallback<T extends Statement> {
+public interface StatementReplayCallback {
     
     /**
-     * Replay statement.
+     * Replay statements.
      *
-     * @param statements statements
-     * @param parameterSets parameter sets
      * @throws SQLException SQL exception
      */
-    void replay(List<T> statements, List<List<Object>> parameterSets) throws 
SQLException;
+    void replay() throws SQLException;
 }
diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/state/circuit/statement/CircuitBreakerPreparedStatement.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/state/circuit/statement/CircuitBreakerPreparedStatement.java
index ac096d5e97a..d270d0a9ecd 100644
--- 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/state/circuit/statement/CircuitBreakerPreparedStatement.java
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/state/circuit/statement/CircuitBreakerPreparedStatement.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.driver.state.circuit.statement;
 
-import org.apache.shardingsphere.driver.executor.DriverExecutor;
 import org.apache.shardingsphere.driver.jdbc.core.statement.StatementManager;
 import 
org.apache.shardingsphere.driver.jdbc.unsupported.AbstractUnsupportedOperationPreparedStatement;
 import 
org.apache.shardingsphere.driver.state.circuit.connection.CircuitBreakerConnection;
@@ -274,11 +273,6 @@ public final class CircuitBreakerPreparedStatement extends 
AbstractUnsupportedOp
         return Collections.emptyList();
     }
     
-    @Override
-    protected DriverExecutor getExecutor() {
-        return null;
-    }
-    
     @Override
     protected StatementManager getStatementManager() {
         return null;
@@ -293,4 +287,8 @@ public final class CircuitBreakerPreparedStatement extends 
AbstractUnsupportedOp
     public int executeUpdate() {
         return -1;
     }
+    
+    @Override
+    protected void closeExecutor() {
+    }
 }

Reply via email to