This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 6190a7036f2 Refactor DriverPushDownExecutor (#31664)
6190a7036f2 is described below

commit 6190a7036f2a74d93dc837a9a91b3e5df928ac5f
Author: Liang Zhang <[email protected]>
AuthorDate: Wed Jun 12 16:09:46 2024 +0800

    Refactor DriverPushDownExecutor (#31664)
    
    * Refactor DriverPushDownExecuteExecutor
    
    * Refactor DriverPushDownExecuteQueryExecutor
    
    * Refactor DriverPushDownExecuteUpdateExecutor
    
    * Refactor DriverPushDownExecuteUpdateExecutor
---
 .../pushdown/DriverPushDownExecuteExecutor.java    | 134 ++------------------
 .../DriverPushDownExecuteQueryExecutor.java        | 116 ++---------------
 .../DriverPushDownExecuteUpdateExecutor.java       | 137 ++-------------------
 .../DriverJDBCPushDownExecuteExecutor.java}        |  54 ++------
 .../DriverJDBCPushDownExecuteQueryExecutor.java}   |  42 +------
 .../DriverJDBCPushDownExecuteUpdateExecutor.java}  |  56 ++-------
 .../raw/DriverRawPushDownExecuteExecutor.java      |  65 ++++++++++
 .../raw/DriverRawPushDownExecuteQueryExecutor.java | 105 ++++++++++++++++
 .../DriverRawPushDownExecuteUpdateExecutor.java    |  84 +++++++++++++
 9 files changed, 307 insertions(+), 486 deletions(-)

diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/DriverPushDownExecuteExecutor.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/DriverPushDownExecuteExecutor.java
index 68ef83d8561..9e4466338ce 100644
--- 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/DriverPushDownExecuteExecutor.java
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/DriverPushDownExecuteExecutor.java
@@ -18,47 +18,26 @@
 package org.apache.shardingsphere.driver.executor.engine.pushdown;
 
 import 
org.apache.shardingsphere.driver.executor.callback.add.StatementAddCallback;
-import 
org.apache.shardingsphere.driver.executor.callback.execute.ExecuteCallbackFactory;
 import 
org.apache.shardingsphere.driver.executor.callback.execute.StatementExecuteCallback;
 import 
org.apache.shardingsphere.driver.executor.callback.replay.StatementReplayCallback;
-import 
org.apache.shardingsphere.driver.executor.engine.transaction.DriverTransactionalExecutor;
+import 
org.apache.shardingsphere.driver.executor.engine.pushdown.jdbc.DriverJDBCPushDownExecuteExecutor;
+import 
org.apache.shardingsphere.driver.executor.engine.pushdown.raw.DriverRawPushDownExecuteExecutor;
 import 
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
-import 
org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResultSet;
 import 
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
-import 
org.apache.shardingsphere.infra.binder.context.statement.dml.SelectStatementContext;
-import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
-import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
-import 
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
-import 
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
 import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
-import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
-import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
-import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
-import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.stream.JDBCStreamQueryResult;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
-import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
-import 
org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
-import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
-import org.apache.shardingsphere.infra.merge.MergeEngine;
-import org.apache.shardingsphere.infra.merge.result.MergedResult;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import org.apache.shardingsphere.infra.metadata.user.Grantee;
 import 
org.apache.shardingsphere.infra.rule.attribute.raw.RawExecutionRuleAttribute;
-import org.apache.shardingsphere.mode.metadata.refresher.MetaDataRefreshEngine;
-import 
org.apache.shardingsphere.sql.parser.sql.common.statement.dal.DALStatement;
 
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Optional;
 
