This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 6f1de3fb172 Refactor DriverExecutor (#31418)
6f1de3fb172 is described below
commit 6f1de3fb172794b9a7243d8ba970a30d8eedb74c
Author: Liang Zhang <[email protected]>
AuthorDate: Mon May 27 23:35:40 2024 +0800
Refactor DriverExecutor (#31418)
* Add ShardingSphereConnection.getTrafficInstanceId()
* Refactor DriverExecutor
* Refactor DriverExecutor
* Refactor DriverExecutor
* Refactor DriverExecutor
* Refactor DriverExecutor
---
.../driver/DriverExecutionPrepareEngine.java | 5 +
.../driver/executor/DriverExecutor.java | 150 ++++++++++++++++++++-
.../statement/ShardingSpherePreparedStatement.java | 68 +++-------
.../core/statement/ShardingSphereStatement.java | 59 +++-----
4 files changed, 184 insertions(+), 98 deletions(-)
diff --git
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/DriverExecutionPrepareEngine.java
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/DriverExecutionPrepareEngine.java
index 655c5f8836e..4e097b0dacc 100644
---
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/DriverExecutionPrepareEngine.java
+++
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/DriverExecutionPrepareEngine.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.infra.executor.sql.prepare.driver;
+import lombok.Getter;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
@@ -45,6 +46,9 @@ public final class DriverExecutionPrepareEngine<T extends
DriverExecutionUnit<?>
@SuppressWarnings("rawtypes")
private static final Map<String, SQLExecutionUnitBuilder>
TYPE_TO_BUILDER_MAP = new ConcurrentHashMap<>(8, 1F);
+ @Getter
+ private final String type;
+
private final DatabaseConnectionManager<C> databaseConnectionManager;
private final ExecutorStatementManager<C, ?, ?> statementManager;
@@ -60,6 +64,7 @@ public final class DriverExecutionPrepareEngine<T extends
DriverExecutionUnit<?>
final ExecutorStatementManager<C, ?,
?> statementManager, final StorageResourceOption option, final
Collection<ShardingSphereRule> rules,
final Map<String, StorageUnit>
storageUnits) {
super(maxConnectionsSizePerQuery, rules);
+ this.type = type;
this.databaseConnectionManager = databaseConnectionManager;
this.statementManager = statementManager;
this.option = option;
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java
index 510665e241a..ad349f07178 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java
@@ -18,33 +18,56 @@
package org.apache.shardingsphere.driver.executor;
import lombok.Getter;
+import org.apache.shardingsphere.driver.executor.callback.ExecuteQueryCallback;
+import
org.apache.shardingsphere.driver.executor.callback.impl.PreparedStatementExecuteQueryCallback;
+import
org.apache.shardingsphere.driver.executor.callback.impl.StatementExecuteQueryCallback;
import
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
+import
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
+import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
+import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
+import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
+import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.session.query.QueryContext;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.sqlfederation.engine.SQLFederationEngine;
+import
org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext;
import org.apache.shardingsphere.traffic.executor.TrafficExecutor;
+import org.apache.shardingsphere.traffic.executor.TrafficExecutorCallback;
+import org.apache.shardingsphere.traffic.rule.TrafficRule;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Optional;
/**
* Driver executor.
*/
-@Getter
public final class DriverExecutor implements AutoCloseable {
+ private final ShardingSphereConnection connection;
+
+ @Getter
private final DriverJDBCExecutor regularExecutor;
+ @Getter
private final RawExecutor rawExecutor;
+ private final TrafficExecutor trafficExecutor;
+
private final SQLFederationEngine sqlFederationEngine;
- private final TrafficExecutor trafficExecutor;
+ private ExecuteType executeType = ExecuteType.REGULAR;
public DriverExecutor(final ShardingSphereConnection connection) {
+ this.connection = connection;
MetaDataContexts metaDataContexts =
connection.getContextManager().getMetaDataContexts();
ExecutorEngine executorEngine =
connection.getContextManager().getExecutorEngine();
JDBCExecutor jdbcExecutor = new JDBCExecutor(executorEngine,
connection.getDatabaseConnectionManager().getConnectionContext());
@@ -52,18 +75,137 @@ public final class DriverExecutor implements AutoCloseable
{
rawExecutor = new RawExecutor(executorEngine,
connection.getDatabaseConnectionManager().getConnectionContext());
ShardingSphereDatabase database =
metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName());
String schemaName = new
DatabaseTypeRegistry(database.getProtocolType()).getDefaultSchemaName(connection.getDatabaseName());
- sqlFederationEngine = new
SQLFederationEngine(connection.getDatabaseName(), schemaName,
metaDataContexts.getMetaData(), metaDataContexts.getStatistics(), jdbcExecutor);
trafficExecutor = new TrafficExecutor();
+ sqlFederationEngine = new
SQLFederationEngine(connection.getDatabaseName(), schemaName,
metaDataContexts.getMetaData(), metaDataContexts.getStatistics(), jdbcExecutor);
}
/**
- * Close.
+ * Execute advance query.
*
+ * @param metaData meta data
+ * @param database database
+ * @param queryContext query context
+ * @param prepareEngine prepare engine
+ * @return result set
* @throws SQLException SQL exception
*/
+ public Optional<ResultSet> executeAdvanceQuery(final
ShardingSphereMetaData metaData, final ShardingSphereDatabase database, final
QueryContext queryContext,
+ final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine)
throws SQLException {
+ Optional<String> trafficInstanceId =
connection.getTrafficInstanceId(metaData.getGlobalRuleMetaData().getSingleRule(TrafficRule.class),
queryContext);
+ if (trafficInstanceId.isPresent()) {
+ TrafficExecutorCallback<ResultSet> trafficCallback =
JDBCDriverType.STATEMENT.equals(prepareEngine.getType())
+ ? Statement::executeQuery
+ : ((statement, sql) -> ((PreparedStatement)
statement).executeQuery());
+ return
Optional.of(trafficExecutor.execute(connection.getProcessId(),
database.getName(), trafficInstanceId.get(), queryContext, prepareEngine,
trafficCallback));
+ }
+ if (sqlFederationEngine.decide(queryContext.getSqlStatementContext(),
queryContext.getParameters(), database, metaData.getGlobalRuleMetaData())) {
+ ExecuteQueryCallback sqlFederationCallback =
JDBCDriverType.STATEMENT.equals(prepareEngine.getType())
+ ? new
StatementExecuteQueryCallback(database.getProtocolType(),
database.getResourceMetaData(),
+
queryContext.getSqlStatementContext().getSqlStatement(),
SQLExecutorExceptionHandler.isExceptionThrown())
+ : new
PreparedStatementExecuteQueryCallback(database.getProtocolType(),
+ database.getResourceMetaData(),
queryContext.getSqlStatementContext().getSqlStatement(),
SQLExecutorExceptionHandler.isExceptionThrown());
+ return Optional.of(sqlFederationEngine.executeQuery(prepareEngine,
sqlFederationCallback, new SQLFederationContext(false, queryContext, metaData,
connection.getProcessId())));
+ }
+ return Optional.empty();
+ }
+
+ /**
+ * Execute advance update.
+ *
+ * @param metaData meta data
+ * @param database database
+ * @param queryContext query context
+ * @param prepareEngine prepare engine
+ * @return updated row count
+ * @throws SQLException SQL exception
+ */
+ public Optional<Integer> executeAdvanceUpdate(final ShardingSphereMetaData
metaData, final ShardingSphereDatabase database, final QueryContext
queryContext,
+ final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine)
throws SQLException {
+ Optional<String> trafficInstanceId =
connection.getTrafficInstanceId(metaData.getGlobalRuleMetaData().getSingleRule(TrafficRule.class),
queryContext);
+ if (trafficInstanceId.isPresent()) {
+ return
Optional.of(trafficExecutor.execute(connection.getProcessId(),
database.getName(),
+ trafficInstanceId.get(), queryContext, prepareEngine,
(statement, sql) -> ((PreparedStatement) statement).executeUpdate()));
+ }
+ return Optional.empty();
+ }
+
+ /**
+ * Execute advance update.
+ *
+ * @param metaData meta data
+ * @param database database
+ * @param queryContext query context
+ * @param prepareEngine prepare engine
+ * @param trafficCallback traffic callback
+ * @return updated row count
+ * @throws SQLException SQL exception
+ */
+ public Optional<Integer> executeAdvanceUpdate(final ShardingSphereMetaData
metaData, final ShardingSphereDatabase database, final QueryContext
queryContext,
+ final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
+ final
TrafficExecutorCallback<Integer> trafficCallback) throws SQLException {
+ Optional<String> trafficInstanceId =
connection.getTrafficInstanceId(metaData.getGlobalRuleMetaData().getSingleRule(TrafficRule.class),
queryContext);
+ if (trafficInstanceId.isPresent()) {
+ return
Optional.of(trafficExecutor.execute(connection.getProcessId(),
database.getName(), trafficInstanceId.get(), queryContext, prepareEngine,
trafficCallback));
+ }
+ return Optional.empty();
+ }
+
+ /**
+ * Execute advance.
+ *
+ * @param metaData meta data
+ * @param database database
+ * @param queryContext query context
+ * @param prepareEngine prepare engine
+ * @param trafficCallback traffic callback
+ * @return execute result
+ * @throws SQLException SQL exception
+ */
+ public Optional<Boolean> executeAdvance(final ShardingSphereMetaData
metaData, final ShardingSphereDatabase database,
+ final QueryContext queryContext,
final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
+ final
TrafficExecutorCallback<Boolean> trafficCallback) throws SQLException {
+ Optional<String> trafficInstanceId =
connection.getTrafficInstanceId(metaData.getGlobalRuleMetaData().getSingleRule(TrafficRule.class),
queryContext);
+ if (trafficInstanceId.isPresent()) {
+ executeType = ExecuteType.TRAFFIC;
+ return
Optional.of(trafficExecutor.execute(connection.getProcessId(),
database.getName(), trafficInstanceId.get(), queryContext, prepareEngine,
trafficCallback));
+ }
+ if (sqlFederationEngine.decide(queryContext.getSqlStatementContext(),
queryContext.getParameters(), database, metaData.getGlobalRuleMetaData())) {
+ executeType = ExecuteType.FEDERATION;
+ ExecuteQueryCallback sqlFederationCallback =
JDBCDriverType.STATEMENT.equals(prepareEngine.getType())
+ ? new
StatementExecuteQueryCallback(database.getProtocolType(),
database.getResourceMetaData(),
+
queryContext.getSqlStatementContext().getSqlStatement(),
SQLExecutorExceptionHandler.isExceptionThrown())
+ : new
PreparedStatementExecuteQueryCallback(database.getProtocolType(),
database.getResourceMetaData(),
+
queryContext.getSqlStatementContext().getSqlStatement(),
SQLExecutorExceptionHandler.isExceptionThrown());
+ ResultSet resultSet =
sqlFederationEngine.executeQuery(prepareEngine, sqlFederationCallback, new
SQLFederationContext(false, queryContext, metaData, connection.getProcessId()));
+ return Optional.of(null != resultSet);
+ }
+ return Optional.empty();
+ }
+
+ /**
+ * Get advanced result set.
+ *
+ * @return advanced result set
+ */
+ public Optional<ResultSet> getAdvancedResultSet() {
+ switch (executeType) {
+ case TRAFFIC:
+ return Optional.of(trafficExecutor.getResultSet());
+ case FEDERATION:
+ return Optional.of(sqlFederationEngine.getResultSet());
+ default:
+ return Optional.empty();
+ }
+ }
+
@Override
public void close() throws SQLException {
sqlFederationEngine.close();
trafficExecutor.close();
}
+
+ public enum ExecuteType {
+
+ TRAFFIC, FEDERATION, REGULAR
+ }
}
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
index fecffc9e8e1..35bf260aeda 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
@@ -83,8 +83,6 @@ import
org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.parser.rule.SQLParserRule;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import
org.apache.shardingsphere.sql.parser.sql.common.statement.dal.DALStatement;
-import
org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext;
-import
org.apache.shardingsphere.traffic.exception.EmptyTrafficExecutionUnitException;
import org.apache.shardingsphere.traffic.rule.TrafficRule;
import org.apache.shardingsphere.transaction.util.AutoCommitUtils;
@@ -226,14 +224,9 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
QueryContext queryContext = createQueryContext();
handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement());
ShardingSphereDatabase database =
metaDataContexts.getMetaData().getDatabase(databaseName);
- Optional<String> trafficInstanceId =
connection.getTrafficInstanceId(trafficRule, queryContext);
- if (trafficInstanceId.isPresent()) {
- currentResultSet =
executor.getTrafficExecutor().execute(connection.getProcessId(), databaseName,
- trafficInstanceId.get(), queryContext,
createDriverExecutionPrepareEngine(database), (statement, sql) ->
((PreparedStatement) statement).executeQuery());
- return currentResultSet;
- }
- if (decide(queryContext, database,
metaDataContexts.getMetaData().getGlobalRuleMetaData())) {
- currentResultSet = executeFederationQuery(queryContext);
+ Optional<ResultSet> advancedResultSet =
executor.executeAdvanceQuery(metaDataContexts.getMetaData(), database,
queryContext, createDriverExecutionPrepareEngine(database));
+ if (advancedResultSet.isPresent()) {
+ currentResultSet = advancedResultSet.get();
return currentResultSet;
}
executionContext = createExecutionContext(queryContext);
@@ -260,25 +253,12 @@ public final class ShardingSpherePreparedStatement
extends AbstractPreparedState
return new ShardingSphereResultSet(resultSets, mergedResult, this,
selectContainsEnhancedTable, executionContext, columnLabelAndIndexMap);
}
- private boolean decide(final QueryContext queryContext, final
ShardingSphereDatabase database, final RuleMetaData globalRuleMetaData) {
- return
executor.getSqlFederationEngine().decide(queryContext.getSqlStatementContext(),
queryContext.getParameters(), database, globalRuleMetaData);
- }
-
private void handleAutoCommit(final SQLStatement sqlStatement) throws
SQLException {
if (AutoCommitUtils.needOpenTransaction(sqlStatement)) {
connection.handleAutoCommit();
}
}
- private JDBCExecutionUnit createTrafficExecutionUnit(final String
trafficInstanceId, final QueryContext queryContext) throws SQLException {
- ShardingSphereDatabase database =
metaDataContexts.getMetaData().getDatabase(databaseName);
- DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
prepareEngine = createDriverExecutionPrepareEngine(database);
- ExecutionUnit executionUnit = new ExecutionUnit(trafficInstanceId, new
SQLUnit(queryContext.getSql(), queryContext.getParameters()));
- ExecutionGroupContext<JDBCExecutionUnit> context =
- prepareEngine.prepare(new RouteContext(),
Collections.singleton(executionUnit), new
ExecutionGroupReportContext(connection.getProcessId(), databaseName, new
Grantee("", "")));
- return context.getInputGroups().stream().flatMap(each ->
each.getInputs().stream()).findFirst().orElseThrow(EmptyTrafficExecutionUnitException::new);
- }
-
private void resetParameters() throws SQLException {
parameterSets.clear();
parameterSets.add(getParameters());
@@ -298,14 +278,6 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
SQLExecutorExceptionHandler.isExceptionThrown()));
}
- private ResultSet executeFederationQuery(final QueryContext queryContext) {
- ShardingSphereDatabase database =
metaDataContexts.getMetaData().getDatabase(databaseName);
- PreparedStatementExecuteQueryCallback callback = new
PreparedStatementExecuteQueryCallback(database.getProtocolType(),
- database.getResourceMetaData(), sqlStatement,
SQLExecutorExceptionHandler.isExceptionThrown());
- SQLFederationContext context = new SQLFederationContext(false,
queryContext, metaDataContexts.getMetaData(), connection.getProcessId());
- return
executor.getSqlFederationEngine().executeQuery(createDriverExecutionPrepareEngine(database),
callback, context);
- }
-
private DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
createDriverExecutionPrepareEngine(final ShardingSphereDatabase database) {
int maxConnectionsSizePerQuery =
metaDataContexts.getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
return new
DriverExecutionPrepareEngine<>(JDBCDriverType.PREPARED_STATEMENT,
maxConnectionsSizePerQuery, connection.getDatabaseConnectionManager(),
statementManager, statementOption,
@@ -322,11 +294,10 @@ public final class ShardingSpherePreparedStatement
extends AbstractPreparedState
clearPrevious();
QueryContext queryContext = createQueryContext();
handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement());
- Optional<String> trafficInstanceId =
connection.getTrafficInstanceId(trafficRule, queryContext);
- if (trafficInstanceId.isPresent()) {
- ShardingSphereDatabase database =
metaDataContexts.getMetaData().getDatabase(databaseName);
- return
executor.getTrafficExecutor().execute(connection.getProcessId(), databaseName,
- trafficInstanceId.get(), queryContext,
createDriverExecutionPrepareEngine(database), (statement, sql) ->
((PreparedStatement) statement).executeUpdate());
+ ShardingSphereDatabase database =
metaDataContexts.getMetaData().getDatabase(databaseName);
+ Optional<Integer> updatedCount =
executor.executeAdvanceUpdate(metaDataContexts.getMetaData(), database,
queryContext, createDriverExecutionPrepareEngine(database));
+ if (updatedCount.isPresent()) {
+ return updatedCount.get();
}
executionContext = createExecutionContext(queryContext);
if (hasRawExecutionRule()) {
@@ -387,20 +358,12 @@ public final class ShardingSpherePreparedStatement
extends AbstractPreparedState
clearPrevious();
QueryContext queryContext = createQueryContext();
handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement());
- Optional<String> trafficInstanceId =
connection.getTrafficInstanceId(trafficRule, queryContext);
- if (trafficInstanceId.isPresent()) {
- ShardingSphereDatabase database =
metaDataContexts.getMetaData().getDatabase(databaseName);
- boolean result =
executor.getTrafficExecutor().execute(connection.getProcessId(), databaseName,
- trafficInstanceId.get(), queryContext,
createDriverExecutionPrepareEngine(database), (statement, sql) ->
((PreparedStatement) statement).execute());
- currentResultSet =
executor.getTrafficExecutor().getResultSet();
- return result;
- }
- if (decide(queryContext,
metaDataContexts.getMetaData().getDatabase(databaseName),
metaDataContexts.getMetaData().getGlobalRuleMetaData())) {
- ResultSet resultSet = executeFederationQuery(queryContext);
- currentResultSet = resultSet;
- return null != resultSet;
+ ShardingSphereDatabase database =
metaDataContexts.getMetaData().getDatabase(databaseName);
+ Optional<Boolean> advancedResult = executor.executeAdvance(
+ metaDataContexts.getMetaData(), database, queryContext,
createDriverExecutionPrepareEngine(database), (statement, sql) ->
((PreparedStatement) statement).execute());
+ if (advancedResult.isPresent()) {
+ return advancedResult.get();
}
- currentResultSet = null;
executionContext = createExecutionContext(queryContext);
if (hasRawExecutionRule()) {
Collection<ExecuteResult> results =
@@ -414,8 +377,7 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
handleExceptionInTransaction(connection, metaDataContexts);
throw SQLExceptionTransformEngine.toSQLException(ex,
metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType());
} finally {
- batchPreparedStatementExecutor.clear();
- clearParameters();
+ clearBatch();
}
}
@@ -477,6 +439,10 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
if (null != currentResultSet) {
return currentResultSet;
}
+ Optional<ResultSet> advancedResultSet =
executor.getAdvancedResultSet();
+ if (advancedResultSet.isPresent()) {
+ return advancedResultSet.get();
+ }
if (executionContext.getSqlStatementContext() instanceof
SelectStatementContext ||
executionContext.getSqlStatementContext().getSqlStatement() instanceof
DALStatement) {
List<ResultSet> resultSets = getResultSets();
if (resultSets.isEmpty()) {
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
index 9becb70f884..55edd1785d9 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
@@ -74,9 +74,7 @@ import
org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.parser.rule.SQLParserRule;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import
org.apache.shardingsphere.sql.parser.sql.common.statement.dal.DALStatement;
-import
org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext;
import org.apache.shardingsphere.traffic.executor.TrafficExecutorCallback;
-import org.apache.shardingsphere.traffic.rule.TrafficRule;
import org.apache.shardingsphere.transaction.util.AutoCommitUtils;
import java.sql.Connection;
@@ -111,8 +109,6 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
private final KernelProcessor kernelProcessor;
- private final TrafficRule trafficRule;
-
@Getter(AccessLevel.PROTECTED)
private final StatementManager statementManager;
@@ -141,7 +137,6 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
statementOption = new StatementOption(resultSetType,
resultSetConcurrency, resultSetHoldability);
executor = new DriverExecutor(connection);
kernelProcessor = new KernelProcessor();
- trafficRule =
metaDataContexts.getMetaData().getGlobalRuleMetaData().getSingleRule(TrafficRule.class);
statementManager = new StatementManager();
batchStatementExecutor = new BatchStatementExecutor(this);
databaseName = connection.getDatabaseName();
@@ -156,14 +151,9 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
databaseName =
queryContext.getDatabaseNameFromSQLStatement().orElse(connection.getDatabaseName());
connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName);
ShardingSphereDatabase database =
metaDataContexts.getMetaData().getDatabase(databaseName);
- Optional<String> trafficInstanceId =
connection.getTrafficInstanceId(trafficRule, queryContext);
- if (trafficInstanceId.isPresent()) {
- currentResultSet = executor.getTrafficExecutor().execute(
- connection.getProcessId(), databaseName,
trafficInstanceId.get(), queryContext,
createDriverExecutionPrepareEngine(database), Statement::executeQuery);
- return currentResultSet;
- }
- if (decide(queryContext, database,
metaDataContexts.getMetaData().getGlobalRuleMetaData())) {
- currentResultSet = executeFederationQuery(queryContext);
+ Optional<ResultSet> advancedResultSet =
executor.executeAdvanceQuery(metaDataContexts.getMetaData(), database,
queryContext, createDriverExecutionPrepareEngine(database));
+ if (advancedResultSet.isPresent()) {
+ currentResultSet = advancedResultSet.get();
return currentResultSet;
}
executionContext = createExecutionContext(queryContext);
@@ -186,10 +176,6 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
return new ShardingSphereResultSet(getResultSets(), mergedResult,
this, selectContainsEnhancedTable, executionContext);
}
- private boolean decide(final QueryContext queryContext, final
ShardingSphereDatabase database, final RuleMetaData globalRuleMetaData) {
- return
executor.getSqlFederationEngine().decide(queryContext.getSqlStatementContext(),
queryContext.getParameters(), database, globalRuleMetaData);
- }
-
private List<QueryResult> executeQuery0(final ExecutionContext
executionContext) throws SQLException {
if
(!metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty())
{
return executor.getRawExecutor().execute(
@@ -203,14 +189,6 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
return
executor.getRegularExecutor().executeQuery(executionGroupContext,
executionContext.getQueryContext(), callback);
}
- private ResultSet executeFederationQuery(final QueryContext queryContext) {
- ShardingSphereDatabase database =
metaDataContexts.getMetaData().getDatabase(databaseName);
- StatementExecuteQueryCallback callback = new
StatementExecuteQueryCallback(database.getProtocolType(),
- database.getResourceMetaData(),
queryContext.getSqlStatementContext().getSqlStatement(),
SQLExecutorExceptionHandler.isExceptionThrown());
- SQLFederationContext context = new SQLFederationContext(false,
queryContext, metaDataContexts.getMetaData(), connection.getProcessId());
- return
executor.getSqlFederationEngine().executeQuery(createDriverExecutionPrepareEngine(database),
callback, context);
- }
-
private DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
createDriverExecutionPrepareEngine(final ShardingSphereDatabase database) {
int maxConnectionsSizePerQuery =
metaDataContexts.getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
return new DriverExecutionPrepareEngine<>(JDBCDriverType.STATEMENT,
maxConnectionsSizePerQuery, connection.getDatabaseConnectionManager(),
statementManager, statementOption,
@@ -291,11 +269,10 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement());
databaseName =
queryContext.getDatabaseNameFromSQLStatement().orElse(connection.getDatabaseName());
connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName);
- Optional<String> trafficInstanceId =
connection.getTrafficInstanceId(trafficRule, queryContext);
- if (trafficInstanceId.isPresent()) {
- ShardingSphereDatabase database =
metaDataContexts.getMetaData().getDatabase(databaseName);
- return executor.getTrafficExecutor().execute(
- connection.getProcessId(), databaseName,
trafficInstanceId.get(), queryContext,
createDriverExecutionPrepareEngine(database), trafficCallback);
+ ShardingSphereDatabase database =
metaDataContexts.getMetaData().getDatabase(databaseName);
+ Optional<Integer> updatedCount =
executor.executeAdvanceUpdate(metaDataContexts.getMetaData(), database,
queryContext, createDriverExecutionPrepareEngine(database), trafficCallback);
+ if (updatedCount.isPresent()) {
+ return updatedCount.get();
}
executionContext = createExecutionContext(queryContext);
if
(!metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty())
{
@@ -393,24 +370,16 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
}
private boolean execute0(final String sql, final ExecuteCallback
executeCallback, final TrafficExecutorCallback<Boolean> trafficCallback) throws
SQLException {
+ currentResultSet = null;
QueryContext queryContext = createQueryContext(sql);
handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement());
databaseName =
queryContext.getDatabaseNameFromSQLStatement().orElse(connection.getDatabaseName());
connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName);
- Optional<String> trafficInstanceId =
connection.getTrafficInstanceId(trafficRule, queryContext);
- if (trafficInstanceId.isPresent()) {
- ShardingSphereDatabase database =
metaDataContexts.getMetaData().getDatabase(databaseName);
- boolean result = executor.getTrafficExecutor().execute(
- connection.getProcessId(), databaseName,
trafficInstanceId.get(), queryContext,
createDriverExecutionPrepareEngine(database), trafficCallback);
- currentResultSet = executor.getTrafficExecutor().getResultSet();
- return result;
- }
- if (decide(queryContext,
metaDataContexts.getMetaData().getDatabase(databaseName),
metaDataContexts.getMetaData().getGlobalRuleMetaData())) {
- ResultSet resultSet = executeFederationQuery(queryContext);
- currentResultSet = resultSet;
- return null != resultSet;
+ ShardingSphereDatabase database =
metaDataContexts.getMetaData().getDatabase(databaseName);
+ Optional<Boolean> advancedResult =
executor.executeAdvance(metaDataContexts.getMetaData(), database, queryContext,
createDriverExecutionPrepareEngine(database), trafficCallback);
+ if (advancedResult.isPresent()) {
+ return advancedResult.get();
}
- currentResultSet = null;
executionContext = createExecutionContext(queryContext);
if
(!metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty())
{
Collection<ExecuteResult> results =
executor.getRawExecutor().execute(createRawExecutionContext(executionContext),
executionContext.getQueryContext(), new RawSQLExecutorCallback());
@@ -528,6 +497,10 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
if (null != currentResultSet) {
return currentResultSet;
}
+ Optional<ResultSet> advancedResultSet =
executor.getAdvancedResultSet();
+ if (advancedResultSet.isPresent()) {
+ return advancedResultSet.get();
+ }
if (executionContext.getSqlStatementContext() instanceof
SelectStatementContext ||
executionContext.getSqlStatementContext().getSqlStatement() instanceof
DALStatement) {
List<ResultSet> resultSets = getResultSets();
if (resultSets.isEmpty()) {