This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new ffb7041a2c9 Add DriverPushDownExecuteExecutor.getResultSet() (#31655)
ffb7041a2c9 is described below

commit ffb7041a2c99e5ec58231a312d0b7a96d2c21172
Author: Liang Zhang <[email protected]>
AuthorDate: Tue Jun 11 23:52:38 2024 +0800

    Add DriverPushDownExecuteExecutor.getResultSet() (#31655)
---
 .../executor/engine/DriverExecuteExecutor.java     | 13 ++++-
 .../executor/engine/DriverExecutorFacade.java      | 12 +++-
 .../pushdown/DriverPushDownExecuteExecutor.java    | 66 ++++++++++++++++++++--
 .../statement/ShardingSpherePreparedStatement.java | 51 ++---------------
 .../core/statement/ShardingSphereStatement.java    | 52 +----------------
 5 files changed, 90 insertions(+), 104 deletions(-)

diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteExecutor.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteExecutor.java
index 356917624fb..b250ad61000 100644
--- 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteExecutor.java
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteExecutor.java
@@ -24,6 +24,7 @@ import 
org.apache.shardingsphere.driver.executor.callback.execute.StatementExecu
 import 
org.apache.shardingsphere.driver.executor.callback.replay.StatementReplayCallback;
 import 
org.apache.shardingsphere.driver.executor.engine.pushdown.DriverPushDownExecuteExecutor;
 import 
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
+import 
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
 import org.apache.shardingsphere.infra.connection.kernel.KernelProcessor;
 import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
@@ -42,6 +43,8 @@ import org.apache.shardingsphere.traffic.rule.TrafficRule;
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
 import java.util.Optional;
 
 /**
@@ -106,14 +109,22 @@ public final class DriverExecuteExecutor {
     /**
      * Get result set.
      *
+     * @param database database
+     * @param sqlStatementContext SQL statement context
+     * @param statement statement
+     * @param statements statements
      * @return result set
+     * @throws SQLException SQL exception
      */
-    public Optional<ResultSet> getResultSet() {
+    public Optional<ResultSet> getResultSet(final ShardingSphereDatabase 
database, final SQLStatementContext sqlStatementContext,
+                                            final Statement statement, final 
List<? extends Statement> statements) throws SQLException {
         switch (executeType) {
             case TRAFFIC:
                 return Optional.of(trafficExecutor.getResultSet());
             case FEDERATION:
                 return Optional.of(sqlFederationEngine.getResultSet());
+            case PUSH_DOWN:
+                return pushDownExecuteExecutor.getResultSet(database, 
sqlStatementContext, statement, statements);
             default:
                 return Optional.empty();
         }
diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecutorFacade.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecutorFacade.java
index 7ab0aeb0658..48663ead45c 100644
--- 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecutorFacade.java
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecutorFacade.java
@@ -23,6 +23,7 @@ import 
org.apache.shardingsphere.driver.executor.callback.execute.StatementExecu
 import 
org.apache.shardingsphere.driver.executor.callback.replay.StatementReplayCallback;
 import 
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
 import org.apache.shardingsphere.driver.jdbc.core.statement.StatementManager;
+import 
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
 import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
 import org.apache.shardingsphere.infra.executor.audit.SQLAuditEngine;
@@ -42,6 +43,7 @@ import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
@@ -148,10 +150,16 @@ public final class DriverExecutorFacade implements 
AutoCloseable {
     /**
      * Get result set.
      *
+     * @param database database
+     * @param sqlStatementContext SQL statement context
+     * @param statement statement
+     * @param statements statements
      * @return result set
+     * @throws SQLException SQL exception
      */
-    public Optional<ResultSet> getResultSet() {
-        return executeExecutor.getResultSet();
+    public Optional<ResultSet> getResultSet(final ShardingSphereDatabase 
database, final SQLStatementContext sqlStatementContext,
+                                            final Statement statement, final 
List<? extends Statement> statements) throws SQLException {
+        return executeExecutor.getResultSet(database, sqlStatementContext, 
statement, statements);
     }
     
     @Override
diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/DriverPushDownExecuteExecutor.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/DriverPushDownExecuteExecutor.java
index fc11906f0f7..68ef83d8561 100644
--- 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/DriverPushDownExecuteExecutor.java
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/DriverPushDownExecuteExecutor.java
@@ -23,7 +23,9 @@ import 
org.apache.shardingsphere.driver.executor.callback.execute.StatementExecu
 import 
org.apache.shardingsphere.driver.executor.callback.replay.StatementReplayCallback;
 import 
org.apache.shardingsphere.driver.executor.engine.transaction.DriverTransactionalExecutor;
 import 
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
-import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
+import 
org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResultSet;
+import 
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
+import 
org.apache.shardingsphere.infra.binder.context.statement.dml.SelectStatementContext;
 import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
 import 
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
@@ -35,23 +37,30 @@ import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecut
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.stream.JDBCStreamQueryResult;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
 import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
+import org.apache.shardingsphere.infra.merge.MergeEngine;
+import org.apache.shardingsphere.infra.merge.result.MergedResult;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import org.apache.shardingsphere.infra.metadata.user.Grantee;
 import 
org.apache.shardingsphere.infra.rule.attribute.raw.RawExecutionRuleAttribute;
 import org.apache.shardingsphere.mode.metadata.refresher.MetaDataRefreshEngine;
+import 
org.apache.shardingsphere.sql.parser.sql.common.statement.dal.DALStatement;
 
 import java.sql.Connection;
+import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Optional;
 
 /**
  * Driver push down execute executor.
@@ -60,7 +69,7 @@ public final class DriverPushDownExecuteExecutor {
     
     private final ShardingSphereConnection connection;
     
-    private final ConfigurationProperties props;
+    private final ShardingSphereMetaData metaData;
     
     private final Grantee grantee;
     
@@ -71,7 +80,7 @@ public final class DriverPushDownExecuteExecutor {
     public DriverPushDownExecuteExecutor(final ShardingSphereConnection 
connection, final ShardingSphereMetaData metaData, final Grantee grantee,
                                          final JDBCExecutor jdbcExecutor, 
final RawExecutor rawExecutor) {
         this.connection = connection;
-        props = metaData.getProps();
+        this.metaData = metaData;
         this.grantee = grantee;
         this.jdbcExecutor = jdbcExecutor;
         this.rawExecutor = rawExecutor;
@@ -118,7 +127,7 @@ public final class DriverPushDownExecuteExecutor {
             processEngine.executeSQL(executionGroupContext, 
executionContext.getQueryContext());
             List<Boolean> results = jdbcExecutor.execute(executionGroupContext,
                     new 
ExecuteCallbackFactory(prepareEngine.getType()).newInstance(database, 
executeCallback, executionContext.getSqlStatementContext().getSqlStatement()));
-            new 
MetaDataRefreshEngine(connection.getContextManager().getPersistServiceFacade().getMetaDataManagerPersistService(),
 database, props)
+            new 
MetaDataRefreshEngine(connection.getContextManager().getPersistServiceFacade().getMetaDataManagerPersistService(),
 database, metaData.getProps())
                     
.refresh(executionContext.getQueryContext().getSqlStatementContext(), 
executionContext.getRouteContext().getRouteUnits());
             return null != results && !results.isEmpty() && null != 
results.get(0) && results.get(0);
         } finally {
@@ -143,9 +152,56 @@ public final class DriverPushDownExecuteExecutor {
     }
     
     private boolean executeRaw(final ShardingSphereDatabase database, final 
ExecutionContext executionContext) throws SQLException {
-        int maxConnectionsSizePerQuery = 
props.<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
+        int maxConnectionsSizePerQuery = 
metaData.getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
         ExecutionGroupContext<RawSQLExecutionUnit> executionGroupContext = new 
RawExecutionPrepareEngine(maxConnectionsSizePerQuery, 
database.getRuleMetaData().getRules())
                 .prepare(executionContext.getRouteContext(), 
executionContext.getExecutionUnits(), new 
ExecutionGroupReportContext(connection.getProcessId(), database.getName(), 
grantee));
         return rawExecutor.execute(executionGroupContext, 
executionContext.getQueryContext(), new 
RawSQLExecutorCallback()).iterator().next() instanceof QueryResult;
     }
+    
+    /**
+     * Get result set.
+     *
+     * @param database database
+     * @param sqlStatementContext SQL statement context
+     * @param statement statement
+     * @param statements statements
+     * @return result set
+     * @throws SQLException SQL exception
+     */
+    public Optional<ResultSet> getResultSet(final ShardingSphereDatabase 
database, final SQLStatementContext sqlStatementContext,
+                                            final Statement statement, final 
List<? extends Statement> statements) throws SQLException {
+        if (sqlStatementContext instanceof SelectStatementContext || 
sqlStatementContext.getSqlStatement() instanceof DALStatement) {
+            List<ResultSet> resultSets = getResultSets(statements);
+            if (resultSets.isEmpty()) {
+                return Optional.empty();
+            }
+            List<QueryResult> queryResults = getQueryResults(resultSets);
+            MergedResult mergedResult = new 
MergeEngine(metaData.getGlobalRuleMetaData(), database, metaData.getProps(), 
connection.getDatabaseConnectionManager().getConnectionContext())
+                    .merge(queryResults, sqlStatementContext);
+            boolean selectContainsEnhancedTable = sqlStatementContext 
instanceof SelectStatementContext && ((SelectStatementContext) 
sqlStatementContext).isContainsEnhancedTable();
+            return Optional.of(new ShardingSphereResultSet(resultSets, 
mergedResult, statement, selectContainsEnhancedTable, sqlStatementContext));
+        }
+        return Optional.empty();
+    }
+    
+    @SuppressWarnings("JDBCResourceOpenedButNotSafelyClosed")
+    private List<ResultSet> getResultSets(final List<? extends Statement> 
statements) throws SQLException {
+        List<ResultSet> result = new ArrayList<>(statements.size());
+        for (Statement each : statements) {
+            if (null != each.getResultSet()) {
+                result.add(each.getResultSet());
+            }
+        }
+        return result;
+    }
+    
+    private List<QueryResult> getQueryResults(final List<ResultSet> 
resultSets) throws SQLException {
+        List<QueryResult> result = new ArrayList<>(resultSets.size());
+        for (ResultSet each : resultSets) {
+            if (null != each) {
+                result.add(new JDBCStreamQueryResult(each));
+            }
+        }
+        return result;
+    }
 }
diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
index 5a7b1f947fd..71177e0eb79 100644
--- 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
@@ -19,14 +19,13 @@ package 
org.apache.shardingsphere.driver.jdbc.core.statement;
 
 import lombok.AccessLevel;
 import lombok.Getter;
+import 
org.apache.shardingsphere.driver.executor.callback.add.StatementAddCallback;
 import org.apache.shardingsphere.driver.executor.engine.DriverExecutorFacade;
 import 
org.apache.shardingsphere.driver.executor.engine.batch.DriverExecuteBatchExecutor;
-import 
org.apache.shardingsphere.driver.executor.callback.add.StatementAddCallback;
 import 
org.apache.shardingsphere.driver.jdbc.adapter.AbstractPreparedStatementAdapter;
 import 
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
 import 
org.apache.shardingsphere.driver.jdbc.core.resultset.GeneratedKeysResultSet;
 import 
org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResultSet;
-import 
org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResultSetUtils;
 import 
org.apache.shardingsphere.driver.jdbc.core.statement.metadata.ShardingSphereParameterMetaData;
 import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
 import org.apache.shardingsphere.infra.binder.context.aware.ParameterAware;
@@ -42,15 +41,11 @@ import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePrecondition
 import 
org.apache.shardingsphere.infra.exception.dialect.SQLExceptionTransformEngine;
 import 
org.apache.shardingsphere.infra.exception.kernel.syntax.EmptySQLException;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
-import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
-import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.stream.JDBCStreamQueryResult;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
 import org.apache.shardingsphere.infra.hint.HintManager;
 import org.apache.shardingsphere.infra.hint.HintValueContext;
 import org.apache.shardingsphere.infra.hint.SQLHintUtils;
-import org.apache.shardingsphere.infra.merge.MergeEngine;
-import org.apache.shardingsphere.infra.merge.result.MergedResult;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
@@ -61,7 +56,6 @@ import 
org.apache.shardingsphere.infra.rule.attribute.resoure.StorageConnectorRe
 import org.apache.shardingsphere.infra.session.query.QueryContext;
 import org.apache.shardingsphere.parser.rule.SQLParserRule;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
-import 
org.apache.shardingsphere.sql.parser.sql.common.statement.dal.DALStatement;
 import org.apache.shardingsphere.transaction.util.AutoCommitUtils;
 
 import java.sql.ParameterMetaData;
@@ -274,44 +268,13 @@ public final class ShardingSpherePreparedStatement 
extends AbstractPreparedState
         if (null != currentResultSet) {
             return currentResultSet;
         }
-        Optional<ResultSet> resultSet = driverExecutorFacade.getResultSet();
-        if (resultSet.isPresent()) {
-            return resultSet.get();
-        }
-        if (sqlStatementContext instanceof SelectStatementContext || 
sqlStatementContext.getSqlStatement() instanceof DALStatement) {
-            List<ResultSet> resultSets = getResultSets();
-            if (resultSets.isEmpty()) {
-                return currentResultSet;
-            }
-            MergedResult mergedResult = 
mergeQuery(getQueryResults(resultSets), sqlStatementContext);
-            if (null == columnLabelAndIndexMap) {
-                columnLabelAndIndexMap = 
ShardingSphereResultSetUtils.createColumnLabelAndIndexMap(sqlStatementContext, 
selectContainsEnhancedTable, resultSets.get(0).getMetaData());
-            }
-            currentResultSet = new ShardingSphereResultSet(resultSets, 
mergedResult, this, selectContainsEnhancedTable, sqlStatementContext, 
columnLabelAndIndexMap);
+        driverExecutorFacade.getResultSet(metaData.getDatabase(databaseName), 
sqlStatementContext, this, statements).ifPresent(optional -> currentResultSet = 
optional);
+        if (null == columnLabelAndIndexMap && currentResultSet instanceof 
ShardingSphereResultSet) {
+            columnLabelAndIndexMap = ((ShardingSphereResultSet) 
currentResultSet).getColumnLabelAndIndexMap();
         }
         return currentResultSet;
     }
     
-    private List<ResultSet> getResultSets() throws SQLException {
-        List<ResultSet> result = new ArrayList<>(statements.size());
-        for (Statement each : statements) {
-            if (null != each.getResultSet()) {
-                result.add(each.getResultSet());
-            }
-        }
-        return result;
-    }
-    
-    private List<QueryResult> getQueryResults(final List<ResultSet> 
resultSets) throws SQLException {
-        List<QueryResult> result = new ArrayList<>(resultSets.size());
-        for (ResultSet each : resultSets) {
-            if (null != each) {
-                result.add(new JDBCStreamQueryResult(each));
-            }
-        }
-        return result;
-    }
-    
     private QueryContext createQueryContext() {
         List<Object> params = new ArrayList<>(getParameters());
         if (sqlStatementContext instanceof ParameterAware) {
@@ -320,12 +283,6 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
         return new QueryContext(sqlStatementContext, sql, params, 
hintValueContext, true);
     }
     
-    private MergedResult mergeQuery(final List<QueryResult> queryResults, 
final SQLStatementContext sqlStatementContext) throws SQLException {
-        MergeEngine mergeEngine = new 
MergeEngine(metaData.getGlobalRuleMetaData(), 
metaData.getDatabase(databaseName),
-                metaData.getProps(), 
connection.getDatabaseConnectionManager().getConnectionContext());
-        return mergeEngine.merge(queryResults, sqlStatementContext);
-    }
-    
     private void replay() throws SQLException {
         replaySetParameter(statements, parameterSets);
         for (Statement each : statements) {
diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
index d738b645cc6..0578a51e0e9 100644
--- 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
@@ -19,20 +19,18 @@ package 
org.apache.shardingsphere.driver.jdbc.core.statement;
 
 import lombok.AccessLevel;
 import lombok.Getter;
-import org.apache.shardingsphere.driver.executor.engine.DriverExecutorFacade;
-import 
org.apache.shardingsphere.driver.executor.engine.batch.statement.BatchStatementExecutor;
 import 
org.apache.shardingsphere.driver.executor.callback.add.StatementAddCallback;
 import 
org.apache.shardingsphere.driver.executor.callback.execute.StatementExecuteCallback;
 import 
org.apache.shardingsphere.driver.executor.callback.execute.StatementExecuteUpdateCallback;
+import org.apache.shardingsphere.driver.executor.engine.DriverExecutorFacade;
+import 
org.apache.shardingsphere.driver.executor.engine.batch.statement.BatchStatementExecutor;
 import org.apache.shardingsphere.driver.jdbc.adapter.AbstractStatementAdapter;
 import 
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
 import 
org.apache.shardingsphere.driver.jdbc.core.resultset.GeneratedKeysResultSet;
-import 
org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResultSet;
 import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
 import 
org.apache.shardingsphere.infra.binder.context.segment.insert.keygen.GeneratedKeyContext;
 import 
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
 import 
org.apache.shardingsphere.infra.binder.context.statement.dml.InsertStatementContext;
-import 
org.apache.shardingsphere.infra.binder.context.statement.dml.SelectStatementContext;
 import org.apache.shardingsphere.infra.binder.context.type.TableAvailable;
 import org.apache.shardingsphere.infra.binder.engine.SQLBindEngine;
 import 
org.apache.shardingsphere.infra.database.core.keygen.GeneratedKeyColumnProvider;
@@ -41,14 +39,10 @@ import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePrecondition
 import 
org.apache.shardingsphere.infra.exception.dialect.SQLExceptionTransformEngine;
 import 
org.apache.shardingsphere.infra.exception.kernel.syntax.EmptySQLException;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
-import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
-import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.stream.JDBCStreamQueryResult;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
 import org.apache.shardingsphere.infra.hint.HintValueContext;
 import org.apache.shardingsphere.infra.hint.SQLHintUtils;
-import org.apache.shardingsphere.infra.merge.MergeEngine;
-import org.apache.shardingsphere.infra.merge.result.MergedResult;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import org.apache.shardingsphere.infra.metadata.user.Grantee;
@@ -56,13 +50,11 @@ import 
org.apache.shardingsphere.infra.rule.attribute.datanode.DataNodeRuleAttri
 import org.apache.shardingsphere.infra.session.query.QueryContext;
 import org.apache.shardingsphere.parser.rule.SQLParserRule;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
-import 
org.apache.shardingsphere.sql.parser.sql.common.statement.dal.DALStatement;
 import org.apache.shardingsphere.transaction.util.AutoCommitUtils;
 
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedList;
@@ -319,48 +311,10 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
         if (null != currentResultSet) {
             return currentResultSet;
         }
-        Optional<ResultSet> resultSet = driverExecutorFacade.getResultSet();
-        if (resultSet.isPresent()) {
-            return resultSet.get();
-        }
-        if (sqlStatementContext instanceof SelectStatementContext || 
sqlStatementContext.getSqlStatement() instanceof DALStatement) {
-            List<ResultSet> resultSets = getResultSets();
-            if (resultSets.isEmpty()) {
-                return currentResultSet;
-            }
-            MergedResult mergedResult = 
mergeQuery(getQueryResults(resultSets), sqlStatementContext);
-            boolean selectContainsEnhancedTable = sqlStatementContext 
instanceof SelectStatementContext && ((SelectStatementContext) 
sqlStatementContext).isContainsEnhancedTable();
-            currentResultSet = new ShardingSphereResultSet(resultSets, 
mergedResult, this, selectContainsEnhancedTable, sqlStatementContext);
-        }
+        driverExecutorFacade.getResultSet(metaData.getDatabase(databaseName), 
sqlStatementContext, this, statements).ifPresent(optional -> currentResultSet = 
optional);
         return currentResultSet;
     }
     
-    private List<ResultSet> getResultSets() throws SQLException {
-        List<ResultSet> result = new ArrayList<>(statements.size());
-        for (Statement each : statements) {
-            if (null != each.getResultSet()) {
-                result.add(each.getResultSet());
-            }
-        }
-        return result;
-    }
-    
-    private List<QueryResult> getQueryResults(final List<ResultSet> 
resultSets) throws SQLException {
-        List<QueryResult> result = new ArrayList<>(resultSets.size());
-        for (ResultSet each : resultSets) {
-            if (null != each) {
-                result.add(new JDBCStreamQueryResult(each));
-            }
-        }
-        return result;
-    }
-    
-    private MergedResult mergeQuery(final List<QueryResult> queryResults, 
final SQLStatementContext sqlStatementContext) throws SQLException {
-        MergeEngine mergeEngine = new 
MergeEngine(metaData.getGlobalRuleMetaData(), 
metaData.getDatabase(databaseName),
-                metaData.getProps(), 
connection.getDatabaseConnectionManager().getConnectionContext());
-        return mergeEngine.merge(queryResults, sqlStatementContext);
-    }
-    
     @SuppressWarnings("MagicConstant")
     @Override
     public int getResultSetType() {

Reply via email to