@@ -67,23 +46,14 @@ import java.util.Optional;
  */
 public final class DriverPushDownExecuteExecutor {
     
-    private final ShardingSphereConnection connection;
+    private final DriverJDBCPushDownExecuteExecutor jdbcPushDownExecutor;
     
-    private final ShardingSphereMetaData metaData;
-    
-    private final Grantee grantee;
-    
-    private final JDBCExecutor jdbcExecutor;
-    
-    private final RawExecutor rawExecutor;
+    private final DriverRawPushDownExecuteExecutor rawPushDownExecutor;
     
     public DriverPushDownExecuteExecutor(final ShardingSphereConnection 
connection, final ShardingSphereMetaData metaData, final Grantee grantee,
                                          final JDBCExecutor jdbcExecutor, 
final RawExecutor rawExecutor) {
-        this.connection = connection;
-        this.metaData = metaData;
-        this.grantee = grantee;
-        this.jdbcExecutor = jdbcExecutor;
-        this.rawExecutor = rawExecutor;
+        jdbcPushDownExecutor = new 
DriverJDBCPushDownExecuteExecutor(connection, metaData, grantee, jdbcExecutor);
+        rawPushDownExecutor = new DriverRawPushDownExecuteExecutor(connection, 
metaData, grantee, rawExecutor);
     }
     
     /**
@@ -102,60 +72,8 @@ public final class DriverPushDownExecuteExecutor {
     public boolean execute(final ShardingSphereDatabase database, final 
ExecutionContext executionContext, final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
                            final StatementExecuteCallback executeCallback, 
final StatementAddCallback addCallback, final StatementReplayCallback 
replayCallback) throws SQLException {
         return 
database.getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty()
-                ? executeJDBC(database, executionContext, prepareEngine, 
executeCallback, addCallback, replayCallback)
-                : executeRaw(database, executionContext);
-    }
-    
-    @SuppressWarnings("rawtypes")
-    private boolean executeJDBC(final ShardingSphereDatabase database, final 
ExecutionContext executionContext, final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
-                                final StatementExecuteCallback 
executeCallback, final StatementAddCallback addCallback, final 
StatementReplayCallback replayCallback) throws SQLException {
-        return new DriverTransactionalExecutor(connection).execute(
-                database, executionContext, () -> doExecuteJDBC(database, 
executionContext, prepareEngine, executeCallback, addCallback, replayCallback));
-    }
-    
-    @SuppressWarnings({"rawtypes", "unchecked"})
-    private boolean doExecuteJDBC(final ShardingSphereDatabase database, final 
ExecutionContext executionContext, final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
-                                  final StatementExecuteCallback 
executeCallback, final StatementAddCallback addCallback, final 
StatementReplayCallback replayCallback) throws SQLException {
-        ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = 
prepareEngine.prepare(
-                executionContext.getRouteContext(), 
executionContext.getExecutionUnits(), new 
ExecutionGroupReportContext(connection.getProcessId(), database.getName(), 
grantee));
-        for (ExecutionGroup<JDBCExecutionUnit> each : 
executionGroupContext.getInputGroups()) {
-            addCallback.add(getStatements(each), 
JDBCDriverType.PREPARED_STATEMENT.equals(prepareEngine.getType()) ? 
getParameterSets(each) : Collections.emptyList());
-        }
-        replayCallback.replay();
-        ProcessEngine processEngine = new ProcessEngine();
-        try {
-            processEngine.executeSQL(executionGroupContext, 
executionContext.getQueryContext());
-            List<Boolean> results = jdbcExecutor.execute(executionGroupContext,
-                    new 
ExecuteCallbackFactory(prepareEngine.getType()).newInstance(database, 
executeCallback, executionContext.getSqlStatementContext().getSqlStatement()));
-            new 
MetaDataRefreshEngine(connection.getContextManager().getPersistServiceFacade().getMetaDataManagerPersistService(),
 database, metaData.getProps())
-                    
.refresh(executionContext.getQueryContext().getSqlStatementContext(), 
executionContext.getRouteContext().getRouteUnits());
-            return null != results && !results.isEmpty() && null != 
results.get(0) && results.get(0);
-        } finally {
-            
processEngine.completeSQLExecution(executionGroupContext.getReportContext().getProcessId());
-        }
-    }
-    
-    private Collection<Statement> getStatements(final 
ExecutionGroup<JDBCExecutionUnit> executionGroup) {
-        Collection<Statement> result = new LinkedList<>();
-        for (JDBCExecutionUnit each : executionGroup.getInputs()) {
-            result.add(each.getStorageResource());
-        }
-        return result;
-    }
-    
-    private Collection<List<Object>> getParameterSets(final 
ExecutionGroup<JDBCExecutionUnit> executionGroup) {
-        Collection<List<Object>> result = new LinkedList<>();
-        for (JDBCExecutionUnit each : executionGroup.getInputs()) {
-            result.add(each.getExecutionUnit().getSqlUnit().getParameters());
-        }
-        return result;
-    }
-    
-    private boolean executeRaw(final ShardingSphereDatabase database, final 
ExecutionContext executionContext) throws SQLException {
-        int maxConnectionsSizePerQuery = 
metaData.getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
-        ExecutionGroupContext<RawSQLExecutionUnit> executionGroupContext = new 
RawExecutionPrepareEngine(maxConnectionsSizePerQuery, 
database.getRuleMetaData().getRules())
-                .prepare(executionContext.getRouteContext(), 
executionContext.getExecutionUnits(), new 
ExecutionGroupReportContext(connection.getProcessId(), database.getName(), 
grantee));
-        return rawExecutor.execute(executionGroupContext, 
executionContext.getQueryContext(), new 
RawSQLExecutorCallback()).iterator().next() instanceof QueryResult;
+                ? jdbcPushDownExecutor.execute(database, executionContext, 
prepareEngine, executeCallback, addCallback, replayCallback)
+                : rawPushDownExecutor.execute(database, executionContext);
     }
     
     /**
@@ -170,38 +88,8 @@ public final class DriverPushDownExecuteExecutor {
      */
     public Optional<ResultSet> getResultSet(final ShardingSphereDatabase 
database, final SQLStatementContext sqlStatementContext,
                                             final Statement statement, final 
List<? extends Statement> statements) throws SQLException {
-        if (sqlStatementContext instanceof SelectStatementContext || 
sqlStatementContext.getSqlStatement() instanceof DALStatement) {
-            List<ResultSet> resultSets = getResultSets(statements);
-            if (resultSets.isEmpty()) {
-                return Optional.empty();
-            }
-            List<QueryResult> queryResults = getQueryResults(resultSets);
-            MergedResult mergedResult = new 
MergeEngine(metaData.getGlobalRuleMetaData(), database, metaData.getProps(), 
connection.getDatabaseConnectionManager().getConnectionContext())
-                    .merge(queryResults, sqlStatementContext);
-            boolean selectContainsEnhancedTable = sqlStatementContext 
instanceof SelectStatementContext && ((SelectStatementContext) 
sqlStatementContext).isContainsEnhancedTable();
-            return Optional.of(new ShardingSphereResultSet(resultSets, 
mergedResult, statement, selectContainsEnhancedTable, sqlStatementContext));
-        }
-        return Optional.empty();
-    }
-    
-    @SuppressWarnings("JDBCResourceOpenedButNotSafelyClosed")
-    private List<ResultSet> getResultSets(final List<? extends Statement> 
statements) throws SQLException {
-        List<ResultSet> result = new ArrayList<>(statements.size());
-        for (Statement each : statements) {
-            if (null != each.getResultSet()) {
-                result.add(each.getResultSet());
-            }
-        }
-        return result;
-    }
-    
-    private List<QueryResult> getQueryResults(final List<ResultSet> 
resultSets) throws SQLException {
-        List<QueryResult> result = new ArrayList<>(resultSets.size());
-        for (ResultSet each : resultSets) {
-            if (null != each) {
-                result.add(new JDBCStreamQueryResult(each));
-            }
-        }
-        return result;
+        return 
database.getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty()
+                ? jdbcPushDownExecutor.getResultSet(database, 
sqlStatementContext, statement, statements)
+                : Optional.empty();
     }
 }
diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/DriverPushDownExecuteQueryExecutor.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/DriverPushDownExecuteQueryExecutor.java
index 820d0c2bccb..ec6c7305dd5 100644
--- 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/DriverPushDownExecuteQueryExecutor.java
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/DriverPushDownExecuteQueryExecutor.java
@@ -18,78 +18,39 @@
 package org.apache.shardingsphere.driver.executor.engine.pushdown;
 
 import 
org.apache.shardingsphere.driver.executor.callback.add.StatementAddCallback;
-import 
org.apache.shardingsphere.driver.executor.callback.execute.ExecuteQueryCallbackFactory;
 import 
org.apache.shardingsphere.driver.executor.callback.replay.StatementReplayCallback;
+import 
org.apache.shardingsphere.driver.executor.engine.pushdown.jdbc.DriverJDBCPushDownExecuteQueryExecutor;
+import 
org.apache.shardingsphere.driver.executor.engine.pushdown.raw.DriverRawPushDownExecuteQueryExecutor;
 import 
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
 import 
org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResultSet;
-import 
org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResultSetFactory;
-import 
org.apache.shardingsphere.infra.binder.context.statement.dml.SelectStatementContext;
-import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
-import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
-import org.apache.shardingsphere.infra.connection.kernel.KernelProcessor;
-import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
-import 
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
-import 
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
-import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
-import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
-import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
-import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
-import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
-import 
org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
-import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
-import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
 import org.apache.shardingsphere.infra.metadata.user.Grantee;
 import 
