This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 2da211c153f Refactor DriverExecutor (#31442)
2da211c153f is described below

commit 2da211c153f365524e69c7a952d687c6d089224b
Author: Liang Zhang <[email protected]>
AuthorDate: Thu May 30 00:43:59 2024 +0800

    Refactor DriverExecutor (#31442)
    
    * Refactor DriverExecutor
    
    * Refactor DriverExecutor
    
    * Refactor DriverExecutor
    
    * Refactor DriverExecutor
---
 .../driver/executor/DriverExecutor.java            | 162 +++++++++++++++++++--
 .../core/resultset/ShardingSphereResultSet.java    |   2 +
 .../statement/ShardingSpherePreparedStatement.java |  53 +++----
 .../core/statement/ShardingSphereStatement.java    |  39 +----
 .../core/statement/StatementReplayCallback.java    |  39 +++++
 5 files changed, 212 insertions(+), 83 deletions(-)

diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java
index 5c6136bf646..04a7a4e0108 100644
--- 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java
@@ -22,16 +22,37 @@ import 
org.apache.shardingsphere.driver.executor.callback.ExecuteQueryCallback;
 import 
org.apache.shardingsphere.driver.executor.callback.impl.PreparedStatementExecuteQueryCallback;
 import 
org.apache.shardingsphere.driver.executor.callback.impl.StatementExecuteQueryCallback;
 import 
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
+import 
org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResultSet;
+import 
org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResultSetUtils;
+import 
org.apache.shardingsphere.driver.jdbc.core.statement.StatementReplayCallback;
+import 
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
+import 
org.apache.shardingsphere.infra.binder.context.statement.dml.SelectStatementContext;
+import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
+import org.apache.shardingsphere.infra.connection.kernel.KernelProcessor;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
+import org.apache.shardingsphere.infra.executor.audit.SQLAuditEngine;
 import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
+import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
+import 
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
+import 
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
+import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
+import 
org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
+import org.apache.shardingsphere.infra.merge.MergeEngine;
+import org.apache.shardingsphere.infra.merge.result.MergedResult;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
+import org.apache.shardingsphere.infra.metadata.user.Grantee;
+import 
org.apache.shardingsphere.infra.rule.attribute.raw.RawExecutionRuleAttribute;
 import org.apache.shardingsphere.infra.session.query.QueryContext;
 import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
 import org.apache.shardingsphere.sqlfederation.engine.SQLFederationEngine;
@@ -45,7 +66,13 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
 import java.util.Optional;
+import java.util.stream.Collectors;
 
 /**
  * Driver executor.
@@ -66,6 +93,14 @@ public final class DriverExecutor implements AutoCloseable {
     
     private ExecuteType executeType = ExecuteType.REGULAR;
     
+    private final KernelProcessor kernelProcessor;
+    
+    @Getter
+    private final List<Statement> statements = new ArrayList<>();
+    
+    @Getter
+    private final List<List<Object>> parameterSets = new ArrayList<>();
+    
     public DriverExecutor(final ShardingSphereConnection connection) {
         this.connection = connection;
         MetaDataContexts metaDataContexts = 
connection.getContextManager().getMetaDataContexts();
@@ -73,10 +108,10 @@ public final class DriverExecutor implements AutoCloseable 
{
         JDBCExecutor jdbcExecutor = new JDBCExecutor(executorEngine, 
connection.getDatabaseConnectionManager().getConnectionContext());
         regularExecutor = new DriverJDBCExecutor(connection.getDatabaseName(), 
connection.getContextManager(), jdbcExecutor);
         rawExecutor = new RawExecutor(executorEngine, 
connection.getDatabaseConnectionManager().getConnectionContext());
-        ShardingSphereDatabase database = 
metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName());
-        String schemaName = new 
DatabaseTypeRegistry(database.getProtocolType()).getDefaultSchemaName(connection.getDatabaseName());
+        String schemaName = new 
DatabaseTypeRegistry(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType()).getDefaultSchemaName(connection.getDatabaseName());
         trafficExecutor = new TrafficExecutor();
         sqlFederationEngine = new 
SQLFederationEngine(connection.getDatabaseName(), schemaName, 
metaDataContexts.getMetaData(), metaDataContexts.getStatistics(), jdbcExecutor);
+        kernelProcessor = new KernelProcessor();
     }
     
     /**
@@ -86,36 +121,129 @@ public final class DriverExecutor implements 
AutoCloseable {
      * @param database database
      * @param queryContext query context
      * @param prepareEngine prepare engine
+     * @param statement statement
+     * @param columnLabelAndIndexMap column label and index map
+     * @param statementReplayCallback statement replay callback
      * @return result set
      * @throws SQLException SQL exception
      */
-    public Optional<ResultSet> executeAdvanceQuery(final 
ShardingSphereMetaData metaData, final ShardingSphereDatabase database, final 
QueryContext queryContext,
-                                                   final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine) 
throws SQLException {
+    public ResultSet executeAdvanceQuery(final ShardingSphereMetaData 
metaData, final ShardingSphereDatabase database, final QueryContext 
queryContext,
+                                         final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine, 
final Statement statement,
+                                         final Map<String, Integer> 
columnLabelAndIndexMap, final StatementReplayCallback statementReplayCallback) 
throws SQLException {
         Optional<String> trafficInstanceId = 
connection.getTrafficInstanceId(metaData.getGlobalRuleMetaData().getSingleRule(TrafficRule.class),
 queryContext);
         if (trafficInstanceId.isPresent()) {
-            return Optional.of(trafficExecutor.execute(
-                    connection.getProcessId(), database.getName(), 
trafficInstanceId.get(), queryContext, prepareEngine, 
getTrafficExecutorCallback(prepareEngine)));
+            return trafficExecutor.execute(connection.getProcessId(), 
database.getName(), trafficInstanceId.get(), queryContext, prepareEngine, 
getTrafficExecutorCallback(prepareEngine));
         }
         if (sqlFederationEngine.decide(queryContext.getSqlStatementContext(), 
queryContext.getParameters(), database, metaData.getGlobalRuleMetaData())) {
-            return Optional.of(sqlFederationEngine.executeQuery(
-                    prepareEngine, getSQLFederationCallback(database, 
queryContext, prepareEngine), new SQLFederationContext(false, queryContext, 
metaData, connection.getProcessId())));
+            return sqlFederationEngine.executeQuery(
+                    prepareEngine, getExecuteQueryCallback(database, 
queryContext, prepareEngine.getType()), new SQLFederationContext(false, 
queryContext, metaData, connection.getProcessId()));
         }
-        return Optional.empty();
+        return doExecuteQuery(metaData, database, queryContext, prepareEngine, 
statement, columnLabelAndIndexMap, statementReplayCallback);
     }
     
     private TrafficExecutorCallback<ResultSet> 
getTrafficExecutorCallback(final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine) {
         return JDBCDriverType.STATEMENT.equals(prepareEngine.getType()) ? 
Statement::executeQuery : ((statement, sql) -> ((PreparedStatement) 
statement).executeQuery());
     }
     
-    private ExecuteQueryCallback getSQLFederationCallback(final 
ShardingSphereDatabase database, final QueryContext queryContext,
-                                                          final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine) {
-        return JDBCDriverType.STATEMENT.equals(prepareEngine.getType())
+    private ExecuteQueryCallback getExecuteQueryCallback(final 
ShardingSphereDatabase database, final QueryContext queryContext, final String 
jdbcDriverType) {
+        return JDBCDriverType.STATEMENT.equals(jdbcDriverType)
                 ? new 
StatementExecuteQueryCallback(database.getProtocolType(), 
database.getResourceMetaData(),
                         
queryContext.getSqlStatementContext().getSqlStatement(), 
SQLExecutorExceptionHandler.isExceptionThrown())
                 : new 
PreparedStatementExecuteQueryCallback(database.getProtocolType(), 
database.getResourceMetaData(),
                         
queryContext.getSqlStatementContext().getSqlStatement(), 
SQLExecutorExceptionHandler.isExceptionThrown());
     }
     
+    private ShardingSphereResultSet doExecuteQuery(final 
ShardingSphereMetaData metaData, final ShardingSphereDatabase database, final 
QueryContext queryContext,
+                                                   final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine, 
final Statement statement,
+                                                   final Map<String, Integer> 
columnLabelAndIndexMap, final StatementReplayCallback statementReplayCallback) 
throws SQLException {
+        List<QueryResult> queryResults = executeQuery0(metaData, database, 
queryContext, prepareEngine, statementReplayCallback);
+        MergedResult mergedResult = mergeQuery(metaData, database, 
queryResults, queryContext.getSqlStatementContext());
+        boolean selectContainsEnhancedTable = 
queryContext.getSqlStatementContext() instanceof SelectStatementContext
+                && ((SelectStatementContext) 
queryContext.getSqlStatementContext()).isContainsEnhancedTable();
+        List<ResultSet> resultSets = getResultSets();
+        return new ShardingSphereResultSet(resultSets, mergedResult, 
statement, selectContainsEnhancedTable, queryContext.getSqlStatementContext(),
+                null == columnLabelAndIndexMap
+                        ? 
ShardingSphereResultSetUtils.createColumnLabelAndIndexMap(queryContext.getSqlStatementContext(),
 selectContainsEnhancedTable, resultSets.get(0).getMetaData())
+                        : columnLabelAndIndexMap);
+    }
+    
+    private List<QueryResult> executeQuery0(final ShardingSphereMetaData 
metaData, final ShardingSphereDatabase database, final QueryContext 
queryContext,
+                                            final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
+                                            final StatementReplayCallback 
statementReplayCallback) throws SQLException {
+        ExecutionContext executionContext = createExecutionContext(metaData, 
database, queryContext);
+        if (hasRawExecutionRule(database)) {
+            return 
rawExecutor.execute(createRawExecutionGroupContext(metaData, database, 
executionContext),
+                    queryContext, new 
RawSQLExecutorCallback()).stream().map(QueryResult.class::cast).collect(Collectors.toList());
+        }
+        ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = 
prepareEngine.prepare(executionContext.getRouteContext(), 
executionContext.getExecutionUnits(),
+                new ExecutionGroupReportContext(connection.getProcessId(), 
database.getName(), new Grantee("", "")));
+        for (ExecutionGroup<JDBCExecutionUnit> each : 
executionGroupContext.getInputGroups()) {
+            statements.addAll(getStatements(each));
+            if 
(JDBCDriverType.PREPARED_STATEMENT.equals(prepareEngine.getType())) {
+                parameterSets.addAll(getParameterSets(each));
+            }
+        }
+        statementReplayCallback.replay(statements, parameterSets);
+        return regularExecutor.executeQuery(executionGroupContext, 
queryContext, getExecuteQueryCallback(database, queryContext, 
prepareEngine.getType()));
+    }
+    
+    private Collection<Statement> getStatements(final 
ExecutionGroup<JDBCExecutionUnit> executionGroup) {
+        Collection<Statement> result = new LinkedList<>();
+        for (JDBCExecutionUnit each : executionGroup.getInputs()) {
+            result.add(each.getStorageResource());
+        }
+        return result;
+    }
+    
+    private Collection<List<Object>> getParameterSets(final 
ExecutionGroup<JDBCExecutionUnit> executionGroup) {
+        Collection<List<Object>> result = new LinkedList<>();
+        for (JDBCExecutionUnit each : executionGroup.getInputs()) {
+            result.add(each.getExecutionUnit().getSqlUnit().getParameters());
+        }
+        return result;
+    }
+    
+    private ExecutionContext createExecutionContext(final 
ShardingSphereMetaData metaData, final ShardingSphereDatabase database, final 
QueryContext queryContext) throws SQLException {
+        clearStatements();
+        RuleMetaData globalRuleMetaData = metaData.getGlobalRuleMetaData();
+        SQLAuditEngine.audit(queryContext.getSqlStatementContext(), 
queryContext.getParameters(), globalRuleMetaData, database, null, 
queryContext.getHintValueContext());
+        return kernelProcessor.generateExecutionContext(queryContext, 
database, globalRuleMetaData, metaData.getProps(), 
connection.getDatabaseConnectionManager().getConnectionContext());
+    }
+    
+    private void clearStatements() throws SQLException {
+        for (Statement each : statements) {
+            each.close();
+        }
+        statements.clear();
+    }
+    
+    private boolean hasRawExecutionRule(final ShardingSphereDatabase database) 
{
+        return 
!database.getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty();
+    }
+    
+    private ExecutionGroupContext<RawSQLExecutionUnit> 
createRawExecutionGroupContext(final ShardingSphereMetaData metaData,
+                                                                               
       final ShardingSphereDatabase database, final ExecutionContext 
executionContext) throws SQLException {
+        int maxConnectionsSizePerQuery = 
metaData.getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
+        return new RawExecutionPrepareEngine(maxConnectionsSizePerQuery, 
database.getRuleMetaData().getRules()).prepare(
+                executionContext.getRouteContext(), 
executionContext.getExecutionUnits(), new 
ExecutionGroupReportContext(connection.getProcessId(), database.getName(), new 
Grantee("", "")));
+    }
+    
+    private MergedResult mergeQuery(final ShardingSphereMetaData metaData, 
final ShardingSphereDatabase database,
+                                    final List<QueryResult> queryResults, 
final SQLStatementContext sqlStatementContext) throws SQLException {
+        MergeEngine mergeEngine = new 
MergeEngine(metaData.getGlobalRuleMetaData(), database, metaData.getProps(), 
connection.getDatabaseConnectionManager().getConnectionContext());
+        return mergeEngine.merge(queryResults, sqlStatementContext);
+    }
+    
+    private List<ResultSet> getResultSets() throws SQLException {
+        List<ResultSet> result = new ArrayList<>(statements.size());
+        for (Statement each : statements) {
+            if (null != each.getResultSet()) {
+                result.add(each.getResultSet());
+            }
+        }
+        return result;
+    }
+    
     /**
      * Execute advance update.
      *
@@ -159,7 +287,7 @@ public final class DriverExecutor implements AutoCloseable {
         if (sqlFederationEngine.decide(queryContext.getSqlStatementContext(), 
queryContext.getParameters(), database, metaData.getGlobalRuleMetaData())) {
             executeType = ExecuteType.FEDERATION;
             ResultSet resultSet = sqlFederationEngine.executeQuery(
-                    prepareEngine, getSQLFederationCallback(database, 
queryContext, prepareEngine), new SQLFederationContext(false, queryContext, 
metaData, connection.getProcessId()));
+                    prepareEngine, getExecuteQueryCallback(database, 
queryContext, prepareEngine.getType()), new SQLFederationContext(false, 
queryContext, metaData, connection.getProcessId()));
             return Optional.of(null != resultSet);
         }
         return Optional.empty();
@@ -181,6 +309,14 @@ public final class DriverExecutor implements AutoCloseable 
{
         }
     }
     
+    /**
+     * Clear.
+     */
+    public void clear() {
+        statements.clear();
+        parameterSets.clear();
+    }
+    
     @Override
     public void close() throws SQLException {
         sqlFederationEngine.close();
diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/resultset/ShardingSphereResultSet.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/resultset/ShardingSphereResultSet.java
index d293eac5992..cf23ec8d66a 100644
--- 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/resultset/ShardingSphereResultSet.java
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/resultset/ShardingSphereResultSet.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.driver.jdbc.core.resultset;
 
+import lombok.Getter;
 import org.apache.shardingsphere.driver.jdbc.adapter.AbstractResultSetAdapter;
 import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
 import 
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
@@ -62,6 +63,7 @@ public final class ShardingSphereResultSet extends 
AbstractResultSetAdapter {
     
     private final MergedResult mergeResultSet;
     
+    @Getter
     private final Map<String, Integer> columnLabelAndIndexMap;
     
     public ShardingSphereResultSet(final List<ResultSet> resultSets, final 
MergedResult mergeResultSet, final Statement statement, final boolean 
selectContainsEnhancedTable,
diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
index 2326ca199fd..5f5d9d8443f 100644
--- 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
@@ -23,7 +23,6 @@ import lombok.Getter;
 import org.apache.shardingsphere.driver.executor.DriverExecutor;
 import org.apache.shardingsphere.driver.executor.batch.BatchExecutionUnit;
 import 
org.apache.shardingsphere.driver.executor.batch.BatchPreparedStatementExecutor;
-import 
org.apache.shardingsphere.driver.executor.callback.impl.PreparedStatementExecuteQueryCallback;
 import 
org.apache.shardingsphere.driver.jdbc.adapter.AbstractPreparedStatementAdapter;
 import 
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
 import 
org.apache.shardingsphere.driver.jdbc.core.resultset.GeneratedKeysResultSet;
@@ -99,7 +98,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.stream.Collectors;
 
 /**
  * ShardingSphere prepared statement.
@@ -224,13 +222,16 @@ public final class ShardingSpherePreparedStatement 
extends AbstractPreparedState
             QueryContext queryContext = createQueryContext();
             
handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement());
             ShardingSphereDatabase database = 
metaDataContexts.getMetaData().getDatabase(databaseName);
-            Optional<ResultSet> advancedResultSet = 
executor.executeAdvanceQuery(metaDataContexts.getMetaData(), database, 
queryContext, createDriverExecutionPrepareEngine(database));
-            if (advancedResultSet.isPresent()) {
-                currentResultSet = advancedResultSet.get();
-                return currentResultSet;
+            findGeneratedKey().ifPresent(optional -> 
generatedValues.addAll(optional.getGeneratedValues()));
+            currentResultSet = 
executor.executeAdvanceQuery(metaDataContexts.getMetaData(), database, 
queryContext, createDriverExecutionPrepareEngine(database), this, 
columnLabelAndIndexMap,
+                    (StatementReplayCallback<PreparedStatement>) this::replay);
+            if (currentResultSet instanceof ShardingSphereResultSet) {
+                columnLabelAndIndexMap = ((ShardingSphereResultSet) 
currentResultSet).getColumnLabelAndIndexMap();
             }
-            ExecutionContext executionContext = 
createExecutionContext(queryContext);
-            currentResultSet = doExecuteQuery(executionContext);
+            for (Statement each : executor.getStatements()) {
+                statements.add((PreparedStatement) each);
+            }
+            parameterSets.addAll(executor.getParameterSets());
             return currentResultSet;
             // CHECKSTYLE:OFF
         } catch (final RuntimeException ex) {
@@ -243,28 +244,6 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
         }
     }
     
-    private ShardingSphereResultSet doExecuteQuery(final ExecutionContext 
executionContext) throws SQLException {
-        List<QueryResult> queryResults = executeQuery0(executionContext);
-        MergedResult mergedResult = mergeQuery(queryResults, 
sqlStatementContext);
-        List<ResultSet> resultSets = getResultSets();
-        if (null == columnLabelAndIndexMap) {
-            columnLabelAndIndexMap = 
ShardingSphereResultSetUtils.createColumnLabelAndIndexMap(sqlStatementContext, 
selectContainsEnhancedTable, resultSets.get(0).getMetaData());
-        }
-        return new ShardingSphereResultSet(resultSets, mergedResult, this, 
selectContainsEnhancedTable, sqlStatementContext, columnLabelAndIndexMap);
-    }
-    
-    private List<QueryResult> executeQuery0(final ExecutionContext 
executionContext) throws SQLException {
-        if (hasRawExecutionRule()) {
-            return 
executor.getRawExecutor().execute(createRawExecutionGroupContext(executionContext),
-                    executionContext.getQueryContext(), new 
RawSQLExecutorCallback()).stream().map(QueryResult.class::cast).collect(Collectors.toList());
-        }
-        ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = 
createExecutionGroupContext(executionContext);
-        cacheStatements(executionGroupContext.getInputGroups());
-        PreparedStatementExecuteQueryCallback callback = new 
PreparedStatementExecuteQueryCallback(metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType(),
-                
metaDataContexts.getMetaData().getDatabase(databaseName).getResourceMetaData(), 
sqlStatement, SQLExecutorExceptionHandler.isExceptionThrown());
-        return 
executor.getRegularExecutor().executeQuery(executionGroupContext, 
executionContext.getQueryContext(), callback);
-    }
-    
     private boolean hasRawExecutionRule() {
         return 
!metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty();
     }
@@ -278,7 +257,7 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
     private void resetParameters() throws SQLException {
         parameterSets.clear();
         parameterSets.add(getParameters());
-        replaySetParameter();
+        replaySetParameter(statements, parameterSets);
     }
     
     private DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> 
createDriverExecutionPrepareEngine(final ShardingSphereDatabase database) {
@@ -513,17 +492,17 @@ public final class ShardingSpherePreparedStatement 
extends AbstractPreparedState
                 
parameterSets.add(eachInput.getExecutionUnit().getSqlUnit().getParameters());
             });
         }
-        replay();
+        replay(statements, parameterSets);
     }
     
-    private void replay() throws SQLException {
-        replaySetParameter();
+    private void replay(final List<PreparedStatement> statements, final 
List<List<Object>> parameterSets) throws SQLException {
+        replaySetParameter(statements, parameterSets);
         for (Statement each : statements) {
             getMethodInvocationRecorder().replay(each);
         }
     }
     
-    private void replaySetParameter() throws SQLException {
+    private void replaySetParameter(final List<PreparedStatement> statements, 
final List<List<Object>> parameterSets) throws SQLException {
         for (int i = 0; i < statements.size(); i++) {
             replaySetParameter(statements.get(i), parameterSets.get(i));
         }
@@ -534,6 +513,7 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
         statements.clear();
         parameterSets.clear();
         generatedValues.clear();
+        executor.clear();
     }
     
     private Optional<GeneratedKeyContext> findGeneratedKey() {
@@ -567,7 +547,8 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
     public void addBatch() {
         try {
             QueryContext queryContext = createQueryContext();
-            executionContext = connection.getTrafficInstanceId(trafficRule, 
queryContext)
+            Optional<String> trafficInstanceId = 
connection.getTrafficInstanceId(trafficRule, queryContext);
+            executionContext = trafficInstanceId
                     .map(optional -> createExecutionContext(queryContext, 
optional)).orElseGet(() -> createExecutionContext(queryContext));
             
batchPreparedStatementExecutor.addBatchForExecutionUnits(executionContext.getExecutionUnits());
         } finally {
diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
index af0cbc38445..bda16fec1d4 100644
--- 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
@@ -23,7 +23,6 @@ import 
org.apache.shardingsphere.driver.executor.DriverExecutor;
 import org.apache.shardingsphere.driver.executor.batch.BatchStatementExecutor;
 import org.apache.shardingsphere.driver.executor.callback.ExecuteCallback;
 import 
org.apache.shardingsphere.driver.executor.callback.ExecuteUpdateCallback;
-import 
org.apache.shardingsphere.driver.executor.callback.impl.StatementExecuteQueryCallback;
 import org.apache.shardingsphere.driver.jdbc.adapter.AbstractStatementAdapter;
 import 
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
 import 
org.apache.shardingsphere.driver.jdbc.core.resultset.GeneratedKeysResultSet;
@@ -152,13 +151,9 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
             
connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName);
             ShardingSphereDatabase database = 
metaDataContexts.getMetaData().getDatabase(databaseName);
             sqlStatementContext = queryContext.getSqlStatementContext();
-            Optional<ResultSet> advancedResultSet = 
executor.executeAdvanceQuery(metaDataContexts.getMetaData(), database, 
queryContext, createDriverExecutionPrepareEngine(database));
-            if (advancedResultSet.isPresent()) {
-                currentResultSet = advancedResultSet.get();
-                return currentResultSet;
-            }
-            ExecutionContext executionContext = 
createExecutionContext(queryContext);
-            currentResultSet = doExecuteQuery(executionContext);
+            currentResultSet = 
executor.executeAdvanceQuery(metaDataContexts.getMetaData(), database, 
queryContext, createDriverExecutionPrepareEngine(database), this, null,
+                    (StatementReplayCallback<Statement>) (statements, 
parameterSets) -> replay(statements));
+            statements.addAll(executor.getStatements());
             return currentResultSet;
             // CHECKSTYLE:OFF
         } catch (final RuntimeException ex) {
@@ -169,30 +164,6 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
         }
     }
     
-    private ShardingSphereResultSet doExecuteQuery(final ExecutionContext 
executionContext) throws SQLException {
-        List<QueryResult> queryResults = executeQuery0(executionContext);
-        MergedResult mergedResult = mergeQuery(queryResults, 
sqlStatementContext);
-        boolean selectContainsEnhancedTable = sqlStatementContext instanceof 
SelectStatementContext && ((SelectStatementContext) 
sqlStatementContext).isContainsEnhancedTable();
-        return new ShardingSphereResultSet(getResultSets(), mergedResult, 
this, selectContainsEnhancedTable, sqlStatementContext);
-    }
-    
-    private List<QueryResult> executeQuery0(final ExecutionContext 
executionContext) throws SQLException {
-        if (hasRawExecutionRule()) {
-            return 
executor.getRawExecutor().execute(createRawExecutionGroupContext(executionContext),
-                    executionContext.getQueryContext(), new 
RawSQLExecutorCallback()).stream().map(QueryResult.class::cast).collect(Collectors.toList());
-        }
-        ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = 
createExecutionGroupContext(executionContext);
-        cacheStatements(executionGroupContext.getInputGroups());
-        StatementExecuteQueryCallback callback = new 
StatementExecuteQueryCallback(metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType(),
-                
metaDataContexts.getMetaData().getDatabase(databaseName).getResourceMetaData(), 
sqlStatementContext.getSqlStatement(),
-                SQLExecutorExceptionHandler.isExceptionThrown());
-        return 
executor.getRegularExecutor().executeQuery(executionGroupContext, 
executionContext.getQueryContext(), callback);
-    }
-    
-    private boolean hasRawExecutionRule() {
-        return 
!metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty();
-    }
-    
     private DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> 
createDriverExecutionPrepareEngine(final ShardingSphereDatabase database) {
         int maxConnectionsSizePerQuery = 
metaDataContexts.getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
         return new DriverExecutionPrepareEngine<>(JDBCDriverType.STATEMENT, 
maxConnectionsSizePerQuery, connection.getDatabaseConnectionManager(), 
statementManager, statementOption,
@@ -470,7 +441,7 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
         for (ExecutionGroup<JDBCExecutionUnit> each : executionGroups) {
             
statements.addAll(each.getInputs().stream().map(JDBCExecutionUnit::getStorageResource).collect(Collectors.toList()));
         }
-        replay();
+        replay(statements);
     }
     
     private JDBCExecutorCallback<Boolean> createExecuteCallback(final 
ExecuteCallback executeCallback, final SQLStatement sqlStatement) {
@@ -490,7 +461,7 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
         };
     }
     
-    private void replay() throws SQLException {
+    private void replay(final List<Statement> statements) throws SQLException {
         for (Statement each : statements) {
             getMethodInvocationRecorder().replay(each);
         }
diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/StatementReplayCallback.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/StatementReplayCallback.java
new file mode 100644
index 00000000000..0ad5b4456c0
--- /dev/null
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/StatementReplayCallback.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.driver.jdbc.core.statement;
+
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+
+/**
+ * Statement replay callback.
+ *
+ * @param <T> type of statement
+ */
+public interface StatementReplayCallback<T extends Statement> {
+    
+    /**
+     * Replay statement.
+     *
+     * @param statements statements
+     * @param parameterSets parameter sets
+     * @throws SQLException SQL exception
+     */
+    void replay(List<T> statements, List<List<Object>> parameterSets) throws 
SQLException;
+}


Reply via email to