This is an automated email from the ASF dual-hosted git repository.
menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new b5c8a8dbbb5 Merge DriverJDBCExecutor into DriverExecutorFacade (#31636)
b5c8a8dbbb5 is described below
commit b5c8a8dbbb59c2d2f0da6dfd044b90bf974ac4ef
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Jun 8 17:28:00 2024 +0800
Merge DriverJDBCExecutor into DriverExecutorFacade (#31636)
---
.../executor/engine/DriverExecuteExecutor.java | 28 +++-
.../engine/DriverExecuteQueryExecutor.java | 17 ++-
.../engine/DriverExecuteUpdateExecutor.java | 68 +++++++--
.../executor/engine/DriverExecutorFacade.java | 7 +-
.../driver/executor/engine/DriverJDBCExecutor.java | 157 ---------------------
5 files changed, 102 insertions(+), 175 deletions(-)
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteExecutor.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteExecutor.java
index d1e4f185f9a..b019c9b9067 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteExecutor.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteExecutor.java
@@ -37,6 +37,7 @@ import
org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
+import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
@@ -46,12 +47,15 @@ import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryRe
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
import
org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
+import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
import org.apache.shardingsphere.infra.metadata.user.Grantee;
+import org.apache.shardingsphere.infra.route.context.RouteUnit;
import
org.apache.shardingsphere.infra.rule.attribute.raw.RawExecutionRuleAttribute;
import org.apache.shardingsphere.infra.session.query.QueryContext;
+import org.apache.shardingsphere.mode.metadata.refresher.MetaDataRefreshEngine;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.DMLStatement;
import
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.SelectStatement;
@@ -84,7 +88,7 @@ public final class DriverExecuteExecutor {
private final ShardingSphereMetaData metaData;
- private final DriverJDBCExecutor regularExecutor;
+ private final JDBCExecutor jdbcExecutor;
private final RawExecutor rawExecutor;
@@ -213,7 +217,27 @@ public final class DriverExecuteExecutor {
}
replayCallback.replay();
JDBCExecutorCallback<Boolean> jdbcExecutorCallback =
createExecuteCallback(database, callback,
executionContext.getSqlStatementContext().getSqlStatement(),
prepareEngine.getType());
- return regularExecutor.execute(executionGroupContext,
executionContext.getQueryContext(),
executionContext.getRouteContext().getRouteUnits(), jdbcExecutorCallback);
+ return useDriverToExecute(database, executionGroupContext,
executionContext.getQueryContext(),
executionContext.getRouteContext().getRouteUnits(), jdbcExecutorCallback);
+ }
+
+ private boolean useDriverToExecute(final ShardingSphereDatabase database,
final ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext, final
QueryContext queryContext,
+ final Collection<RouteUnit> routeUnits,
final JDBCExecutorCallback<Boolean> callback) throws SQLException {
+ ProcessEngine processEngine = new ProcessEngine();
+ try {
+ processEngine.executeSQL(executionGroupContext, queryContext);
+ List<Boolean> results = doExecute(database, executionGroupContext,
queryContext, routeUnits, callback);
+ return null != results && !results.isEmpty() && null !=
results.get(0) && results.get(0);
+ } finally {
+
processEngine.completeSQLExecution(executionGroupContext.getReportContext().getProcessId());
+ }
+ }
+
+ private <T> List<T> doExecute(final ShardingSphereDatabase database, final
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext,
+ final QueryContext queryContext, final
Collection<RouteUnit> routeUnits, final JDBCExecutorCallback<T> callback)
throws SQLException {
+ List<T> results = jdbcExecutor.execute(executionGroupContext,
callback);
+ new MetaDataRefreshEngine(
+
connection.getContextManager().getPersistServiceFacade().getMetaDataManagerPersistService(),
database, metaData.getProps()).refresh(queryContext.getSqlStatementContext(),
routeUnits);
+ return results;
}
private JDBCExecutorCallback<Boolean> createExecuteCallback(final
ShardingSphereDatabase database,
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteQueryExecutor.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteQueryExecutor.java
index 46c0222055c..50ba479ab36 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteQueryExecutor.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteQueryExecutor.java
@@ -37,6 +37,7 @@ import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupRepor
import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
+import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
@@ -44,6 +45,7 @@ import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryRe
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
import
org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
+import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
import org.apache.shardingsphere.infra.merge.MergeEngine;
import org.apache.shardingsphere.infra.merge.result.MergedResult;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
@@ -82,7 +84,7 @@ public final class DriverExecuteQueryExecutor {
private final ShardingSphereMetaData metaData;
- private final DriverJDBCExecutor regularExecutor;
+ private final JDBCExecutor jdbcExecutor;
private final RawExecutor rawExecutor;
@@ -159,7 +161,18 @@ public final class DriverExecuteQueryExecutor {
addCallback.add(statements,
JDBCDriverType.PREPARED_STATEMENT.equals(prepareEngine.getType()) ?
getParameterSets(each) : Collections.emptyList());
}
replayCallback.replay();
- return regularExecutor.executeQuery(executionGroupContext,
queryContext, getExecuteQueryCallback(database, queryContext,
prepareEngine.getType()));
+ return executePushDownQuery(executionGroupContext, queryContext,
getExecuteQueryCallback(database, queryContext, prepareEngine.getType()));
+ }
+
+ private List<QueryResult> executePushDownQuery(final
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext,
+ final QueryContext
queryContext, final ExecuteQueryCallback callback) throws SQLException {
+ ProcessEngine processEngine = new ProcessEngine();
+ try {
+ processEngine.executeSQL(executionGroupContext, queryContext);
+ return jdbcExecutor.execute(executionGroupContext, callback);
+ } finally {
+
processEngine.completeSQLExecution(executionGroupContext.getReportContext().getProcessId());
+ }
}
private boolean hasRawExecutionRule(final ShardingSphereDatabase database)
{
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteUpdateExecutor.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteUpdateExecutor.java
index 6fbe92db58f..54f3332af9c 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteUpdateExecutor.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteUpdateExecutor.java
@@ -23,6 +23,7 @@ import
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConne
import
org.apache.shardingsphere.driver.executor.callback.add.StatementAddCallback;
import
org.apache.shardingsphere.driver.executor.callback.replay.StatementReplayCallback;
import
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
+import org.apache.shardingsphere.infra.binder.context.type.TableAvailable;
import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
import org.apache.shardingsphere.infra.connection.kernel.KernelProcessor;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
@@ -35,6 +36,7 @@ import
org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
+import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
@@ -44,11 +46,16 @@ import
org.apache.shardingsphere.infra.executor.sql.execute.result.update.Update
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
import
org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
+import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.user.Grantee;
+import org.apache.shardingsphere.infra.route.context.RouteUnit;
+import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
+import
org.apache.shardingsphere.infra.rule.attribute.datanode.DataNodeRuleAttribute;
import
org.apache.shardingsphere.infra.rule.attribute.raw.RawExecutionRuleAttribute;
import org.apache.shardingsphere.infra.session.query.QueryContext;
+import org.apache.shardingsphere.mode.metadata.refresher.MetaDataRefreshEngine;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.DMLStatement;
import
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.SelectStatement;
@@ -78,7 +85,7 @@ public final class DriverExecuteUpdateExecutor {
private final ShardingSphereMetaData metaData;
- private final DriverJDBCExecutor regularExecutor;
+ private final JDBCExecutor jdbcExecutor;
private final RawExecutor rawExecutor;
@@ -179,7 +186,56 @@ public final class DriverExecuteUpdateExecutor {
}
replayCallback.replay();
JDBCExecutorCallback<Integer> callback =
createExecuteUpdateCallback(database, updateCallback, sqlStatementContext,
prepareEngine.getType());
- return regularExecutor.executeUpdate(executionGroupContext,
executionContext.getQueryContext(),
executionContext.getRouteContext().getRouteUnits(), callback);
+ return useDriverToExecuteUpdate(database, executionGroupContext,
executionContext.getQueryContext(),
executionContext.getRouteContext().getRouteUnits(), callback);
+ }
+
+ private int useDriverToExecuteUpdate(final ShardingSphereDatabase
database, final ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext,
+ final QueryContext queryContext,
final Collection<RouteUnit> routeUnits, final JDBCExecutorCallback<Integer>
callback) throws SQLException {
+ ProcessEngine processEngine = new ProcessEngine();
+ try {
+ processEngine.executeSQL(executionGroupContext, queryContext);
+ List<Integer> results = doExecute(database, executionGroupContext,
queryContext, routeUnits, callback);
+ return isNeedAccumulate(database.getRuleMetaData().getRules(),
queryContext.getSqlStatementContext()) ? accumulate(results) : results.get(0);
+ } finally {
+
processEngine.completeSQLExecution(executionGroupContext.getReportContext().getProcessId());
+ }
+ }
+
+ private boolean isNeedAccumulate(final Collection<ShardingSphereRule>
rules, final SQLStatementContext sqlStatementContext) {
+ if (!(sqlStatementContext instanceof TableAvailable)) {
+ return false;
+ }
+ for (ShardingSphereRule each : rules) {
+ Optional<DataNodeRuleAttribute> ruleAttribute =
each.getAttributes().findAttribute(DataNodeRuleAttribute.class);
+ if (ruleAttribute.isPresent() &&
ruleAttribute.get().isNeedAccumulate(((TableAvailable)
sqlStatementContext).getTablesContext().getTableNames())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private int accumulate(final List<Integer> updateResults) {
+ int result = 0;
+ for (Integer each : updateResults) {
+ result += null == each ? 0 : each;
+ }
+ return result;
+ }
+
+ private int accumulate(final Collection<ExecuteResult> results) {
+ int result = 0;
+ for (ExecuteResult each : results) {
+ result += ((UpdateResult) each).getUpdateCount();
+ }
+ return result;
+ }
+
+ private <T> List<T> doExecute(final ShardingSphereDatabase database, final
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext, final
QueryContext queryContext,
+ final Collection<RouteUnit> routeUnits,
final JDBCExecutorCallback<T> callback) throws SQLException {
+ List<T> results = jdbcExecutor.execute(executionGroupContext,
callback);
+ new MetaDataRefreshEngine(
+
connection.getContextManager().getPersistServiceFacade().getMetaDataManagerPersistService(),
database, metaData.getProps()).refresh(queryContext.getSqlStatementContext(),
routeUnits);
+ return results;
}
private ExecutionGroupContext<JDBCExecutionUnit>
createExecutionGroupContext(final ShardingSphereDatabase database, final
ExecutionContext executionContext,
@@ -205,14 +261,6 @@ public final class DriverExecuteUpdateExecutor {
};
}
- private int accumulate(final Collection<ExecuteResult> results) {
- int result = 0;
- for (ExecuteResult each : results) {
- result += ((UpdateResult) each).getUpdateCount();
- }
- return result;
- }
-
private boolean isNeedImplicitCommitTransaction(final
ShardingSphereConnection connection, final SQLStatement sqlStatement, final
boolean multiExecutionUnits) {
if (!connection.getAutoCommit()) {
return false;
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecutorFacade.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecutorFacade.java
index 627ca9f1439..13f17d82c00 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecutorFacade.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecutorFacade.java
@@ -72,15 +72,14 @@ public final class DriverExecutorFacade implements
AutoCloseable {
this.statementOption = statementOption;
this.statementManager = statementManager;
this.jdbcDriverType = jdbcDriverType;
- DriverJDBCExecutor regularExecutor = new
DriverJDBCExecutor(connection.getDatabaseName(),
connection.getContextManager(), jdbcExecutor);
RawExecutor rawExecutor = new
RawExecutor(connection.getContextManager().getExecutorEngine(),
connection.getDatabaseConnectionManager().getConnectionContext());
trafficExecutor = new TrafficExecutor();
ShardingSphereMetaData metaData =
connection.getContextManager().getMetaDataContexts().getMetaData();
String schemaName = new
DatabaseTypeRegistry(metaData.getDatabase(connection.getDatabaseName()).getProtocolType()).getDefaultSchemaName(connection.getDatabaseName());
sqlFederationEngine = new
SQLFederationEngine(connection.getDatabaseName(), schemaName, metaData,
connection.getContextManager().getMetaDataContexts().getStatistics(),
jdbcExecutor);
- queryExecutor = new DriverExecuteQueryExecutor(connection, metaData,
regularExecutor, rawExecutor, trafficExecutor, sqlFederationEngine);
- updateExecutor = new DriverExecuteUpdateExecutor(connection, metaData,
regularExecutor, rawExecutor, trafficExecutor);
- executeExecutor = new DriverExecuteExecutor(connection, metaData,
regularExecutor, rawExecutor, trafficExecutor, sqlFederationEngine);
+ queryExecutor = new DriverExecuteQueryExecutor(connection, metaData,
jdbcExecutor, rawExecutor, trafficExecutor, sqlFederationEngine);
+ updateExecutor = new DriverExecuteUpdateExecutor(connection, metaData,
jdbcExecutor, rawExecutor, trafficExecutor);
+ executeExecutor = new DriverExecuteExecutor(connection, metaData,
jdbcExecutor, rawExecutor, trafficExecutor, sqlFederationEngine);
}
/**
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverJDBCExecutor.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverJDBCExecutor.java
deleted file mode 100644
index cf0e791d03a..00000000000
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverJDBCExecutor.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.driver.executor.engine;
-
-import
org.apache.shardingsphere.driver.executor.callback.execute.ExecuteQueryCallback;
-import
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
-import org.apache.shardingsphere.infra.binder.context.type.TableAvailable;
-import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
-import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
-import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
-import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
-import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
-import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
-import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
-import org.apache.shardingsphere.infra.route.context.RouteUnit;
-import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
-import
org.apache.shardingsphere.infra.rule.attribute.datanode.DataNodeRuleAttribute;
-import org.apache.shardingsphere.infra.session.query.QueryContext;
-import org.apache.shardingsphere.mode.manager.ContextManager;
-import org.apache.shardingsphere.mode.metadata.refresher.MetaDataRefreshEngine;
-import
org.apache.shardingsphere.mode.service.persist.MetaDataManagerPersistService;
-
-import java.sql.SQLException;
-import java.util.Collection;
-import java.util.List;
-import java.util.Optional;
-
-/**
- * Driver JDBC executor.
- */
-public final class DriverJDBCExecutor {
-
- private final String databaseName;
-
- private final ShardingSphereMetaData metaData;
-
- private final MetaDataManagerPersistService persistService;
-
- private final JDBCExecutor jdbcExecutor;
-
- private final ProcessEngine processEngine;
-
- public DriverJDBCExecutor(final String databaseName, final ContextManager
contextManager, final JDBCExecutor jdbcExecutor) {
- this.databaseName = databaseName;
- metaData = contextManager.getMetaDataContexts().getMetaData();
- persistService =
contextManager.getPersistServiceFacade().getMetaDataManagerPersistService();
- this.jdbcExecutor = jdbcExecutor;
- processEngine = new ProcessEngine();
- }
-
- /**
- * Execute query.
- *
- * @param executionGroupContext execution group context
- * @param queryContext query context
- * @param callback execute query callback
- * @return query results
- * @throws SQLException SQL exception
- */
- public List<QueryResult> executeQuery(final
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext,
- final QueryContext queryContext,
final ExecuteQueryCallback callback) throws SQLException {
- try {
- processEngine.executeSQL(executionGroupContext, queryContext);
- return jdbcExecutor.execute(executionGroupContext, callback);
- } finally {
-
processEngine.completeSQLExecution(executionGroupContext.getReportContext().getProcessId());
- }
- }
-
- /**
- * Execute update.
- *
- * @param executionGroupContext execution group context
- * @param queryContext query context
- * @param routeUnits route units
- * @param callback JDBC executor callback
- * @return effected records count
- * @throws SQLException SQL exception
- */
- public int executeUpdate(final ExecutionGroupContext<JDBCExecutionUnit>
executionGroupContext,
- final QueryContext queryContext, final
Collection<RouteUnit> routeUnits, final JDBCExecutorCallback<Integer> callback)
throws SQLException {
- try {
- processEngine.executeSQL(executionGroupContext, queryContext);
- List<Integer> results = doExecute(executionGroupContext,
queryContext, routeUnits, callback);
- return
isNeedAccumulate(metaData.getDatabase(queryContext.getDatabaseNameFromSQLStatement().orElse(databaseName)).getRuleMetaData().getRules(),
queryContext.getSqlStatementContext())
- ? accumulate(results)
- : results.get(0);
- } finally {
-
processEngine.completeSQLExecution(executionGroupContext.getReportContext().getProcessId());
- }
- }
-
- private boolean isNeedAccumulate(final Collection<ShardingSphereRule>
rules, final SQLStatementContext sqlStatementContext) {
- if (!(sqlStatementContext instanceof TableAvailable)) {
- return false;
- }
- for (ShardingSphereRule each : rules) {
- Optional<DataNodeRuleAttribute> ruleAttribute =
each.getAttributes().findAttribute(DataNodeRuleAttribute.class);
- if (ruleAttribute.isPresent() &&
ruleAttribute.get().isNeedAccumulate(((TableAvailable)
sqlStatementContext).getTablesContext().getTableNames())) {
- return true;
- }
- }
- return false;
- }
-
- private int accumulate(final List<Integer> updateResults) {
- int result = 0;
- for (Integer each : updateResults) {
- result += null == each ? 0 : each;
- }
- return result;
- }
-
- /**
- * Execute SQL.
- *
- * @param executionGroupContext execution group context
- * @param queryContext query context
- * @param routeUnits route units
- * @param callback JDBC executor callback
- * @return return true if is DQL, false if is DML
- * @throws SQLException SQL exception
- */
- public boolean execute(final ExecutionGroupContext<JDBCExecutionUnit>
executionGroupContext, final QueryContext queryContext,
- final Collection<RouteUnit> routeUnits, final
JDBCExecutorCallback<Boolean> callback) throws SQLException {
- try {
- processEngine.executeSQL(executionGroupContext, queryContext);
- List<Boolean> results = doExecute(executionGroupContext,
queryContext, routeUnits, callback);
- return null != results && !results.isEmpty() && null !=
results.get(0) && results.get(0);
- } finally {
-
processEngine.completeSQLExecution(executionGroupContext.getReportContext().getProcessId());
- }
- }
-
- private <T> List<T> doExecute(final
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext, final
QueryContext queryContext, final Collection<RouteUnit> routeUnits,
- final JDBCExecutorCallback<T> callback)
throws SQLException {
- List<T> results = jdbcExecutor.execute(executionGroupContext,
callback);
- new MetaDataRefreshEngine(persistService,
-
metaData.getDatabase(queryContext.getDatabaseNameFromSQLStatement().orElse(databaseName)),
metaData.getProps()).refresh(queryContext.getSqlStatementContext(),
routeUnits);
- return results;
- }
-}