This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new a1a44fc78db Refactor DriverExecuteQueryExecutor (#31638)
a1a44fc78db is described below
commit a1a44fc78db5e92d6883dbe2126ae3a169c4963d
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Jun 9 12:55:03 2024 +0800
Refactor DriverExecuteQueryExecutor (#31638)
* Refactor DriverExecuteQueryExecutor
* Refactor DriverExecuteQueryExecutor
---
.../engine/DriverExecuteQueryExecutor.java | 77 ++++++++++++----------
1 file changed, 42 insertions(+), 35 deletions(-)
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteQueryExecutor.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteQueryExecutor.java
index 50ba479ab36..bc287c93bbc 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteQueryExecutor.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteQueryExecutor.java
@@ -18,14 +18,14 @@
package org.apache.shardingsphere.driver.executor.engine;
import lombok.RequiredArgsConstructor;
+import
org.apache.shardingsphere.driver.executor.callback.add.StatementAddCallback;
import
org.apache.shardingsphere.driver.executor.callback.execute.ExecuteQueryCallback;
import
org.apache.shardingsphere.driver.executor.callback.execute.impl.PreparedStatementExecuteQueryCallback;
import
org.apache.shardingsphere.driver.executor.callback.execute.impl.StatementExecuteQueryCallback;
+import
org.apache.shardingsphere.driver.executor.callback.replay.StatementReplayCallback;
import
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
import
org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResultSet;
import
org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResultSetUtils;
-import
org.apache.shardingsphere.driver.executor.callback.add.StatementAddCallback;
-import
org.apache.shardingsphere.driver.executor.callback.replay.StatementReplayCallback;
import
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
import
org.apache.shardingsphere.infra.binder.context.statement.dml.SelectStatementContext;
import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
@@ -112,24 +112,18 @@ public final class DriverExecuteQueryExecutor {
final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
final Statement statement, final Map<String, Integer> columnLabelAndIndexMap,
final StatementAddCallback addCallback,
final StatementReplayCallback replayCallback) throws SQLException {
statements.clear();
- Optional<String> trafficInstanceId =
connection.getTrafficInstanceId(metaData.getGlobalRuleMetaData().getSingleRule(TrafficRule.class),
queryContext);
+ RuleMetaData globalRuleMetaData = metaData.getGlobalRuleMetaData();
+ SQLAuditEngine.audit(queryContext, globalRuleMetaData, database);
+ Optional<String> trafficInstanceId =
connection.getTrafficInstanceId(globalRuleMetaData.getSingleRule(TrafficRule.class),
queryContext);
if (trafficInstanceId.isPresent()) {
return trafficExecutor.execute(
connection.getProcessId(), database.getName(),
trafficInstanceId.get(), queryContext, prepareEngine,
getTrafficExecuteQueryCallback(prepareEngine.getType()));
}
- if (sqlFederationEngine.decide(queryContext.getSqlStatementContext(),
queryContext.getParameters(), database, metaData.getGlobalRuleMetaData())) {
+ if (sqlFederationEngine.decide(queryContext.getSqlStatementContext(),
queryContext.getParameters(), database, globalRuleMetaData)) {
return sqlFederationEngine.executeQuery(
prepareEngine, getExecuteQueryCallback(database,
queryContext, prepareEngine.getType()), new SQLFederationContext(false,
queryContext, metaData, connection.getProcessId()));
}
- List<QueryResult> queryResults = executePushDownQuery(database,
queryContext, prepareEngine, addCallback, replayCallback);
- MergedResult mergedResult = mergeQuery(database, queryResults,
queryContext.getSqlStatementContext());
- boolean selectContainsEnhancedTable =
queryContext.getSqlStatementContext() instanceof SelectStatementContext
- && ((SelectStatementContext)
queryContext.getSqlStatementContext()).isContainsEnhancedTable();
- List<ResultSet> resultSets = getResultSets();
- return new ShardingSphereResultSet(resultSets, mergedResult,
statement, selectContainsEnhancedTable, queryContext.getSqlStatementContext(),
- null == columnLabelAndIndexMap
- ?
ShardingSphereResultSetUtils.createColumnLabelAndIndexMap(queryContext.getSqlStatementContext(),
selectContainsEnhancedTable, resultSets.get(0).getMetaData())
- : columnLabelAndIndexMap);
+ return executePushDownQuery(database, queryContext, prepareEngine,
statement, columnLabelAndIndexMap, addCallback, replayCallback);
}
private TrafficExecutorCallback<ResultSet>
getTrafficExecuteQueryCallback(final String jdbcDriverType) {
@@ -144,15 +138,37 @@ public final class DriverExecuteQueryExecutor {
queryContext.getSqlStatementContext().getSqlStatement(),
SQLExecutorExceptionHandler.isExceptionThrown());
}
+ @SuppressWarnings("rawtypes")
+ private ShardingSphereResultSet executePushDownQuery(final
ShardingSphereDatabase database, final QueryContext queryContext,
+ final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
final Statement statement,
+ final Map<String,
Integer> columnLabelAndIndexMap,
+ final
StatementAddCallback addCallback, final StatementReplayCallback replayCallback)
throws SQLException {
+ List<QueryResult> queryResults = getQueryResults(database,
queryContext, prepareEngine, addCallback, replayCallback);
+ MergedResult mergedResult = mergeQuery(database, queryResults,
queryContext.getSqlStatementContext());
+ boolean isContainsEnhancedTable =
queryContext.getSqlStatementContext() instanceof SelectStatementContext
+ && ((SelectStatementContext)
queryContext.getSqlStatementContext()).isContainsEnhancedTable();
+ List<ResultSet> resultSets = getResultSets();
+ return new ShardingSphereResultSet(resultSets, mergedResult,
statement, isContainsEnhancedTable, queryContext.getSqlStatementContext(),
+ null == columnLabelAndIndexMap
+ ?
ShardingSphereResultSetUtils.createColumnLabelAndIndexMap(queryContext.getSqlStatementContext(),
isContainsEnhancedTable, resultSets.get(0).getMetaData())
+ : columnLabelAndIndexMap);
+ }
+
+ @SuppressWarnings("rawtypes")
+ private List<QueryResult> getQueryResults(final ShardingSphereDatabase
database, final QueryContext queryContext, final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
+ final StatementAddCallback
addCallback, final StatementReplayCallback replayCallback) throws SQLException {
+ ExecutionContext executionContext = new
KernelProcessor().generateExecutionContext(
+ queryContext, database, metaData.getGlobalRuleMetaData(),
metaData.getProps(),
connection.getDatabaseConnectionManager().getConnectionContext());
+ return
database.getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty()
+ ? getJDBCQueryResults(database, queryContext, prepareEngine,
addCallback, replayCallback, executionContext)
+ : getRawQueryResults(database, queryContext, executionContext);
+ }
+
@SuppressWarnings({"rawtypes", "unchecked"})
- private List<QueryResult> executePushDownQuery(final
ShardingSphereDatabase database, final QueryContext queryContext,
- final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
- final StatementAddCallback
addCallback, final StatementReplayCallback replayCallback) throws SQLException {
- ExecutionContext executionContext = createExecutionContext(database,
queryContext);
- if (hasRawExecutionRule(database)) {
- return rawExecutor.execute(
- createRawExecutionGroupContext(database,
executionContext), queryContext, new
RawSQLExecutorCallback()).stream().map(QueryResult.class::cast).collect(Collectors.toList());
- }
+ private List<QueryResult> getJDBCQueryResults(final ShardingSphereDatabase
database, final QueryContext queryContext,
+ final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
+ final StatementAddCallback
addCallback, final StatementReplayCallback replayCallback,
+ final ExecutionContext
executionContext) throws SQLException {
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
prepareEngine.prepare(executionContext.getRouteContext(),
executionContext.getExecutionUnits(),
new ExecutionGroupReportContext(connection.getProcessId(),
database.getName(), new Grantee("", "")));
for (ExecutionGroup<JDBCExecutionUnit> each :
executionGroupContext.getInputGroups()) {
@@ -161,24 +177,15 @@ public final class DriverExecuteQueryExecutor {
addCallback.add(statements,
JDBCDriverType.PREPARED_STATEMENT.equals(prepareEngine.getType()) ?
getParameterSets(each) : Collections.emptyList());
}
replayCallback.replay();
- return executePushDownQuery(executionGroupContext, queryContext,
getExecuteQueryCallback(database, queryContext, prepareEngine.getType()));
- }
-
- private List<QueryResult> executePushDownQuery(final
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext,
- final QueryContext
queryContext, final ExecuteQueryCallback callback) throws SQLException {
ProcessEngine processEngine = new ProcessEngine();
try {
processEngine.executeSQL(executionGroupContext, queryContext);
- return jdbcExecutor.execute(executionGroupContext, callback);
+ return jdbcExecutor.execute(executionGroupContext,
getExecuteQueryCallback(database, queryContext, prepareEngine.getType()));
} finally {
processEngine.completeSQLExecution(executionGroupContext.getReportContext().getProcessId());
}
}
- private boolean hasRawExecutionRule(final ShardingSphereDatabase database)
{
- return
!database.getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty();
- }
-
private Collection<Statement> getStatements(final
ExecutionGroup<JDBCExecutionUnit> executionGroup) {
Collection<Statement> result = new LinkedList<>();
for (JDBCExecutionUnit each : executionGroup.getInputs()) {
@@ -195,10 +202,9 @@ public final class DriverExecuteQueryExecutor {
return result;
}
- private ExecutionContext createExecutionContext(final
ShardingSphereDatabase database, final QueryContext queryContext) {
- RuleMetaData globalRuleMetaData = metaData.getGlobalRuleMetaData();
- SQLAuditEngine.audit(queryContext, globalRuleMetaData, database);
- return new KernelProcessor().generateExecutionContext(queryContext,
database, globalRuleMetaData, metaData.getProps(),
connection.getDatabaseConnectionManager().getConnectionContext());
+ private List<QueryResult> getRawQueryResults(final ShardingSphereDatabase
database, final QueryContext queryContext, final ExecutionContext
executionContext) throws SQLException {
+ return rawExecutor.execute(
+ createRawExecutionGroupContext(database, executionContext),
queryContext, new
RawSQLExecutorCallback()).stream().map(QueryResult.class::cast).collect(Collectors.toList());
}
private ExecutionGroupContext<RawSQLExecutionUnit>
createRawExecutionGroupContext(final ShardingSphereDatabase database, final
ExecutionContext executionContext) throws SQLException {
@@ -212,6 +218,7 @@ public final class DriverExecuteQueryExecutor {
return mergeEngine.merge(queryResults, sqlStatementContext);
}
+ @SuppressWarnings("JDBCResourceOpenedButNotSafelyClosed")
private List<ResultSet> getResultSets() throws SQLException {
List<ResultSet> result = new ArrayList<>(statements.size());
for (Statement each : statements) {