This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 6190a7036f2 Refactor DriverPushDownExecutor (#31664)
6190a7036f2 is described below
commit 6190a7036f2a74d93dc837a9a91b3e5df928ac5f
Author: Liang Zhang <[email protected]>
AuthorDate: Wed Jun 12 16:09:46 2024 +0800
Refactor DriverPushDownExecutor (#31664)
* Refactor DriverPushDownExecuteExecutor
* Refactor DriverPushDownExecuteQueryExecutor
* Refactor DriverPushDownExecuteUpdateExecutor
* Refactor DriverPushDownExecuteUpdateExecutor
---
.../pushdown/DriverPushDownExecuteExecutor.java | 134 ++------------------
.../DriverPushDownExecuteQueryExecutor.java | 116 ++---------------
.../DriverPushDownExecuteUpdateExecutor.java | 137 ++-------------------
.../DriverJDBCPushDownExecuteExecutor.java} | 54 ++------
.../DriverJDBCPushDownExecuteQueryExecutor.java} | 42 +------
.../DriverJDBCPushDownExecuteUpdateExecutor.java} | 56 ++-------
.../raw/DriverRawPushDownExecuteExecutor.java | 65 ++++++++++
.../raw/DriverRawPushDownExecuteQueryExecutor.java | 105 ++++++++++++++++
.../DriverRawPushDownExecuteUpdateExecutor.java | 84 +++++++++++++
9 files changed, 307 insertions(+), 486 deletions(-)
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/DriverPushDownExecuteExecutor.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/DriverPushDownExecuteExecutor.java
index 68ef83d8561..9e4466338ce 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/DriverPushDownExecuteExecutor.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/DriverPushDownExecuteExecutor.java
@@ -18,47 +18,26 @@
package org.apache.shardingsphere.driver.executor.engine.pushdown;
import
org.apache.shardingsphere.driver.executor.callback.add.StatementAddCallback;
-import
org.apache.shardingsphere.driver.executor.callback.execute.ExecuteCallbackFactory;
import
org.apache.shardingsphere.driver.executor.callback.execute.StatementExecuteCallback;
import
org.apache.shardingsphere.driver.executor.callback.replay.StatementReplayCallback;
-import
org.apache.shardingsphere.driver.executor.engine.transaction.DriverTransactionalExecutor;
+import
org.apache.shardingsphere.driver.executor.engine.pushdown.jdbc.DriverJDBCPushDownExecuteExecutor;
+import
org.apache.shardingsphere.driver.executor.engine.pushdown.raw.DriverRawPushDownExecuteExecutor;
import
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
-import
org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResultSet;
import
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
-import
org.apache.shardingsphere.infra.binder.context.statement.dml.SelectStatementContext;
-import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
-import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
-import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
-import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
-import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
-import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
-import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
-import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.stream.JDBCStreamQueryResult;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
-import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
-import
org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
-import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
-import org.apache.shardingsphere.infra.merge.MergeEngine;
-import org.apache.shardingsphere.infra.merge.result.MergedResult;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.user.Grantee;
import
org.apache.shardingsphere.infra.rule.attribute.raw.RawExecutionRuleAttribute;
-import org.apache.shardingsphere.mode.metadata.refresher.MetaDataRefreshEngine;
-import
org.apache.shardingsphere.sql.parser.sql.common.statement.dal.DALStatement;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
@@ -67,23 +46,14 @@ import java.util.Optional;
*/
public final class DriverPushDownExecuteExecutor {
- private final ShardingSphereConnection connection;
+ private final DriverJDBCPushDownExecuteExecutor jdbcPushDownExecutor;
- private final ShardingSphereMetaData metaData;
-
- private final Grantee grantee;
-
- private final JDBCExecutor jdbcExecutor;
-
- private final RawExecutor rawExecutor;
+ private final DriverRawPushDownExecuteExecutor rawPushDownExecutor;
public DriverPushDownExecuteExecutor(final ShardingSphereConnection
connection, final ShardingSphereMetaData metaData, final Grantee grantee,
final JDBCExecutor jdbcExecutor,
final RawExecutor rawExecutor) {
- this.connection = connection;
- this.metaData = metaData;
- this.grantee = grantee;
- this.jdbcExecutor = jdbcExecutor;
- this.rawExecutor = rawExecutor;
+ jdbcPushDownExecutor = new
DriverJDBCPushDownExecuteExecutor(connection, metaData, grantee, jdbcExecutor);
+ rawPushDownExecutor = new DriverRawPushDownExecuteExecutor(connection,
metaData, grantee, rawExecutor);
}
/**
@@ -102,60 +72,8 @@ public final class DriverPushDownExecuteExecutor {
public boolean execute(final ShardingSphereDatabase database, final
ExecutionContext executionContext, final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
final StatementExecuteCallback executeCallback,
final StatementAddCallback addCallback, final StatementReplayCallback
replayCallback) throws SQLException {
return
database.getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty()
- ? executeJDBC(database, executionContext, prepareEngine,
executeCallback, addCallback, replayCallback)
- : executeRaw(database, executionContext);
- }
-
- @SuppressWarnings("rawtypes")
- private boolean executeJDBC(final ShardingSphereDatabase database, final
ExecutionContext executionContext, final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
- final StatementExecuteCallback
executeCallback, final StatementAddCallback addCallback, final
StatementReplayCallback replayCallback) throws SQLException {
- return new DriverTransactionalExecutor(connection).execute(
- database, executionContext, () -> doExecuteJDBC(database,
executionContext, prepareEngine, executeCallback, addCallback, replayCallback));
- }
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- private boolean doExecuteJDBC(final ShardingSphereDatabase database, final
ExecutionContext executionContext, final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
- final StatementExecuteCallback
executeCallback, final StatementAddCallback addCallback, final
StatementReplayCallback replayCallback) throws SQLException {
- ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
prepareEngine.prepare(
- executionContext.getRouteContext(),
executionContext.getExecutionUnits(), new
ExecutionGroupReportContext(connection.getProcessId(), database.getName(),
grantee));
- for (ExecutionGroup<JDBCExecutionUnit> each :
executionGroupContext.getInputGroups()) {
- addCallback.add(getStatements(each),
JDBCDriverType.PREPARED_STATEMENT.equals(prepareEngine.getType()) ?
getParameterSets(each) : Collections.emptyList());
- }
- replayCallback.replay();
- ProcessEngine processEngine = new ProcessEngine();
- try {
- processEngine.executeSQL(executionGroupContext,
executionContext.getQueryContext());
- List<Boolean> results = jdbcExecutor.execute(executionGroupContext,
- new
ExecuteCallbackFactory(prepareEngine.getType()).newInstance(database,
executeCallback, executionContext.getSqlStatementContext().getSqlStatement()));
- new
MetaDataRefreshEngine(connection.getContextManager().getPersistServiceFacade().getMetaDataManagerPersistService(),
database, metaData.getProps())
-
.refresh(executionContext.getQueryContext().getSqlStatementContext(),
executionContext.getRouteContext().getRouteUnits());
- return null != results && !results.isEmpty() && null !=
results.get(0) && results.get(0);
- } finally {
-
processEngine.completeSQLExecution(executionGroupContext.getReportContext().getProcessId());
- }
- }
-
- private Collection<Statement> getStatements(final
ExecutionGroup<JDBCExecutionUnit> executionGroup) {
- Collection<Statement> result = new LinkedList<>();
- for (JDBCExecutionUnit each : executionGroup.getInputs()) {
- result.add(each.getStorageResource());
- }
- return result;
- }
-
- private Collection<List<Object>> getParameterSets(final
ExecutionGroup<JDBCExecutionUnit> executionGroup) {
- Collection<List<Object>> result = new LinkedList<>();
- for (JDBCExecutionUnit each : executionGroup.getInputs()) {
- result.add(each.getExecutionUnit().getSqlUnit().getParameters());
- }
- return result;
- }
-
- private boolean executeRaw(final ShardingSphereDatabase database, final
ExecutionContext executionContext) throws SQLException {
- int maxConnectionsSizePerQuery =
metaData.getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
- ExecutionGroupContext<RawSQLExecutionUnit> executionGroupContext = new
RawExecutionPrepareEngine(maxConnectionsSizePerQuery,
database.getRuleMetaData().getRules())
- .prepare(executionContext.getRouteContext(),
executionContext.getExecutionUnits(), new
ExecutionGroupReportContext(connection.getProcessId(), database.getName(),
grantee));
- return rawExecutor.execute(executionGroupContext,
executionContext.getQueryContext(), new
RawSQLExecutorCallback()).iterator().next() instanceof QueryResult;
+ ? jdbcPushDownExecutor.execute(database, executionContext,
prepareEngine, executeCallback, addCallback, replayCallback)
+ : rawPushDownExecutor.execute(database, executionContext);
}
/**
@@ -170,38 +88,8 @@ public final class DriverPushDownExecuteExecutor {
*/
public Optional<ResultSet> getResultSet(final ShardingSphereDatabase
database, final SQLStatementContext sqlStatementContext,
final Statement statement, final
List<? extends Statement> statements) throws SQLException {
- if (sqlStatementContext instanceof SelectStatementContext ||
sqlStatementContext.getSqlStatement() instanceof DALStatement) {
- List<ResultSet> resultSets = getResultSets(statements);
- if (resultSets.isEmpty()) {
- return Optional.empty();
- }
- List<QueryResult> queryResults = getQueryResults(resultSets);
- MergedResult mergedResult = new
MergeEngine(metaData.getGlobalRuleMetaData(), database, metaData.getProps(),
connection.getDatabaseConnectionManager().getConnectionContext())
- .merge(queryResults, sqlStatementContext);
- boolean selectContainsEnhancedTable = sqlStatementContext
instanceof SelectStatementContext && ((SelectStatementContext)
sqlStatementContext).isContainsEnhancedTable();
- return Optional.of(new ShardingSphereResultSet(resultSets,
mergedResult, statement, selectContainsEnhancedTable, sqlStatementContext));
- }
- return Optional.empty();
- }
-
- @SuppressWarnings("JDBCResourceOpenedButNotSafelyClosed")
- private List<ResultSet> getResultSets(final List<? extends Statement>
statements) throws SQLException {
- List<ResultSet> result = new ArrayList<>(statements.size());
- for (Statement each : statements) {
- if (null != each.getResultSet()) {
- result.add(each.getResultSet());
- }
- }
- return result;
- }
-
- private List<QueryResult> getQueryResults(final List<ResultSet>
resultSets) throws SQLException {
- List<QueryResult> result = new ArrayList<>(resultSets.size());
- for (ResultSet each : resultSets) {
- if (null != each) {
- result.add(new JDBCStreamQueryResult(each));
- }
- }
- return result;
+ return
database.getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty()
+ ? jdbcPushDownExecutor.getResultSet(database,
sqlStatementContext, statement, statements)
+ : Optional.empty();
}
}
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/DriverPushDownExecuteQueryExecutor.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/DriverPushDownExecuteQueryExecutor.java
index 820d0c2bccb..ec6c7305dd5 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/DriverPushDownExecuteQueryExecutor.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/DriverPushDownExecuteQueryExecutor.java
@@ -18,78 +18,39 @@
package org.apache.shardingsphere.driver.executor.engine.pushdown;
import
org.apache.shardingsphere.driver.executor.callback.add.StatementAddCallback;
-import
org.apache.shardingsphere.driver.executor.callback.execute.ExecuteQueryCallbackFactory;
import
org.apache.shardingsphere.driver.executor.callback.replay.StatementReplayCallback;
+import
org.apache.shardingsphere.driver.executor.engine.pushdown.jdbc.DriverJDBCPushDownExecuteQueryExecutor;
+import
org.apache.shardingsphere.driver.executor.engine.pushdown.raw.DriverRawPushDownExecuteQueryExecutor;
import
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
import
org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResultSet;
-import
org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResultSetFactory;
-import
org.apache.shardingsphere.infra.binder.context.statement.dml.SelectStatementContext;
-import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
-import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
-import org.apache.shardingsphere.infra.connection.kernel.KernelProcessor;
-import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
-import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
-import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
-import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
-import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
-import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
-import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
-import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
-import
org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
-import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
-import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
import org.apache.shardingsphere.infra.metadata.user.Grantee;
import
org.apache.shardingsphere.infra.rule.attribute.raw.RawExecutionRuleAttribute;
-import org.apache.shardingsphere.infra.session.connection.ConnectionContext;
import org.apache.shardingsphere.infra.session.query.QueryContext;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
/**
* Driver push down execute query executor.
*/
public final class DriverPushDownExecuteQueryExecutor {
- private final ConnectionContext connectionContext;
+ private final DriverJDBCPushDownExecuteQueryExecutor
driverJDBCPushDownExecutor;
- private final String processId;
-
- private final RuleMetaData globalRuleMetaData;
-
- private final ConfigurationProperties props;
-
- private final Grantee grantee;
-
- private final JDBCExecutor jdbcExecutor;
-
- private final RawExecutor rawExecutor;
-
- private final Collection<Statement> statements;
+ private final DriverRawPushDownExecuteQueryExecutor
driverRawPushDownExecutor;
public DriverPushDownExecuteQueryExecutor(final ShardingSphereConnection
connection, final ShardingSphereMetaData metaData, final Grantee grantee,
final JDBCExecutor jdbcExecutor,
final RawExecutor rawExecutor) {
- connectionContext =
connection.getDatabaseConnectionManager().getConnectionContext();
- processId = connection.getProcessId();
- globalRuleMetaData = metaData.getGlobalRuleMetaData();
- props = metaData.getProps();
- this.grantee = grantee;
- this.jdbcExecutor = jdbcExecutor;
- this.rawExecutor = rawExecutor;
- statements = new LinkedList<>();
+ driverJDBCPushDownExecutor = new
DriverJDBCPushDownExecuteQueryExecutor(connection, metaData, grantee,
jdbcExecutor);
+ driverRawPushDownExecutor = new
DriverRawPushDownExecuteQueryExecutor(connection, metaData, grantee,
rawExecutor);
}
/**
@@ -110,69 +71,8 @@ public final class DriverPushDownExecuteQueryExecutor {
final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
final Statement statement,
final Map<String, Integer>
columnLabelAndIndexMap,
final StatementAddCallback
addCallback, final StatementReplayCallback replayCallback) throws SQLException {
- List<QueryResult> queryResults = getQueryResults(database,
queryContext, prepareEngine, addCallback, replayCallback);
- boolean isContainsEnhancedTable =
queryContext.getSqlStatementContext() instanceof SelectStatementContext
- && ((SelectStatementContext)
queryContext.getSqlStatementContext()).isContainsEnhancedTable();
- return new ShardingSphereResultSetFactory(connectionContext,
globalRuleMetaData, props, statements)
- .newInstance(database, queryContext, queryResults, statement,
columnLabelAndIndexMap, isContainsEnhancedTable);
- }
-
- @SuppressWarnings("rawtypes")
- private List<QueryResult> getQueryResults(final ShardingSphereDatabase
database, final QueryContext queryContext, final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
- final StatementAddCallback
addCallback, final StatementReplayCallback replayCallback) throws SQLException {
- statements.clear();
- ExecutionContext executionContext = new
KernelProcessor().generateExecutionContext(queryContext, database,
globalRuleMetaData, props, connectionContext);
return
database.getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty()
- ? getJDBCQueryResults(database, queryContext, prepareEngine,
addCallback, replayCallback, executionContext)
- : getRawQueryResults(database, queryContext, executionContext);
- }
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- private List<QueryResult> getJDBCQueryResults(final ShardingSphereDatabase
database, final QueryContext queryContext,
- final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
- final StatementAddCallback
addCallback, final StatementReplayCallback replayCallback,
- final ExecutionContext
executionContext) throws SQLException {
- ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
prepareEngine.prepare(executionContext.getRouteContext(),
executionContext.getExecutionUnits(),
- new ExecutionGroupReportContext(processId, database.getName(),
grantee));
- for (ExecutionGroup<JDBCExecutionUnit> each :
executionGroupContext.getInputGroups()) {
- Collection<Statement> statements = getStatements(each);
- this.statements.addAll(statements);
- addCallback.add(statements,
JDBCDriverType.PREPARED_STATEMENT.equals(prepareEngine.getType()) ?
getParameterSets(each) : Collections.emptyList());
- }
- replayCallback.replay();
- ProcessEngine processEngine = new ProcessEngine();
- try {
- processEngine.executeSQL(executionGroupContext, queryContext);
- return jdbcExecutor.execute(executionGroupContext, new
ExecuteQueryCallbackFactory(prepareEngine.getType()).newInstance(database,
queryContext));
- } finally {
-
processEngine.completeSQLExecution(executionGroupContext.getReportContext().getProcessId());
- }
- }
-
- private Collection<Statement> getStatements(final
ExecutionGroup<JDBCExecutionUnit> executionGroup) {
- Collection<Statement> result = new LinkedList<>();
- for (JDBCExecutionUnit each : executionGroup.getInputs()) {
- result.add(each.getStorageResource());
- }
- return result;
- }
-
- private Collection<List<Object>> getParameterSets(final
ExecutionGroup<JDBCExecutionUnit> executionGroup) {
- Collection<List<Object>> result = new LinkedList<>();
- for (JDBCExecutionUnit each : executionGroup.getInputs()) {
- result.add(each.getExecutionUnit().getSqlUnit().getParameters());
- }
- return result;
- }
-
- private List<QueryResult> getRawQueryResults(final ShardingSphereDatabase
database, final QueryContext queryContext, final ExecutionContext
executionContext) throws SQLException {
- return rawExecutor.execute(
- createRawExecutionGroupContext(database, executionContext),
queryContext, new
RawSQLExecutorCallback()).stream().map(QueryResult.class::cast).collect(Collectors.toList());
- }
-
- private ExecutionGroupContext<RawSQLExecutionUnit>
createRawExecutionGroupContext(final ShardingSphereDatabase database, final
ExecutionContext executionContext) throws SQLException {
- int maxConnectionsSizePerQuery =
props.<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
- return new RawExecutionPrepareEngine(maxConnectionsSizePerQuery,
database.getRuleMetaData().getRules()).prepare(
- executionContext.getRouteContext(),
executionContext.getExecutionUnits(), new
ExecutionGroupReportContext(processId, database.getName(), grantee));
+ ? driverJDBCPushDownExecutor.executeQuery(database,
queryContext, prepareEngine, statement, columnLabelAndIndexMap, addCallback,
replayCallback)
+ : driverRawPushDownExecutor.executeQuery(database,
queryContext, statement, columnLabelAndIndexMap);
}
}
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/DriverPushDownExecuteUpdateExecutor.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/DriverPushDownExecuteUpdateExecutor.java
index aac6d76848e..8ca3f327d59 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/DriverPushDownExecuteUpdateExecutor.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/DriverPushDownExecuteUpdateExecutor.java
@@ -18,73 +18,37 @@
package org.apache.shardingsphere.driver.executor.engine.pushdown;
import
org.apache.shardingsphere.driver.executor.callback.add.StatementAddCallback;
-import
org.apache.shardingsphere.driver.executor.callback.execute.ExecuteUpdateCallbackFactory;
import
org.apache.shardingsphere.driver.executor.callback.execute.StatementExecuteUpdateCallback;
import
org.apache.shardingsphere.driver.executor.callback.replay.StatementReplayCallback;
-import
org.apache.shardingsphere.driver.executor.engine.transaction.DriverTransactionalExecutor;
+import
org.apache.shardingsphere.driver.executor.engine.pushdown.jdbc.DriverJDBCPushDownExecuteUpdateExecutor;
+import
org.apache.shardingsphere.driver.executor.engine.pushdown.raw.DriverRawPushDownExecuteUpdateExecutor;
import
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
-import
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
-import org.apache.shardingsphere.infra.binder.context.type.TableAvailable;
-import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
-import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
-import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
-import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
-import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
-import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
-import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
-import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
-import
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
-import
org.apache.shardingsphere.infra.executor.sql.execute.result.update.UpdateResult;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
-import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
-import
org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
-import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.user.Grantee;
-import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
-import
org.apache.shardingsphere.infra.rule.attribute.datanode.DataNodeRuleAttribute;
import
org.apache.shardingsphere.infra.rule.attribute.raw.RawExecutionRuleAttribute;
-import org.apache.shardingsphere.mode.metadata.refresher.MetaDataRefreshEngine;
import java.sql.Connection;
import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Optional;
/**
* Driver push down execute update executor.
*/
public final class DriverPushDownExecuteUpdateExecutor {
- private final ShardingSphereConnection connection;
+ private final DriverJDBCPushDownExecuteUpdateExecutor jdbcPushDownExecutor;
- private final String processId;
-
- private final ConfigurationProperties props;
-
- private final Grantee grantee;
-
- private final JDBCExecutor jdbcExecutor;
-
- private final RawExecutor rawExecutor;
+ private final DriverRawPushDownExecuteUpdateExecutor rawPushDownExecutor;
public DriverPushDownExecuteUpdateExecutor(final ShardingSphereConnection
connection, final ShardingSphereMetaData metaData, final Grantee grantee,
final JDBCExecutor
jdbcExecutor, final RawExecutor rawExecutor) {
- this.connection = connection;
- processId = connection.getProcessId();
- props = metaData.getProps();
- this.grantee = grantee;
- this.jdbcExecutor = jdbcExecutor;
- this.rawExecutor = rawExecutor;
+ jdbcPushDownExecutor = new
DriverJDBCPushDownExecuteUpdateExecutor(connection, metaData, grantee,
jdbcExecutor);
+ rawPushDownExecutor = new
DriverRawPushDownExecuteUpdateExecutor(connection, metaData, grantee,
rawExecutor);
}
/**
@@ -103,92 +67,7 @@ public final class DriverPushDownExecuteUpdateExecutor {
public int executeUpdate(final ShardingSphereDatabase database, final
ExecutionContext executionContext, final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
final StatementExecuteUpdateCallback
updateCallback, final StatementAddCallback addCallback, final
StatementReplayCallback replayCallback) throws SQLException {
return
database.getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty()
- ? executeJDBCUpdate(database, executionContext, prepareEngine,
updateCallback, addCallback, replayCallback)
- : executeRawUpdate(database, executionContext);
- }
-
- @SuppressWarnings("rawtypes")
- private int executeJDBCUpdate(final ShardingSphereDatabase database, final
ExecutionContext executionContext, final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
- final StatementExecuteUpdateCallback
updateCallback, final StatementAddCallback addCallback, final
StatementReplayCallback replayCallback) throws SQLException {
- return new DriverTransactionalExecutor(connection).execute(
- database, executionContext, () ->
doExecuteJDBCUpdate(database, executionContext, prepareEngine, updateCallback,
addCallback, replayCallback));
- }
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- private int doExecuteJDBCUpdate(final ShardingSphereDatabase database,
final ExecutionContext executionContext, final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
- final StatementExecuteUpdateCallback
updateCallback, final StatementAddCallback addCallback, final
StatementReplayCallback replayCallback) throws SQLException {
- ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
prepareEngine.prepare(
- executionContext.getRouteContext(),
executionContext.getExecutionUnits(), new
ExecutionGroupReportContext(processId, database.getName(), grantee));
- for (ExecutionGroup<JDBCExecutionUnit> each :
executionGroupContext.getInputGroups()) {
- addCallback.add(getStatements(each),
JDBCDriverType.PREPARED_STATEMENT.equals(prepareEngine.getType()) ?
getParameterSets(each) : Collections.emptyList());
- }
- replayCallback.replay();
- ProcessEngine processEngine = new ProcessEngine();
- try {
- processEngine.executeSQL(executionGroupContext,
executionContext.getQueryContext());
- JDBCExecutorCallback<Integer> callback = new
ExecuteUpdateCallbackFactory(prepareEngine.getType())
- .newInstance(database,
executionContext.getQueryContext().getSqlStatementContext().getSqlStatement(),
updateCallback);
- List<Integer> updateCounts =
jdbcExecutor.execute(executionGroupContext, callback);
- new
MetaDataRefreshEngine(connection.getContextManager().getPersistServiceFacade().getMetaDataManagerPersistService(),
database, props)
-
.refresh(executionContext.getQueryContext().getSqlStatementContext(),
executionContext.getRouteContext().getRouteUnits());
- return isNeedAccumulate(database.getRuleMetaData().getRules(),
executionContext.getQueryContext().getSqlStatementContext()) ?
accumulate(updateCounts) : updateCounts.get(0);
- } finally {
-
processEngine.completeSQLExecution(executionGroupContext.getReportContext().getProcessId());
- }
- }
-
- private Collection<Statement> getStatements(final
ExecutionGroup<JDBCExecutionUnit> executionGroup) {
- Collection<Statement> result = new LinkedList<>();
- for (JDBCExecutionUnit each : executionGroup.getInputs()) {
- result.add(each.getStorageResource());
- }
- return result;
- }
-
- private Collection<List<Object>> getParameterSets(final
ExecutionGroup<JDBCExecutionUnit> executionGroup) {
- Collection<List<Object>> result = new LinkedList<>();
- for (JDBCExecutionUnit each : executionGroup.getInputs()) {
- result.add(each.getExecutionUnit().getSqlUnit().getParameters());
- }
- return result;
- }
-
- private boolean isNeedAccumulate(final Collection<ShardingSphereRule>
rules, final SQLStatementContext sqlStatementContext) {
- if (!(sqlStatementContext instanceof TableAvailable)) {
- return false;
- }
- for (ShardingSphereRule each : rules) {
- Optional<DataNodeRuleAttribute> ruleAttribute =
each.getAttributes().findAttribute(DataNodeRuleAttribute.class);
- if (ruleAttribute.isPresent() &&
ruleAttribute.get().isNeedAccumulate(((TableAvailable)
sqlStatementContext).getTablesContext().getTableNames())) {
- return true;
- }
- }
- return false;
- }
-
- private int accumulate(final List<Integer> updateResults) {
- int result = 0;
- for (Integer each : updateResults) {
- result += null == each ? 0 : each;
- }
- return result;
- }
-
- private int accumulate(final Collection<ExecuteResult> results) {
- int result = 0;
- for (ExecuteResult each : results) {
- result += ((UpdateResult) each).getUpdateCount();
- }
- return result;
- }
-
- private int executeRawUpdate(final ShardingSphereDatabase database, final
ExecutionContext executionContext) throws SQLException {
- return
accumulate(rawExecutor.execute(createRawExecutionGroupContext(database,
executionContext), executionContext.getQueryContext(), new
RawSQLExecutorCallback()));
- }
-
- private ExecutionGroupContext<RawSQLExecutionUnit>
createRawExecutionGroupContext(final ShardingSphereDatabase database, final
ExecutionContext executionContext) throws SQLException {
- int maxConnectionsSizePerQuery =
props.<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
- return new RawExecutionPrepareEngine(maxConnectionsSizePerQuery,
database.getRuleMetaData().getRules()).prepare(
- executionContext.getRouteContext(),
executionContext.getExecutionUnits(), new
ExecutionGroupReportContext(processId, database.getName(), grantee));
+ ? jdbcPushDownExecutor.executeUpdate(database,
executionContext, prepareEngine, updateCallback, addCallback, replayCallback)
+ : rawPushDownExecutor.executeUpdate(database,
executionContext);
}
}
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/DriverPushDownExecuteExecutor.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/jdbc/DriverJDBCPushDownExecuteExecutor.java
similarity index 72%
copy from
jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/DriverPushDownExecuteExecutor.java
copy to
jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/jdbc/DriverJDBCPushDownExecuteExecutor.java
index 68ef83d8561..64f93f00ad5 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/DriverPushDownExecuteExecutor.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/jdbc/DriverJDBCPushDownExecuteExecutor.java
@@ -15,8 +15,9 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.driver.executor.engine.pushdown;
+package org.apache.shardingsphere.driver.executor.engine.pushdown.jdbc;
+import lombok.RequiredArgsConstructor;
import
org.apache.shardingsphere.driver.executor.callback.add.StatementAddCallback;
import
org.apache.shardingsphere.driver.executor.callback.execute.ExecuteCallbackFactory;
import
org.apache.shardingsphere.driver.executor.callback.execute.StatementExecuteCallback;
@@ -26,28 +27,22 @@ import
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConne
import
org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResultSet;
import
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
import
org.apache.shardingsphere.infra.binder.context.statement.dml.SelectStatementContext;
-import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
-import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
-import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
-import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.stream.JDBCStreamQueryResult;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
-import
org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
import org.apache.shardingsphere.infra.merge.MergeEngine;
import org.apache.shardingsphere.infra.merge.result.MergedResult;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.user.Grantee;
-import
org.apache.shardingsphere.infra.rule.attribute.raw.RawExecutionRuleAttribute;
import org.apache.shardingsphere.mode.metadata.refresher.MetaDataRefreshEngine;
import
org.apache.shardingsphere.sql.parser.sql.common.statement.dal.DALStatement;
@@ -63,9 +58,10 @@ import java.util.List;
import java.util.Optional;
/**
- * Driver push down execute executor.
+ * Driver JDBC push down execute executor.
*/
-public final class DriverPushDownExecuteExecutor {
+@RequiredArgsConstructor
+public final class DriverJDBCPushDownExecuteExecutor {
private final ShardingSphereConnection connection;
@@ -75,47 +71,28 @@ public final class DriverPushDownExecuteExecutor {
private final JDBCExecutor jdbcExecutor;
- private final RawExecutor rawExecutor;
-
- public DriverPushDownExecuteExecutor(final ShardingSphereConnection
connection, final ShardingSphereMetaData metaData, final Grantee grantee,
- final JDBCExecutor jdbcExecutor,
final RawExecutor rawExecutor) {
- this.connection = connection;
- this.metaData = metaData;
- this.grantee = grantee;
- this.jdbcExecutor = jdbcExecutor;
- this.rawExecutor = rawExecutor;
- }
-
/**
* Execute.
*
* @param database database
* @param executionContext execution context
* @param prepareEngine prepare engine
- * @param executeCallback statement execute callback
- * @param addCallback statement add callback
- * @param replayCallback statement replay callback
- * @return is query
+ * @param executeCallback execute callback
+ * @param addCallback add callback
+ * @param replayCallback replay callback
+ * @return execute result
* @throws SQLException SQL exception
*/
@SuppressWarnings("rawtypes")
public boolean execute(final ShardingSphereDatabase database, final
ExecutionContext executionContext, final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
final StatementExecuteCallback executeCallback,
final StatementAddCallback addCallback, final StatementReplayCallback
replayCallback) throws SQLException {
- return
database.getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty()
- ? executeJDBC(database, executionContext, prepareEngine,
executeCallback, addCallback, replayCallback)
- : executeRaw(database, executionContext);
- }
-
- @SuppressWarnings("rawtypes")
- private boolean executeJDBC(final ShardingSphereDatabase database, final
ExecutionContext executionContext, final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
- final StatementExecuteCallback
executeCallback, final StatementAddCallback addCallback, final
StatementReplayCallback replayCallback) throws SQLException {
return new DriverTransactionalExecutor(connection).execute(
- database, executionContext, () -> doExecuteJDBC(database,
executionContext, prepareEngine, executeCallback, addCallback, replayCallback));
+ database, executionContext, () -> doExecute(database,
executionContext, prepareEngine, executeCallback, addCallback, replayCallback));
}
@SuppressWarnings({"rawtypes", "unchecked"})
- private boolean doExecuteJDBC(final ShardingSphereDatabase database, final
ExecutionContext executionContext, final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
- final StatementExecuteCallback
executeCallback, final StatementAddCallback addCallback, final
StatementReplayCallback replayCallback) throws SQLException {
+ private boolean doExecute(final ShardingSphereDatabase database, final
ExecutionContext executionContext, final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
+ final StatementExecuteCallback executeCallback,
final StatementAddCallback addCallback, final StatementReplayCallback
replayCallback) throws SQLException {
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
prepareEngine.prepare(
executionContext.getRouteContext(),
executionContext.getExecutionUnits(), new
ExecutionGroupReportContext(connection.getProcessId(), database.getName(),
grantee));
for (ExecutionGroup<JDBCExecutionUnit> each :
executionGroupContext.getInputGroups()) {
@@ -151,13 +128,6 @@ public final class DriverPushDownExecuteExecutor {
return result;
}
- private boolean executeRaw(final ShardingSphereDatabase database, final
ExecutionContext executionContext) throws SQLException {
- int maxConnectionsSizePerQuery =
metaData.getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
- ExecutionGroupContext<RawSQLExecutionUnit> executionGroupContext = new
RawExecutionPrepareEngine(maxConnectionsSizePerQuery,
database.getRuleMetaData().getRules())
- .prepare(executionContext.getRouteContext(),
executionContext.getExecutionUnits(), new
ExecutionGroupReportContext(connection.getProcessId(), database.getName(),
grantee));
- return rawExecutor.execute(executionGroupContext,
executionContext.getQueryContext(), new
RawSQLExecutorCallback()).iterator().next() instanceof QueryResult;
- }
-
/**
* Get result set.
*
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/DriverPushDownExecuteQueryExecutor.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/jdbc/DriverJDBCPushDownExecuteQueryExecutor.java
similarity index 73%
copy from
jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/DriverPushDownExecuteQueryExecutor.java
copy to
jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/jdbc/DriverJDBCPushDownExecuteQueryExecutor.java
index 820d0c2bccb..91253deb40e 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/DriverPushDownExecuteQueryExecutor.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/jdbc/DriverJDBCPushDownExecuteQueryExecutor.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.driver.executor.engine.pushdown;
+package org.apache.shardingsphere.driver.executor.engine.pushdown.jdbc;
import
org.apache.shardingsphere.driver.executor.callback.add.StatementAddCallback;
import
org.apache.shardingsphere.driver.executor.callback.execute.ExecuteQueryCallbackFactory;
@@ -25,7 +25,6 @@ import
org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResult
import
org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResultSetFactory;
import
org.apache.shardingsphere.infra.binder.context.statement.dml.SelectStatementContext;
import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
-import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
import org.apache.shardingsphere.infra.connection.kernel.KernelProcessor;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
@@ -33,19 +32,14 @@ import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupRepor
import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
-import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
-import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
-import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
-import
org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
import org.apache.shardingsphere.infra.metadata.user.Grantee;
-import
org.apache.shardingsphere.infra.rule.attribute.raw.RawExecutionRuleAttribute;
import org.apache.shardingsphere.infra.session.connection.ConnectionContext;
import org.apache.shardingsphere.infra.session.query.QueryContext;
@@ -57,12 +51,11 @@ import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
/**
- * Driver push down execute query executor.
+ * Driver JDBC push down execute query executor.
*/
-public final class DriverPushDownExecuteQueryExecutor {
+public final class DriverJDBCPushDownExecuteQueryExecutor {
private final ConnectionContext connectionContext;
@@ -76,19 +69,15 @@ public final class DriverPushDownExecuteQueryExecutor {
private final JDBCExecutor jdbcExecutor;
- private final RawExecutor rawExecutor;
-
private final Collection<Statement> statements;
- public DriverPushDownExecuteQueryExecutor(final ShardingSphereConnection
connection, final ShardingSphereMetaData metaData, final Grantee grantee,
- final JDBCExecutor jdbcExecutor,
final RawExecutor rawExecutor) {
+ public DriverJDBCPushDownExecuteQueryExecutor(final
ShardingSphereConnection connection, final ShardingSphereMetaData metaData,
final Grantee grantee, final JDBCExecutor jdbcExecutor) {
connectionContext =
connection.getDatabaseConnectionManager().getConnectionContext();
processId = connection.getProcessId();
globalRuleMetaData = metaData.getGlobalRuleMetaData();
props = metaData.getProps();
this.grantee = grantee;
this.jdbcExecutor = jdbcExecutor;
- this.rawExecutor = rawExecutor;
statements = new LinkedList<>();
}
@@ -117,21 +106,11 @@ public final class DriverPushDownExecuteQueryExecutor {
.newInstance(database, queryContext, queryResults, statement,
columnLabelAndIndexMap, isContainsEnhancedTable);
}
- @SuppressWarnings("rawtypes")
+ @SuppressWarnings({"rawtypes", "unchecked"})
private List<QueryResult> getQueryResults(final ShardingSphereDatabase
database, final QueryContext queryContext, final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
final StatementAddCallback
addCallback, final StatementReplayCallback replayCallback) throws SQLException {
statements.clear();
ExecutionContext executionContext = new
KernelProcessor().generateExecutionContext(queryContext, database,
globalRuleMetaData, props, connectionContext);
- return
database.getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty()
- ? getJDBCQueryResults(database, queryContext, prepareEngine,
addCallback, replayCallback, executionContext)
- : getRawQueryResults(database, queryContext, executionContext);
- }
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- private List<QueryResult> getJDBCQueryResults(final ShardingSphereDatabase
database, final QueryContext queryContext,
- final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
- final StatementAddCallback
addCallback, final StatementReplayCallback replayCallback,
- final ExecutionContext
executionContext) throws SQLException {
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
prepareEngine.prepare(executionContext.getRouteContext(),
executionContext.getExecutionUnits(),
new ExecutionGroupReportContext(processId, database.getName(),
grantee));
for (ExecutionGroup<JDBCExecutionUnit> each :
executionGroupContext.getInputGroups()) {
@@ -164,15 +143,4 @@ public final class DriverPushDownExecuteQueryExecutor {
}
return result;
}
-
- private List<QueryResult> getRawQueryResults(final ShardingSphereDatabase
database, final QueryContext queryContext, final ExecutionContext
executionContext) throws SQLException {
- return rawExecutor.execute(
- createRawExecutionGroupContext(database, executionContext),
queryContext, new
RawSQLExecutorCallback()).stream().map(QueryResult.class::cast).collect(Collectors.toList());
- }
-
- private ExecutionGroupContext<RawSQLExecutionUnit>
createRawExecutionGroupContext(final ShardingSphereDatabase database, final
ExecutionContext executionContext) throws SQLException {
- int maxConnectionsSizePerQuery =
props.<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
- return new RawExecutionPrepareEngine(maxConnectionsSizePerQuery,
database.getRuleMetaData().getRules()).prepare(
- executionContext.getRouteContext(),
executionContext.getExecutionUnits(), new
ExecutionGroupReportContext(processId, database.getName(), grantee));
- }
}
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/DriverPushDownExecuteUpdateExecutor.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/jdbc/DriverJDBCPushDownExecuteUpdateExecutor.java
similarity index 68%
copy from
jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/DriverPushDownExecuteUpdateExecutor.java
copy to
jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/jdbc/DriverJDBCPushDownExecuteUpdateExecutor.java
index aac6d76848e..baf4600aa6d 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/DriverPushDownExecuteUpdateExecutor.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/jdbc/DriverJDBCPushDownExecuteUpdateExecutor.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.driver.executor.engine.pushdown;
+package org.apache.shardingsphere.driver.executor.engine.pushdown.jdbc;
import
org.apache.shardingsphere.driver.executor.callback.add.StatementAddCallback;
import
org.apache.shardingsphere.driver.executor.callback.execute.ExecuteUpdateCallbackFactory;
@@ -26,7 +26,6 @@ import
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConne
import
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
import org.apache.shardingsphere.infra.binder.context.type.TableAvailable;
import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
-import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
@@ -34,21 +33,14 @@ import
org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
-import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
-import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
-import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
-import
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
-import
org.apache.shardingsphere.infra.executor.sql.execute.result.update.UpdateResult;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
-import
org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.user.Grantee;
import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
import
org.apache.shardingsphere.infra.rule.attribute.datanode.DataNodeRuleAttribute;
-import
org.apache.shardingsphere.infra.rule.attribute.raw.RawExecutionRuleAttribute;
import org.apache.shardingsphere.mode.metadata.refresher.MetaDataRefreshEngine;
import java.sql.Connection;
@@ -61,9 +53,9 @@ import java.util.List;
import java.util.Optional;
/**
- * Driver push down execute update executor.
+ * Driver JDBC push down execute update executor.
*/
-public final class DriverPushDownExecuteUpdateExecutor {
+public final class DriverJDBCPushDownExecuteUpdateExecutor {
private final ShardingSphereConnection connection;
@@ -75,16 +67,12 @@ public final class DriverPushDownExecuteUpdateExecutor {
private final JDBCExecutor jdbcExecutor;
- private final RawExecutor rawExecutor;
-
- public DriverPushDownExecuteUpdateExecutor(final ShardingSphereConnection
connection, final ShardingSphereMetaData metaData, final Grantee grantee,
- final JDBCExecutor
jdbcExecutor, final RawExecutor rawExecutor) {
+ public DriverJDBCPushDownExecuteUpdateExecutor(final
ShardingSphereConnection connection, final ShardingSphereMetaData metaData,
final Grantee grantee, final JDBCExecutor jdbcExecutor) {
this.connection = connection;
processId = connection.getProcessId();
props = metaData.getProps();
this.grantee = grantee;
this.jdbcExecutor = jdbcExecutor;
- this.rawExecutor = rawExecutor;
}
/**
@@ -94,29 +82,21 @@ public final class DriverPushDownExecuteUpdateExecutor {
* @param executionContext execution context
* @param prepareEngine prepare engine
* @param updateCallback statement execute update callback
- * @param replayCallback statement replay callback
* @param addCallback statement add callback
- * @return updated row count
+ * @param replayCallback statement replay callback
+ * @return affected rows
* @throws SQLException SQL exception
*/
@SuppressWarnings("rawtypes")
public int executeUpdate(final ShardingSphereDatabase database, final
ExecutionContext executionContext, final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
final StatementExecuteUpdateCallback
updateCallback, final StatementAddCallback addCallback, final
StatementReplayCallback replayCallback) throws SQLException {
- return
database.getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty()
- ? executeJDBCUpdate(database, executionContext, prepareEngine,
updateCallback, addCallback, replayCallback)
- : executeRawUpdate(database, executionContext);
- }
-
- @SuppressWarnings("rawtypes")
- private int executeJDBCUpdate(final ShardingSphereDatabase database, final
ExecutionContext executionContext, final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
- final StatementExecuteUpdateCallback
updateCallback, final StatementAddCallback addCallback, final
StatementReplayCallback replayCallback) throws SQLException {
return new DriverTransactionalExecutor(connection).execute(
- database, executionContext, () ->
doExecuteJDBCUpdate(database, executionContext, prepareEngine, updateCallback,
addCallback, replayCallback));
+ database, executionContext, () -> doExecuteUpdate(database,
executionContext, prepareEngine, updateCallback, addCallback, replayCallback));
}
@SuppressWarnings({"rawtypes", "unchecked"})
- private int doExecuteJDBCUpdate(final ShardingSphereDatabase database,
final ExecutionContext executionContext, final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
- final StatementExecuteUpdateCallback
updateCallback, final StatementAddCallback addCallback, final
StatementReplayCallback replayCallback) throws SQLException {
+ private int doExecuteUpdate(final ShardingSphereDatabase database, final
ExecutionContext executionContext, final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
+ final StatementExecuteUpdateCallback
updateCallback, final StatementAddCallback addCallback, final
StatementReplayCallback replayCallback) throws SQLException {
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
prepareEngine.prepare(
executionContext.getRouteContext(),
executionContext.getExecutionUnits(), new
ExecutionGroupReportContext(processId, database.getName(), grantee));
for (ExecutionGroup<JDBCExecutionUnit> each :
executionGroupContext.getInputGroups()) {
@@ -173,22 +153,4 @@ public final class DriverPushDownExecuteUpdateExecutor {
}
return result;
}
-
- private int accumulate(final Collection<ExecuteResult> results) {
- int result = 0;
- for (ExecuteResult each : results) {
- result += ((UpdateResult) each).getUpdateCount();
- }
- return result;
- }
-
- private int executeRawUpdate(final ShardingSphereDatabase database, final
ExecutionContext executionContext) throws SQLException {
- return
accumulate(rawExecutor.execute(createRawExecutionGroupContext(database,
executionContext), executionContext.getQueryContext(), new
RawSQLExecutorCallback()));
- }
-
- private ExecutionGroupContext<RawSQLExecutionUnit>
createRawExecutionGroupContext(final ShardingSphereDatabase database, final
ExecutionContext executionContext) throws SQLException {
- int maxConnectionsSizePerQuery =
props.<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
- return new RawExecutionPrepareEngine(maxConnectionsSizePerQuery,
database.getRuleMetaData().getRules()).prepare(
- executionContext.getRouteContext(),
executionContext.getExecutionUnits(), new
ExecutionGroupReportContext(processId, database.getName(), grantee));
- }
}
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/raw/DriverRawPushDownExecuteExecutor.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/raw/DriverRawPushDownExecuteExecutor.java
new file mode 100644
index 00000000000..8f707b7e438
--- /dev/null
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/raw/DriverRawPushDownExecuteExecutor.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.driver.executor.engine.pushdown.raw;
+
+import lombok.RequiredArgsConstructor;
+import
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
+import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
+import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
+import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
+import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
+import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
+import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
+import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
+import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
+import
org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
+import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.metadata.user.Grantee;
+
+import java.sql.SQLException;
+
+/**
+ * Driver raw push down execute executor.
+ */
+@RequiredArgsConstructor
+public final class DriverRawPushDownExecuteExecutor {
+
+ private final ShardingSphereConnection connection;
+
+ private final ShardingSphereMetaData metaData;
+
+ private final Grantee grantee;
+
+ private final RawExecutor rawExecutor;
+
+ /**
+ * Execute.
+ *
+ * @param database database
+ * @param executionContext execution context
+ * @return is query result
+ * @throws SQLException SQL exception
+ */
+ public boolean execute(final ShardingSphereDatabase database, final
ExecutionContext executionContext) throws SQLException {
+ int maxConnectionsSizePerQuery =
metaData.getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
+ ExecutionGroupContext<RawSQLExecutionUnit> executionGroupContext = new
RawExecutionPrepareEngine(maxConnectionsSizePerQuery,
database.getRuleMetaData().getRules())
+ .prepare(executionContext.getRouteContext(),
executionContext.getExecutionUnits(), new
ExecutionGroupReportContext(connection.getProcessId(), database.getName(),
grantee));
+ return rawExecutor.execute(executionGroupContext,
executionContext.getQueryContext(), new
RawSQLExecutorCallback()).iterator().next() instanceof QueryResult;
+ }
+}
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/raw/DriverRawPushDownExecuteQueryExecutor.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/raw/DriverRawPushDownExecuteQueryExecutor.java
new file mode 100644
index 00000000000..7f4f776f111
--- /dev/null
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/raw/DriverRawPushDownExecuteQueryExecutor.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.driver.executor.engine.pushdown.raw;
+
+import
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
+import
org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResultSet;
+import
org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResultSetFactory;
+import
org.apache.shardingsphere.infra.binder.context.statement.dml.SelectStatementContext;
+import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
+import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
+import org.apache.shardingsphere.infra.connection.kernel.KernelProcessor;
+import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
+import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
+import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
+import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
+import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
+import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
+import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
+import
org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
+import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
+import org.apache.shardingsphere.infra.metadata.user.Grantee;
+import org.apache.shardingsphere.infra.session.connection.ConnectionContext;
+import org.apache.shardingsphere.infra.session.query.QueryContext;
+
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Driver raw push down execute query executor.
+ */
+public final class DriverRawPushDownExecuteQueryExecutor {
+
+ private final ConnectionContext connectionContext;
+
+ private final String processId;
+
+ private final RuleMetaData globalRuleMetaData;
+
+ private final ConfigurationProperties props;
+
+ private final Grantee grantee;
+
+ private final RawExecutor rawExecutor;
+
+ public DriverRawPushDownExecuteQueryExecutor(final
ShardingSphereConnection connection, final ShardingSphereMetaData metaData,
final Grantee grantee, final RawExecutor rawExecutor) {
+ connectionContext =
connection.getDatabaseConnectionManager().getConnectionContext();
+ processId = connection.getProcessId();
+ globalRuleMetaData = metaData.getGlobalRuleMetaData();
+ props = metaData.getProps();
+ this.grantee = grantee;
+ this.rawExecutor = rawExecutor;
+ }
+
+ /**
+ * Execute query.
+ *
+ * @param database database
+ * @param queryContext query context
+ * @param statement statement
+ * @param columnLabelAndIndexMap column label and index map
+ * @return result set
+ * @throws SQLException SQL exception
+ */
+ public ShardingSphereResultSet executeQuery(final ShardingSphereDatabase
database, final QueryContext queryContext, final Statement statement,
+ final Map<String, Integer>
columnLabelAndIndexMap) throws SQLException {
+ List<QueryResult> queryResults = getQueryResults(database,
queryContext);
+ boolean isContainsEnhancedTable =
queryContext.getSqlStatementContext() instanceof SelectStatementContext
+ && ((SelectStatementContext)
queryContext.getSqlStatementContext()).isContainsEnhancedTable();
+ return new ShardingSphereResultSetFactory(connectionContext,
globalRuleMetaData, props, Collections.emptyList())
+ .newInstance(database, queryContext, queryResults, statement,
columnLabelAndIndexMap, isContainsEnhancedTable);
+ }
+
+ private List<QueryResult> getQueryResults(final ShardingSphereDatabase
database, final QueryContext queryContext) throws SQLException {
+ ExecutionContext executionContext = new
KernelProcessor().generateExecutionContext(queryContext, database,
globalRuleMetaData, props, connectionContext);
+ return rawExecutor.execute(
+ createRawExecutionGroupContext(database, executionContext),
queryContext, new
RawSQLExecutorCallback()).stream().map(QueryResult.class::cast).collect(Collectors.toList());
+ }
+
+ private ExecutionGroupContext<RawSQLExecutionUnit>
createRawExecutionGroupContext(final ShardingSphereDatabase database, final
ExecutionContext executionContext) throws SQLException {
+ int maxConnectionsSizePerQuery =
props.<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
+ return new RawExecutionPrepareEngine(maxConnectionsSizePerQuery,
database.getRuleMetaData().getRules()).prepare(
+ executionContext.getRouteContext(),
executionContext.getExecutionUnits(), new
ExecutionGroupReportContext(processId, database.getName(), grantee));
+ }
+}
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/raw/DriverRawPushDownExecuteUpdateExecutor.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/raw/DriverRawPushDownExecuteUpdateExecutor.java
new file mode 100644
index 00000000000..df84ae3f51a
--- /dev/null
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/raw/DriverRawPushDownExecuteUpdateExecutor.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.driver.executor.engine.pushdown.raw;
+
+import
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
+import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
+import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
+import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
+import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
+import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
+import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
+import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
+import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
+import
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
+import
org.apache.shardingsphere.infra.executor.sql.execute.result.update.UpdateResult;
+import
org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
+import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.metadata.user.Grantee;
+
+import java.sql.SQLException;
+import java.util.Collection;
+
+/**
+ * Driver raw push down execute update executor.
+ */
+public final class DriverRawPushDownExecuteUpdateExecutor {
+
+ private final String processId;
+
+ private final ConfigurationProperties props;
+
+ private final Grantee grantee;
+
+ private final RawExecutor rawExecutor;
+
+ public DriverRawPushDownExecuteUpdateExecutor(final
ShardingSphereConnection connection, final ShardingSphereMetaData metaData,
final Grantee grantee, final RawExecutor rawExecutor) {
+ processId = connection.getProcessId();
+ props = metaData.getProps();
+ this.grantee = grantee;
+ this.rawExecutor = rawExecutor;
+ }
+
+ /**
+ * Execute update.
+ *
+ * @param database database
+ * @param executionContext execution context
+ * @return updated row count
+ * @throws SQLException SQL exception
+ */
+ public int executeUpdate(final ShardingSphereDatabase database, final
ExecutionContext executionContext) throws SQLException {
+ return
accumulate(rawExecutor.execute(createRawExecutionGroupContext(database,
executionContext), executionContext.getQueryContext(), new
RawSQLExecutorCallback()));
+ }
+
+ private int accumulate(final Collection<ExecuteResult> results) {
+ int result = 0;
+ for (ExecuteResult each : results) {
+ result += ((UpdateResult) each).getUpdateCount();
+ }
+ return result;
+ }
+
+ private ExecutionGroupContext<RawSQLExecutionUnit>
createRawExecutionGroupContext(final ShardingSphereDatabase database, final
ExecutionContext executionContext) throws SQLException {
+ int maxConnectionsSizePerQuery =
props.<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
+ return new RawExecutionPrepareEngine(maxConnectionsSizePerQuery,
database.getRuleMetaData().getRules()).prepare(
+ executionContext.getRouteContext(),
executionContext.getExecutionUnits(), new
ExecutionGroupReportContext(processId, database.getName(), grantee));
+ }
+}