This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 9a1e1758ecc Refactor DriverExecutorFacade (#31572)
9a1e1758ecc is described below
commit 9a1e1758ecc43b1f807f08a24a4c3f179e4fe431
Author: Liang Zhang <[email protected]>
AuthorDate: Tue Jun 4 01:02:40 2024 +0800
Refactor DriverExecutorFacade (#31572)
* Refactor DriverExecutorFacade
* Refactor DriverExecutorFacade
* Refactor DriverExecutorFacade
* Refactor DriverExecutorFacade
---
...verExecutor.java => DriverExecuteExecutor.java} | 283 +++------------------
.../executor/DriverExecuteQueryExecutor.java | 211 +++++++++++++++
.../executor/DriverExecuteUpdateExecutor.java | 232 +++++++++++++++++
.../driver/executor/DriverExecutorFacade.java | 67 +++++
.../jdbc/adapter/AbstractStatementAdapter.java | 9 +-
.../statement/ShardingSpherePreparedStatement.java | 51 ++--
.../core/statement/ShardingSphereStatement.java | 35 ++-
.../StatementAddCallback.java} | 13 +-
.../{ => callback}/StatementReplayCallback.java | 14 +-
.../statement/CircuitBreakerPreparedStatement.java | 10 +-
10 files changed, 608 insertions(+), 317 deletions(-)
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecuteExecutor.java
similarity index 50%
rename from
jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java
rename to
jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecuteExecutor.java
index 44109f16d41..e19437efbc3 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecuteExecutor.java
@@ -17,25 +17,19 @@
package org.apache.shardingsphere.driver.executor;
-import lombok.Getter;
+import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.driver.executor.callback.ExecuteCallback;
import org.apache.shardingsphere.driver.executor.callback.ExecuteQueryCallback;
-import
org.apache.shardingsphere.driver.executor.callback.ExecuteUpdateCallback;
import
org.apache.shardingsphere.driver.executor.callback.impl.PreparedStatementExecuteQueryCallback;
import
org.apache.shardingsphere.driver.executor.callback.impl.StatementExecuteQueryCallback;
import
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
-import
org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResultSet;
-import
org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResultSetUtils;
-import
org.apache.shardingsphere.driver.jdbc.core.statement.StatementReplayCallback;
-import
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
-import
org.apache.shardingsphere.infra.binder.context.statement.dml.SelectStatementContext;
+import
org.apache.shardingsphere.driver.jdbc.core.statement.callback.StatementAddCallback;
+import
org.apache.shardingsphere.driver.jdbc.core.statement.callback.StatementReplayCallback;
import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
import org.apache.shardingsphere.infra.connection.kernel.KernelProcessor;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
-import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
import
org.apache.shardingsphere.infra.exception.dialect.SQLExceptionTransformEngine;
import org.apache.shardingsphere.infra.executor.audit.SQLAuditEngine;
-import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
@@ -43,19 +37,15 @@ import
org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
-import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
-import
org.apache.shardingsphere.infra.executor.sql.execute.result.update.UpdateResult;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
import
org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
-import org.apache.shardingsphere.infra.merge.MergeEngine;
-import org.apache.shardingsphere.infra.merge.result.MergedResult;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
@@ -68,7 +58,6 @@ import
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.SelectState
import org.apache.shardingsphere.sqlfederation.engine.SQLFederationEngine;
import
org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext;
import org.apache.shardingsphere.traffic.executor.TrafficExecutor;
-import org.apache.shardingsphere.traffic.executor.TrafficExecutorCallback;
import org.apache.shardingsphere.traffic.rule.TrafficRule;
import org.apache.shardingsphere.transaction.api.TransactionType;
import
org.apache.shardingsphere.transaction.implicit.ImplicitTransactionCallback;
@@ -79,18 +68,17 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
-import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
-import java.util.stream.Collectors;
/**
- * Driver executor.
+ * Driver execute executor.
*/
-public final class DriverExecutor implements AutoCloseable {
+@RequiredArgsConstructor
+public final class DriverExecuteExecutor {
private final ShardingSphereConnection connection;
@@ -106,65 +94,41 @@ public final class DriverExecutor implements AutoCloseable
{
private ExecuteType executeType = ExecuteType.REGULAR;
- private final KernelProcessor kernelProcessor;
-
- @Getter
- private final List<Statement> statements = new ArrayList<>();
-
- @Getter
- private final List<List<Object>> parameterSets = new ArrayList<>();
-
- public DriverExecutor(final ShardingSphereConnection connection) {
- this.connection = connection;
- metaData =
connection.getContextManager().getMetaDataContexts().getMetaData();
- ExecutorEngine executorEngine =
connection.getContextManager().getExecutorEngine();
- JDBCExecutor jdbcExecutor = new JDBCExecutor(executorEngine,
connection.getDatabaseConnectionManager().getConnectionContext());
- regularExecutor = new DriverJDBCExecutor(connection.getDatabaseName(),
connection.getContextManager(), jdbcExecutor);
- rawExecutor = new RawExecutor(executorEngine,
connection.getDatabaseConnectionManager().getConnectionContext());
- String schemaName = new
DatabaseTypeRegistry(metaData.getDatabase(connection.getDatabaseName()).getProtocolType()).getDefaultSchemaName(connection.getDatabaseName());
- trafficExecutor = new TrafficExecutor();
- sqlFederationEngine = new
SQLFederationEngine(connection.getDatabaseName(), schemaName, metaData,
connection.getContextManager().getMetaDataContexts().getStatistics(),
jdbcExecutor);
- kernelProcessor = new KernelProcessor();
- }
-
/**
- * Execute query.
+ * Execute.
*
* @param database database
* @param queryContext query context
* @param prepareEngine prepare engine
- * @param statement statement
- * @param columnLabelAndIndexMap column label and index map
+ * @param executeCallback execute callback
* @param statementReplayCallback statement replay callback
- * @return result set
+ * @param statementAddCallback statement add callback
+ * @return execute result
* @throws SQLException SQL exception
*/
@SuppressWarnings("rawtypes")
- public ResultSet executeQuery(final ShardingSphereDatabase database, final
QueryContext queryContext,
- final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
final Statement statement,
- final Map<String, Integer>
columnLabelAndIndexMap, final StatementReplayCallback statementReplayCallback)
throws SQLException {
+ public boolean execute(final ShardingSphereDatabase database, final
QueryContext queryContext,
+ final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
+ final ExecuteCallback executeCallback, final
StatementReplayCallback statementReplayCallback, final StatementAddCallback
statementAddCallback) throws SQLException {
Optional<String> trafficInstanceId =
connection.getTrafficInstanceId(metaData.getGlobalRuleMetaData().getSingleRule(TrafficRule.class),
queryContext);
if (trafficInstanceId.isPresent()) {
- return trafficExecutor.execute(
- connection.getProcessId(), database.getName(),
trafficInstanceId.get(), queryContext, prepareEngine,
getTrafficExecuteQueryCallback(prepareEngine.getType()));
+ executeType = ExecuteType.TRAFFIC;
+ return trafficExecutor.execute(connection.getProcessId(),
database.getName(), trafficInstanceId.get(), queryContext, prepareEngine,
executeCallback::execute);
}
if (sqlFederationEngine.decide(queryContext.getSqlStatementContext(),
queryContext.getParameters(), database, metaData.getGlobalRuleMetaData())) {
- return sqlFederationEngine.executeQuery(
+ executeType = ExecuteType.FEDERATION;
+ ResultSet resultSet = sqlFederationEngine.executeQuery(
prepareEngine, getExecuteQueryCallback(database,
queryContext, prepareEngine.getType()), new SQLFederationContext(false,
queryContext, metaData, connection.getProcessId()));
+ return null != resultSet;
}
- List<QueryResult> queryResults = executePushDownQuery(database,
queryContext, prepareEngine, statementReplayCallback);
- MergedResult mergedResult = mergeQuery(database, queryResults,
queryContext.getSqlStatementContext());
- boolean selectContainsEnhancedTable =
queryContext.getSqlStatementContext() instanceof SelectStatementContext
- && ((SelectStatementContext)
queryContext.getSqlStatementContext()).isContainsEnhancedTable();
- List<ResultSet> resultSets = getResultSets();
- return new ShardingSphereResultSet(resultSets, mergedResult,
statement, selectContainsEnhancedTable, queryContext.getSqlStatementContext(),
- null == columnLabelAndIndexMap
- ?
ShardingSphereResultSetUtils.createColumnLabelAndIndexMap(queryContext.getSqlStatementContext(),
selectContainsEnhancedTable, resultSets.get(0).getMetaData())
- : columnLabelAndIndexMap);
- }
-
- private TrafficExecutorCallback<ResultSet>
getTrafficExecuteQueryCallback(final String jdbcDriverType) {
- return JDBCDriverType.STATEMENT.equals(jdbcDriverType) ? ((sql,
statement) -> statement.executeQuery(sql)) : ((sql, statement) ->
((PreparedStatement) statement).executeQuery());
+ ExecutionContext executionContext = createExecutionContext(database,
queryContext);
+ if (hasRawExecutionRule(database)) {
+ Collection<ExecuteResult> results =
rawExecutor.execute(createRawExecutionGroupContext(database, executionContext),
queryContext, new RawSQLExecutorCallback());
+ return results.iterator().next() instanceof QueryResult;
+ }
+ boolean isNeedImplicitCommitTransaction =
isNeedImplicitCommitTransaction(
+ connection,
queryContext.getSqlStatementContext().getSqlStatement(),
executionContext.getExecutionUnits().size() > 1);
+ return executeWithExecutionContext(database, executeCallback,
executionContext, prepareEngine, isNeedImplicitCommitTransaction,
statementReplayCallback, statementAddCallback);
}
private ExecuteQueryCallback getExecuteQueryCallback(final
ShardingSphereDatabase database, final QueryContext queryContext, final String
jdbcDriverType) {
@@ -175,27 +139,6 @@ public final class DriverExecutor implements AutoCloseable
{
queryContext.getSqlStatementContext().getSqlStatement(),
SQLExecutorExceptionHandler.isExceptionThrown());
}
- @SuppressWarnings({"rawtypes", "unchecked"})
- private List<QueryResult> executePushDownQuery(final
ShardingSphereDatabase database, final QueryContext queryContext,
- final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
- final
StatementReplayCallback statementReplayCallback) throws SQLException {
- ExecutionContext executionContext = createExecutionContext(database,
queryContext);
- if (hasRawExecutionRule(database)) {
- return
rawExecutor.execute(createRawExecutionGroupContext(database, executionContext),
- queryContext, new
RawSQLExecutorCallback()).stream().map(QueryResult.class::cast).collect(Collectors.toList());
- }
- ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
prepareEngine.prepare(executionContext.getRouteContext(),
executionContext.getExecutionUnits(),
- new ExecutionGroupReportContext(connection.getProcessId(),
database.getName(), new Grantee("", "")));
- for (ExecutionGroup<JDBCExecutionUnit> each :
executionGroupContext.getInputGroups()) {
- statements.addAll(getStatements(each));
- if
(JDBCDriverType.PREPARED_STATEMENT.equals(prepareEngine.getType())) {
- parameterSets.addAll(getParameterSets(each));
- }
- }
- statementReplayCallback.replay(statements, parameterSets);
- return regularExecutor.executeQuery(executionGroupContext,
queryContext, getExecuteQueryCallback(database, queryContext,
prepareEngine.getType()));
- }
-
private boolean hasRawExecutionRule(final ShardingSphereDatabase database)
{
return
!database.getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty();
}
@@ -216,18 +159,10 @@ public final class DriverExecutor implements
AutoCloseable {
return result;
}
- private ExecutionContext createExecutionContext(final
ShardingSphereDatabase database, final QueryContext queryContext) throws
SQLException {
- clearStatements();
+ private ExecutionContext createExecutionContext(final
ShardingSphereDatabase database, final QueryContext queryContext) {
RuleMetaData globalRuleMetaData = metaData.getGlobalRuleMetaData();
SQLAuditEngine.audit(queryContext, globalRuleMetaData, database);
- return kernelProcessor.generateExecutionContext(queryContext,
database, globalRuleMetaData, metaData.getProps(),
connection.getDatabaseConnectionManager().getConnectionContext());
- }
-
- private void clearStatements() throws SQLException {
- for (Statement each : statements) {
- each.close();
- }
- statements.clear();
+ return new KernelProcessor().generateExecutionContext(queryContext,
database, globalRuleMetaData, metaData.getProps(),
connection.getDatabaseConnectionManager().getConnectionContext());
}
private ExecutionGroupContext<RawSQLExecutionUnit>
createRawExecutionGroupContext(final ShardingSphereDatabase database, final
ExecutionContext executionContext) throws SQLException {
@@ -236,57 +171,6 @@ public final class DriverExecutor implements AutoCloseable
{
executionContext.getRouteContext(),
executionContext.getExecutionUnits(), new
ExecutionGroupReportContext(connection.getProcessId(), database.getName(), new
Grantee("", "")));
}
- private MergedResult mergeQuery(final ShardingSphereDatabase database,
final List<QueryResult> queryResults, final SQLStatementContext
sqlStatementContext) throws SQLException {
- MergeEngine mergeEngine = new
MergeEngine(metaData.getGlobalRuleMetaData(), database, metaData.getProps(),
connection.getDatabaseConnectionManager().getConnectionContext());
- return mergeEngine.merge(queryResults, sqlStatementContext);
- }
-
- private List<ResultSet> getResultSets() throws SQLException {
- List<ResultSet> result = new ArrayList<>(statements.size());
- for (Statement each : statements) {
- if (null != each.getResultSet()) {
- result.add(each.getResultSet());
- }
- }
- return result;
- }
-
- /**
- * Execute update.
- *
- * @param database database
- * @param queryContext query context
- * @param prepareEngine prepare engine
- * @param updateCallback update callback
- * @param statementReplayCallback statement replay callback
- * @return updated row count
- * @throws SQLException SQL exception
- */
- @SuppressWarnings("rawtypes")
- public int executeUpdate(final ShardingSphereDatabase database, final
QueryContext queryContext,
- final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
- final ExecuteUpdateCallback updateCallback, final
StatementReplayCallback statementReplayCallback) throws SQLException {
- Optional<String> trafficInstanceId =
connection.getTrafficInstanceId(metaData.getGlobalRuleMetaData().getSingleRule(TrafficRule.class),
queryContext);
- if (trafficInstanceId.isPresent()) {
- return trafficExecutor.execute(connection.getProcessId(),
database.getName(), trafficInstanceId.get(), queryContext, prepareEngine,
updateCallback::executeUpdate);
- }
- ExecutionContext executionContext = createExecutionContext(database,
queryContext);
- return hasRawExecutionRule(database)
- ?
accumulate(rawExecutor.execute(createRawExecutionGroupContext(database,
executionContext), queryContext, new RawSQLExecutorCallback()))
- : executeUpdate(database, updateCallback,
queryContext.getSqlStatementContext(), executionContext, prepareEngine,
isNeedImplicitCommitTransaction(
- connection,
queryContext.getSqlStatementContext().getSqlStatement(),
executionContext.getExecutionUnits().size() > 1), statementReplayCallback);
- }
-
- @SuppressWarnings("rawtypes")
- private int executeUpdate(final ShardingSphereDatabase database, final
ExecuteUpdateCallback updateCallback, final SQLStatementContext
sqlStatementContext, final ExecutionContext executionContext,
- final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
final boolean isNeedImplicitCommitTransaction,
- final StatementReplayCallback
statementReplayCallback) throws SQLException {
- return isNeedImplicitCommitTransaction
- ? executeWithImplicitCommitTransaction(() ->
useDriverToExecuteUpdate(
- database, updateCallback, sqlStatementContext,
executionContext, prepareEngine, statementReplayCallback), connection,
database.getProtocolType())
- : useDriverToExecuteUpdate(database, updateCallback,
sqlStatementContext, executionContext, prepareEngine, statementReplayCallback);
- }
-
private <T> T executeWithImplicitCommitTransaction(final
ImplicitTransactionCallback<T> callback, final Connection connection, final
DatabaseType databaseType) throws SQLException {
T result;
try {
@@ -304,110 +188,31 @@ public final class DriverExecutor implements
AutoCloseable {
return result;
}
- @SuppressWarnings({"rawtypes", "unchecked"})
- private int useDriverToExecuteUpdate(final ShardingSphereDatabase
database, final ExecuteUpdateCallback updateCallback, final SQLStatementContext
sqlStatementContext,
- final ExecutionContext
executionContext, final DriverExecutionPrepareEngine<JDBCExecutionUnit,
Connection> prepareEngine,
- final StatementReplayCallback
statementReplayCallback) throws SQLException {
- ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
createExecutionGroupContext(database, executionContext, prepareEngine);
- for (ExecutionGroup<JDBCExecutionUnit> each :
executionGroupContext.getInputGroups()) {
- statements.addAll(getStatements(each));
- if
(JDBCDriverType.PREPARED_STATEMENT.equals(prepareEngine.getType())) {
- parameterSets.addAll(getParameterSets(each));
- }
- }
- statementReplayCallback.replay(statements, parameterSets);
- JDBCExecutorCallback<Integer> callback =
createExecuteUpdateCallback(database, updateCallback, sqlStatementContext,
prepareEngine.getType());
- return regularExecutor.executeUpdate(executionGroupContext,
executionContext.getQueryContext(),
executionContext.getRouteContext().getRouteUnits(), callback);
- }
-
private ExecutionGroupContext<JDBCExecutionUnit>
createExecutionGroupContext(final ShardingSphereDatabase database, final
ExecutionContext executionContext,
final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
prepareEngine) throws SQLException {
return prepareEngine.prepare(executionContext.getRouteContext(),
executionContext.getExecutionUnits(),
new ExecutionGroupReportContext(connection.getProcessId(),
database.getName(), new Grantee("", "")));
}
- private JDBCExecutorCallback<Integer> createExecuteUpdateCallback(final
ShardingSphereDatabase database,
- final
ExecuteUpdateCallback updateCallback, final SQLStatementContext
sqlStatementContext, final String jdbcDriverType) {
- boolean isExceptionThrown =
SQLExecutorExceptionHandler.isExceptionThrown();
- return new JDBCExecutorCallback<Integer>(database.getProtocolType(),
database.getResourceMetaData(), sqlStatementContext.getSqlStatement(),
isExceptionThrown) {
-
- @Override
- protected Integer executeSQL(final String sql, final Statement
statement, final ConnectionMode connectionMode, final DatabaseType storageType)
throws SQLException {
- return JDBCDriverType.STATEMENT.equals(jdbcDriverType) ?
updateCallback.executeUpdate(sql, statement) : ((PreparedStatement)
statement).executeUpdate();
- }
-
- @Override
- protected Optional<Integer> getSaneResult(final SQLStatement
sqlStatement, final SQLException ex) {
- return Optional.empty();
- }
- };
- }
-
- private int accumulate(final Collection<ExecuteResult> results) {
- int result = 0;
- for (ExecuteResult each : results) {
- result += ((UpdateResult) each).getUpdateCount();
- }
- return result;
- }
-
- /**
- * Execute.
- *
- * @param database database
- * @param queryContext query context
- * @param prepareEngine prepare engine
- * @param executeCallback execute callback
- * @param statementReplayCallback statement replay callback
- * @return execute result
- * @throws SQLException SQL exception
- */
- @SuppressWarnings("rawtypes")
- public boolean execute(final ShardingSphereDatabase database, final
QueryContext queryContext,
- final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
- final ExecuteCallback executeCallback, final
StatementReplayCallback statementReplayCallback) throws SQLException {
- Optional<String> trafficInstanceId =
connection.getTrafficInstanceId(metaData.getGlobalRuleMetaData().getSingleRule(TrafficRule.class),
queryContext);
- if (trafficInstanceId.isPresent()) {
- executeType = ExecuteType.TRAFFIC;
- return trafficExecutor.execute(connection.getProcessId(),
database.getName(), trafficInstanceId.get(), queryContext, prepareEngine,
executeCallback::execute);
- }
- if (sqlFederationEngine.decide(queryContext.getSqlStatementContext(),
queryContext.getParameters(), database, metaData.getGlobalRuleMetaData())) {
- executeType = ExecuteType.FEDERATION;
- ResultSet resultSet = sqlFederationEngine.executeQuery(
- prepareEngine, getExecuteQueryCallback(database,
queryContext, prepareEngine.getType()), new SQLFederationContext(false,
queryContext, metaData, connection.getProcessId()));
- return null != resultSet;
- }
- ExecutionContext executionContext = createExecutionContext(database,
queryContext);
- if (hasRawExecutionRule(database)) {
- Collection<ExecuteResult> results =
rawExecutor.execute(createRawExecutionGroupContext(database, executionContext),
queryContext, new RawSQLExecutorCallback());
- return results.iterator().next() instanceof QueryResult;
- }
- boolean isNeedImplicitCommitTransaction =
isNeedImplicitCommitTransaction(
- connection,
queryContext.getSqlStatementContext().getSqlStatement(),
executionContext.getExecutionUnits().size() > 1);
- return executeWithExecutionContext(database, executeCallback,
executionContext, prepareEngine, isNeedImplicitCommitTransaction,
statementReplayCallback);
- }
-
@SuppressWarnings("rawtypes")
private boolean executeWithExecutionContext(final ShardingSphereDatabase
database, final ExecuteCallback executeCallback, final ExecutionContext
executionContext,
- final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
- final boolean
isNeedImplicitCommitTransaction, final StatementReplayCallback
statementReplayCallback) throws SQLException {
+ final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
final boolean isNeedImplicitCommitTransaction,
+ final StatementReplayCallback
statementReplayCallback, final StatementAddCallback statementAddCallback)
throws SQLException {
return isNeedImplicitCommitTransaction
- ? executeWithImplicitCommitTransaction(() ->
useDriverToExecute(database, executeCallback, executionContext, prepareEngine,
statementReplayCallback), connection,
+ ? executeWithImplicitCommitTransaction(() ->
useDriverToExecute(database, executeCallback, executionContext, prepareEngine,
statementReplayCallback, statementAddCallback), connection,
database.getProtocolType())
- : useDriverToExecute(database, executeCallback,
executionContext, prepareEngine, statementReplayCallback);
+ : useDriverToExecute(database, executeCallback,
executionContext, prepareEngine, statementReplayCallback, statementAddCallback);
}
@SuppressWarnings({"rawtypes", "unchecked"})
private boolean useDriverToExecute(final ShardingSphereDatabase database,
final ExecuteCallback callback, final ExecutionContext executionContext,
- final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
final StatementReplayCallback statementReplayCallback) throws SQLException {
+ final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
+ final StatementReplayCallback
statementReplayCallback, final StatementAddCallback statementAddCallback)
throws SQLException {
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
createExecutionGroupContext(database, executionContext, prepareEngine);
for (ExecutionGroup<JDBCExecutionUnit> each :
executionGroupContext.getInputGroups()) {
- statements.addAll(getStatements(each));
- if
(JDBCDriverType.PREPARED_STATEMENT.equals(prepareEngine.getType())) {
- parameterSets.addAll(getParameterSets(each));
- }
+ statementAddCallback.add(getStatements(each),
JDBCDriverType.PREPARED_STATEMENT.equals(prepareEngine.getType()) ?
getParameterSets(each) : Collections.emptyList());
}
- statementReplayCallback.replay(statements, parameterSets);
+ statementReplayCallback.replay();
JDBCExecutorCallback<Boolean> jdbcExecutorCallback =
createExecuteCallback(database, callback,
executionContext.getSqlStatementContext().getSqlStatement(),
prepareEngine.getType());
return regularExecutor.execute(executionGroupContext,
executionContext.getQueryContext(),
executionContext.getRouteContext().getRouteUnits(), jdbcExecutorCallback);
}
@@ -461,20 +266,6 @@ public final class DriverExecutor implements AutoCloseable
{
}
}
- /**
- * Clear.
- */
- public void clear() {
- statements.clear();
- parameterSets.clear();
- }
-
- @Override
- public void close() throws SQLException {
- sqlFederationEngine.close();
- trafficExecutor.close();
- }
-
public enum ExecuteType {
TRAFFIC, FEDERATION, REGULAR
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecuteQueryExecutor.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecuteQueryExecutor.java
new file mode 100644
index 00000000000..4897c612f10
--- /dev/null
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecuteQueryExecutor.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.driver.executor;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.driver.executor.callback.ExecuteQueryCallback;
+import
org.apache.shardingsphere.driver.executor.callback.impl.PreparedStatementExecuteQueryCallback;
+import
org.apache.shardingsphere.driver.executor.callback.impl.StatementExecuteQueryCallback;
+import
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
+import
org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResultSet;
+import
org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResultSetUtils;
+import
org.apache.shardingsphere.driver.jdbc.core.statement.callback.StatementAddCallback;
+import
org.apache.shardingsphere.driver.jdbc.core.statement.callback.StatementReplayCallback;
+import
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
+import
org.apache.shardingsphere.infra.binder.context.statement.dml.SelectStatementContext;
+import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
+import org.apache.shardingsphere.infra.connection.kernel.KernelProcessor;
+import org.apache.shardingsphere.infra.executor.audit.SQLAuditEngine;
+import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
+import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
+import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
+import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
+import
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
+import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
+import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
+import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
+import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
+import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
+import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
+import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
+import
org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
+import org.apache.shardingsphere.infra.merge.MergeEngine;
+import org.apache.shardingsphere.infra.merge.result.MergedResult;
+import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
+import org.apache.shardingsphere.infra.metadata.user.Grantee;
+import
org.apache.shardingsphere.infra.rule.attribute.raw.RawExecutionRuleAttribute;
+import org.apache.shardingsphere.infra.session.query.QueryContext;
+import org.apache.shardingsphere.sqlfederation.engine.SQLFederationEngine;
+import
org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext;
+import org.apache.shardingsphere.traffic.executor.TrafficExecutor;
+import org.apache.shardingsphere.traffic.executor.TrafficExecutorCallback;
+import org.apache.shardingsphere.traffic.rule.TrafficRule;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * Driver execute query executor.
+ */
+@RequiredArgsConstructor
+public final class DriverExecuteQueryExecutor {
+
+ private final ShardingSphereConnection connection;
+
+ private final ShardingSphereMetaData metaData;
+
+ private final DriverJDBCExecutor regularExecutor;
+
+ private final RawExecutor rawExecutor;
+
+ private final TrafficExecutor trafficExecutor;
+
+ private final SQLFederationEngine sqlFederationEngine;
+
+ private final Collection<Statement> statements = new LinkedList<>();
+
+ /**
+ * Execute query.
+ *
+ * @param database database
+ * @param queryContext query context
+ * @param prepareEngine prepare engine
+ * @param statement statement
+ * @param columnLabelAndIndexMap column label and index map
+ * @param statementReplayCallback statement replay callback
+ * @param statementAddCallback statement add callback
+ * @return result set
+ * @throws SQLException SQL exception
+ */
+ @SuppressWarnings("rawtypes")
+ public ResultSet executeQuery(final ShardingSphereDatabase database, final
QueryContext queryContext,
+ final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
final Statement statement, final Map<String, Integer> columnLabelAndIndexMap,
+ final StatementReplayCallback
statementReplayCallback, final StatementAddCallback statementAddCallback)
throws SQLException {
+ statements.clear();
+ Optional<String> trafficInstanceId =
connection.getTrafficInstanceId(metaData.getGlobalRuleMetaData().getSingleRule(TrafficRule.class),
queryContext);
+ if (trafficInstanceId.isPresent()) {
+ return trafficExecutor.execute(
+ connection.getProcessId(), database.getName(),
trafficInstanceId.get(), queryContext, prepareEngine,
getTrafficExecuteQueryCallback(prepareEngine.getType()));
+ }
+ if (sqlFederationEngine.decide(queryContext.getSqlStatementContext(),
queryContext.getParameters(), database, metaData.getGlobalRuleMetaData())) {
+ return sqlFederationEngine.executeQuery(
+ prepareEngine, getExecuteQueryCallback(database,
queryContext, prepareEngine.getType()), new SQLFederationContext(false,
queryContext, metaData, connection.getProcessId()));
+ }
+ List<QueryResult> queryResults = executePushDownQuery(database,
queryContext, prepareEngine, statementReplayCallback, statementAddCallback);
+ MergedResult mergedResult = mergeQuery(database, queryResults,
queryContext.getSqlStatementContext());
+ boolean selectContainsEnhancedTable =
queryContext.getSqlStatementContext() instanceof SelectStatementContext
+ && ((SelectStatementContext)
queryContext.getSqlStatementContext()).isContainsEnhancedTable();
+ List<ResultSet> resultSets = getResultSets();
+ return new ShardingSphereResultSet(resultSets, mergedResult,
statement, selectContainsEnhancedTable, queryContext.getSqlStatementContext(),
+ null == columnLabelAndIndexMap
+ ?
ShardingSphereResultSetUtils.createColumnLabelAndIndexMap(queryContext.getSqlStatementContext(),
selectContainsEnhancedTable, resultSets.get(0).getMetaData())
+ : columnLabelAndIndexMap);
+ }
+
+ private TrafficExecutorCallback<ResultSet>
getTrafficExecuteQueryCallback(final String jdbcDriverType) {
+ return JDBCDriverType.STATEMENT.equals(jdbcDriverType) ? ((sql,
statement) -> statement.executeQuery(sql)) : ((sql, statement) ->
((PreparedStatement) statement).executeQuery());
+ }
+
+ private ExecuteQueryCallback getExecuteQueryCallback(final
ShardingSphereDatabase database, final QueryContext queryContext, final String
jdbcDriverType) {
+ return JDBCDriverType.STATEMENT.equals(jdbcDriverType)
+ ? new
StatementExecuteQueryCallback(database.getProtocolType(),
database.getResourceMetaData(),
+
queryContext.getSqlStatementContext().getSqlStatement(),
SQLExecutorExceptionHandler.isExceptionThrown())
+ : new
PreparedStatementExecuteQueryCallback(database.getProtocolType(),
database.getResourceMetaData(),
+
queryContext.getSqlStatementContext().getSqlStatement(),
SQLExecutorExceptionHandler.isExceptionThrown());
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ private List<QueryResult> executePushDownQuery(final
ShardingSphereDatabase database, final QueryContext queryContext,
+ final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
+ final
StatementReplayCallback statementReplayCallback, final StatementAddCallback
statementAddCallback) throws SQLException {
+ ExecutionContext executionContext = createExecutionContext(database,
queryContext);
+ if (hasRawExecutionRule(database)) {
+ return rawExecutor.execute(
+ createRawExecutionGroupContext(database,
executionContext), queryContext, new
RawSQLExecutorCallback()).stream().map(QueryResult.class::cast).collect(Collectors.toList());
+ }
+ ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
prepareEngine.prepare(executionContext.getRouteContext(),
executionContext.getExecutionUnits(),
+ new ExecutionGroupReportContext(connection.getProcessId(),
database.getName(), new Grantee("", "")));
+ for (ExecutionGroup<JDBCExecutionUnit> each :
executionGroupContext.getInputGroups()) {
+ Collection<Statement> statements = getStatements(each);
+ this.statements.addAll(statements);
+ statementAddCallback.add(statements,
JDBCDriverType.PREPARED_STATEMENT.equals(prepareEngine.getType()) ?
getParameterSets(each) : Collections.emptyList());
+ }
+ statementReplayCallback.replay();
+ return regularExecutor.executeQuery(executionGroupContext,
queryContext, getExecuteQueryCallback(database, queryContext,
prepareEngine.getType()));
+ }
+
+ private boolean hasRawExecutionRule(final ShardingSphereDatabase database)
{
+ return
!database.getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty();
+ }
+
+ private Collection<Statement> getStatements(final
ExecutionGroup<JDBCExecutionUnit> executionGroup) {
+ Collection<Statement> result = new LinkedList<>();
+ for (JDBCExecutionUnit each : executionGroup.getInputs()) {
+ result.add(each.getStorageResource());
+ }
+ return result;
+ }
+
+ private Collection<List<Object>> getParameterSets(final
ExecutionGroup<JDBCExecutionUnit> executionGroup) {
+ Collection<List<Object>> result = new LinkedList<>();
+ for (JDBCExecutionUnit each : executionGroup.getInputs()) {
+ result.add(each.getExecutionUnit().getSqlUnit().getParameters());
+ }
+ return result;
+ }
+
+ private ExecutionContext createExecutionContext(final
ShardingSphereDatabase database, final QueryContext queryContext) {
+ RuleMetaData globalRuleMetaData = metaData.getGlobalRuleMetaData();
+ SQLAuditEngine.audit(queryContext, globalRuleMetaData, database);
+ return new KernelProcessor().generateExecutionContext(queryContext,
database, globalRuleMetaData, metaData.getProps(),
connection.getDatabaseConnectionManager().getConnectionContext());
+ }
+
+ private ExecutionGroupContext<RawSQLExecutionUnit>
createRawExecutionGroupContext(final ShardingSphereDatabase database, final
ExecutionContext executionContext) throws SQLException {
+ int maxConnectionsSizePerQuery =
metaData.getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
+ return new RawExecutionPrepareEngine(maxConnectionsSizePerQuery,
database.getRuleMetaData().getRules()).prepare(
+ executionContext.getRouteContext(),
executionContext.getExecutionUnits(), new
ExecutionGroupReportContext(connection.getProcessId(), database.getName(), new
Grantee("", "")));
+ }
+
+ private MergedResult mergeQuery(final ShardingSphereDatabase database,
final List<QueryResult> queryResults, final SQLStatementContext
sqlStatementContext) throws SQLException {
+ MergeEngine mergeEngine = new
MergeEngine(metaData.getGlobalRuleMetaData(), database, metaData.getProps(),
connection.getDatabaseConnectionManager().getConnectionContext());
+ return mergeEngine.merge(queryResults, sqlStatementContext);
+ }
+
+ private List<ResultSet> getResultSets() throws SQLException {
+ List<ResultSet> result = new ArrayList<>(statements.size());
+ for (Statement each : statements) {
+ if (null != each.getResultSet()) {
+ result.add(each.getResultSet());
+ }
+ }
+ return result;
+ }
+}
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecuteUpdateExecutor.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecuteUpdateExecutor.java
new file mode 100644
index 00000000000..7592dcab94d
--- /dev/null
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecuteUpdateExecutor.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.driver.executor;
+
+import lombok.RequiredArgsConstructor;
+import
org.apache.shardingsphere.driver.executor.callback.ExecuteUpdateCallback;
+import
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
+import
org.apache.shardingsphere.driver.jdbc.core.statement.callback.StatementAddCallback;
+import
org.apache.shardingsphere.driver.jdbc.core.statement.callback.StatementReplayCallback;
+import
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
+import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
+import org.apache.shardingsphere.infra.connection.kernel.KernelProcessor;
+import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
+import
org.apache.shardingsphere.infra.exception.dialect.SQLExceptionTransformEngine;
+import org.apache.shardingsphere.infra.executor.audit.SQLAuditEngine;
+import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
+import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
+import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
+import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
+import
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
+import
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
+import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
+import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
+import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
+import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
+import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
+import
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
+import
org.apache.shardingsphere.infra.executor.sql.execute.result.update.UpdateResult;
+import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
+import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
+import
org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
+import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.metadata.user.Grantee;
+import
org.apache.shardingsphere.infra.rule.attribute.raw.RawExecutionRuleAttribute;
+import org.apache.shardingsphere.infra.session.query.QueryContext;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
+import
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.DMLStatement;
+import
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.SelectStatement;
+import org.apache.shardingsphere.traffic.executor.TrafficExecutor;
+import org.apache.shardingsphere.traffic.rule.TrafficRule;
+import org.apache.shardingsphere.transaction.api.TransactionType;
+import
org.apache.shardingsphere.transaction.implicit.ImplicitTransactionCallback;
+import org.apache.shardingsphere.transaction.rule.TransactionRule;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Driver execute update executor.
+ */
+@RequiredArgsConstructor
+public final class DriverExecuteUpdateExecutor {
+
+ private final ShardingSphereConnection connection;
+
+ private final ShardingSphereMetaData metaData;
+
+ private final DriverJDBCExecutor regularExecutor;
+
+ private final RawExecutor rawExecutor;
+
+ private final TrafficExecutor trafficExecutor;
+
+ /**
+ * Execute update.
+ *
+ * @param database database
+ * @param queryContext query context
+ * @param prepareEngine prepare engine
+ * @param updateCallback update callback
+ * @param statementReplayCallback statement replay callback
+ * @param statementAddCallback statement add callback
+ * @return updated row count
+ * @throws SQLException SQL exception
+ */
+ @SuppressWarnings("rawtypes")
+ public int executeUpdate(final ShardingSphereDatabase database, final
QueryContext queryContext,
+ final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
+ final ExecuteUpdateCallback updateCallback, final
StatementReplayCallback statementReplayCallback, final StatementAddCallback
statementAddCallback) throws SQLException {
+ Optional<String> trafficInstanceId =
connection.getTrafficInstanceId(metaData.getGlobalRuleMetaData().getSingleRule(TrafficRule.class),
queryContext);
+ if (trafficInstanceId.isPresent()) {
+ return trafficExecutor.execute(connection.getProcessId(),
database.getName(), trafficInstanceId.get(), queryContext, prepareEngine,
updateCallback::executeUpdate);
+ }
+ ExecutionContext executionContext = createExecutionContext(database,
queryContext);
+ return hasRawExecutionRule(database)
+ ?
accumulate(rawExecutor.execute(createRawExecutionGroupContext(database,
executionContext), queryContext, new RawSQLExecutorCallback()))
+ : executeUpdate(database, updateCallback,
queryContext.getSqlStatementContext(), executionContext, prepareEngine,
isNeedImplicitCommitTransaction(
+ connection,
queryContext.getSqlStatementContext().getSqlStatement(),
executionContext.getExecutionUnits().size() > 1), statementReplayCallback,
statementAddCallback);
+ }
+
+ @SuppressWarnings("rawtypes")
+ private int executeUpdate(final ShardingSphereDatabase database, final
ExecuteUpdateCallback updateCallback, final SQLStatementContext
sqlStatementContext, final ExecutionContext executionContext,
+ final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
final boolean isNeedImplicitCommitTransaction,
+ final StatementReplayCallback
statementReplayCallback, final StatementAddCallback statementAddCallback)
throws SQLException {
+ return isNeedImplicitCommitTransaction
+ ? executeWithImplicitCommitTransaction(() ->
useDriverToExecuteUpdate(
+ database, updateCallback, sqlStatementContext,
executionContext, prepareEngine, statementReplayCallback,
statementAddCallback), connection, database.getProtocolType())
+ : useDriverToExecuteUpdate(database, updateCallback,
sqlStatementContext, executionContext, prepareEngine, statementReplayCallback,
statementAddCallback);
+ }
+
+ private boolean hasRawExecutionRule(final ShardingSphereDatabase database)
{
+ return
!database.getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty();
+ }
+
+ private Collection<Statement> getStatements(final
ExecutionGroup<JDBCExecutionUnit> executionGroup) {
+ Collection<Statement> result = new LinkedList<>();
+ for (JDBCExecutionUnit each : executionGroup.getInputs()) {
+ result.add(each.getStorageResource());
+ }
+ return result;
+ }
+
+ private Collection<List<Object>> getParameterSets(final
ExecutionGroup<JDBCExecutionUnit> executionGroup) {
+ Collection<List<Object>> result = new LinkedList<>();
+ for (JDBCExecutionUnit each : executionGroup.getInputs()) {
+ result.add(each.getExecutionUnit().getSqlUnit().getParameters());
+ }
+ return result;
+ }
+
+ private ExecutionContext createExecutionContext(final
ShardingSphereDatabase database, final QueryContext queryContext) {
+ SQLAuditEngine.audit(queryContext, metaData.getGlobalRuleMetaData(),
database);
+ return new KernelProcessor().generateExecutionContext(
+ queryContext, database, metaData.getGlobalRuleMetaData(),
metaData.getProps(),
connection.getDatabaseConnectionManager().getConnectionContext());
+ }
+
+ private ExecutionGroupContext<RawSQLExecutionUnit>
createRawExecutionGroupContext(final ShardingSphereDatabase database, final
ExecutionContext executionContext) throws SQLException {
+ int maxConnectionsSizePerQuery =
metaData.getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
+ return new RawExecutionPrepareEngine(maxConnectionsSizePerQuery,
database.getRuleMetaData().getRules()).prepare(
+ executionContext.getRouteContext(),
executionContext.getExecutionUnits(), new
ExecutionGroupReportContext(connection.getProcessId(), database.getName(), new
Grantee("", "")));
+ }
+
+ private <T> T executeWithImplicitCommitTransaction(final
ImplicitTransactionCallback<T> callback, final Connection connection, final
DatabaseType databaseType) throws SQLException {
+ T result;
+ try {
+ connection.setAutoCommit(false);
+ result = callback.execute();
+ connection.commit();
+ // CHECKSTYLE:OFF
+ } catch (final Exception ex) {
+ // CHECKSTYLE:ON
+ connection.rollback();
+ throw SQLExceptionTransformEngine.toSQLException(ex, databaseType);
+ } finally {
+ connection.setAutoCommit(true);
+ }
+ return result;
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ private int useDriverToExecuteUpdate(final ShardingSphereDatabase
database, final ExecuteUpdateCallback updateCallback, final SQLStatementContext
sqlStatementContext,
+ final ExecutionContext
executionContext, final DriverExecutionPrepareEngine<JDBCExecutionUnit,
Connection> prepareEngine,
+ final StatementReplayCallback
statementReplayCallback, final StatementAddCallback statementAddCallback)
throws SQLException {
+ ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
createExecutionGroupContext(database, executionContext, prepareEngine);
+ for (ExecutionGroup<JDBCExecutionUnit> each :
executionGroupContext.getInputGroups()) {
+ statementAddCallback.add(getStatements(each),
JDBCDriverType.PREPARED_STATEMENT.equals(prepareEngine.getType()) ?
getParameterSets(each) : Collections.emptyList());
+ }
+ statementReplayCallback.replay();
+ JDBCExecutorCallback<Integer> callback =
createExecuteUpdateCallback(database, updateCallback, sqlStatementContext,
prepareEngine.getType());
+ return regularExecutor.executeUpdate(executionGroupContext,
executionContext.getQueryContext(),
executionContext.getRouteContext().getRouteUnits(), callback);
+ }
+
+ private ExecutionGroupContext<JDBCExecutionUnit>
createExecutionGroupContext(final ShardingSphereDatabase database, final
ExecutionContext executionContext,
+
final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
prepareEngine) throws SQLException {
+ return prepareEngine.prepare(executionContext.getRouteContext(),
executionContext.getExecutionUnits(),
+ new ExecutionGroupReportContext(connection.getProcessId(),
database.getName(), new Grantee("", "")));
+ }
+
+ private JDBCExecutorCallback<Integer> createExecuteUpdateCallback(final
ShardingSphereDatabase database,
+ final
ExecuteUpdateCallback updateCallback, final SQLStatementContext
sqlStatementContext, final String jdbcDriverType) {
+ boolean isExceptionThrown =
SQLExecutorExceptionHandler.isExceptionThrown();
+ return new JDBCExecutorCallback<Integer>(database.getProtocolType(),
database.getResourceMetaData(), sqlStatementContext.getSqlStatement(),
isExceptionThrown) {
+
+ @Override
+ protected Integer executeSQL(final String sql, final Statement
statement, final ConnectionMode connectionMode, final DatabaseType storageType)
throws SQLException {
+ return JDBCDriverType.STATEMENT.equals(jdbcDriverType) ?
updateCallback.executeUpdate(sql, statement) : ((PreparedStatement)
statement).executeUpdate();
+ }
+
+ @Override
+ protected Optional<Integer> getSaneResult(final SQLStatement
sqlStatement, final SQLException ex) {
+ return Optional.empty();
+ }
+ };
+ }
+
+ private int accumulate(final Collection<ExecuteResult> results) {
+ int result = 0;
+ for (ExecuteResult each : results) {
+ result += ((UpdateResult) each).getUpdateCount();
+ }
+ return result;
+ }
+
+ private boolean isNeedImplicitCommitTransaction(final
ShardingSphereConnection connection, final SQLStatement sqlStatement, final
boolean multiExecutionUnits) {
+ if (!connection.getAutoCommit()) {
+ return false;
+ }
+ TransactionType transactionType =
connection.getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getSingleRule(TransactionRule.class).getDefaultType();
+ boolean isInTransaction =
connection.getDatabaseConnectionManager().getConnectionTransaction().isInTransaction();
+ if (!TransactionType.isDistributedTransaction(transactionType) ||
isInTransaction) {
+ return false;
+ }
+ return isWriteDMLStatement(sqlStatement) && multiExecutionUnits;
+ }
+
+ private boolean isWriteDMLStatement(final SQLStatement sqlStatement) {
+ return sqlStatement instanceof DMLStatement && !(sqlStatement
instanceof SelectStatement);
+ }
+}
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutorFacade.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutorFacade.java
new file mode 100644
index 00000000000..376fa5de3e2
--- /dev/null
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutorFacade.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.driver.executor;
+
+import lombok.Getter;
+import
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
+import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
+import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
+import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
+import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import org.apache.shardingsphere.sqlfederation.engine.SQLFederationEngine;
+import org.apache.shardingsphere.traffic.executor.TrafficExecutor;
+
+import java.sql.SQLException;
+
+/**
+ * Driver executor facade.
+ */
+public final class DriverExecutorFacade implements AutoCloseable {
+
+ private final TrafficExecutor trafficExecutor;
+
+ private final SQLFederationEngine sqlFederationEngine;
+
+ @Getter
+ private final DriverExecuteQueryExecutor queryExecutor;
+
+ @Getter
+ private final DriverExecuteUpdateExecutor updateExecutor;
+
+ @Getter
+ private final DriverExecuteExecutor executeExecutor;
+
+ public DriverExecutorFacade(final ShardingSphereConnection connection) {
+ JDBCExecutor jdbcExecutor = new
JDBCExecutor(connection.getContextManager().getExecutorEngine(),
connection.getDatabaseConnectionManager().getConnectionContext());
+ DriverJDBCExecutor regularExecutor = new
DriverJDBCExecutor(connection.getDatabaseName(),
connection.getContextManager(), jdbcExecutor);
+ RawExecutor rawExecutor = new
RawExecutor(connection.getContextManager().getExecutorEngine(),
connection.getDatabaseConnectionManager().getConnectionContext());
+ trafficExecutor = new TrafficExecutor();
+ ShardingSphereMetaData metaData =
connection.getContextManager().getMetaDataContexts().getMetaData();
+ String schemaName = new
DatabaseTypeRegistry(metaData.getDatabase(connection.getDatabaseName()).getProtocolType()).getDefaultSchemaName(connection.getDatabaseName());
+ sqlFederationEngine = new
SQLFederationEngine(connection.getDatabaseName(), schemaName, metaData,
connection.getContextManager().getMetaDataContexts().getStatistics(),
jdbcExecutor);
+ queryExecutor = new DriverExecuteQueryExecutor(connection, metaData,
regularExecutor, rawExecutor, trafficExecutor, sqlFederationEngine);
+ updateExecutor = new DriverExecuteUpdateExecutor(connection, metaData,
regularExecutor, rawExecutor, trafficExecutor);
+ executeExecutor = new DriverExecuteExecutor(connection, metaData,
regularExecutor, rawExecutor, trafficExecutor, sqlFederationEngine);
+ }
+
+ @Override
+ public void close() throws SQLException {
+ trafficExecutor.close();
+ sqlFederationEngine.close();
+ }
+}
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
index 7f52c2fbf47..c3e0185a3fc 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
@@ -19,7 +19,6 @@ package org.apache.shardingsphere.driver.jdbc.adapter;
import lombok.AccessLevel;
import lombok.Getter;
-import org.apache.shardingsphere.driver.executor.DriverExecutor;
import
org.apache.shardingsphere.driver.jdbc.adapter.executor.ForceExecuteTemplate;
import
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
import org.apache.shardingsphere.driver.jdbc.core.statement.StatementManager;
@@ -67,8 +66,6 @@ public abstract class AbstractStatementAdapter extends
WrapperAdapter implements
protected abstract Collection<? extends Statement> getRoutedStatements();
- protected abstract DriverExecutor getExecutor();
-
protected abstract StatementManager getStatementManager();
@SuppressWarnings({"unchecked", "rawtypes"})
@@ -225,9 +222,7 @@ public abstract class AbstractStatementAdapter extends
WrapperAdapter implements
closed = true;
try {
forceExecuteTemplate.execute((Collection) getRoutedStatements(),
Statement::close);
- if (null != getExecutor()) {
- getExecutor().close();
- }
+ closeExecutor();
if (null != getStatementManager()) {
getStatementManager().close();
}
@@ -235,4 +230,6 @@ public abstract class AbstractStatementAdapter extends
WrapperAdapter implements
getRoutedStatements().clear();
}
}
+
+ protected abstract void closeExecutor() throws SQLException;
}
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
index aeb8ccbd569..49cf8046d88 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.driver.jdbc.core.statement;
import lombok.AccessLevel;
import lombok.Getter;
-import org.apache.shardingsphere.driver.executor.DriverExecutor;
+import org.apache.shardingsphere.driver.executor.DriverExecutorFacade;
import org.apache.shardingsphere.driver.executor.batch.BatchExecutionUnit;
import
org.apache.shardingsphere.driver.executor.batch.BatchPreparedStatementExecutor;
import
org.apache.shardingsphere.driver.jdbc.adapter.AbstractPreparedStatementAdapter;
@@ -27,6 +27,7 @@ import
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConne
import
org.apache.shardingsphere.driver.jdbc.core.resultset.GeneratedKeysResultSet;
import
org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResultSet;
import
org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResultSetUtils;
+import
org.apache.shardingsphere.driver.jdbc.core.statement.callback.StatementAddCallback;
import
org.apache.shardingsphere.driver.jdbc.core.statement.metadata.ShardingSphereParameterMetaData;
import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
import org.apache.shardingsphere.infra.binder.context.aware.ParameterAware;
@@ -101,7 +102,9 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
private final String sql;
- private final List<PreparedStatement> statements;
+ private final List<PreparedStatement> statements = new ArrayList<>();
+
+ private final List<List<Object>> parameterSets = new ArrayList<>();
private final SQLStatementContext sqlStatementContext;
@@ -112,8 +115,7 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
@Getter
private final ParameterMetaData parameterMetaData;
- @Getter(AccessLevel.PROTECTED)
- private final DriverExecutor executor;
+ private final DriverExecutorFacade driverExecutorFacade;
private final BatchPreparedStatementExecutor
batchPreparedStatementExecutor;
@@ -169,7 +171,6 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
metaData =
connection.getContextManager().getMetaDataContexts().getMetaData();
hintValueContext = SQLHintUtils.extractHint(sql);
this.sql = SQLHintUtils.removeHint(sql);
- statements = new ArrayList<>();
SQLParserRule sqlParserRule =
metaData.getGlobalRuleMetaData().getSingleRule(SQLParserRule.class);
SQLParserEngine sqlParserEngine =
sqlParserRule.getSQLParserEngine(metaData.getDatabase(connection.getDatabaseName()).getProtocolType());
SQLStatement sqlStatement = sqlParserEngine.parse(this.sql, true);
@@ -178,7 +179,7 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName);
parameterMetaData = new ShardingSphereParameterMetaData(sqlStatement);
statementOption = returnGeneratedKeys ? new StatementOption(true,
columns) : new StatementOption(resultSetType, resultSetConcurrency,
resultSetHoldability);
- executor = new DriverExecutor(connection);
+ driverExecutorFacade = new DriverExecutorFacade(connection);
ShardingSphereDatabase database = metaData.getDatabase(databaseName);
JDBCExecutor jdbcExecutor = new
JDBCExecutor(connection.getContextManager().getExecutorEngine(),
connection.getDatabaseConnectionManager().getConnectionContext());
batchPreparedStatementExecutor = new
BatchPreparedStatementExecutor(database, jdbcExecutor,
connection.getProcessId());
@@ -205,14 +206,11 @@ public final class ShardingSpherePreparedStatement
extends AbstractPreparedState
handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement());
ShardingSphereDatabase database =
metaData.getDatabase(databaseName);
findGeneratedKey().ifPresent(optional ->
generatedValues.addAll(optional.getGeneratedValues()));
- currentResultSet = executor.executeQuery(database, queryContext,
createDriverExecutionPrepareEngine(database), this, columnLabelAndIndexMap,
- (StatementReplayCallback<PreparedStatement>) this::replay);
+ currentResultSet =
driverExecutorFacade.getQueryExecutor().executeQuery(database, queryContext,
createDriverExecutionPrepareEngine(database), this, columnLabelAndIndexMap,
+ this::replay, (StatementAddCallback<PreparedStatement>)
this::addStatements);
if (currentResultSet instanceof ShardingSphereResultSet) {
columnLabelAndIndexMap = ((ShardingSphereResultSet)
currentResultSet).getColumnLabelAndIndexMap();
}
- for (Statement each : executor.getStatements()) {
- statements.add((PreparedStatement) each);
- }
return currentResultSet;
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
@@ -231,6 +229,11 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
}
}
+ private void addStatements(final Collection<PreparedStatement> statements,
final Collection<List<Object>> parameterSets) {
+ this.statements.addAll(statements);
+ this.parameterSets.addAll(parameterSets);
+ }
+
private void resetParameters() throws SQLException {
replaySetParameter(statements,
Collections.singletonList(getParameters()));
}
@@ -252,11 +255,8 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
QueryContext queryContext = createQueryContext();
handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement());
ShardingSphereDatabase database =
metaData.getDatabase(databaseName);
- final int result = executor.executeUpdate(database, queryContext,
createDriverExecutionPrepareEngine(database),
- (sql, statement) -> ((PreparedStatement)
statement).executeUpdate(), (StatementReplayCallback<PreparedStatement>)
this::replay);
- for (Statement each : executor.getStatements()) {
- statements.add((PreparedStatement) each);
- }
+ int result =
driverExecutorFacade.getUpdateExecutor().executeUpdate(database, queryContext,
createDriverExecutionPrepareEngine(database),
+ (sql, statement) -> ((PreparedStatement)
statement).executeUpdate(), this::replay,
(StatementAddCallback<PreparedStatement>) this::addStatements);
findGeneratedKey().ifPresent(optional ->
generatedValues.addAll(optional.getGeneratedValues()));
return result;
// CHECKSTYLE:OFF
@@ -280,11 +280,9 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
QueryContext queryContext = createQueryContext();
handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement());
ShardingSphereDatabase database =
metaData.getDatabase(databaseName);
- final boolean result = executor.execute(database, queryContext,
createDriverExecutionPrepareEngine(database), (sql, statement) ->
((PreparedStatement) statement).execute(),
- (StatementReplayCallback<PreparedStatement>) this::replay);
- for (Statement each : executor.getStatements()) {
- statements.add((PreparedStatement) each);
- }
+ boolean result = driverExecutorFacade.getExecuteExecutor().execute(
+ database, queryContext,
createDriverExecutionPrepareEngine(database), (sql, statement) ->
((PreparedStatement) statement).execute(),
+ this::replay, (StatementAddCallback<PreparedStatement>)
this::addStatements);
findGeneratedKey().ifPresent(optional ->
generatedValues.addAll(optional.getGeneratedValues()));
return result;
// CHECKSTYLE:OFF
@@ -302,7 +300,7 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
if (null != currentResultSet) {
return currentResultSet;
}
- Optional<ResultSet> resultSet = executor.getResultSet();
+ Optional<ResultSet> resultSet =
driverExecutorFacade.getExecuteExecutor().getResultSet();
if (resultSet.isPresent()) {
return resultSet.get();
}
@@ -354,7 +352,7 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
return mergeEngine.merge(queryResults, sqlStatementContext);
}
- private void replay(final List<PreparedStatement> statements, final
List<List<Object>> parameterSets) throws SQLException {
+ private void replay() throws SQLException {
replaySetParameter(statements, parameterSets);
for (Statement each : statements) {
getMethodInvocationRecorder().replay(each);
@@ -370,8 +368,8 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
private void clearPrevious() {
currentResultSet = null;
statements.clear();
+ parameterSets.clear();
generatedValues.clear();
- executor.clear();
}
private Optional<GeneratedKeyContext> findGeneratedKey() {
@@ -520,4 +518,9 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
public Collection<PreparedStatement> getRoutedStatements() {
return statements;
}
+
+ @Override
+ protected void closeExecutor() throws SQLException {
+ driverExecutorFacade.close();
+ }
}
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
index b31ff18bcad..a0b4c90b698 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.driver.jdbc.core.statement;
import lombok.AccessLevel;
import lombok.Getter;
-import org.apache.shardingsphere.driver.executor.DriverExecutor;
+import org.apache.shardingsphere.driver.executor.DriverExecutorFacade;
import org.apache.shardingsphere.driver.executor.batch.BatchStatementExecutor;
import org.apache.shardingsphere.driver.executor.callback.ExecuteCallback;
import
org.apache.shardingsphere.driver.executor.callback.ExecuteUpdateCallback;
@@ -27,6 +27,7 @@ import
org.apache.shardingsphere.driver.jdbc.adapter.AbstractStatementAdapter;
import
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
import
org.apache.shardingsphere.driver.jdbc.core.resultset.GeneratedKeysResultSet;
import
org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResultSet;
+import
org.apache.shardingsphere.driver.jdbc.core.statement.callback.StatementAddCallback;
import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
import
org.apache.shardingsphere.infra.binder.context.segment.insert.keygen.GeneratedKeyContext;
import
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
@@ -84,8 +85,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
private final StatementOption statementOption;
- @Getter(AccessLevel.PROTECTED)
- private final DriverExecutor executor;
+ private final DriverExecutorFacade driverExecutorFacade;
@Getter(AccessLevel.PROTECTED)
private final StatementManager statementManager;
@@ -113,7 +113,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
metaData =
connection.getContextManager().getMetaDataContexts().getMetaData();
statements = new LinkedList<>();
statementOption = new StatementOption(resultSetType,
resultSetConcurrency, resultSetHoldability);
- executor = new DriverExecutor(connection);
+ driverExecutorFacade = new DriverExecutorFacade(connection);
statementManager = new StatementManager();
batchStatementExecutor = new BatchStatementExecutor(this);
databaseName = connection.getDatabaseName();
@@ -125,9 +125,8 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
try {
prepareExecute(queryContext);
ShardingSphereDatabase database =
metaData.getDatabase(databaseName);
- currentResultSet = executor.executeQuery(database, queryContext,
createDriverExecutionPrepareEngine(database), this, null,
- (StatementReplayCallback<Statement>) (statements,
parameterSets) -> replay(statements));
- statements.addAll(executor.getStatements());
+ currentResultSet =
driverExecutorFacade.getQueryExecutor().executeQuery(database, queryContext,
createDriverExecutionPrepareEngine(database), this, null,
+ this::replay, (StatementAddCallback<Statement>)
(statements, parameterSets) -> this.statements.addAll(statements));
return currentResultSet;
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
@@ -197,10 +196,8 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
QueryContext queryContext = createQueryContext(sql);
prepareExecute(queryContext);
ShardingSphereDatabase database = metaData.getDatabase(databaseName);
- int result = executor.executeUpdate(database, queryContext,
createDriverExecutionPrepareEngine(database), updateCallback,
- (StatementReplayCallback<Statement>) (statements,
parameterSets) -> replay(statements));
- statements.addAll(executor.getStatements());
- return result;
+ return
driverExecutorFacade.getUpdateExecutor().executeUpdate(database, queryContext,
createDriverExecutionPrepareEngine(database),
+ updateCallback, this::replay,
(StatementAddCallback<Statement>) (statements, parameterSets) ->
this.statements.addAll(statements));
}
@Override
@@ -261,10 +258,8 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
QueryContext queryContext = createQueryContext(sql);
prepareExecute(queryContext);
ShardingSphereDatabase database = metaData.getDatabase(databaseName);
- boolean result = executor.execute(database, queryContext,
createDriverExecutionPrepareEngine(database),
- executeCallback, (StatementReplayCallback<Statement>)
(statements, parameterSets) -> replay(statements));
- statements.addAll(executor.getStatements());
- return result;
+ return driverExecutorFacade.getExecuteExecutor().execute(database,
queryContext, createDriverExecutionPrepareEngine(database), executeCallback,
+ this::replay, (StatementAddCallback<Statement>) (statements,
parameterSets) -> this.statements.addAll(statements));
}
private QueryContext createQueryContext(final String originSQL) throws
SQLException {
@@ -296,7 +291,6 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
each.close();
}
statements.clear();
- executor.clear();
}
private DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
createDriverExecutionPrepareEngine(final ShardingSphereDatabase database) {
@@ -305,7 +299,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
database.getRuleMetaData().getRules(),
database.getResourceMetaData().getStorageUnits());
}
- private void replay(final List<Statement> statements) throws SQLException {
+ private void replay() throws SQLException {
for (Statement each : statements) {
getMethodInvocationRecorder().replay(each);
}
@@ -331,7 +325,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
if (null != currentResultSet) {
return currentResultSet;
}
- Optional<ResultSet> resultSet = executor.getResultSet();
+ Optional<ResultSet> resultSet =
driverExecutorFacade.getExecuteExecutor().getResultSet();
if (resultSet.isPresent()) {
return resultSet.get();
}
@@ -430,4 +424,9 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
return
DatabaseTypedSPILoader.findService(GeneratedKeyColumnProvider.class,
metaData.getDatabase(databaseName).getProtocolType())
.map(GeneratedKeyColumnProvider::getColumnName).orElse(columnName);
}
+
+ @Override
+ protected void closeExecutor() throws SQLException {
+ driverExecutorFacade.close();
+ }
}
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/StatementReplayCallback.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/callback/StatementAddCallback.java
similarity index 74%
copy from
jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/StatementReplayCallback.java
copy to
jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/callback/StatementAddCallback.java
index 0ad5b4456c0..70482d733cc 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/StatementReplayCallback.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/callback/StatementAddCallback.java
@@ -15,25 +15,24 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.driver.jdbc.core.statement;
+package org.apache.shardingsphere.driver.jdbc.core.statement.callback;
-import java.sql.SQLException;
import java.sql.Statement;
+import java.util.Collection;
import java.util.List;
/**
- * Statement replay callback.
+ * Statement add callback.
*
* @param <T> type of statement
*/
-public interface StatementReplayCallback<T extends Statement> {
+public interface StatementAddCallback<T extends Statement> {
/**
- * Replay statement.
+ * Add statements and parameter sets.
*
* @param statements statements
* @param parameterSets parameter sets
- * @throws SQLException SQL exception
*/
- void replay(List<T> statements, List<List<Object>> parameterSets) throws
SQLException;
+ void add(Collection<T> statements, Collection<List<Object>> parameterSets);
}
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/StatementReplayCallback.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/callback/StatementReplayCallback.java
similarity index 69%
rename from
jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/StatementReplayCallback.java
rename to
jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/callback/StatementReplayCallback.java
index 0ad5b4456c0..3aefa19a73c 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/StatementReplayCallback.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/callback/StatementReplayCallback.java
@@ -15,25 +15,19 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.driver.jdbc.core.statement;
+package org.apache.shardingsphere.driver.jdbc.core.statement.callback;
import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.List;
/**
* Statement replay callback.
- *
- * @param <T> type of statement
*/
-public interface StatementReplayCallback<T extends Statement> {
+public interface StatementReplayCallback {
/**
- * Replay statement.
+ * Replay statements.
*
- * @param statements statements
- * @param parameterSets parameter sets
* @throws SQLException SQL exception
*/
- void replay(List<T> statements, List<List<Object>> parameterSets) throws
SQLException;
+ void replay() throws SQLException;
}
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/state/circuit/statement/CircuitBreakerPreparedStatement.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/state/circuit/statement/CircuitBreakerPreparedStatement.java
index ac096d5e97a..d270d0a9ecd 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/state/circuit/statement/CircuitBreakerPreparedStatement.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/state/circuit/statement/CircuitBreakerPreparedStatement.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.driver.state.circuit.statement;
-import org.apache.shardingsphere.driver.executor.DriverExecutor;
import org.apache.shardingsphere.driver.jdbc.core.statement.StatementManager;
import
org.apache.shardingsphere.driver.jdbc.unsupported.AbstractUnsupportedOperationPreparedStatement;
import
org.apache.shardingsphere.driver.state.circuit.connection.CircuitBreakerConnection;
@@ -274,11 +273,6 @@ public final class CircuitBreakerPreparedStatement extends
AbstractUnsupportedOp
return Collections.emptyList();
}
- @Override
- protected DriverExecutor getExecutor() {
- return null;
- }
-
@Override
protected StatementManager getStatementManager() {
return null;
@@ -293,4 +287,8 @@ public final class CircuitBreakerPreparedStatement extends
AbstractUnsupportedOp
public int executeUpdate() {
return -1;
}
+
+ @Override
+ protected void closeExecutor() {
+ }
}