This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new a1a44fc78db Refactor DriverExecuteQueryExecutor (#31638)
a1a44fc78db is described below

commit a1a44fc78db5e92d6883dbe2126ae3a169c4963d
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Jun 9 12:55:03 2024 +0800

    Refactor DriverExecuteQueryExecutor (#31638)
    
    * Refactor DriverExecuteQueryExecutor
    
    * Refactor DriverExecuteQueryExecutor
---
 .../engine/DriverExecuteQueryExecutor.java         | 77 ++++++++++++----------
 1 file changed, 42 insertions(+), 35 deletions(-)

diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteQueryExecutor.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteQueryExecutor.java
index 50ba479ab36..bc287c93bbc 100644
--- 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteQueryExecutor.java
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteQueryExecutor.java
@@ -18,14 +18,14 @@
 package org.apache.shardingsphere.driver.executor.engine;
 
 import lombok.RequiredArgsConstructor;
+import 
org.apache.shardingsphere.driver.executor.callback.add.StatementAddCallback;
 import 
org.apache.shardingsphere.driver.executor.callback.execute.ExecuteQueryCallback;
 import 
org.apache.shardingsphere.driver.executor.callback.execute.impl.PreparedStatementExecuteQueryCallback;
 import 
org.apache.shardingsphere.driver.executor.callback.execute.impl.StatementExecuteQueryCallback;
+import 
org.apache.shardingsphere.driver.executor.callback.replay.StatementReplayCallback;
 import 
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
 import 
org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResultSet;
 import 
org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResultSetUtils;
-import 
org.apache.shardingsphere.driver.executor.callback.add.StatementAddCallback;
-import 
org.apache.shardingsphere.driver.executor.callback.replay.StatementReplayCallback;
 import 
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
 import 
org.apache.shardingsphere.infra.binder.context.statement.dml.SelectStatementContext;
 import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
@@ -112,24 +112,18 @@ public final class DriverExecuteQueryExecutor {
                                   final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine, 
final Statement statement, final Map<String, Integer> columnLabelAndIndexMap,
                                   final StatementAddCallback addCallback, 
final StatementReplayCallback replayCallback) throws SQLException {
         statements.clear();
-        Optional<String> trafficInstanceId = 
connection.getTrafficInstanceId(metaData.getGlobalRuleMetaData().getSingleRule(TrafficRule.class),
 queryContext);
+        RuleMetaData globalRuleMetaData = metaData.getGlobalRuleMetaData();
+        SQLAuditEngine.audit(queryContext, globalRuleMetaData, database);
+        Optional<String> trafficInstanceId = 
connection.getTrafficInstanceId(globalRuleMetaData.getSingleRule(TrafficRule.class),
 queryContext);
         if (trafficInstanceId.isPresent()) {
             return trafficExecutor.execute(
                     connection.getProcessId(), database.getName(), 
trafficInstanceId.get(), queryContext, prepareEngine, 
getTrafficExecuteQueryCallback(prepareEngine.getType()));
         }
-        if (sqlFederationEngine.decide(queryContext.getSqlStatementContext(), 
queryContext.getParameters(), database, metaData.getGlobalRuleMetaData())) {
+        if (sqlFederationEngine.decide(queryContext.getSqlStatementContext(), 
queryContext.getParameters(), database, globalRuleMetaData)) {
             return sqlFederationEngine.executeQuery(
                     prepareEngine, getExecuteQueryCallback(database, 
queryContext, prepareEngine.getType()), new SQLFederationContext(false, 
queryContext, metaData, connection.getProcessId()));
         }
-        List<QueryResult> queryResults = executePushDownQuery(database, 
queryContext, prepareEngine, addCallback, replayCallback);
-        MergedResult mergedResult = mergeQuery(database, queryResults, 
queryContext.getSqlStatementContext());
-        boolean selectContainsEnhancedTable = 
queryContext.getSqlStatementContext() instanceof SelectStatementContext
-                && ((SelectStatementContext) 
queryContext.getSqlStatementContext()).isContainsEnhancedTable();
-        List<ResultSet> resultSets = getResultSets();
-        return new ShardingSphereResultSet(resultSets, mergedResult, 
statement, selectContainsEnhancedTable, queryContext.getSqlStatementContext(),
-                null == columnLabelAndIndexMap
-                        ? 
ShardingSphereResultSetUtils.createColumnLabelAndIndexMap(queryContext.getSqlStatementContext(),
 selectContainsEnhancedTable, resultSets.get(0).getMetaData())
-                        : columnLabelAndIndexMap);
+        return executePushDownQuery(database, queryContext, prepareEngine, 
statement, columnLabelAndIndexMap, addCallback, replayCallback);
     }
     
     private TrafficExecutorCallback<ResultSet> 
getTrafficExecuteQueryCallback(final String jdbcDriverType) {
@@ -144,15 +138,37 @@ public final class DriverExecuteQueryExecutor {
                         
queryContext.getSqlStatementContext().getSqlStatement(), 
SQLExecutorExceptionHandler.isExceptionThrown());
     }
     
+    @SuppressWarnings("rawtypes")
+    private ShardingSphereResultSet executePushDownQuery(final 
ShardingSphereDatabase database, final QueryContext queryContext,
+                                                         final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine, 
final Statement statement,
+                                                         final Map<String, 
Integer> columnLabelAndIndexMap,
+                                                         final 
StatementAddCallback addCallback, final StatementReplayCallback replayCallback) 
throws SQLException {
+        List<QueryResult> queryResults = getQueryResults(database, 
queryContext, prepareEngine, addCallback, replayCallback);
+        MergedResult mergedResult = mergeQuery(database, queryResults, 
queryContext.getSqlStatementContext());
+        boolean isContainsEnhancedTable = 
queryContext.getSqlStatementContext() instanceof SelectStatementContext
+                && ((SelectStatementContext) 
queryContext.getSqlStatementContext()).isContainsEnhancedTable();
+        List<ResultSet> resultSets = getResultSets();
+        return new ShardingSphereResultSet(resultSets, mergedResult, 
statement, isContainsEnhancedTable, queryContext.getSqlStatementContext(),
+                null == columnLabelAndIndexMap
+                        ? 
ShardingSphereResultSetUtils.createColumnLabelAndIndexMap(queryContext.getSqlStatementContext(),
 isContainsEnhancedTable, resultSets.get(0).getMetaData())
+                        : columnLabelAndIndexMap);
+    }
+    
+    @SuppressWarnings("rawtypes")
+    private List<QueryResult> getQueryResults(final ShardingSphereDatabase 
database, final QueryContext queryContext, final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
+                                              final StatementAddCallback 
addCallback, final StatementReplayCallback replayCallback) throws SQLException {
+        ExecutionContext executionContext = new 
KernelProcessor().generateExecutionContext(
+                queryContext, database, metaData.getGlobalRuleMetaData(), 
metaData.getProps(), 
connection.getDatabaseConnectionManager().getConnectionContext());
+        return 
database.getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty()
+                ? getJDBCQueryResults(database, queryContext, prepareEngine, 
addCallback, replayCallback, executionContext)
+                : getRawQueryResults(database, queryContext, executionContext);
+    }
+    
     @SuppressWarnings({"rawtypes", "unchecked"})
-    private List<QueryResult> executePushDownQuery(final 
ShardingSphereDatabase database, final QueryContext queryContext,
-                                                   final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
-                                                   final StatementAddCallback 
addCallback, final StatementReplayCallback replayCallback) throws SQLException {
-        ExecutionContext executionContext = createExecutionContext(database, 
queryContext);
-        if (hasRawExecutionRule(database)) {
-            return rawExecutor.execute(
-                    createRawExecutionGroupContext(database, 
executionContext), queryContext, new 
RawSQLExecutorCallback()).stream().map(QueryResult.class::cast).collect(Collectors.toList());
-        }
+    private List<QueryResult> getJDBCQueryResults(final ShardingSphereDatabase 
database, final QueryContext queryContext,
+                                                  final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
+                                                  final StatementAddCallback 
addCallback, final StatementReplayCallback replayCallback,
+                                                  final ExecutionContext 
executionContext) throws SQLException {
         ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = 
prepareEngine.prepare(executionContext.getRouteContext(), 
executionContext.getExecutionUnits(),
                 new ExecutionGroupReportContext(connection.getProcessId(), 
database.getName(), new Grantee("", "")));
         for (ExecutionGroup<JDBCExecutionUnit> each : 
executionGroupContext.getInputGroups()) {
@@ -161,24 +177,15 @@ public final class DriverExecuteQueryExecutor {
             addCallback.add(statements, 
JDBCDriverType.PREPARED_STATEMENT.equals(prepareEngine.getType()) ? 
getParameterSets(each) : Collections.emptyList());
         }
         replayCallback.replay();
-        return executePushDownQuery(executionGroupContext, queryContext, 
getExecuteQueryCallback(database, queryContext, prepareEngine.getType()));
-    }
-    
-    private List<QueryResult> executePushDownQuery(final 
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext,
-                                                   final QueryContext 
queryContext, final ExecuteQueryCallback callback) throws SQLException {
         ProcessEngine processEngine = new ProcessEngine();
         try {
             processEngine.executeSQL(executionGroupContext, queryContext);
-            return jdbcExecutor.execute(executionGroupContext, callback);
+            return jdbcExecutor.execute(executionGroupContext, 
getExecuteQueryCallback(database, queryContext, prepareEngine.getType()));
         } finally {
             
processEngine.completeSQLExecution(executionGroupContext.getReportContext().getProcessId());
         }
     }
     
-    private boolean hasRawExecutionRule(final ShardingSphereDatabase database) 
{
-        return 
!database.getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty();
-    }
-    
     private Collection<Statement> getStatements(final 
ExecutionGroup<JDBCExecutionUnit> executionGroup) {
         Collection<Statement> result = new LinkedList<>();
         for (JDBCExecutionUnit each : executionGroup.getInputs()) {
@@ -195,10 +202,9 @@ public final class DriverExecuteQueryExecutor {
         return result;
     }
     
-    private ExecutionContext createExecutionContext(final 
ShardingSphereDatabase database, final QueryContext queryContext) {
-        RuleMetaData globalRuleMetaData = metaData.getGlobalRuleMetaData();
-        SQLAuditEngine.audit(queryContext, globalRuleMetaData, database);
-        return new KernelProcessor().generateExecutionContext(queryContext, 
database, globalRuleMetaData, metaData.getProps(), 
connection.getDatabaseConnectionManager().getConnectionContext());
+    private List<QueryResult> getRawQueryResults(final ShardingSphereDatabase 
database, final QueryContext queryContext, final ExecutionContext 
executionContext) throws SQLException {
+        return rawExecutor.execute(
+                createRawExecutionGroupContext(database, executionContext), 
queryContext, new 
RawSQLExecutorCallback()).stream().map(QueryResult.class::cast).collect(Collectors.toList());
     }
     
     private ExecutionGroupContext<RawSQLExecutionUnit> 
createRawExecutionGroupContext(final ShardingSphereDatabase database, final 
ExecutionContext executionContext) throws SQLException {
@@ -212,6 +218,7 @@ public final class DriverExecuteQueryExecutor {
         return mergeEngine.merge(queryResults, sqlStatementContext);
     }
     
+    @SuppressWarnings("JDBCResourceOpenedButNotSafelyClosed")
     private List<ResultSet> getResultSets() throws SQLException {
         List<ResultSet> result = new ArrayList<>(statements.size());
         for (Statement each : statements) {

Reply via email to