org.apache.shardingsphere.infra.rule.attribute.raw.RawExecutionRuleAttribute;
-import org.apache.shardingsphere.infra.session.connection.ConnectionContext;
 import org.apache.shardingsphere.infra.session.query.QueryContext;
 
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.sql.Statement;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 /**
  * Driver push down execute query executor.
  */
 public final class DriverPushDownExecuteQueryExecutor {
     
-    private final ConnectionContext connectionContext;
+    private final DriverJDBCPushDownExecuteQueryExecutor 
driverJDBCPushDownExecutor;
     
-    private final String processId;
-    
-    private final RuleMetaData globalRuleMetaData;
-    
-    private final ConfigurationProperties props;
-    
-    private final Grantee grantee;
-    
-    private final JDBCExecutor jdbcExecutor;
-    
-    private final RawExecutor rawExecutor;
-    
-    private final Collection<Statement> statements;
+    private final DriverRawPushDownExecuteQueryExecutor 
driverRawPushDownExecutor;
     
     public DriverPushDownExecuteQueryExecutor(final ShardingSphereConnection 
connection, final ShardingSphereMetaData metaData, final Grantee grantee,
                                               final JDBCExecutor jdbcExecutor, 
final RawExecutor rawExecutor) {
-        connectionContext = 
connection.getDatabaseConnectionManager().getConnectionContext();
-        processId = connection.getProcessId();
-        globalRuleMetaData = metaData.getGlobalRuleMetaData();
-        props = metaData.getProps();
-        this.grantee = grantee;
-        this.jdbcExecutor = jdbcExecutor;
-        this.rawExecutor = rawExecutor;
-        statements = new LinkedList<>();
+        driverJDBCPushDownExecutor = new 
DriverJDBCPushDownExecuteQueryExecutor(connection, metaData, grantee, 
jdbcExecutor);
+        driverRawPushDownExecutor = new 
DriverRawPushDownExecuteQueryExecutor(connection, metaData, grantee, 
rawExecutor);
     }
     
     /**
@@ -110,69 +71,8 @@ public final class DriverPushDownExecuteQueryExecutor {
                                                 final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine, 
final Statement statement,
                                                 final Map<String, Integer> 
columnLabelAndIndexMap,
                                                 final StatementAddCallback 
addCallback, final StatementReplayCallback replayCallback) throws SQLException {
-        List<QueryResult> queryResults = getQueryResults(database, 
queryContext, prepareEngine, addCallback, replayCallback);
-        boolean isContainsEnhancedTable = 
queryContext.getSqlStatementContext() instanceof SelectStatementContext
-                && ((SelectStatementContext) 
queryContext.getSqlStatementContext()).isContainsEnhancedTable();
-        return new ShardingSphereResultSetFactory(connectionContext, 
globalRuleMetaData, props, statements)
-                .newInstance(database, queryContext, queryResults, statement, 
columnLabelAndIndexMap, isContainsEnhancedTable);
-    }
-    
-    @SuppressWarnings("rawtypes")
-    private List<QueryResult> getQueryResults(final ShardingSphereDatabase 
database, final QueryContext queryContext, final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
-                                              final StatementAddCallback 
addCallback, final StatementReplayCallback replayCallback) throws SQLException {
-        statements.clear();
-        ExecutionContext executionContext = new 
KernelProcessor().generateExecutionContext(queryContext, database, 
globalRuleMetaData, props, connectionContext);
         return 
database.getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty()
-                ? getJDBCQueryResults(database, queryContext, prepareEngine, 
addCallback, replayCallback, executionContext)
-                : getRawQueryResults(database, queryContext, executionContext);
-    }
-    
-    @SuppressWarnings({"rawtypes", "unchecked"})
-    private List<QueryResult> getJDBCQueryResults(final ShardingSphereDatabase 
database, final QueryContext queryContext,
-                                                  final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
-                                                  final StatementAddCallback 
addCallback, final StatementReplayCallback replayCallback,
-                                                  final ExecutionContext 
executionContext) throws SQLException {
-        ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = 
prepareEngine.prepare(executionContext.getRouteContext(), 
executionContext.getExecutionUnits(),
-                new ExecutionGroupReportContext(processId, database.getName(), 
grantee));
-        for (ExecutionGroup<JDBCExecutionUnit> each : 
executionGroupContext.getInputGroups()) {
-            Collection<Statement> statements = getStatements(each);
-            this.statements.addAll(statements);
-            addCallback.add(statements, 
JDBCDriverType.PREPARED_STATEMENT.equals(prepareEngine.getType()) ? 
getParameterSets(each) : Collections.emptyList());
-        }
-        replayCallback.replay();
-        ProcessEngine processEngine = new ProcessEngine();
-        try {
-            processEngine.executeSQL(executionGroupContext, queryContext);
-            return jdbcExecutor.execute(executionGroupContext, new 
ExecuteQueryCallbackFactory(prepareEngine.getType()).newInstance(database, 
queryContext));
-        } finally {
-            
processEngine.completeSQLExecution(executionGroupContext.getReportContext().getProcessId());
-        }
-    }
-    
-    private Collection<Statement> getStatements(final 
ExecutionGroup<JDBCExecutionUnit> executionGroup) {
-        Collection<Statement> result = new LinkedList<>();
-        for (JDBCExecutionUnit each : executionGroup.getInputs()) {
-            result.add(each.getStorageResource());
-        }
-        return result;
-    }
-    
-    private Collection<List<Object>> getParameterSets(final 
ExecutionGroup<JDBCExecutionUnit> executionGroup) {
-        Collection<List<Object>> result = new LinkedList<>();
-        for (JDBCExecutionUnit each : executionGroup.getInputs()) {
-            result.add(each.getExecutionUnit().getSqlUnit().getParameters());
-        }
-        return result;
-    }
-    
-    private List<QueryResult> getRawQueryResults(final ShardingSphereDatabase 
database, final QueryContext queryContext, final ExecutionContext 
executionContext) throws SQLException {
-        return rawExecutor.execute(
-                createRawExecutionGroupContext(database, executionContext), 
queryContext, new 
RawSQLExecutorCallback()).stream().map(QueryResult.class::cast).collect(Collectors.toList());
-    }
-    
-    private ExecutionGroupContext<RawSQLExecutionUnit> 
createRawExecutionGroupContext(final ShardingSphereDatabase database, final 
ExecutionContext executionContext) throws SQLException {
-        int maxConnectionsSizePerQuery = 
props.<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
-        return new RawExecutionPrepareEngine(maxConnectionsSizePerQuery, 
database.getRuleMetaData().getRules()).prepare(
-                executionContext.getRouteContext(), 
executionContext.getExecutionUnits(), new 
ExecutionGroupReportContext(processId, database.getName(), grantee));
+                ? driverJDBCPushDownExecutor.executeQuery(database, 
queryContext, prepareEngine, statement, columnLabelAndIndexMap, addCallback, 
replayCallback)
+                : driverRawPushDownExecutor.executeQuery(database, 
queryContext, statement, columnLabelAndIndexMap);
     }
 }
diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/DriverPushDownExecuteUpdateExecutor.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/DriverPushDownExecuteUpdateExecutor.java
index aac6d76848e..8ca3f327d59 100644
--- 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/DriverPushDownExecuteUpdateExecutor.java
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/DriverPushDownExecuteUpdateExecutor.java
@@ -18,73 +18,37 @@
 package org.apache.shardingsphere.driver.executor.engine.pushdown;
 
 import 
org.apache.shardingsphere.driver.executor.callback.add.StatementAddCallback;
-import 
org.apache.shardingsphere.driver.executor.callback.execute.ExecuteUpdateCallbackFactory;
 import 
org.apache.shardingsphere.driver.executor.callback.execute.StatementExecuteUpdateCallback;
 import 
org.apache.shardingsphere.driver.executor.callback.replay.StatementReplayCallback;
-import 
org.apache.shardingsphere.driver.executor.engine.transaction.DriverTransactionalExecutor;
+import 
org.apache.shardingsphere.driver.executor.engine.pushdown.jdbc.DriverJDBCPushDownExecuteUpdateExecutor;
+import 
org.apache.shardingsphere.driver.executor.engine.pushdown.raw.DriverRawPushDownExecuteUpdateExecutor;
 import 
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
-import 
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
-import org.apache.shardingsphere.infra.binder.context.type.TableAvailable;
-import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
-import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
-import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
-import 
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
-import 
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
 import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
-import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
-import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
-import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
-import 
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
-import 
org.apache.shardingsphere.infra.executor.sql.execute.result.update.UpdateResult;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
-import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
-import 
org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
-import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import org.apache.shardingsphere.infra.metadata.user.Grantee;
-import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
-import 
org.apache.shardingsphere.infra.rule.attribute.datanode.DataNodeRuleAttribute;
 import 
org.apache.shardingsphere.infra.rule.attribute.raw.RawExecutionRuleAttribute;
-import org.apache.shardingsphere.mode.metadata.refresher.MetaDataRefreshEngine;
 
 import java.sql.Connection;
 import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Optional;
 
 /**
  * Driver push down execute update executor.
  */
 public final class DriverPushDownExecuteUpdateExecutor {
     
-    private final ShardingSphereConnection connection;
+    private final DriverJDBCPushDownExecuteUpdateExecutor jdbcPushDownExecutor;
     
-    private final String processId;
-    
-    private final ConfigurationProperties props;
-    
-    private final Grantee grantee;
-    
-    private final JDBCExecutor jdbcExecutor;
-    
-    private final RawExecutor rawExecutor;
+    private final DriverRawPushDownExecuteUpdateExecutor rawPushDownExecutor;
     
     public DriverPushDownExecuteUpdateExecutor(final ShardingSphereConnection 
connection, final ShardingSphereMetaData metaData, final Grantee grantee,
                                                final JDBCExecutor 
jdbcExecutor, final RawExecutor rawExecutor) {
-        this.connection = connection;
-        processId = connection.getProcessId();
-        props = metaData.getProps();
-        this.grantee = grantee;
-        this.jdbcExecutor = jdbcExecutor;
-        this.rawExecutor = rawExecutor;
+        jdbcPushDownExecutor = new 
DriverJDBCPushDownExecuteUpdateExecutor(connection, metaData, grantee, 
jdbcExecutor);
+        rawPushDownExecutor = new 
DriverRawPushDownExecuteUpdateExecutor(connection, metaData, grantee, 
rawExecutor);
     }
     
     /**
@@ -103,92 +67,7 @@ public final class DriverPushDownExecuteUpdateExecutor {
     public int executeUpdate(final ShardingSphereDatabase database, final 
ExecutionContext executionContext, final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
                              final StatementExecuteUpdateCallback 
updateCallback, final StatementAddCallback addCallback, final 
StatementReplayCallback replayCallback) throws SQLException {
         return 
database.getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty()
-                ? executeJDBCUpdate(database, executionContext, prepareEngine, 
updateCallback, addCallback, replayCallback)
-                : executeRawUpdate(database, executionContext);
-    }
-    
-    @SuppressWarnings("rawtypes")
-    private int executeJDBCUpdate(final ShardingSphereDatabase database, final 
ExecutionContext executionContext, final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
-                                  final StatementExecuteUpdateCallback 
updateCallback, final StatementAddCallback addCallback, final 
StatementReplayCallback replayCallback) throws SQLException {
-        return new DriverTransactionalExecutor(connection).execute(
-                database, executionContext, () -> 
doExecuteJDBCUpdate(database, executionContext, prepareEngine, updateCallback, 
addCallback, replayCallback));
-    }
-    
-    @SuppressWarnings({"rawtypes", "unchecked"})
-    private int doExecuteJDBCUpdate(final ShardingSphereDatabase database, 
final ExecutionContext executionContext, final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
-                                    final StatementExecuteUpdateCallback 
updateCallback, final StatementAddCallback addCallback, final 
StatementReplayCallback replayCallback) throws SQLException {
-        ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = 
prepareEngine.prepare(
-                executionContext.getRouteContext(), 
executionContext.getExecutionUnits(), new 
ExecutionGroupReportContext(processId, database.getName(), grantee));
-        for (ExecutionGroup<JDBCExecutionUnit> each : 
executionGroupContext.getInputGroups()) {
-            addCallback.add(getStatements(each), 
JDBCDriverType.PREPARED_STATEMENT.equals(prepareEngine.getType()) ? 
getParameterSets(each) : Collections.emptyList());
-        }
-        replayCallback.replay();
-        ProcessEngine processEngine = new ProcessEngine();
-        try {
-            processEngine.executeSQL(executionGroupContext, 
executionContext.getQueryContext());
-            JDBCExecutorCallback<Integer> callback = new 
ExecuteUpdateCallbackFactory(prepareEngine.getType())
-                    .newInstance(database, 
executionContext.getQueryContext().getSqlStatementContext().getSqlStatement(), 
updateCallback);
-            List<Integer> updateCounts = 
jdbcExecutor.execute(executionGroupContext, callback);
-            new 
MetaDataRefreshEngine(connection.getContextManager().getPersistServiceFacade().getMetaDataManagerPersistService(),
 database, props)
-                    
.refresh(executionContext.getQueryContext().getSqlStatementContext(), 
executionContext.getRouteContext().getRouteUnits());
-            return isNeedAccumulate(database.getRuleMetaData().getRules(), 
executionContext.getQueryContext().getSqlStatementContext()) ? 
accumulate(updateCounts) : updateCounts.get(0);
-        } finally {
-            
processEngine.completeSQLExecution(executionGroupContext.getReportContext().getProcessId());
-        }
-    }
-    
-    private Collection<Statement> getStatements(final 
ExecutionGroup<JDBCExecutionUnit> executionGroup) {
-        Collection<Statement> result = new LinkedList<>();
-        for (JDBCExecutionUnit each : executionGroup.getInputs()) {
-            result.add(each.getStorageResource());
-        }
-        return result;
-    }
-    
-    private Collection<List<Object>> getParameterSets(final 
ExecutionGroup<JDBCExecutionUnit> executionGroup) {
-        Collection<List<Object>> result = new LinkedList<>();
-        for (JDBCExecutionUnit each : executionGroup.getInputs()) {
-            result.add(each.getExecutionUnit().getSqlUnit().getParameters());
-        }
-        return result;
-    }
-    
-    private boolean isNeedAccumulate(final Collection<ShardingSphereRule> 
rules, final SQLStatementContext sqlStatementContext) {
-        if (!(sqlStatementContext instanceof TableAvailable)) {
-            return false;
-        }
-        for (ShardingSphereRule each : rules) {
-            Optional<DataNodeRuleAttribute> ruleAttribute = 
each.getAttributes().findAttribute(DataNodeRuleAttribute.class);
-            if (ruleAttribute.isPresent() && 
ruleAttribute.get().isNeedAccumulate(((TableAvailable) 
sqlStatementContext).getTablesContext().getTableNames())) {
-                return true;
-            }
-        }
-        return false;
-    }
-    
-    private int accumulate(final List<Integer> updateResults) {
-        int result = 0;
-        for (Integer each : updateResults) {
-            result += null == each ? 0 : each;
-        }
-        return result;
-    }
-    
-    private int accumulate(final Collection<ExecuteResult> results) {
-        int result = 0;
-        for (ExecuteResult each : results) {
-            result += ((UpdateResult) each).getUpdateCount();
-        }
-        return result;
-    }
-    
-    private int executeRawUpdate(final ShardingSphereDatabase database, final 
ExecutionContext executionContext) throws SQLException {
-        return 
accumulate(rawExecutor.execute(createRawExecutionGroupContext(database, 
executionContext), executionContext.getQueryContext(), new 
RawSQLExecutorCallback()));
-    }
-    
-    private ExecutionGroupContext<RawSQLExecutionUnit> 
createRawExecutionGroupContext(final ShardingSphereDatabase database, final 
ExecutionContext executionContext) throws SQLException {
-        int maxConnectionsSizePerQuery = 
props.<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
-        return new RawExecutionPrepareEngine(maxConnectionsSizePerQuery, 
database.getRuleMetaData().getRules()).prepare(
-                executionContext.getRouteContext(), 
executionContext.getExecutionUnits(), new 
ExecutionGroupReportContext(processId, database.getName(), grantee));
+                ? jdbcPushDownExecutor.executeUpdate(database, 
executionContext, prepareEngine, updateCallback, addCallback, replayCallback)
+                : rawPushDownExecutor.executeUpdate(database, 
executionContext);
     }
 }
diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/DriverPushDownExecuteExecutor.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/jdbc/DriverJDBCPushDownExecuteExecutor.java
similarity index 72%
copy from 
jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/DriverPushDownExecuteExecutor.java
copy to 
jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/jdbc/DriverJDBCPushDownExecuteExecutor.java
index 68ef83d8561..64f93f00ad5 100644
--- 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/DriverPushDownExecuteExecutor.java
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/jdbc/DriverJDBCPushDownExecuteExecutor.java
@@ -15,8 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.driver.executor.engine.pushdown;
+package org.apache.shardingsphere.driver.executor.engine.pushdown.jdbc;
 
+import lombok.RequiredArgsConstructor;
 import 
org.apache.shardingsphere.driver.executor.callback.add.StatementAddCallback;
 import 
org.apache.shardingsphere.driver.executor.callback.execute.ExecuteCallbackFactory;
 import 
org.apache.shardingsphere.driver.executor.callback.execute.StatementExecuteCallback;
@@ -26,28 +27,22 @@ import 
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConne
 import 
org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResultSet;
 import 
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
 import 
org.apache.shardingsphere.infra.binder.context.statement.dml.SelectStatementContext;
-import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
 import 
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
 import 
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
 import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
-import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
-import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
-import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.stream.JDBCStreamQueryResult;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
-import 
org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
 import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
 import org.apache.shardingsphere.infra.merge.MergeEngine;
 import org.apache.shardingsphere.infra.merge.result.MergedResult;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import org.apache.shardingsphere.infra.metadata.user.Grantee;
-import 
org.apache.shardingsphere.infra.rule.attribute.raw.RawExecutionRuleAttribute;
 import org.apache.shardingsphere.mode.metadata.refresher.MetaDataRefreshEngine;
 import 
org.apache.shardingsphere.sql.parser.sql.common.statement.dal.DALStatement;
 
@@ -63,9 +58,10 @@ import java.util.List;
 import java.util.Optional;
 
 /**
- * Driver push down execute executor.
+ * Driver JDBC push down execute executor.
  */
-public final class DriverPushDownExecuteExecutor {
+@RequiredArgsConstructor
+public final class DriverJDBCPushDownExecuteExecutor {
     
     private final ShardingSphereConnection connection;
     
@@ -75,47 +71,28 @@ public final class DriverPushDownExecuteExecutor {
     
     private final JDBCExecutor jdbcExecutor;
     
-    private final RawExecutor rawExecutor;
-    
-    public DriverPushDownExecuteExecutor(final ShardingSphereConnection 
connection, final ShardingSphereMetaData metaData, final Grantee grantee,
-                                         final JDBCExecutor jdbcExecutor, 
final RawExecutor rawExecutor) {
-        this.connection = connection;
-        this.metaData = metaData;
-        this.grantee = grantee;
-        this.jdbcExecutor = jdbcExecutor;
-        this.rawExecutor = rawExecutor;
-    }
-    
     /**
      * Execute.
      * 
      * @param database database
      * @param executionContext execution context
      * @param prepareEngine prepare engine
-     * @param executeCallback statement execute callback
-     * @param addCallback statement add callback
-     * @param replayCallback statement replay callback
-     * @return is query
+     * @param executeCallback execute callback
+     * @param addCallback add callback
+     * @param replayCallback replay callback
+     * @return execute result
      * @throws SQLException SQL exception
      */
     @SuppressWarnings("rawtypes")
     public boolean execute(final ShardingSphereDatabase database, final 
ExecutionContext executionContext, final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
                            final StatementExecuteCallback executeCallback, 
final StatementAddCallback addCallback, final StatementReplayCallback 
replayCallback) throws SQLException {
-        return 
database.getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty()
-                ? executeJDBC(database, executionContext, prepareEngine, 
executeCallback, addCallback, replayCallback)
-                : executeRaw(database, executionContext);
-    }
-    
-    @SuppressWarnings("rawtypes")
-    private boolean executeJDBC(final ShardingSphereDatabase database, final 
ExecutionContext executionContext, final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
-                                final StatementExecuteCallback 
executeCallback, final StatementAddCallback addCallback, final 
StatementReplayCallback replayCallback) throws SQLException {
         return new DriverTransactionalExecutor(connection).execute(
-                database, executionContext, () -> doExecuteJDBC(database, 
executionContext, prepareEngine, executeCallback, addCallback, replayCallback));
+                database, executionContext, () -> doExecute(database, 
executionContext, prepareEngine, executeCallback, addCallback, replayCallback));
     }
     
     @SuppressWarnings({"rawtypes", "unchecked"})
-    private boolean doExecuteJDBC(final ShardingSphereDatabase database, final 
ExecutionContext executionContext, final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
-                                  final StatementExecuteCallback 
executeCallback, final StatementAddCallback addCallback, final 
StatementReplayCallback replayCallback) throws SQLException {
+    private boolean doExecute(final ShardingSphereDatabase database, final 
ExecutionContext executionContext, final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
+                              final StatementExecuteCallback executeCallback, 
final StatementAddCallback addCallback, final StatementReplayCallback 
replayCallback) throws SQLException {
         ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = 
prepareEngine.prepare(
                 executionContext.getRouteContext(), 
executionContext.getExecutionUnits(), new 
ExecutionGroupReportContext(connection.getProcessId(), database.getName(), 
grantee));
         for (ExecutionGroup<JDBCExecutionUnit> each : 
executionGroupContext.getInputGroups()) {
@@ -151,13 +128,6 @@ public final class DriverPushDownExecuteExecutor {
         return result;
     }
     
-    private boolean executeRaw(final ShardingSphereDatabase database, final 
ExecutionContext executionContext) throws SQLException {
-        int maxConnectionsSizePerQuery = 
metaData.getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
-        ExecutionGroupContext<RawSQLExecutionUnit> executionGroupContext = new 
RawExecutionPrepareEngine(maxConnectionsSizePerQuery, 
database.getRuleMetaData().getRules())
-                .prepare(executionContext.getRouteContext(), 
executionContext.getExecutionUnits(), new 
ExecutionGroupReportContext(connection.getProcessId(), database.getName(), 
grantee));
-        return rawExecutor.execute(executionGroupContext, 
executionContext.getQueryContext(), new 
RawSQLExecutorCallback()).iterator().next() instanceof QueryResult;
-    }
-    
     /**
      * Get result set.
      *
diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/DriverPushDownExecuteQueryExecutor.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/jdbc/DriverJDBCPushDownExecuteQueryExecutor.java
similarity index 73%
copy from 
jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/DriverPushDownExecuteQueryExecutor.java
copy to 
jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/jdbc/DriverJDBCPushDownExecuteQueryExecutor.java
index 820d0c2bccb..91253deb40e 100644
--- 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/DriverPushDownExecuteQueryExecutor.java
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/jdbc/DriverJDBCPushDownExecuteQueryExecutor.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.driver.executor.engine.pushdown;
+package org.apache.shardingsphere.driver.executor.engine.pushdown.jdbc;
 
 import 
org.apache.shardingsphere.driver.executor.callback.add.StatementAddCallback;
 import 
org.apache.shardingsphere.driver.executor.callback.execute.ExecuteQueryCallbackFactory;
@@ -25,7 +25,6 @@ import 
org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResult
 import 
org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResultSetFactory;
 import 
org.apache.shardingsphere.infra.binder.context.statement.dml.SelectStatementContext;
 import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
-import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
 import org.apache.shardingsphere.infra.connection.kernel.KernelProcessor;
 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
 import 
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
@@ -33,19 +32,14 @@ import 
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupRepor
 import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
-import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
-import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
-import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
-import 
org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
 import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
 import org.apache.shardingsphere.infra.metadata.user.Grantee;
-import 
org.apache.shardingsphere.infra.rule.attribute.raw.RawExecutionRuleAttribute;
 import org.apache.shardingsphere.infra.session.connection.ConnectionContext;
 import org.apache.shardingsphere.infra.session.query.QueryContext;
 
@@ -57,12 +51,11 @@ import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 /**
- * Driver push down execute query executor.
+ * Driver JDBC push down execute query executor.
  */
-public final class DriverPushDownExecuteQueryExecutor {
+public final class DriverJDBCPushDownExecuteQueryExecutor {
     
     private final ConnectionContext connectionContext;
     
@@ -76,19 +69,15 @@ public final class DriverPushDownExecuteQueryExecutor {
     
     private final JDBCExecutor jdbcExecutor;
     
-    private final RawExecutor rawExecutor;
-    
     private final Collection<Statement> statements;
     
-    public DriverPushDownExecuteQueryExecutor(final ShardingSphereConnection 
connection, final ShardingSphereMetaData metaData, final Grantee grantee,
-                                              final JDBCExecutor jdbcExecutor, 
final RawExecutor rawExecutor) {
+    public DriverJDBCPushDownExecuteQueryExecutor(final 
ShardingSphereConnection connection, final ShardingSphereMetaData metaData, 
final Grantee grantee, final JDBCExecutor jdbcExecutor) {
         connectionContext = 
connection.getDatabaseConnectionManager().getConnectionContext();
         processId = connection.getProcessId();
         globalRuleMetaData = metaData.getGlobalRuleMetaData();
         props = metaData.getProps();
         this.grantee = grantee;
         this.jdbcExecutor = jdbcExecutor;
-        this.rawExecutor = rawExecutor;
         statements = new LinkedList<>();
     }
     
@@ -117,21 +106,11 @@ public final class DriverPushDownExecuteQueryExecutor {
                 .newInstance(database, queryContext, queryResults, statement, 
columnLabelAndIndexMap, isContainsEnhancedTable);
     }
     
-    @SuppressWarnings("rawtypes")
+    @SuppressWarnings({"rawtypes", "unchecked"})
     private List<QueryResult> getQueryResults(final ShardingSphereDatabase 
database, final QueryContext queryContext, final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
                                               final StatementAddCallback 
addCallback, final StatementReplayCallback replayCallback) throws SQLException {
         statements.clear();
         ExecutionContext executionContext = new 
KernelProcessor().generateExecutionContext(queryContext, database, 
globalRuleMetaData, props, connectionContext);
-        return 
database.getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty()
-                ? getJDBCQueryResults(database, queryContext, prepareEngine, 
addCallback, replayCallback, executionContext)
-                : getRawQueryResults(database, queryContext, executionContext);
-    }
-    
-    @SuppressWarnings({"rawtypes", "unchecked"})
-    private List<QueryResult> getJDBCQueryResults(final ShardingSphereDatabase 
database, final QueryContext queryContext,
-                                                  final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
-                                                  final StatementAddCallback 
addCallback, final StatementReplayCallback replayCallback,
-                                                  final ExecutionContext 
executionContext) throws SQLException {
         ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = 
prepareEngine.prepare(executionContext.getRouteContext(), 
executionContext.getExecutionUnits(),
                 new ExecutionGroupReportContext(processId, database.getName(), 
grantee));
         for (ExecutionGroup<JDBCExecutionUnit> each : 
executionGroupContext.getInputGroups()) {
@@ -164,15 +143,4 @@ public final class DriverPushDownExecuteQueryExecutor {
         }
         return result;
     }
-    
-    private List<QueryResult> getRawQueryResults(final ShardingSphereDatabase 
database, final QueryContext queryContext, final ExecutionContext 
executionContext) throws SQLException {
-        return rawExecutor.execute(
-                createRawExecutionGroupContext(database, executionContext), 
queryContext, new 
RawSQLExecutorCallback()).stream().map(QueryResult.class::cast).collect(Collectors.toList());
-    }
-    
-    private ExecutionGroupContext<RawSQLExecutionUnit> 
createRawExecutionGroupContext(final ShardingSphereDatabase database, final 
ExecutionContext executionContext) throws SQLException {
-        int maxConnectionsSizePerQuery = 
props.<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
-        return new RawExecutionPrepareEngine(maxConnectionsSizePerQuery, 
database.getRuleMetaData().getRules()).prepare(
-                executionContext.getRouteContext(), 
executionContext.getExecutionUnits(), new 
ExecutionGroupReportContext(processId, database.getName(), grantee));
-    }
 }
diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/DriverPushDownExecuteUpdateExecutor.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/jdbc/DriverJDBCPushDownExecuteUpdateExecutor.java
similarity index 68%
copy from 
jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/DriverPushDownExecuteUpdateExecutor.java
copy to 
jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/jdbc/DriverJDBCPushDownExecuteUpdateExecutor.java
index aac6d76848e..baf4600aa6d 100644
--- 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/DriverPushDownExecuteUpdateExecutor.java
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/jdbc/DriverJDBCPushDownExecuteUpdateExecutor.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.driver.executor.engine.pushdown;
+package org.apache.shardingsphere.driver.executor.engine.pushdown.jdbc;
 
 import 
org.apache.shardingsphere.driver.executor.callback.add.StatementAddCallback;
 import 
org.apache.shardingsphere.driver.executor.callback.execute.ExecuteUpdateCallbackFactory;
@@ -26,7 +26,6 @@ import 
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConne
 import 
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
 import org.apache.shardingsphere.infra.binder.context.type.TableAvailable;
 import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
-import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
 import 
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
 import 
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
@@ -34,21 +33,14 @@ import 
org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
-import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
-import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
-import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
-import 
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
-import 
org.apache.shardingsphere.infra.executor.sql.execute.result.update.UpdateResult;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
-import 
org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
 import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import org.apache.shardingsphere.infra.metadata.user.Grantee;
 import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
 import 
org.apache.shardingsphere.infra.rule.attribute.datanode.DataNodeRuleAttribute;
-import 
org.apache.shardingsphere.infra.rule.attribute.raw.RawExecutionRuleAttribute;
 import org.apache.shardingsphere.mode.metadata.refresher.MetaDataRefreshEngine;
 
 import java.sql.Connection;
@@ -61,9 +53,9 @@ import java.util.List;
 import java.util.Optional;
 
 /**
- * Driver push down execute update executor.
+ * Driver JDBC push down execute update executor.
  */
-public final class DriverPushDownExecuteUpdateExecutor {
+public final class DriverJDBCPushDownExecuteUpdateExecutor {
     
     private final ShardingSphereConnection connection;
     
@@ -75,16 +67,12 @@ public final class DriverPushDownExecuteUpdateExecutor {
     
     private final JDBCExecutor jdbcExecutor;
     
-    private final RawExecutor rawExecutor;
-    
-    public DriverPushDownExecuteUpdateExecutor(final ShardingSphereConnection 
connection, final ShardingSphereMetaData metaData, final Grantee grantee,
-                                               final JDBCExecutor 
jdbcExecutor, final RawExecutor rawExecutor) {
+    public DriverJDBCPushDownExecuteUpdateExecutor(final 
ShardingSphereConnection connection, final ShardingSphereMetaData metaData, 
final Grantee grantee, final JDBCExecutor jdbcExecutor) {
         this.connection = connection;
         processId = connection.getProcessId();
         props = metaData.getProps();
         this.grantee = grantee;
         this.jdbcExecutor = jdbcExecutor;
-        this.rawExecutor = rawExecutor;
     }
     
     /**
@@ -94,29 +82,21 @@ public final class DriverPushDownExecuteUpdateExecutor {
      * @param executionContext execution context
      * @param prepareEngine prepare engine
      * @param updateCallback statement execute update callback
-     * @param replayCallback statement replay callback
      * @param addCallback statement add callback
-     * @return updated row count
+     * @param replayCallback statement replay callback
+     * @return affected rows
      * @throws SQLException SQL exception
      */
     @SuppressWarnings("rawtypes")
     public int executeUpdate(final ShardingSphereDatabase database, final 
ExecutionContext executionContext, final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
                              final StatementExecuteUpdateCallback 
updateCallback, final StatementAddCallback addCallback, final 
StatementReplayCallback replayCallback) throws SQLException {
-        return 
database.getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty()
-                ? executeJDBCUpdate(database, executionContext, prepareEngine, 
updateCallback, addCallback, replayCallback)
-                : executeRawUpdate(database, executionContext);
-    }
-    
-    @SuppressWarnings("rawtypes")
-    private int executeJDBCUpdate(final ShardingSphereDatabase database, final 
ExecutionContext executionContext, final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
-                                  final StatementExecuteUpdateCallback 
updateCallback, final StatementAddCallback addCallback, final 
StatementReplayCallback replayCallback) throws SQLException {
         return new DriverTransactionalExecutor(connection).execute(
-                database, executionContext, () -> 
doExecuteJDBCUpdate(database, executionContext, prepareEngine, updateCallback, 
addCallback, replayCallback));
+                database, executionContext, () -> doExecuteUpdate(database, 
executionContext, prepareEngine, updateCallback, addCallback, replayCallback));
     }
     
     @SuppressWarnings({"rawtypes", "unchecked"})
-    private int doExecuteJDBCUpdate(final ShardingSphereDatabase database, 
final ExecutionContext executionContext, final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
-                                    final StatementExecuteUpdateCallback 
updateCallback, final StatementAddCallback addCallback, final 
StatementReplayCallback replayCallback) throws SQLException {
+    private int doExecuteUpdate(final ShardingSphereDatabase database, final 
ExecutionContext executionContext, final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
+                                final StatementExecuteUpdateCallback 
updateCallback, final StatementAddCallback addCallback, final 
StatementReplayCallback replayCallback) throws SQLException {
         ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = 
prepareEngine.prepare(
                 executionContext.getRouteContext(), 
executionContext.getExecutionUnits(), new 
ExecutionGroupReportContext(processId, database.getName(), grantee));
         for (ExecutionGroup<JDBCExecutionUnit> each : 
executionGroupContext.getInputGroups()) {
@@ -173,22 +153,4 @@ public final class DriverPushDownExecuteUpdateExecutor {
         }
         return result;
     }
-    
-    private int accumulate(final Collection<ExecuteResult> results) {
-        int result = 0;
-        for (ExecuteResult each : results) {
-            result += ((UpdateResult) each).getUpdateCount();
-        }
-        return result;
-    }
-    
-    private int executeRawUpdate(final ShardingSphereDatabase database, final 
ExecutionContext executionContext) throws SQLException {
-        return 
accumulate(rawExecutor.execute(createRawExecutionGroupContext(database, 
executionContext), executionContext.getQueryContext(), new 
RawSQLExecutorCallback()));
-    }
-    
-    private ExecutionGroupContext<RawSQLExecutionUnit> 
createRawExecutionGroupContext(final ShardingSphereDatabase database, final 
ExecutionContext executionContext) throws SQLException {
-        int maxConnectionsSizePerQuery = 
props.<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
-        return new RawExecutionPrepareEngine(maxConnectionsSizePerQuery, 
database.getRuleMetaData().getRules()).prepare(
-                executionContext.getRouteContext(), 
executionContext.getExecutionUnits(), new 
ExecutionGroupReportContext(processId, database.getName(), grantee));
-    }
 }
diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/raw/DriverRawPushDownExecuteExecutor.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/raw/DriverRawPushDownExecuteExecutor.java
new file mode 100644
index 00000000000..8f707b7e438
--- /dev/null
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/raw/DriverRawPushDownExecuteExecutor.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.driver.executor.engine.pushdown.raw;
+
+import lombok.RequiredArgsConstructor;
+import 
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
+import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
+import 
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
+import 
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
+import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
+import 
org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
+import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.metadata.user.Grantee;
+
+import java.sql.SQLException;
+
+/**
+ * Driver raw push down execute executor.
+ */
+@RequiredArgsConstructor
+public final class DriverRawPushDownExecuteExecutor {
+    
+    private final ShardingSphereConnection connection;
+    
+    private final ShardingSphereMetaData metaData;
+    
+    private final Grantee grantee;
+    
+    private final RawExecutor rawExecutor;
+    
+    /**
+     * Execute.
+     * 
+     * @param database database
+     * @param executionContext execution context
+     * @return is query result
+     * @throws SQLException SQL exception
+     */
+    public boolean execute(final ShardingSphereDatabase database, final 
ExecutionContext executionContext) throws SQLException {
+        int maxConnectionsSizePerQuery = 
metaData.getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
+        ExecutionGroupContext<RawSQLExecutionUnit> executionGroupContext = new 
RawExecutionPrepareEngine(maxConnectionsSizePerQuery, 
database.getRuleMetaData().getRules())
+                .prepare(executionContext.getRouteContext(), 
executionContext.getExecutionUnits(), new 
ExecutionGroupReportContext(connection.getProcessId(), database.getName(), 
grantee));
+        return rawExecutor.execute(executionGroupContext, 
executionContext.getQueryContext(), new 
RawSQLExecutorCallback()).iterator().next() instanceof QueryResult;
+    }
+}
diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/raw/DriverRawPushDownExecuteQueryExecutor.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/raw/DriverRawPushDownExecuteQueryExecutor.java
new file mode 100644
index 00000000000..7f4f776f111
--- /dev/null
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/raw/DriverRawPushDownExecuteQueryExecutor.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.driver.executor.engine.pushdown.raw;
+
+import 
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
+import 
org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResultSet;
+import 
org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResultSetFactory;
+import 
org.apache.shardingsphere.infra.binder.context.statement.dml.SelectStatementContext;
+import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
+import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
+import org.apache.shardingsphere.infra.connection.kernel.KernelProcessor;
+import 
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
+import 
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
+import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
+import 
org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
+import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
+import org.apache.shardingsphere.infra.metadata.user.Grantee;
+import org.apache.shardingsphere.infra.session.connection.ConnectionContext;
+import org.apache.shardingsphere.infra.session.query.QueryContext;
+
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Driver raw push down execute query executor.
+ */
+public final class DriverRawPushDownExecuteQueryExecutor {
+    
+    private final ConnectionContext connectionContext;
+    
+    private final String processId;
+    
+    private final RuleMetaData globalRuleMetaData;
+    
+    private final ConfigurationProperties props;
+    
+    private final Grantee grantee;
+    
+    private final RawExecutor rawExecutor;
+    
+    public DriverRawPushDownExecuteQueryExecutor(final 
ShardingSphereConnection connection, final ShardingSphereMetaData metaData, 
final Grantee grantee, final RawExecutor rawExecutor) {
+        connectionContext = 
connection.getDatabaseConnectionManager().getConnectionContext();
+        processId = connection.getProcessId();
+        globalRuleMetaData = metaData.getGlobalRuleMetaData();
+        props = metaData.getProps();
+        this.grantee = grantee;
+        this.rawExecutor = rawExecutor;
+    }
+    
+    /**
+     * Execute query.
+     * 
+     * @param database database
+     * @param queryContext query context
+     * @param statement statement
+     * @param columnLabelAndIndexMap column label and index map
+     * @return result set
+     * @throws SQLException SQL exception
+     */
+    public ShardingSphereResultSet executeQuery(final ShardingSphereDatabase 
database, final QueryContext queryContext, final Statement statement,
+                                                final Map<String, Integer> 
columnLabelAndIndexMap) throws SQLException {
+        List<QueryResult> queryResults = getQueryResults(database, 
queryContext);
+        boolean isContainsEnhancedTable = 
queryContext.getSqlStatementContext() instanceof SelectStatementContext
+                && ((SelectStatementContext) 
queryContext.getSqlStatementContext()).isContainsEnhancedTable();
+        return new ShardingSphereResultSetFactory(connectionContext, 
globalRuleMetaData, props, Collections.emptyList())
+                .newInstance(database, queryContext, queryResults, statement, 
columnLabelAndIndexMap, isContainsEnhancedTable);
+    }
+    
+    private List<QueryResult> getQueryResults(final ShardingSphereDatabase 
database, final QueryContext queryContext) throws SQLException {
+        ExecutionContext executionContext = new 
KernelProcessor().generateExecutionContext(queryContext, database, 
globalRuleMetaData, props, connectionContext);
+        return rawExecutor.execute(
+                createRawExecutionGroupContext(database, executionContext), 
queryContext, new 
RawSQLExecutorCallback()).stream().map(QueryResult.class::cast).collect(Collectors.toList());
+    }
+    
+    private ExecutionGroupContext<RawSQLExecutionUnit> 
createRawExecutionGroupContext(final ShardingSphereDatabase database, final 
ExecutionContext executionContext) throws SQLException {
+        int maxConnectionsSizePerQuery = 
props.<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
+        return new RawExecutionPrepareEngine(maxConnectionsSizePerQuery, 
database.getRuleMetaData().getRules()).prepare(
+                executionContext.getRouteContext(), 
executionContext.getExecutionUnits(), new 
ExecutionGroupReportContext(processId, database.getName(), grantee));
+    }
+}
diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/raw/DriverRawPushDownExecuteUpdateExecutor.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/raw/DriverRawPushDownExecuteUpdateExecutor.java
new file mode 100644
index 00000000000..df84ae3f51a
--- /dev/null
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/raw/DriverRawPushDownExecuteUpdateExecutor.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.driver.executor.engine.pushdown.raw;
+
+import 
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
+import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
+import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
+import 
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
+import 
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
+import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.result.update.UpdateResult;
+import 
org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
+import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.metadata.user.Grantee;
+
+import java.sql.SQLException;
+import java.util.Collection;
+
+/**
+ * Driver raw push down execute update executor.
+ */
+public final class DriverRawPushDownExecuteUpdateExecutor {
+    
+    private final String processId;
+    
+    private final ConfigurationProperties props;
+    
+    private final Grantee grantee;
+    
+    private final RawExecutor rawExecutor;
+    
+    public DriverRawPushDownExecuteUpdateExecutor(final 
ShardingSphereConnection connection, final ShardingSphereMetaData metaData, 
final Grantee grantee, final RawExecutor rawExecutor) {
+        processId = connection.getProcessId();
+        props = metaData.getProps();
+        this.grantee = grantee;
+        this.rawExecutor = rawExecutor;
+    }
+    
+    /**
+     * Execute update.
+     * 
+     * @param database database
+     * @param executionContext execution context
+     * @return updated row count
+     * @throws SQLException SQL exception
+     */
+    public int executeUpdate(final ShardingSphereDatabase database, final 
ExecutionContext executionContext) throws SQLException {
+        return 
accumulate(rawExecutor.execute(createRawExecutionGroupContext(database, 
executionContext), executionContext.getQueryContext(), new 
RawSQLExecutorCallback()));
+    }
+    
+    private int accumulate(final Collection<ExecuteResult> results) {
+        int result = 0;
+        for (ExecuteResult each : results) {
+            result += ((UpdateResult) each).getUpdateCount();
+        }
+        return result;
+    }
+    
+    private ExecutionGroupContext<RawSQLExecutionUnit> 
createRawExecutionGroupContext(final ShardingSphereDatabase database, final 
ExecutionContext executionContext) throws SQLException {
+        int maxConnectionsSizePerQuery = 
props.<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
+        return new RawExecutionPrepareEngine(maxConnectionsSizePerQuery, 
database.getRuleMetaData().getRules()).prepare(
+                executionContext.getRouteContext(), 
executionContext.getExecutionUnits(), new 
ExecutionGroupReportContext(processId, database.getName(), grantee));
+    }
+}

Reply via email to