This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new ffb7041a2c9 Add DriverPushDownExecuteExecutor.getResultSet() (#31655)
ffb7041a2c9 is described below
commit ffb7041a2c99e5ec58231a312d0b7a96d2c21172
Author: Liang Zhang <[email protected]>
AuthorDate: Tue Jun 11 23:52:38 2024 +0800
Add DriverPushDownExecuteExecutor.getResultSet() (#31655)
---
.../executor/engine/DriverExecuteExecutor.java | 13 ++++-
.../executor/engine/DriverExecutorFacade.java | 12 +++-
.../pushdown/DriverPushDownExecuteExecutor.java | 66 ++++++++++++++++++++--
.../statement/ShardingSpherePreparedStatement.java | 51 ++---------------
.../core/statement/ShardingSphereStatement.java | 52 +----------------
5 files changed, 90 insertions(+), 104 deletions(-)
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteExecutor.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteExecutor.java
index 356917624fb..b250ad61000 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteExecutor.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteExecutor.java
@@ -24,6 +24,7 @@ import
org.apache.shardingsphere.driver.executor.callback.execute.StatementExecu
import
org.apache.shardingsphere.driver.executor.callback.replay.StatementReplayCallback;
import
org.apache.shardingsphere.driver.executor.engine.pushdown.DriverPushDownExecuteExecutor;
import
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
+import
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
import org.apache.shardingsphere.infra.connection.kernel.KernelProcessor;
import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
@@ -42,6 +43,8 @@ import org.apache.shardingsphere.traffic.rule.TrafficRule;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
import java.util.Optional;
/**
@@ -106,14 +109,22 @@ public final class DriverExecuteExecutor {
/**
* Get result set.
*
+ * @param database database
+ * @param sqlStatementContext SQL statement context
+ * @param statement statement
+ * @param statements statements
* @return result set
+ * @throws SQLException SQL exception
*/
- public Optional<ResultSet> getResultSet() {
+ public Optional<ResultSet> getResultSet(final ShardingSphereDatabase
database, final SQLStatementContext sqlStatementContext,
+ final Statement statement, final
List<? extends Statement> statements) throws SQLException {
switch (executeType) {
case TRAFFIC:
return Optional.of(trafficExecutor.getResultSet());
case FEDERATION:
return Optional.of(sqlFederationEngine.getResultSet());
+ case PUSH_DOWN:
+ return pushDownExecuteExecutor.getResultSet(database,
sqlStatementContext, statement, statements);
default:
return Optional.empty();
}
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecutorFacade.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecutorFacade.java
index 7ab0aeb0658..48663ead45c 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecutorFacade.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecutorFacade.java
@@ -23,6 +23,7 @@ import
org.apache.shardingsphere.driver.executor.callback.execute.StatementExecu
import
org.apache.shardingsphere.driver.executor.callback.replay.StatementReplayCallback;
import
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
import org.apache.shardingsphere.driver.jdbc.core.statement.StatementManager;
+import
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
import org.apache.shardingsphere.infra.executor.audit.SQLAuditEngine;
@@ -42,6 +43,7 @@ import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -148,10 +150,16 @@ public final class DriverExecutorFacade implements
AutoCloseable {
/**
* Get result set.
*
+ * @param database database
+ * @param sqlStatementContext SQL statement context
+ * @param statement statement
+ * @param statements statements
* @return result set
+ * @throws SQLException SQL exception
*/
- public Optional<ResultSet> getResultSet() {
- return executeExecutor.getResultSet();
+ public Optional<ResultSet> getResultSet(final ShardingSphereDatabase
database, final SQLStatementContext sqlStatementContext,
+ final Statement statement, final
List<? extends Statement> statements) throws SQLException {
+ return executeExecutor.getResultSet(database, sqlStatementContext,
statement, statements);
}
@Override
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/DriverPushDownExecuteExecutor.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/DriverPushDownExecuteExecutor.java
index fc11906f0f7..68ef83d8561 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/DriverPushDownExecuteExecutor.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/DriverPushDownExecuteExecutor.java
@@ -23,7 +23,9 @@ import
org.apache.shardingsphere.driver.executor.callback.execute.StatementExecu
import
org.apache.shardingsphere.driver.executor.callback.replay.StatementReplayCallback;
import
org.apache.shardingsphere.driver.executor.engine.transaction.DriverTransactionalExecutor;
import
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
-import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
+import
org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResultSet;
+import
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
+import
org.apache.shardingsphere.infra.binder.context.statement.dml.SelectStatementContext;
import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
@@ -35,23 +37,30 @@ import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecut
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
+import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.stream.JDBCStreamQueryResult;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
import
org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
+import org.apache.shardingsphere.infra.merge.MergeEngine;
+import org.apache.shardingsphere.infra.merge.result.MergedResult;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.user.Grantee;
import
org.apache.shardingsphere.infra.rule.attribute.raw.RawExecutionRuleAttribute;
import org.apache.shardingsphere.mode.metadata.refresher.MetaDataRefreshEngine;
+import
org.apache.shardingsphere.sql.parser.sql.common.statement.dal.DALStatement;
import java.sql.Connection;
+import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
+import java.util.Optional;
/**
* Driver push down execute executor.
@@ -60,7 +69,7 @@ public final class DriverPushDownExecuteExecutor {
private final ShardingSphereConnection connection;
- private final ConfigurationProperties props;
+ private final ShardingSphereMetaData metaData;
private final Grantee grantee;
@@ -71,7 +80,7 @@ public final class DriverPushDownExecuteExecutor {
public DriverPushDownExecuteExecutor(final ShardingSphereConnection
connection, final ShardingSphereMetaData metaData, final Grantee grantee,
final JDBCExecutor jdbcExecutor,
final RawExecutor rawExecutor) {
this.connection = connection;
- props = metaData.getProps();
+ this.metaData = metaData;
this.grantee = grantee;
this.jdbcExecutor = jdbcExecutor;
this.rawExecutor = rawExecutor;
@@ -118,7 +127,7 @@ public final class DriverPushDownExecuteExecutor {
processEngine.executeSQL(executionGroupContext,
executionContext.getQueryContext());
List<Boolean> results = jdbcExecutor.execute(executionGroupContext,
new
ExecuteCallbackFactory(prepareEngine.getType()).newInstance(database,
executeCallback, executionContext.getSqlStatementContext().getSqlStatement()));
- new
MetaDataRefreshEngine(connection.getContextManager().getPersistServiceFacade().getMetaDataManagerPersistService(),
database, props)
+ new
MetaDataRefreshEngine(connection.getContextManager().getPersistServiceFacade().getMetaDataManagerPersistService(),
database, metaData.getProps())
.refresh(executionContext.getQueryContext().getSqlStatementContext(),
executionContext.getRouteContext().getRouteUnits());
return null != results && !results.isEmpty() && null !=
results.get(0) && results.get(0);
} finally {
@@ -143,9 +152,56 @@ public final class DriverPushDownExecuteExecutor {
}
private boolean executeRaw(final ShardingSphereDatabase database, final
ExecutionContext executionContext) throws SQLException {
- int maxConnectionsSizePerQuery =
props.<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
+ int maxConnectionsSizePerQuery =
metaData.getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
ExecutionGroupContext<RawSQLExecutionUnit> executionGroupContext = new
RawExecutionPrepareEngine(maxConnectionsSizePerQuery,
database.getRuleMetaData().getRules())
.prepare(executionContext.getRouteContext(),
executionContext.getExecutionUnits(), new
ExecutionGroupReportContext(connection.getProcessId(), database.getName(),
grantee));
return rawExecutor.execute(executionGroupContext,
executionContext.getQueryContext(), new
RawSQLExecutorCallback()).iterator().next() instanceof QueryResult;
}
+
+ /**
+ * Get result set.
+ *
+ * @param database database
+ * @param sqlStatementContext SQL statement context
+ * @param statement statement
+ * @param statements statements
+ * @return result set
+ * @throws SQLException SQL exception
+ */
+ public Optional<ResultSet> getResultSet(final ShardingSphereDatabase
database, final SQLStatementContext sqlStatementContext,
+ final Statement statement, final
List<? extends Statement> statements) throws SQLException {
+ if (sqlStatementContext instanceof SelectStatementContext ||
sqlStatementContext.getSqlStatement() instanceof DALStatement) {
+ List<ResultSet> resultSets = getResultSets(statements);
+ if (resultSets.isEmpty()) {
+ return Optional.empty();
+ }
+ List<QueryResult> queryResults = getQueryResults(resultSets);
+ MergedResult mergedResult = new
MergeEngine(metaData.getGlobalRuleMetaData(), database, metaData.getProps(),
connection.getDatabaseConnectionManager().getConnectionContext())
+ .merge(queryResults, sqlStatementContext);
+ boolean selectContainsEnhancedTable = sqlStatementContext
instanceof SelectStatementContext && ((SelectStatementContext)
sqlStatementContext).isContainsEnhancedTable();
+ return Optional.of(new ShardingSphereResultSet(resultSets,
mergedResult, statement, selectContainsEnhancedTable, sqlStatementContext));
+ }
+ return Optional.empty();
+ }
+
+ @SuppressWarnings("JDBCResourceOpenedButNotSafelyClosed")
+ private List<ResultSet> getResultSets(final List<? extends Statement>
statements) throws SQLException {
+ List<ResultSet> result = new ArrayList<>(statements.size());
+ for (Statement each : statements) {
+ if (null != each.getResultSet()) {
+ result.add(each.getResultSet());
+ }
+ }
+ return result;
+ }
+
+ private List<QueryResult> getQueryResults(final List<ResultSet>
resultSets) throws SQLException {
+ List<QueryResult> result = new ArrayList<>(resultSets.size());
+ for (ResultSet each : resultSets) {
+ if (null != each) {
+ result.add(new JDBCStreamQueryResult(each));
+ }
+ }
+ return result;
+ }
}
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
index 5a7b1f947fd..71177e0eb79 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
@@ -19,14 +19,13 @@ package
org.apache.shardingsphere.driver.jdbc.core.statement;
import lombok.AccessLevel;
import lombok.Getter;
+import
org.apache.shardingsphere.driver.executor.callback.add.StatementAddCallback;
import org.apache.shardingsphere.driver.executor.engine.DriverExecutorFacade;
import
org.apache.shardingsphere.driver.executor.engine.batch.DriverExecuteBatchExecutor;
-import
org.apache.shardingsphere.driver.executor.callback.add.StatementAddCallback;
import
org.apache.shardingsphere.driver.jdbc.adapter.AbstractPreparedStatementAdapter;
import
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
import
org.apache.shardingsphere.driver.jdbc.core.resultset.GeneratedKeysResultSet;
import
org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResultSet;
-import
org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResultSetUtils;
import
org.apache.shardingsphere.driver.jdbc.core.statement.metadata.ShardingSphereParameterMetaData;
import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
import org.apache.shardingsphere.infra.binder.context.aware.ParameterAware;
@@ -42,15 +41,11 @@ import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePrecondition
import
org.apache.shardingsphere.infra.exception.dialect.SQLExceptionTransformEngine;
import
org.apache.shardingsphere.infra.exception.kernel.syntax.EmptySQLException;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
-import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
-import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.stream.JDBCStreamQueryResult;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
import org.apache.shardingsphere.infra.hint.HintManager;
import org.apache.shardingsphere.infra.hint.HintValueContext;
import org.apache.shardingsphere.infra.hint.SQLHintUtils;
-import org.apache.shardingsphere.infra.merge.MergeEngine;
-import org.apache.shardingsphere.infra.merge.result.MergedResult;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
@@ -61,7 +56,6 @@ import
org.apache.shardingsphere.infra.rule.attribute.resoure.StorageConnectorRe
import org.apache.shardingsphere.infra.session.query.QueryContext;
import org.apache.shardingsphere.parser.rule.SQLParserRule;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
-import
org.apache.shardingsphere.sql.parser.sql.common.statement.dal.DALStatement;
import org.apache.shardingsphere.transaction.util.AutoCommitUtils;
import java.sql.ParameterMetaData;
@@ -274,44 +268,13 @@ public final class ShardingSpherePreparedStatement
extends AbstractPreparedState
if (null != currentResultSet) {
return currentResultSet;
}
- Optional<ResultSet> resultSet = driverExecutorFacade.getResultSet();
- if (resultSet.isPresent()) {
- return resultSet.get();
- }
- if (sqlStatementContext instanceof SelectStatementContext ||
sqlStatementContext.getSqlStatement() instanceof DALStatement) {
- List<ResultSet> resultSets = getResultSets();
- if (resultSets.isEmpty()) {
- return currentResultSet;
- }
- MergedResult mergedResult =
mergeQuery(getQueryResults(resultSets), sqlStatementContext);
- if (null == columnLabelAndIndexMap) {
- columnLabelAndIndexMap =
ShardingSphereResultSetUtils.createColumnLabelAndIndexMap(sqlStatementContext,
selectContainsEnhancedTable, resultSets.get(0).getMetaData());
- }
- currentResultSet = new ShardingSphereResultSet(resultSets,
mergedResult, this, selectContainsEnhancedTable, sqlStatementContext,
columnLabelAndIndexMap);
+ driverExecutorFacade.getResultSet(metaData.getDatabase(databaseName),
sqlStatementContext, this, statements).ifPresent(optional -> currentResultSet =
optional);
+ if (null == columnLabelAndIndexMap && currentResultSet instanceof
ShardingSphereResultSet) {
+ columnLabelAndIndexMap = ((ShardingSphereResultSet)
currentResultSet).getColumnLabelAndIndexMap();
}
return currentResultSet;
}
- private List<ResultSet> getResultSets() throws SQLException {
- List<ResultSet> result = new ArrayList<>(statements.size());
- for (Statement each : statements) {
- if (null != each.getResultSet()) {
- result.add(each.getResultSet());
- }
- }
- return result;
- }
-
- private List<QueryResult> getQueryResults(final List<ResultSet>
resultSets) throws SQLException {
- List<QueryResult> result = new ArrayList<>(resultSets.size());
- for (ResultSet each : resultSets) {
- if (null != each) {
- result.add(new JDBCStreamQueryResult(each));
- }
- }
- return result;
- }
-
private QueryContext createQueryContext() {
List<Object> params = new ArrayList<>(getParameters());
if (sqlStatementContext instanceof ParameterAware) {
@@ -320,12 +283,6 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
return new QueryContext(sqlStatementContext, sql, params,
hintValueContext, true);
}
- private MergedResult mergeQuery(final List<QueryResult> queryResults,
final SQLStatementContext sqlStatementContext) throws SQLException {
- MergeEngine mergeEngine = new
MergeEngine(metaData.getGlobalRuleMetaData(),
metaData.getDatabase(databaseName),
- metaData.getProps(),
connection.getDatabaseConnectionManager().getConnectionContext());
- return mergeEngine.merge(queryResults, sqlStatementContext);
- }
-
private void replay() throws SQLException {
replaySetParameter(statements, parameterSets);
for (Statement each : statements) {
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
index d738b645cc6..0578a51e0e9 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
@@ -19,20 +19,18 @@ package
org.apache.shardingsphere.driver.jdbc.core.statement;
import lombok.AccessLevel;
import lombok.Getter;
-import org.apache.shardingsphere.driver.executor.engine.DriverExecutorFacade;
-import
org.apache.shardingsphere.driver.executor.engine.batch.statement.BatchStatementExecutor;
import
org.apache.shardingsphere.driver.executor.callback.add.StatementAddCallback;
import
org.apache.shardingsphere.driver.executor.callback.execute.StatementExecuteCallback;
import
org.apache.shardingsphere.driver.executor.callback.execute.StatementExecuteUpdateCallback;
+import org.apache.shardingsphere.driver.executor.engine.DriverExecutorFacade;
+import
org.apache.shardingsphere.driver.executor.engine.batch.statement.BatchStatementExecutor;
import org.apache.shardingsphere.driver.jdbc.adapter.AbstractStatementAdapter;
import
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
import
org.apache.shardingsphere.driver.jdbc.core.resultset.GeneratedKeysResultSet;
-import
org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResultSet;
import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
import
org.apache.shardingsphere.infra.binder.context.segment.insert.keygen.GeneratedKeyContext;
import
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
import
org.apache.shardingsphere.infra.binder.context.statement.dml.InsertStatementContext;
-import
org.apache.shardingsphere.infra.binder.context.statement.dml.SelectStatementContext;
import org.apache.shardingsphere.infra.binder.context.type.TableAvailable;
import org.apache.shardingsphere.infra.binder.engine.SQLBindEngine;
import
org.apache.shardingsphere.infra.database.core.keygen.GeneratedKeyColumnProvider;
@@ -41,14 +39,10 @@ import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePrecondition
import
org.apache.shardingsphere.infra.exception.dialect.SQLExceptionTransformEngine;
import
org.apache.shardingsphere.infra.exception.kernel.syntax.EmptySQLException;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
-import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
-import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.stream.JDBCStreamQueryResult;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
import org.apache.shardingsphere.infra.hint.HintValueContext;
import org.apache.shardingsphere.infra.hint.SQLHintUtils;
-import org.apache.shardingsphere.infra.merge.MergeEngine;
-import org.apache.shardingsphere.infra.merge.result.MergedResult;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.user.Grantee;
@@ -56,13 +50,11 @@ import
org.apache.shardingsphere.infra.rule.attribute.datanode.DataNodeRuleAttri
import org.apache.shardingsphere.infra.session.query.QueryContext;
import org.apache.shardingsphere.parser.rule.SQLParserRule;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
-import
org.apache.shardingsphere.sql.parser.sql.common.statement.dal.DALStatement;
import org.apache.shardingsphere.transaction.util.AutoCommitUtils;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
@@ -319,48 +311,10 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
if (null != currentResultSet) {
return currentResultSet;
}
- Optional<ResultSet> resultSet = driverExecutorFacade.getResultSet();
- if (resultSet.isPresent()) {
- return resultSet.get();
- }
- if (sqlStatementContext instanceof SelectStatementContext ||
sqlStatementContext.getSqlStatement() instanceof DALStatement) {
- List<ResultSet> resultSets = getResultSets();
- if (resultSets.isEmpty()) {
- return currentResultSet;
- }
- MergedResult mergedResult =
mergeQuery(getQueryResults(resultSets), sqlStatementContext);
- boolean selectContainsEnhancedTable = sqlStatementContext
instanceof SelectStatementContext && ((SelectStatementContext)
sqlStatementContext).isContainsEnhancedTable();
- currentResultSet = new ShardingSphereResultSet(resultSets,
mergedResult, this, selectContainsEnhancedTable, sqlStatementContext);
- }
+ driverExecutorFacade.getResultSet(metaData.getDatabase(databaseName),
sqlStatementContext, this, statements).ifPresent(optional -> currentResultSet =
optional);
return currentResultSet;
}
- private List<ResultSet> getResultSets() throws SQLException {
- List<ResultSet> result = new ArrayList<>(statements.size());
- for (Statement each : statements) {
- if (null != each.getResultSet()) {
- result.add(each.getResultSet());
- }
- }
- return result;
- }
-
- private List<QueryResult> getQueryResults(final List<ResultSet>
resultSets) throws SQLException {
- List<QueryResult> result = new ArrayList<>(resultSets.size());
- for (ResultSet each : resultSets) {
- if (null != each) {
- result.add(new JDBCStreamQueryResult(each));
- }
- }
- return result;
- }
-
- private MergedResult mergeQuery(final List<QueryResult> queryResults,
final SQLStatementContext sqlStatementContext) throws SQLException {
- MergeEngine mergeEngine = new
MergeEngine(metaData.getGlobalRuleMetaData(),
metaData.getDatabase(databaseName),
- metaData.getProps(),
connection.getDatabaseConnectionManager().getConnectionContext());
- return mergeEngine.merge(queryResults, sqlStatementContext);
- }
-
@SuppressWarnings("MagicConstant")
@Override
public int getResultSetType() {