This is an automated email from the ASF dual-hosted git repository.

menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 61360740df0 Add DriverPushDownExecuteQueryExecutor (#31639)
61360740df0 is described below

commit 61360740df02a29cafbc946929897295106d7d10
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Jun 9 13:58:24 2024 +0800

    Add DriverPushDownExecuteQueryExecutor (#31639)
---
 .../execute/ExecuteQueryCallbackFactory.java       |  49 +++++++
 .../engine/DriverExecuteQueryExecutor.java         | 152 ++-------------------
 .../DriverPushDownExecuteQueryExecutor.java}       |  97 +++++--------
 3 files changed, 96 insertions(+), 202 deletions(-)

diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/callback/execute/ExecuteQueryCallbackFactory.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/callback/execute/ExecuteQueryCallbackFactory.java
new file mode 100644
index 00000000000..72d0b1f46a1
--- /dev/null
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/callback/execute/ExecuteQueryCallbackFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.driver.executor.callback.execute;
+
+import lombok.RequiredArgsConstructor;
+import 
org.apache.shardingsphere.driver.executor.callback.execute.impl.PreparedStatementExecuteQueryCallback;
+import 
org.apache.shardingsphere.driver.executor.callback.execute.impl.StatementExecuteQueryCallback;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
+import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
+import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.session.query.QueryContext;
+
+/**
+ * Execute query callback factory.
+ */
+@RequiredArgsConstructor
+public final class ExecuteQueryCallbackFactory {
+    
+    private final String jdbcDriverType;
+    
+    /**
+     * Create new instance of execute query callback.
+     * @param database database
+     * @param queryContext query context
+     * @return created instance
+     */
+    public ExecuteQueryCallback newInstance(final ShardingSphereDatabase 
database, final QueryContext queryContext) {
+        return JDBCDriverType.STATEMENT.equals(jdbcDriverType)
+                ? new 
StatementExecuteQueryCallback(database.getProtocolType(), 
database.getResourceMetaData(),
+                        
queryContext.getSqlStatementContext().getSqlStatement(), 
SQLExecutorExceptionHandler.isExceptionThrown())
+                : new 
PreparedStatementExecuteQueryCallback(database.getProtocolType(), 
database.getResourceMetaData(),
+                        
queryContext.getSqlStatementContext().getSqlStatement(), 
SQLExecutorExceptionHandler.isExceptionThrown());
+    }
+}
diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteQueryExecutor.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteQueryExecutor.java
index bc287c93bbc..d806a77b27f 100644
--- 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteQueryExecutor.java
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteQueryExecutor.java
@@ -17,42 +17,20 @@
 
 package org.apache.shardingsphere.driver.executor.engine;
 
-import lombok.RequiredArgsConstructor;
 import 
org.apache.shardingsphere.driver.executor.callback.add.StatementAddCallback;
-import 
org.apache.shardingsphere.driver.executor.callback.execute.ExecuteQueryCallback;
-import 
org.apache.shardingsphere.driver.executor.callback.execute.impl.PreparedStatementExecuteQueryCallback;
-import 
org.apache.shardingsphere.driver.executor.callback.execute.impl.StatementExecuteQueryCallback;
+import 
org.apache.shardingsphere.driver.executor.callback.execute.ExecuteQueryCallbackFactory;
 import 
org.apache.shardingsphere.driver.executor.callback.replay.StatementReplayCallback;
+import 
org.apache.shardingsphere.driver.executor.engine.pushdown.DriverPushDownExecuteQueryExecutor;
 import 
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
-import 
org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResultSet;
-import 
org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResultSetUtils;
-import 
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
-import 
org.apache.shardingsphere.infra.binder.context.statement.dml.SelectStatementContext;
-import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
-import org.apache.shardingsphere.infra.connection.kernel.KernelProcessor;
 import org.apache.shardingsphere.infra.executor.audit.SQLAuditEngine;
-import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
-import 
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
-import 
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
-import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
-import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
-import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
-import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
-import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
-import 
org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
-import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
-import org.apache.shardingsphere.infra.merge.MergeEngine;
-import org.apache.shardingsphere.infra.merge.result.MergedResult;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
-import org.apache.shardingsphere.infra.metadata.user.Grantee;
-import 
org.apache.shardingsphere.infra.rule.attribute.raw.RawExecutionRuleAttribute;
 import org.apache.shardingsphere.infra.session.query.QueryContext;
 import org.apache.shardingsphere.sqlfederation.engine.SQLFederationEngine;
 import 
org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext;
@@ -65,34 +43,32 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.stream.Collectors;
 
 /**
  * Driver execute query executor.
  */
-@RequiredArgsConstructor
 public final class DriverExecuteQueryExecutor {
     
     private final ShardingSphereConnection connection;
     
     private final ShardingSphereMetaData metaData;
     
-    private final JDBCExecutor jdbcExecutor;
-    
-    private final RawExecutor rawExecutor;
+    private final DriverPushDownExecuteQueryExecutor 
pushDownExecuteQueryExecutor;
     
     private final TrafficExecutor trafficExecutor;
     
     private final SQLFederationEngine sqlFederationEngine;
     
-    private final Collection<Statement> statements = new LinkedList<>();
+    public DriverExecuteQueryExecutor(final ShardingSphereConnection 
connection, final ShardingSphereMetaData metaData, final JDBCExecutor 
jdbcExecutor, final RawExecutor rawExecutor,
+                                      final TrafficExecutor trafficExecutor, 
final SQLFederationEngine sqlFederationEngine) {
+        this.connection = connection;
+        this.metaData = metaData;
+        pushDownExecuteQueryExecutor = new 
DriverPushDownExecuteQueryExecutor(connection, metaData, jdbcExecutor, 
rawExecutor);
+        this.trafficExecutor = trafficExecutor;
+        this.sqlFederationEngine = sqlFederationEngine;
+    }
     
     /**
      * Execute query.
@@ -111,7 +87,6 @@ public final class DriverExecuteQueryExecutor {
     public ResultSet executeQuery(final ShardingSphereDatabase database, final 
QueryContext queryContext,
                                   final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine, 
final Statement statement, final Map<String, Integer> columnLabelAndIndexMap,
                                   final StatementAddCallback addCallback, 
final StatementReplayCallback replayCallback) throws SQLException {
-        statements.clear();
         RuleMetaData globalRuleMetaData = metaData.getGlobalRuleMetaData();
         SQLAuditEngine.audit(queryContext, globalRuleMetaData, database);
         Optional<String> trafficInstanceId = 
connection.getTrafficInstanceId(globalRuleMetaData.getSingleRule(TrafficRule.class),
 queryContext);
@@ -120,112 +95,13 @@ public final class DriverExecuteQueryExecutor {
                     connection.getProcessId(), database.getName(), 
trafficInstanceId.get(), queryContext, prepareEngine, 
getTrafficExecuteQueryCallback(prepareEngine.getType()));
         }
         if (sqlFederationEngine.decide(queryContext.getSqlStatementContext(), 
queryContext.getParameters(), database, globalRuleMetaData)) {
-            return sqlFederationEngine.executeQuery(
-                    prepareEngine, getExecuteQueryCallback(database, 
queryContext, prepareEngine.getType()), new SQLFederationContext(false, 
queryContext, metaData, connection.getProcessId()));
+            return sqlFederationEngine.executeQuery(prepareEngine,
+                    new 
ExecuteQueryCallbackFactory(prepareEngine.getType()).newInstance(database, 
queryContext), new SQLFederationContext(false, queryContext, metaData, 
connection.getProcessId()));
         }
-        return executePushDownQuery(database, queryContext, prepareEngine, 
statement, columnLabelAndIndexMap, addCallback, replayCallback);
+        return pushDownExecuteQueryExecutor.executeQuery(database, 
queryContext, prepareEngine, statement, columnLabelAndIndexMap, addCallback, 
replayCallback);
     }
     
     private TrafficExecutorCallback<ResultSet> 
getTrafficExecuteQueryCallback(final String jdbcDriverType) {
         return JDBCDriverType.STATEMENT.equals(jdbcDriverType) ? ((sql, 
statement) -> statement.executeQuery(sql)) : ((sql, statement) -> 
((PreparedStatement) statement).executeQuery());
     }
-    
-    private ExecuteQueryCallback getExecuteQueryCallback(final 
ShardingSphereDatabase database, final QueryContext queryContext, final String 
jdbcDriverType) {
-        return JDBCDriverType.STATEMENT.equals(jdbcDriverType)
-                ? new 
StatementExecuteQueryCallback(database.getProtocolType(), 
database.getResourceMetaData(),
-                        
queryContext.getSqlStatementContext().getSqlStatement(), 
SQLExecutorExceptionHandler.isExceptionThrown())
-                : new 
PreparedStatementExecuteQueryCallback(database.getProtocolType(), 
database.getResourceMetaData(),
-                        
queryContext.getSqlStatementContext().getSqlStatement(), 
SQLExecutorExceptionHandler.isExceptionThrown());
-    }
-    
-    @SuppressWarnings("rawtypes")
-    private ShardingSphereResultSet executePushDownQuery(final 
ShardingSphereDatabase database, final QueryContext queryContext,
-                                                         final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine, 
final Statement statement,
-                                                         final Map<String, 
Integer> columnLabelAndIndexMap,
-                                                         final 
StatementAddCallback addCallback, final StatementReplayCallback replayCallback) 
throws SQLException {
-        List<QueryResult> queryResults = getQueryResults(database, 
queryContext, prepareEngine, addCallback, replayCallback);
-        MergedResult mergedResult = mergeQuery(database, queryResults, 
queryContext.getSqlStatementContext());
-        boolean isContainsEnhancedTable = 
queryContext.getSqlStatementContext() instanceof SelectStatementContext
-                && ((SelectStatementContext) 
queryContext.getSqlStatementContext()).isContainsEnhancedTable();
-        List<ResultSet> resultSets = getResultSets();
-        return new ShardingSphereResultSet(resultSets, mergedResult, 
statement, isContainsEnhancedTable, queryContext.getSqlStatementContext(),
-                null == columnLabelAndIndexMap
-                        ? 
ShardingSphereResultSetUtils.createColumnLabelAndIndexMap(queryContext.getSqlStatementContext(),
 isContainsEnhancedTable, resultSets.get(0).getMetaData())
-                        : columnLabelAndIndexMap);
-    }
-    
-    @SuppressWarnings("rawtypes")
-    private List<QueryResult> getQueryResults(final ShardingSphereDatabase 
database, final QueryContext queryContext, final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
-                                              final StatementAddCallback 
addCallback, final StatementReplayCallback replayCallback) throws SQLException {
-        ExecutionContext executionContext = new 
KernelProcessor().generateExecutionContext(
-                queryContext, database, metaData.getGlobalRuleMetaData(), 
metaData.getProps(), 
connection.getDatabaseConnectionManager().getConnectionContext());
-        return 
database.getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty()
-                ? getJDBCQueryResults(database, queryContext, prepareEngine, 
addCallback, replayCallback, executionContext)
-                : getRawQueryResults(database, queryContext, executionContext);
-    }
-    
-    @SuppressWarnings({"rawtypes", "unchecked"})
-    private List<QueryResult> getJDBCQueryResults(final ShardingSphereDatabase 
database, final QueryContext queryContext,
-                                                  final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
-                                                  final StatementAddCallback 
addCallback, final StatementReplayCallback replayCallback,
-                                                  final ExecutionContext 
executionContext) throws SQLException {
-        ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = 
prepareEngine.prepare(executionContext.getRouteContext(), 
executionContext.getExecutionUnits(),
-                new ExecutionGroupReportContext(connection.getProcessId(), 
database.getName(), new Grantee("", "")));
-        for (ExecutionGroup<JDBCExecutionUnit> each : 
executionGroupContext.getInputGroups()) {
-            Collection<Statement> statements = getStatements(each);
-            this.statements.addAll(statements);
-            addCallback.add(statements, 
JDBCDriverType.PREPARED_STATEMENT.equals(prepareEngine.getType()) ? 
getParameterSets(each) : Collections.emptyList());
-        }
-        replayCallback.replay();
-        ProcessEngine processEngine = new ProcessEngine();
-        try {
-            processEngine.executeSQL(executionGroupContext, queryContext);
-            return jdbcExecutor.execute(executionGroupContext, 
getExecuteQueryCallback(database, queryContext, prepareEngine.getType()));
-        } finally {
-            
processEngine.completeSQLExecution(executionGroupContext.getReportContext().getProcessId());
-        }
-    }
-    
-    private Collection<Statement> getStatements(final 
ExecutionGroup<JDBCExecutionUnit> executionGroup) {
-        Collection<Statement> result = new LinkedList<>();
-        for (JDBCExecutionUnit each : executionGroup.getInputs()) {
-            result.add(each.getStorageResource());
-        }
-        return result;
-    }
-    
-    private Collection<List<Object>> getParameterSets(final 
ExecutionGroup<JDBCExecutionUnit> executionGroup) {
-        Collection<List<Object>> result = new LinkedList<>();
-        for (JDBCExecutionUnit each : executionGroup.getInputs()) {
-            result.add(each.getExecutionUnit().getSqlUnit().getParameters());
-        }
-        return result;
-    }
-    
-    private List<QueryResult> getRawQueryResults(final ShardingSphereDatabase 
database, final QueryContext queryContext, final ExecutionContext 
executionContext) throws SQLException {
-        return rawExecutor.execute(
-                createRawExecutionGroupContext(database, executionContext), 
queryContext, new 
RawSQLExecutorCallback()).stream().map(QueryResult.class::cast).collect(Collectors.toList());
-    }
-    
-    private ExecutionGroupContext<RawSQLExecutionUnit> 
createRawExecutionGroupContext(final ShardingSphereDatabase database, final 
ExecutionContext executionContext) throws SQLException {
-        int maxConnectionsSizePerQuery = 
metaData.getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
-        return new RawExecutionPrepareEngine(maxConnectionsSizePerQuery, 
database.getRuleMetaData().getRules()).prepare(
-                executionContext.getRouteContext(), 
executionContext.getExecutionUnits(), new 
ExecutionGroupReportContext(connection.getProcessId(), database.getName(), new 
Grantee("", "")));
-    }
-    
-    private MergedResult mergeQuery(final ShardingSphereDatabase database, 
final List<QueryResult> queryResults, final SQLStatementContext 
sqlStatementContext) throws SQLException {
-        MergeEngine mergeEngine = new 
MergeEngine(metaData.getGlobalRuleMetaData(), database, metaData.getProps(), 
connection.getDatabaseConnectionManager().getConnectionContext());
-        return mergeEngine.merge(queryResults, sqlStatementContext);
-    }
-    
-    @SuppressWarnings("JDBCResourceOpenedButNotSafelyClosed")
-    private List<ResultSet> getResultSets() throws SQLException {
-        List<ResultSet> result = new ArrayList<>(statements.size());
-        for (Statement each : statements) {
-            if (null != each.getResultSet()) {
-                result.add(each.getResultSet());
-            }
-        }
-        return result;
-    }
 }
diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteQueryExecutor.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/DriverPushDownExecuteQueryExecutor.java
similarity index 64%
copy from 
jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteQueryExecutor.java
copy to 
jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/DriverPushDownExecuteQueryExecutor.java
index bc287c93bbc..e04785bc486 100644
--- 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteQueryExecutor.java
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/DriverPushDownExecuteQueryExecutor.java
@@ -15,27 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.driver.executor.engine;
+package org.apache.shardingsphere.driver.executor.engine.pushdown;
 
-import lombok.RequiredArgsConstructor;
 import 
org.apache.shardingsphere.driver.executor.callback.add.StatementAddCallback;
-import 
org.apache.shardingsphere.driver.executor.callback.execute.ExecuteQueryCallback;
-import 
org.apache.shardingsphere.driver.executor.callback.execute.impl.PreparedStatementExecuteQueryCallback;
-import 
org.apache.shardingsphere.driver.executor.callback.execute.impl.StatementExecuteQueryCallback;
+import 
org.apache.shardingsphere.driver.executor.callback.execute.ExecuteQueryCallbackFactory;
 import 
org.apache.shardingsphere.driver.executor.callback.replay.StatementReplayCallback;
 import 
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
 import 
org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResultSet;
 import 
org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResultSetUtils;
 import 
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
 import 
org.apache.shardingsphere.infra.binder.context.statement.dml.SelectStatementContext;
+import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
 import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
 import org.apache.shardingsphere.infra.connection.kernel.KernelProcessor;
-import org.apache.shardingsphere.infra.executor.audit.SQLAuditEngine;
 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
 import 
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
 import 
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
 import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
-import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
@@ -53,15 +49,10 @@ import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
 import org.apache.shardingsphere.infra.metadata.user.Grantee;
 import 
org.apache.shardingsphere.infra.rule.attribute.raw.RawExecutionRuleAttribute;
+import org.apache.shardingsphere.infra.session.connection.ConnectionContext;
 import org.apache.shardingsphere.infra.session.query.QueryContext;
-import org.apache.shardingsphere.sqlfederation.engine.SQLFederationEngine;
-import 
org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext;
-import org.apache.shardingsphere.traffic.executor.TrafficExecutor;
-import org.apache.shardingsphere.traffic.executor.TrafficExecutorCallback;
-import org.apache.shardingsphere.traffic.rule.TrafficRule;
 
 import java.sql.Connection;
-import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
@@ -71,32 +62,40 @@ import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.stream.Collectors;
 
 /**
- * Driver execute query executor.
+ * Driver push down execute query executor.
  */
-@RequiredArgsConstructor
-public final class DriverExecuteQueryExecutor {
+public final class DriverPushDownExecuteQueryExecutor {
     
-    private final ShardingSphereConnection connection;
+    private final ConnectionContext connectionContext;
     
-    private final ShardingSphereMetaData metaData;
+    private final String processId;
+    
+    private final RuleMetaData globalRuleMetaData;
+    
+    private final ConfigurationProperties props;
     
     private final JDBCExecutor jdbcExecutor;
     
     private final RawExecutor rawExecutor;
     
-    private final TrafficExecutor trafficExecutor;
+    private final Collection<Statement> statements;
     
-    private final SQLFederationEngine sqlFederationEngine;
-    
-    private final Collection<Statement> statements = new LinkedList<>();
+    public DriverPushDownExecuteQueryExecutor(final ShardingSphereConnection 
connection, final ShardingSphereMetaData metaData, final JDBCExecutor 
jdbcExecutor, final RawExecutor rawExecutor) {
+        connectionContext = 
connection.getDatabaseConnectionManager().getConnectionContext();
+        processId = connection.getProcessId();
+        globalRuleMetaData = metaData.getGlobalRuleMetaData();
+        props = metaData.getProps();
+        this.jdbcExecutor = jdbcExecutor;
+        this.rawExecutor = rawExecutor;
+        statements = new LinkedList<>();
+    }
     
     /**
      * Execute query.
-     *
+     * 
      * @param database database
      * @param queryContext query context
      * @param prepareEngine prepare engine
@@ -108,41 +107,11 @@ public final class DriverExecuteQueryExecutor {
      * @throws SQLException SQL exception
      */
     @SuppressWarnings("rawtypes")
-    public ResultSet executeQuery(final ShardingSphereDatabase database, final 
QueryContext queryContext,
-                                  final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine, 
final Statement statement, final Map<String, Integer> columnLabelAndIndexMap,
-                                  final StatementAddCallback addCallback, 
final StatementReplayCallback replayCallback) throws SQLException {
+    public ShardingSphereResultSet executeQuery(final ShardingSphereDatabase 
database, final QueryContext queryContext,
+                                                final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine, 
final Statement statement,
+                                                final Map<String, Integer> 
columnLabelAndIndexMap,
+                                                final StatementAddCallback 
addCallback, final StatementReplayCallback replayCallback) throws SQLException {
         statements.clear();
-        RuleMetaData globalRuleMetaData = metaData.getGlobalRuleMetaData();
-        SQLAuditEngine.audit(queryContext, globalRuleMetaData, database);
-        Optional<String> trafficInstanceId = 
connection.getTrafficInstanceId(globalRuleMetaData.getSingleRule(TrafficRule.class),
 queryContext);
-        if (trafficInstanceId.isPresent()) {
-            return trafficExecutor.execute(
-                    connection.getProcessId(), database.getName(), 
trafficInstanceId.get(), queryContext, prepareEngine, 
getTrafficExecuteQueryCallback(prepareEngine.getType()));
-        }
-        if (sqlFederationEngine.decide(queryContext.getSqlStatementContext(), 
queryContext.getParameters(), database, globalRuleMetaData)) {
-            return sqlFederationEngine.executeQuery(
-                    prepareEngine, getExecuteQueryCallback(database, 
queryContext, prepareEngine.getType()), new SQLFederationContext(false, 
queryContext, metaData, connection.getProcessId()));
-        }
-        return executePushDownQuery(database, queryContext, prepareEngine, 
statement, columnLabelAndIndexMap, addCallback, replayCallback);
-    }
-    
-    private TrafficExecutorCallback<ResultSet> 
getTrafficExecuteQueryCallback(final String jdbcDriverType) {
-        return JDBCDriverType.STATEMENT.equals(jdbcDriverType) ? ((sql, 
statement) -> statement.executeQuery(sql)) : ((sql, statement) -> 
((PreparedStatement) statement).executeQuery());
-    }
-    
-    private ExecuteQueryCallback getExecuteQueryCallback(final 
ShardingSphereDatabase database, final QueryContext queryContext, final String 
jdbcDriverType) {
-        return JDBCDriverType.STATEMENT.equals(jdbcDriverType)
-                ? new 
StatementExecuteQueryCallback(database.getProtocolType(), 
database.getResourceMetaData(),
-                        
queryContext.getSqlStatementContext().getSqlStatement(), 
SQLExecutorExceptionHandler.isExceptionThrown())
-                : new 
PreparedStatementExecuteQueryCallback(database.getProtocolType(), 
database.getResourceMetaData(),
-                        
queryContext.getSqlStatementContext().getSqlStatement(), 
SQLExecutorExceptionHandler.isExceptionThrown());
-    }
-    
-    @SuppressWarnings("rawtypes")
-    private ShardingSphereResultSet executePushDownQuery(final 
ShardingSphereDatabase database, final QueryContext queryContext,
-                                                         final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine, 
final Statement statement,
-                                                         final Map<String, 
Integer> columnLabelAndIndexMap,
-                                                         final 
StatementAddCallback addCallback, final StatementReplayCallback replayCallback) 
throws SQLException {
         List<QueryResult> queryResults = getQueryResults(database, 
queryContext, prepareEngine, addCallback, replayCallback);
         MergedResult mergedResult = mergeQuery(database, queryResults, 
queryContext.getSqlStatementContext());
         boolean isContainsEnhancedTable = 
queryContext.getSqlStatementContext() instanceof SelectStatementContext
@@ -158,7 +127,7 @@ public final class DriverExecuteQueryExecutor {
     private List<QueryResult> getQueryResults(final ShardingSphereDatabase 
database, final QueryContext queryContext, final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
                                               final StatementAddCallback 
addCallback, final StatementReplayCallback replayCallback) throws SQLException {
         ExecutionContext executionContext = new 
KernelProcessor().generateExecutionContext(
-                queryContext, database, metaData.getGlobalRuleMetaData(), 
metaData.getProps(), 
connection.getDatabaseConnectionManager().getConnectionContext());
+                queryContext, database, globalRuleMetaData, props, 
connectionContext);
         return 
database.getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty()
                 ? getJDBCQueryResults(database, queryContext, prepareEngine, 
addCallback, replayCallback, executionContext)
                 : getRawQueryResults(database, queryContext, executionContext);
@@ -170,7 +139,7 @@ public final class DriverExecuteQueryExecutor {
                                                   final StatementAddCallback 
addCallback, final StatementReplayCallback replayCallback,
                                                   final ExecutionContext 
executionContext) throws SQLException {
         ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = 
prepareEngine.prepare(executionContext.getRouteContext(), 
executionContext.getExecutionUnits(),
-                new ExecutionGroupReportContext(connection.getProcessId(), 
database.getName(), new Grantee("", "")));
+                new ExecutionGroupReportContext(processId, database.getName(), 
new Grantee("", "")));
         for (ExecutionGroup<JDBCExecutionUnit> each : 
executionGroupContext.getInputGroups()) {
             Collection<Statement> statements = getStatements(each);
             this.statements.addAll(statements);
@@ -180,7 +149,7 @@ public final class DriverExecuteQueryExecutor {
         ProcessEngine processEngine = new ProcessEngine();
         try {
             processEngine.executeSQL(executionGroupContext, queryContext);
-            return jdbcExecutor.execute(executionGroupContext, 
getExecuteQueryCallback(database, queryContext, prepareEngine.getType()));
+            return jdbcExecutor.execute(executionGroupContext, new 
ExecuteQueryCallbackFactory(prepareEngine.getType()).newInstance(database, 
queryContext));
         } finally {
             
processEngine.completeSQLExecution(executionGroupContext.getReportContext().getProcessId());
         }
@@ -208,13 +177,13 @@ public final class DriverExecuteQueryExecutor {
     }
     
     private ExecutionGroupContext<RawSQLExecutionUnit> 
createRawExecutionGroupContext(final ShardingSphereDatabase database, final 
ExecutionContext executionContext) throws SQLException {
-        int maxConnectionsSizePerQuery = 
metaData.getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
+        int maxConnectionsSizePerQuery = 
props.<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
         return new RawExecutionPrepareEngine(maxConnectionsSizePerQuery, 
database.getRuleMetaData().getRules()).prepare(
-                executionContext.getRouteContext(), 
executionContext.getExecutionUnits(), new 
ExecutionGroupReportContext(connection.getProcessId(), database.getName(), new 
Grantee("", "")));
+                executionContext.getRouteContext(), 
executionContext.getExecutionUnits(), new 
ExecutionGroupReportContext(processId, database.getName(), new Grantee("", 
"")));
     }
     
     private MergedResult mergeQuery(final ShardingSphereDatabase database, 
final List<QueryResult> queryResults, final SQLStatementContext 
sqlStatementContext) throws SQLException {
-        MergeEngine mergeEngine = new 
MergeEngine(metaData.getGlobalRuleMetaData(), database, metaData.getProps(), 
connection.getDatabaseConnectionManager().getConnectionContext());
+        MergeEngine mergeEngine = new MergeEngine(globalRuleMetaData, 
database, props, connectionContext);
         return mergeEngine.merge(queryResults, sqlStatementContext);
     }
     

Reply via email to