This is an automated email from the ASF dual-hosted git repository.
menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 61360740df0 Add DriverPushDownExecuteQueryExecutor (#31639)
61360740df0 is described below
commit 61360740df02a29cafbc946929897295106d7d10
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Jun 9 13:58:24 2024 +0800
Add DriverPushDownExecuteQueryExecutor (#31639)
---
.../execute/ExecuteQueryCallbackFactory.java | 49 +++++++
.../engine/DriverExecuteQueryExecutor.java | 152 ++-------------------
.../DriverPushDownExecuteQueryExecutor.java} | 97 +++++--------
3 files changed, 96 insertions(+), 202 deletions(-)
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/callback/execute/ExecuteQueryCallbackFactory.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/callback/execute/ExecuteQueryCallbackFactory.java
new file mode 100644
index 00000000000..72d0b1f46a1
--- /dev/null
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/callback/execute/ExecuteQueryCallbackFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.driver.executor.callback.execute;
+
+import lombok.RequiredArgsConstructor;
+import
org.apache.shardingsphere.driver.executor.callback.execute.impl.PreparedStatementExecuteQueryCallback;
+import
org.apache.shardingsphere.driver.executor.callback.execute.impl.StatementExecuteQueryCallback;
+import
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
+import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
+import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.session.query.QueryContext;
+
+/**
+ * Execute query callback factory.
+ */
+@RequiredArgsConstructor
+public final class ExecuteQueryCallbackFactory {
+
+ private final String jdbcDriverType;
+
+ /**
+ * Create new instance of execute query callback.
+ * @param database database
+ * @param queryContext query context
+ * @return created instance
+ */
+ public ExecuteQueryCallback newInstance(final ShardingSphereDatabase
database, final QueryContext queryContext) {
+ return JDBCDriverType.STATEMENT.equals(jdbcDriverType)
+ ? new
StatementExecuteQueryCallback(database.getProtocolType(),
database.getResourceMetaData(),
+
queryContext.getSqlStatementContext().getSqlStatement(),
SQLExecutorExceptionHandler.isExceptionThrown())
+ : new
PreparedStatementExecuteQueryCallback(database.getProtocolType(),
database.getResourceMetaData(),
+
queryContext.getSqlStatementContext().getSqlStatement(),
SQLExecutorExceptionHandler.isExceptionThrown());
+ }
+}
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteQueryExecutor.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteQueryExecutor.java
index bc287c93bbc..d806a77b27f 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteQueryExecutor.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteQueryExecutor.java
@@ -17,42 +17,20 @@
package org.apache.shardingsphere.driver.executor.engine;
-import lombok.RequiredArgsConstructor;
import
org.apache.shardingsphere.driver.executor.callback.add.StatementAddCallback;
-import
org.apache.shardingsphere.driver.executor.callback.execute.ExecuteQueryCallback;
-import
org.apache.shardingsphere.driver.executor.callback.execute.impl.PreparedStatementExecuteQueryCallback;
-import
org.apache.shardingsphere.driver.executor.callback.execute.impl.StatementExecuteQueryCallback;
+import
org.apache.shardingsphere.driver.executor.callback.execute.ExecuteQueryCallbackFactory;
import
org.apache.shardingsphere.driver.executor.callback.replay.StatementReplayCallback;
+import
org.apache.shardingsphere.driver.executor.engine.pushdown.DriverPushDownExecuteQueryExecutor;
import
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
-import
org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResultSet;
-import
org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResultSetUtils;
-import
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
-import
org.apache.shardingsphere.infra.binder.context.statement.dml.SelectStatementContext;
-import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
-import org.apache.shardingsphere.infra.connection.kernel.KernelProcessor;
import org.apache.shardingsphere.infra.executor.audit.SQLAuditEngine;
-import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
-import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
-import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
-import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
-import
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
-import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
-import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
-import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
-import
org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
-import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
-import org.apache.shardingsphere.infra.merge.MergeEngine;
-import org.apache.shardingsphere.infra.merge.result.MergedResult;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
-import org.apache.shardingsphere.infra.metadata.user.Grantee;
-import
org.apache.shardingsphere.infra.rule.attribute.raw.RawExecutionRuleAttribute;
import org.apache.shardingsphere.infra.session.query.QueryContext;
import org.apache.shardingsphere.sqlfederation.engine.SQLFederationEngine;
import
org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext;
@@ -65,34 +43,32 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.stream.Collectors;
/**
* Driver execute query executor.
*/
-@RequiredArgsConstructor
public final class DriverExecuteQueryExecutor {
private final ShardingSphereConnection connection;
private final ShardingSphereMetaData metaData;
- private final JDBCExecutor jdbcExecutor;
-
- private final RawExecutor rawExecutor;
+ private final DriverPushDownExecuteQueryExecutor
pushDownExecuteQueryExecutor;
private final TrafficExecutor trafficExecutor;
private final SQLFederationEngine sqlFederationEngine;
- private final Collection<Statement> statements = new LinkedList<>();
+ public DriverExecuteQueryExecutor(final ShardingSphereConnection
connection, final ShardingSphereMetaData metaData, final JDBCExecutor
jdbcExecutor, final RawExecutor rawExecutor,
+ final TrafficExecutor trafficExecutor,
final SQLFederationEngine sqlFederationEngine) {
+ this.connection = connection;
+ this.metaData = metaData;
+ pushDownExecuteQueryExecutor = new
DriverPushDownExecuteQueryExecutor(connection, metaData, jdbcExecutor,
rawExecutor);
+ this.trafficExecutor = trafficExecutor;
+ this.sqlFederationEngine = sqlFederationEngine;
+ }
/**
* Execute query.
@@ -111,7 +87,6 @@ public final class DriverExecuteQueryExecutor {
public ResultSet executeQuery(final ShardingSphereDatabase database, final
QueryContext queryContext,
final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
final Statement statement, final Map<String, Integer> columnLabelAndIndexMap,
final StatementAddCallback addCallback,
final StatementReplayCallback replayCallback) throws SQLException {
- statements.clear();
RuleMetaData globalRuleMetaData = metaData.getGlobalRuleMetaData();
SQLAuditEngine.audit(queryContext, globalRuleMetaData, database);
Optional<String> trafficInstanceId =
connection.getTrafficInstanceId(globalRuleMetaData.getSingleRule(TrafficRule.class),
queryContext);
@@ -120,112 +95,13 @@ public final class DriverExecuteQueryExecutor {
connection.getProcessId(), database.getName(),
trafficInstanceId.get(), queryContext, prepareEngine,
getTrafficExecuteQueryCallback(prepareEngine.getType()));
}
if (sqlFederationEngine.decide(queryContext.getSqlStatementContext(),
queryContext.getParameters(), database, globalRuleMetaData)) {
- return sqlFederationEngine.executeQuery(
- prepareEngine, getExecuteQueryCallback(database,
queryContext, prepareEngine.getType()), new SQLFederationContext(false,
queryContext, metaData, connection.getProcessId()));
+ return sqlFederationEngine.executeQuery(prepareEngine,
+ new
ExecuteQueryCallbackFactory(prepareEngine.getType()).newInstance(database,
queryContext), new SQLFederationContext(false, queryContext, metaData,
connection.getProcessId()));
}
- return executePushDownQuery(database, queryContext, prepareEngine,
statement, columnLabelAndIndexMap, addCallback, replayCallback);
+ return pushDownExecuteQueryExecutor.executeQuery(database,
queryContext, prepareEngine, statement, columnLabelAndIndexMap, addCallback,
replayCallback);
}
private TrafficExecutorCallback<ResultSet>
getTrafficExecuteQueryCallback(final String jdbcDriverType) {
return JDBCDriverType.STATEMENT.equals(jdbcDriverType) ? ((sql,
statement) -> statement.executeQuery(sql)) : ((sql, statement) ->
((PreparedStatement) statement).executeQuery());
}
-
- private ExecuteQueryCallback getExecuteQueryCallback(final
ShardingSphereDatabase database, final QueryContext queryContext, final String
jdbcDriverType) {
- return JDBCDriverType.STATEMENT.equals(jdbcDriverType)
- ? new
StatementExecuteQueryCallback(database.getProtocolType(),
database.getResourceMetaData(),
-
queryContext.getSqlStatementContext().getSqlStatement(),
SQLExecutorExceptionHandler.isExceptionThrown())
- : new
PreparedStatementExecuteQueryCallback(database.getProtocolType(),
database.getResourceMetaData(),
-
queryContext.getSqlStatementContext().getSqlStatement(),
SQLExecutorExceptionHandler.isExceptionThrown());
- }
-
- @SuppressWarnings("rawtypes")
- private ShardingSphereResultSet executePushDownQuery(final
ShardingSphereDatabase database, final QueryContext queryContext,
- final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
final Statement statement,
- final Map<String,
Integer> columnLabelAndIndexMap,
- final
StatementAddCallback addCallback, final StatementReplayCallback replayCallback)
throws SQLException {
- List<QueryResult> queryResults = getQueryResults(database,
queryContext, prepareEngine, addCallback, replayCallback);
- MergedResult mergedResult = mergeQuery(database, queryResults,
queryContext.getSqlStatementContext());
- boolean isContainsEnhancedTable =
queryContext.getSqlStatementContext() instanceof SelectStatementContext
- && ((SelectStatementContext)
queryContext.getSqlStatementContext()).isContainsEnhancedTable();
- List<ResultSet> resultSets = getResultSets();
- return new ShardingSphereResultSet(resultSets, mergedResult,
statement, isContainsEnhancedTable, queryContext.getSqlStatementContext(),
- null == columnLabelAndIndexMap
- ?
ShardingSphereResultSetUtils.createColumnLabelAndIndexMap(queryContext.getSqlStatementContext(),
isContainsEnhancedTable, resultSets.get(0).getMetaData())
- : columnLabelAndIndexMap);
- }
-
- @SuppressWarnings("rawtypes")
- private List<QueryResult> getQueryResults(final ShardingSphereDatabase
database, final QueryContext queryContext, final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
- final StatementAddCallback
addCallback, final StatementReplayCallback replayCallback) throws SQLException {
- ExecutionContext executionContext = new
KernelProcessor().generateExecutionContext(
- queryContext, database, metaData.getGlobalRuleMetaData(),
metaData.getProps(),
connection.getDatabaseConnectionManager().getConnectionContext());
- return
database.getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty()
- ? getJDBCQueryResults(database, queryContext, prepareEngine,
addCallback, replayCallback, executionContext)
- : getRawQueryResults(database, queryContext, executionContext);
- }
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- private List<QueryResult> getJDBCQueryResults(final ShardingSphereDatabase
database, final QueryContext queryContext,
- final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
- final StatementAddCallback
addCallback, final StatementReplayCallback replayCallback,
- final ExecutionContext
executionContext) throws SQLException {
- ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
prepareEngine.prepare(executionContext.getRouteContext(),
executionContext.getExecutionUnits(),
- new ExecutionGroupReportContext(connection.getProcessId(),
database.getName(), new Grantee("", "")));
- for (ExecutionGroup<JDBCExecutionUnit> each :
executionGroupContext.getInputGroups()) {
- Collection<Statement> statements = getStatements(each);
- this.statements.addAll(statements);
- addCallback.add(statements,
JDBCDriverType.PREPARED_STATEMENT.equals(prepareEngine.getType()) ?
getParameterSets(each) : Collections.emptyList());
- }
- replayCallback.replay();
- ProcessEngine processEngine = new ProcessEngine();
- try {
- processEngine.executeSQL(executionGroupContext, queryContext);
- return jdbcExecutor.execute(executionGroupContext,
getExecuteQueryCallback(database, queryContext, prepareEngine.getType()));
- } finally {
-
processEngine.completeSQLExecution(executionGroupContext.getReportContext().getProcessId());
- }
- }
-
- private Collection<Statement> getStatements(final
ExecutionGroup<JDBCExecutionUnit> executionGroup) {
- Collection<Statement> result = new LinkedList<>();
- for (JDBCExecutionUnit each : executionGroup.getInputs()) {
- result.add(each.getStorageResource());
- }
- return result;
- }
-
- private Collection<List<Object>> getParameterSets(final
ExecutionGroup<JDBCExecutionUnit> executionGroup) {
- Collection<List<Object>> result = new LinkedList<>();
- for (JDBCExecutionUnit each : executionGroup.getInputs()) {
- result.add(each.getExecutionUnit().getSqlUnit().getParameters());
- }
- return result;
- }
-
- private List<QueryResult> getRawQueryResults(final ShardingSphereDatabase
database, final QueryContext queryContext, final ExecutionContext
executionContext) throws SQLException {
- return rawExecutor.execute(
- createRawExecutionGroupContext(database, executionContext),
queryContext, new
RawSQLExecutorCallback()).stream().map(QueryResult.class::cast).collect(Collectors.toList());
- }
-
- private ExecutionGroupContext<RawSQLExecutionUnit>
createRawExecutionGroupContext(final ShardingSphereDatabase database, final
ExecutionContext executionContext) throws SQLException {
- int maxConnectionsSizePerQuery =
metaData.getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
- return new RawExecutionPrepareEngine(maxConnectionsSizePerQuery,
database.getRuleMetaData().getRules()).prepare(
- executionContext.getRouteContext(),
executionContext.getExecutionUnits(), new
ExecutionGroupReportContext(connection.getProcessId(), database.getName(), new
Grantee("", "")));
- }
-
- private MergedResult mergeQuery(final ShardingSphereDatabase database,
final List<QueryResult> queryResults, final SQLStatementContext
sqlStatementContext) throws SQLException {
- MergeEngine mergeEngine = new
MergeEngine(metaData.getGlobalRuleMetaData(), database, metaData.getProps(),
connection.getDatabaseConnectionManager().getConnectionContext());
- return mergeEngine.merge(queryResults, sqlStatementContext);
- }
-
- @SuppressWarnings("JDBCResourceOpenedButNotSafelyClosed")
- private List<ResultSet> getResultSets() throws SQLException {
- List<ResultSet> result = new ArrayList<>(statements.size());
- for (Statement each : statements) {
- if (null != each.getResultSet()) {
- result.add(each.getResultSet());
- }
- }
- return result;
- }
}
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteQueryExecutor.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/DriverPushDownExecuteQueryExecutor.java
similarity index 64%
copy from
jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteQueryExecutor.java
copy to
jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/DriverPushDownExecuteQueryExecutor.java
index bc287c93bbc..e04785bc486 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteQueryExecutor.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/DriverPushDownExecuteQueryExecutor.java
@@ -15,27 +15,23 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.driver.executor.engine;
+package org.apache.shardingsphere.driver.executor.engine.pushdown;
-import lombok.RequiredArgsConstructor;
import
org.apache.shardingsphere.driver.executor.callback.add.StatementAddCallback;
-import
org.apache.shardingsphere.driver.executor.callback.execute.ExecuteQueryCallback;
-import
org.apache.shardingsphere.driver.executor.callback.execute.impl.PreparedStatementExecuteQueryCallback;
-import
org.apache.shardingsphere.driver.executor.callback.execute.impl.StatementExecuteQueryCallback;
+import
org.apache.shardingsphere.driver.executor.callback.execute.ExecuteQueryCallbackFactory;
import
org.apache.shardingsphere.driver.executor.callback.replay.StatementReplayCallback;
import
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
import
org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResultSet;
import
org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResultSetUtils;
import
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
import
org.apache.shardingsphere.infra.binder.context.statement.dml.SelectStatementContext;
+import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
import org.apache.shardingsphere.infra.connection.kernel.KernelProcessor;
-import org.apache.shardingsphere.infra.executor.audit.SQLAuditEngine;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
-import
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
@@ -53,15 +49,10 @@ import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
import org.apache.shardingsphere.infra.metadata.user.Grantee;
import
org.apache.shardingsphere.infra.rule.attribute.raw.RawExecutionRuleAttribute;
+import org.apache.shardingsphere.infra.session.connection.ConnectionContext;
import org.apache.shardingsphere.infra.session.query.QueryContext;
-import org.apache.shardingsphere.sqlfederation.engine.SQLFederationEngine;
-import
org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext;
-import org.apache.shardingsphere.traffic.executor.TrafficExecutor;
-import org.apache.shardingsphere.traffic.executor.TrafficExecutorCallback;
-import org.apache.shardingsphere.traffic.rule.TrafficRule;
import java.sql.Connection;
-import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
@@ -71,32 +62,40 @@ import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.stream.Collectors;
/**
- * Driver execute query executor.
+ * Driver push down execute query executor.
*/
-@RequiredArgsConstructor
-public final class DriverExecuteQueryExecutor {
+public final class DriverPushDownExecuteQueryExecutor {
- private final ShardingSphereConnection connection;
+ private final ConnectionContext connectionContext;
- private final ShardingSphereMetaData metaData;
+ private final String processId;
+
+ private final RuleMetaData globalRuleMetaData;
+
+ private final ConfigurationProperties props;
private final JDBCExecutor jdbcExecutor;
private final RawExecutor rawExecutor;
- private final TrafficExecutor trafficExecutor;
+ private final Collection<Statement> statements;
- private final SQLFederationEngine sqlFederationEngine;
-
- private final Collection<Statement> statements = new LinkedList<>();
+ public DriverPushDownExecuteQueryExecutor(final ShardingSphereConnection
connection, final ShardingSphereMetaData metaData, final JDBCExecutor
jdbcExecutor, final RawExecutor rawExecutor) {
+ connectionContext =
connection.getDatabaseConnectionManager().getConnectionContext();
+ processId = connection.getProcessId();
+ globalRuleMetaData = metaData.getGlobalRuleMetaData();
+ props = metaData.getProps();
+ this.jdbcExecutor = jdbcExecutor;
+ this.rawExecutor = rawExecutor;
+ statements = new LinkedList<>();
+ }
/**
* Execute query.
- *
+ *
* @param database database
* @param queryContext query context
* @param prepareEngine prepare engine
@@ -108,41 +107,11 @@ public final class DriverExecuteQueryExecutor {
* @throws SQLException SQL exception
*/
@SuppressWarnings("rawtypes")
- public ResultSet executeQuery(final ShardingSphereDatabase database, final
QueryContext queryContext,
- final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
final Statement statement, final Map<String, Integer> columnLabelAndIndexMap,
- final StatementAddCallback addCallback,
final StatementReplayCallback replayCallback) throws SQLException {
+ public ShardingSphereResultSet executeQuery(final ShardingSphereDatabase
database, final QueryContext queryContext,
+ final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
final Statement statement,
+ final Map<String, Integer>
columnLabelAndIndexMap,
+ final StatementAddCallback
addCallback, final StatementReplayCallback replayCallback) throws SQLException {
statements.clear();
- RuleMetaData globalRuleMetaData = metaData.getGlobalRuleMetaData();
- SQLAuditEngine.audit(queryContext, globalRuleMetaData, database);
- Optional<String> trafficInstanceId =
connection.getTrafficInstanceId(globalRuleMetaData.getSingleRule(TrafficRule.class),
queryContext);
- if (trafficInstanceId.isPresent()) {
- return trafficExecutor.execute(
- connection.getProcessId(), database.getName(),
trafficInstanceId.get(), queryContext, prepareEngine,
getTrafficExecuteQueryCallback(prepareEngine.getType()));
- }
- if (sqlFederationEngine.decide(queryContext.getSqlStatementContext(),
queryContext.getParameters(), database, globalRuleMetaData)) {
- return sqlFederationEngine.executeQuery(
- prepareEngine, getExecuteQueryCallback(database,
queryContext, prepareEngine.getType()), new SQLFederationContext(false,
queryContext, metaData, connection.getProcessId()));
- }
- return executePushDownQuery(database, queryContext, prepareEngine,
statement, columnLabelAndIndexMap, addCallback, replayCallback);
- }
-
- private TrafficExecutorCallback<ResultSet>
getTrafficExecuteQueryCallback(final String jdbcDriverType) {
- return JDBCDriverType.STATEMENT.equals(jdbcDriverType) ? ((sql,
statement) -> statement.executeQuery(sql)) : ((sql, statement) ->
((PreparedStatement) statement).executeQuery());
- }
-
- private ExecuteQueryCallback getExecuteQueryCallback(final
ShardingSphereDatabase database, final QueryContext queryContext, final String
jdbcDriverType) {
- return JDBCDriverType.STATEMENT.equals(jdbcDriverType)
- ? new
StatementExecuteQueryCallback(database.getProtocolType(),
database.getResourceMetaData(),
-
queryContext.getSqlStatementContext().getSqlStatement(),
SQLExecutorExceptionHandler.isExceptionThrown())
- : new
PreparedStatementExecuteQueryCallback(database.getProtocolType(),
database.getResourceMetaData(),
-
queryContext.getSqlStatementContext().getSqlStatement(),
SQLExecutorExceptionHandler.isExceptionThrown());
- }
-
- @SuppressWarnings("rawtypes")
- private ShardingSphereResultSet executePushDownQuery(final
ShardingSphereDatabase database, final QueryContext queryContext,
- final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
final Statement statement,
- final Map<String,
Integer> columnLabelAndIndexMap,
- final
StatementAddCallback addCallback, final StatementReplayCallback replayCallback)
throws SQLException {
List<QueryResult> queryResults = getQueryResults(database,
queryContext, prepareEngine, addCallback, replayCallback);
MergedResult mergedResult = mergeQuery(database, queryResults,
queryContext.getSqlStatementContext());
boolean isContainsEnhancedTable =
queryContext.getSqlStatementContext() instanceof SelectStatementContext
@@ -158,7 +127,7 @@ public final class DriverExecuteQueryExecutor {
private List<QueryResult> getQueryResults(final ShardingSphereDatabase
database, final QueryContext queryContext, final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
final StatementAddCallback
addCallback, final StatementReplayCallback replayCallback) throws SQLException {
ExecutionContext executionContext = new
KernelProcessor().generateExecutionContext(
- queryContext, database, metaData.getGlobalRuleMetaData(),
metaData.getProps(),
connection.getDatabaseConnectionManager().getConnectionContext());
+ queryContext, database, globalRuleMetaData, props,
connectionContext);
return
database.getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty()
? getJDBCQueryResults(database, queryContext, prepareEngine,
addCallback, replayCallback, executionContext)
: getRawQueryResults(database, queryContext, executionContext);
@@ -170,7 +139,7 @@ public final class DriverExecuteQueryExecutor {
final StatementAddCallback
addCallback, final StatementReplayCallback replayCallback,
final ExecutionContext
executionContext) throws SQLException {
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
prepareEngine.prepare(executionContext.getRouteContext(),
executionContext.getExecutionUnits(),
- new ExecutionGroupReportContext(connection.getProcessId(),
database.getName(), new Grantee("", "")));
+ new ExecutionGroupReportContext(processId, database.getName(),
new Grantee("", "")));
for (ExecutionGroup<JDBCExecutionUnit> each :
executionGroupContext.getInputGroups()) {
Collection<Statement> statements = getStatements(each);
this.statements.addAll(statements);
@@ -180,7 +149,7 @@ public final class DriverExecuteQueryExecutor {
ProcessEngine processEngine = new ProcessEngine();
try {
processEngine.executeSQL(executionGroupContext, queryContext);
- return jdbcExecutor.execute(executionGroupContext,
getExecuteQueryCallback(database, queryContext, prepareEngine.getType()));
+ return jdbcExecutor.execute(executionGroupContext, new
ExecuteQueryCallbackFactory(prepareEngine.getType()).newInstance(database,
queryContext));
} finally {
processEngine.completeSQLExecution(executionGroupContext.getReportContext().getProcessId());
}
@@ -208,13 +177,13 @@ public final class DriverExecuteQueryExecutor {
}
private ExecutionGroupContext<RawSQLExecutionUnit>
createRawExecutionGroupContext(final ShardingSphereDatabase database, final
ExecutionContext executionContext) throws SQLException {
- int maxConnectionsSizePerQuery =
metaData.getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
+ int maxConnectionsSizePerQuery =
props.<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
return new RawExecutionPrepareEngine(maxConnectionsSizePerQuery,
database.getRuleMetaData().getRules()).prepare(
- executionContext.getRouteContext(),
executionContext.getExecutionUnits(), new
ExecutionGroupReportContext(connection.getProcessId(), database.getName(), new
Grantee("", "")));
+ executionContext.getRouteContext(),
executionContext.getExecutionUnits(), new
ExecutionGroupReportContext(processId, database.getName(), new Grantee("",
"")));
}
private MergedResult mergeQuery(final ShardingSphereDatabase database,
final List<QueryResult> queryResults, final SQLStatementContext
sqlStatementContext) throws SQLException {
- MergeEngine mergeEngine = new
MergeEngine(metaData.getGlobalRuleMetaData(), database, metaData.getProps(),
connection.getDatabaseConnectionManager().getConnectionContext());
+ MergeEngine mergeEngine = new MergeEngine(globalRuleMetaData,
database, props, connectionContext);
return mergeEngine.merge(queryResults, sqlStatementContext);
}