This is an automated email from the ASF dual-hosted git repository.

menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new b5c8a8dbbb5 Merge DriverJDBCExecutor into DriverExecutorFacade (#31636)
b5c8a8dbbb5 is described below

commit b5c8a8dbbb59c2d2f0da6dfd044b90bf974ac4ef
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Jun 8 17:28:00 2024 +0800

    Merge DriverJDBCExecutor into DriverExecutorFacade (#31636)
---
 .../executor/engine/DriverExecuteExecutor.java     |  28 +++-
 .../engine/DriverExecuteQueryExecutor.java         |  17 ++-
 .../engine/DriverExecuteUpdateExecutor.java        |  68 +++++++--
 .../executor/engine/DriverExecutorFacade.java      |   7 +-
 .../driver/executor/engine/DriverJDBCExecutor.java | 157 ---------------------
 5 files changed, 102 insertions(+), 175 deletions(-)

diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteExecutor.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteExecutor.java
index d1e4f185f9a..b019c9b9067 100644
--- 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteExecutor.java
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteExecutor.java
@@ -37,6 +37,7 @@ import 
org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
@@ -46,12 +47,15 @@ import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryRe
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
+import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
 import org.apache.shardingsphere.infra.metadata.user.Grantee;
+import org.apache.shardingsphere.infra.route.context.RouteUnit;
 import 
org.apache.shardingsphere.infra.rule.attribute.raw.RawExecutionRuleAttribute;
 import org.apache.shardingsphere.infra.session.query.QueryContext;
+import org.apache.shardingsphere.mode.metadata.refresher.MetaDataRefreshEngine;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 import 
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.DMLStatement;
 import 
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.SelectStatement;
@@ -84,7 +88,7 @@ public final class DriverExecuteExecutor {
     
     private final ShardingSphereMetaData metaData;
     
-    private final DriverJDBCExecutor regularExecutor;
+    private final JDBCExecutor jdbcExecutor;
     
     private final RawExecutor rawExecutor;
     
@@ -213,7 +217,27 @@ public final class DriverExecuteExecutor {
         }
         replayCallback.replay();
         JDBCExecutorCallback<Boolean> jdbcExecutorCallback = 
createExecuteCallback(database, callback, 
executionContext.getSqlStatementContext().getSqlStatement(), 
prepareEngine.getType());
-        return regularExecutor.execute(executionGroupContext, 
executionContext.getQueryContext(), 
executionContext.getRouteContext().getRouteUnits(), jdbcExecutorCallback);
+        return useDriverToExecute(database, executionGroupContext, 
executionContext.getQueryContext(), 
executionContext.getRouteContext().getRouteUnits(), jdbcExecutorCallback);
+    }
+    
+    private boolean useDriverToExecute(final ShardingSphereDatabase database, 
final ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext, final 
QueryContext queryContext,
+                                       final Collection<RouteUnit> routeUnits, 
final JDBCExecutorCallback<Boolean> callback) throws SQLException {
+        ProcessEngine processEngine = new ProcessEngine();
+        try {
+            processEngine.executeSQL(executionGroupContext, queryContext);
+            List<Boolean> results = doExecute(database, executionGroupContext, 
queryContext, routeUnits, callback);
+            return null != results && !results.isEmpty() && null != 
results.get(0) && results.get(0);
+        } finally {
+            
processEngine.completeSQLExecution(executionGroupContext.getReportContext().getProcessId());
+        }
+    }
+    
+    private <T> List<T> doExecute(final ShardingSphereDatabase database, final 
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext,
+                                  final QueryContext queryContext, final 
Collection<RouteUnit> routeUnits, final JDBCExecutorCallback<T> callback) 
throws SQLException {
+        List<T> results = jdbcExecutor.execute(executionGroupContext, 
callback);
+        new MetaDataRefreshEngine(
+                
connection.getContextManager().getPersistServiceFacade().getMetaDataManagerPersistService(),
 database, metaData.getProps()).refresh(queryContext.getSqlStatementContext(), 
routeUnits);
+        return results;
     }
     
     private JDBCExecutorCallback<Boolean> createExecuteCallback(final 
ShardingSphereDatabase database,
diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteQueryExecutor.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteQueryExecutor.java
index 46c0222055c..50ba479ab36 100644
--- 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteQueryExecutor.java
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteQueryExecutor.java
@@ -37,6 +37,7 @@ import 
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupRepor
 import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
@@ -44,6 +45,7 @@ import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryRe
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
+import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
 import org.apache.shardingsphere.infra.merge.MergeEngine;
 import org.apache.shardingsphere.infra.merge.result.MergedResult;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
@@ -82,7 +84,7 @@ public final class DriverExecuteQueryExecutor {
     
     private final ShardingSphereMetaData metaData;
     
-    private final DriverJDBCExecutor regularExecutor;
+    private final JDBCExecutor jdbcExecutor;
     
     private final RawExecutor rawExecutor;
     
@@ -159,7 +161,18 @@ public final class DriverExecuteQueryExecutor {
             addCallback.add(statements, 
JDBCDriverType.PREPARED_STATEMENT.equals(prepareEngine.getType()) ? 
getParameterSets(each) : Collections.emptyList());
         }
         replayCallback.replay();
-        return regularExecutor.executeQuery(executionGroupContext, 
queryContext, getExecuteQueryCallback(database, queryContext, 
prepareEngine.getType()));
+        return executePushDownQuery(executionGroupContext, queryContext, 
getExecuteQueryCallback(database, queryContext, prepareEngine.getType()));
+    }
+    
+    private List<QueryResult> executePushDownQuery(final 
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext,
+                                                   final QueryContext 
queryContext, final ExecuteQueryCallback callback) throws SQLException {
+        ProcessEngine processEngine = new ProcessEngine();
+        try {
+            processEngine.executeSQL(executionGroupContext, queryContext);
+            return jdbcExecutor.execute(executionGroupContext, callback);
+        } finally {
+            
processEngine.completeSQLExecution(executionGroupContext.getReportContext().getProcessId());
+        }
     }
     
     private boolean hasRawExecutionRule(final ShardingSphereDatabase database) 
{
diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteUpdateExecutor.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteUpdateExecutor.java
index 6fbe92db58f..54f3332af9c 100644
--- 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteUpdateExecutor.java
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteUpdateExecutor.java
@@ -23,6 +23,7 @@ import 
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConne
 import 
org.apache.shardingsphere.driver.executor.callback.add.StatementAddCallback;
 import 
org.apache.shardingsphere.driver.executor.callback.replay.StatementReplayCallback;
 import 
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
+import org.apache.shardingsphere.infra.binder.context.type.TableAvailable;
 import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
 import org.apache.shardingsphere.infra.connection.kernel.KernelProcessor;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
@@ -35,6 +36,7 @@ import 
org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
@@ -44,11 +46,16 @@ import 
org.apache.shardingsphere.infra.executor.sql.execute.result.update.Update
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
+import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import org.apache.shardingsphere.infra.metadata.user.Grantee;
+import org.apache.shardingsphere.infra.route.context.RouteUnit;
+import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
+import 
org.apache.shardingsphere.infra.rule.attribute.datanode.DataNodeRuleAttribute;
 import 
org.apache.shardingsphere.infra.rule.attribute.raw.RawExecutionRuleAttribute;
 import org.apache.shardingsphere.infra.session.query.QueryContext;
+import org.apache.shardingsphere.mode.metadata.refresher.MetaDataRefreshEngine;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 import 
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.DMLStatement;
 import 
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.SelectStatement;
@@ -78,7 +85,7 @@ public final class DriverExecuteUpdateExecutor {
     
     private final ShardingSphereMetaData metaData;
     
-    private final DriverJDBCExecutor regularExecutor;
+    private final JDBCExecutor jdbcExecutor;
     
     private final RawExecutor rawExecutor;
     
@@ -179,7 +186,56 @@ public final class DriverExecuteUpdateExecutor {
         }
         replayCallback.replay();
         JDBCExecutorCallback<Integer> callback = 
createExecuteUpdateCallback(database, updateCallback, sqlStatementContext, 
prepareEngine.getType());
-        return regularExecutor.executeUpdate(executionGroupContext, 
executionContext.getQueryContext(), 
executionContext.getRouteContext().getRouteUnits(), callback);
+        return useDriverToExecuteUpdate(database, executionGroupContext, 
executionContext.getQueryContext(), 
executionContext.getRouteContext().getRouteUnits(), callback);
+    }
+    
+    private int useDriverToExecuteUpdate(final ShardingSphereDatabase 
database, final ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext,
+                                         final QueryContext queryContext, 
final Collection<RouteUnit> routeUnits, final JDBCExecutorCallback<Integer> 
callback) throws SQLException {
+        ProcessEngine processEngine = new ProcessEngine();
+        try {
+            processEngine.executeSQL(executionGroupContext, queryContext);
+            List<Integer> results = doExecute(database, executionGroupContext, 
queryContext, routeUnits, callback);
+            return isNeedAccumulate(database.getRuleMetaData().getRules(), 
queryContext.getSqlStatementContext()) ? accumulate(results) : results.get(0);
+        } finally {
+            
processEngine.completeSQLExecution(executionGroupContext.getReportContext().getProcessId());
+        }
+    }
+    
+    private boolean isNeedAccumulate(final Collection<ShardingSphereRule> 
rules, final SQLStatementContext sqlStatementContext) {
+        if (!(sqlStatementContext instanceof TableAvailable)) {
+            return false;
+        }
+        for (ShardingSphereRule each : rules) {
+            Optional<DataNodeRuleAttribute> ruleAttribute = 
each.getAttributes().findAttribute(DataNodeRuleAttribute.class);
+            if (ruleAttribute.isPresent() && 
ruleAttribute.get().isNeedAccumulate(((TableAvailable) 
sqlStatementContext).getTablesContext().getTableNames())) {
+                return true;
+            }
+        }
+        return false;
+    }
+    
+    private int accumulate(final List<Integer> updateResults) {
+        int result = 0;
+        for (Integer each : updateResults) {
+            result += null == each ? 0 : each;
+        }
+        return result;
+    }
+    
+    private int accumulate(final Collection<ExecuteResult> results) {
+        int result = 0;
+        for (ExecuteResult each : results) {
+            result += ((UpdateResult) each).getUpdateCount();
+        }
+        return result;
+    }
+    
+    private <T> List<T> doExecute(final ShardingSphereDatabase database, final 
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext, final 
QueryContext queryContext,
+                                  final Collection<RouteUnit> routeUnits, 
final JDBCExecutorCallback<T> callback) throws SQLException {
+        List<T> results = jdbcExecutor.execute(executionGroupContext, 
callback);
+        new MetaDataRefreshEngine(
+                
connection.getContextManager().getPersistServiceFacade().getMetaDataManagerPersistService(),
 database, metaData.getProps()).refresh(queryContext.getSqlStatementContext(), 
routeUnits);
+        return results;
     }
     
     private ExecutionGroupContext<JDBCExecutionUnit> 
createExecutionGroupContext(final ShardingSphereDatabase database, final 
ExecutionContext executionContext,
@@ -205,14 +261,6 @@ public final class DriverExecuteUpdateExecutor {
         };
     }
     
-    private int accumulate(final Collection<ExecuteResult> results) {
-        int result = 0;
-        for (ExecuteResult each : results) {
-            result += ((UpdateResult) each).getUpdateCount();
-        }
-        return result;
-    }
-    
     private boolean isNeedImplicitCommitTransaction(final 
ShardingSphereConnection connection, final SQLStatement sqlStatement, final 
boolean multiExecutionUnits) {
         if (!connection.getAutoCommit()) {
             return false;
diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecutorFacade.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecutorFacade.java
index 627ca9f1439..13f17d82c00 100644
--- 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecutorFacade.java
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecutorFacade.java
@@ -72,15 +72,14 @@ public final class DriverExecutorFacade implements 
AutoCloseable {
         this.statementOption = statementOption;
         this.statementManager = statementManager;
         this.jdbcDriverType = jdbcDriverType;
-        DriverJDBCExecutor regularExecutor = new 
DriverJDBCExecutor(connection.getDatabaseName(), 
connection.getContextManager(), jdbcExecutor);
         RawExecutor rawExecutor = new 
RawExecutor(connection.getContextManager().getExecutorEngine(), 
connection.getDatabaseConnectionManager().getConnectionContext());
         trafficExecutor = new TrafficExecutor();
         ShardingSphereMetaData metaData = 
connection.getContextManager().getMetaDataContexts().getMetaData();
         String schemaName = new 
DatabaseTypeRegistry(metaData.getDatabase(connection.getDatabaseName()).getProtocolType()).getDefaultSchemaName(connection.getDatabaseName());
         sqlFederationEngine = new 
SQLFederationEngine(connection.getDatabaseName(), schemaName, metaData, 
connection.getContextManager().getMetaDataContexts().getStatistics(), 
jdbcExecutor);
-        queryExecutor = new DriverExecuteQueryExecutor(connection, metaData, 
regularExecutor, rawExecutor, trafficExecutor, sqlFederationEngine);
-        updateExecutor = new DriverExecuteUpdateExecutor(connection, metaData, 
regularExecutor, rawExecutor, trafficExecutor);
-        executeExecutor = new DriverExecuteExecutor(connection, metaData, 
regularExecutor, rawExecutor, trafficExecutor, sqlFederationEngine);
+        queryExecutor = new DriverExecuteQueryExecutor(connection, metaData, 
jdbcExecutor, rawExecutor, trafficExecutor, sqlFederationEngine);
+        updateExecutor = new DriverExecuteUpdateExecutor(connection, metaData, 
jdbcExecutor, rawExecutor, trafficExecutor);
+        executeExecutor = new DriverExecuteExecutor(connection, metaData, 
jdbcExecutor, rawExecutor, trafficExecutor, sqlFederationEngine);
     }
     
     /**
diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverJDBCExecutor.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverJDBCExecutor.java
deleted file mode 100644
index cf0e791d03a..00000000000
--- 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverJDBCExecutor.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.driver.executor.engine;
-
-import 
org.apache.shardingsphere.driver.executor.callback.execute.ExecuteQueryCallback;
-import 
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
-import org.apache.shardingsphere.infra.binder.context.type.TableAvailable;
-import 
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
-import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
-import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
-import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
-import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
-import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
-import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
-import org.apache.shardingsphere.infra.route.context.RouteUnit;
-import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
-import 
org.apache.shardingsphere.infra.rule.attribute.datanode.DataNodeRuleAttribute;
-import org.apache.shardingsphere.infra.session.query.QueryContext;
-import org.apache.shardingsphere.mode.manager.ContextManager;
-import org.apache.shardingsphere.mode.metadata.refresher.MetaDataRefreshEngine;
-import 
org.apache.shardingsphere.mode.service.persist.MetaDataManagerPersistService;
-
-import java.sql.SQLException;
-import java.util.Collection;
-import java.util.List;
-import java.util.Optional;
-
-/**
- * Driver JDBC executor.
- */
-public final class DriverJDBCExecutor {
-    
-    private final String databaseName;
-    
-    private final ShardingSphereMetaData metaData;
-    
-    private final MetaDataManagerPersistService persistService;
-    
-    private final JDBCExecutor jdbcExecutor;
-    
-    private final ProcessEngine processEngine;
-    
-    public DriverJDBCExecutor(final String databaseName, final ContextManager 
contextManager, final JDBCExecutor jdbcExecutor) {
-        this.databaseName = databaseName;
-        metaData = contextManager.getMetaDataContexts().getMetaData();
-        persistService = 
contextManager.getPersistServiceFacade().getMetaDataManagerPersistService();
-        this.jdbcExecutor = jdbcExecutor;
-        processEngine = new ProcessEngine();
-    }
-    
-    /**
-     * Execute query.
-     *
-     * @param executionGroupContext execution group context
-     * @param queryContext query context
-     * @param callback execute query callback
-     * @return query results
-     * @throws SQLException SQL exception
-     */
-    public List<QueryResult> executeQuery(final 
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext,
-                                          final QueryContext queryContext, 
final ExecuteQueryCallback callback) throws SQLException {
-        try {
-            processEngine.executeSQL(executionGroupContext, queryContext);
-            return jdbcExecutor.execute(executionGroupContext, callback);
-        } finally {
-            
processEngine.completeSQLExecution(executionGroupContext.getReportContext().getProcessId());
-        }
-    }
-    
-    /**
-     * Execute update.
-     *
-     * @param executionGroupContext execution group context
-     * @param queryContext query context
-     * @param routeUnits route units
-     * @param callback JDBC executor callback
-     * @return effected records count
-     * @throws SQLException SQL exception
-     */
-    public int executeUpdate(final ExecutionGroupContext<JDBCExecutionUnit> 
executionGroupContext,
-                             final QueryContext queryContext, final 
Collection<RouteUnit> routeUnits, final JDBCExecutorCallback<Integer> callback) 
throws SQLException {
-        try {
-            processEngine.executeSQL(executionGroupContext, queryContext);
-            List<Integer> results = doExecute(executionGroupContext, 
queryContext, routeUnits, callback);
-            return 
isNeedAccumulate(metaData.getDatabase(queryContext.getDatabaseNameFromSQLStatement().orElse(databaseName)).getRuleMetaData().getRules(),
 queryContext.getSqlStatementContext())
-                    ? accumulate(results)
-                    : results.get(0);
-        } finally {
-            
processEngine.completeSQLExecution(executionGroupContext.getReportContext().getProcessId());
-        }
-    }
-    
-    private boolean isNeedAccumulate(final Collection<ShardingSphereRule> 
rules, final SQLStatementContext sqlStatementContext) {
-        if (!(sqlStatementContext instanceof TableAvailable)) {
-            return false;
-        }
-        for (ShardingSphereRule each : rules) {
-            Optional<DataNodeRuleAttribute> ruleAttribute = 
each.getAttributes().findAttribute(DataNodeRuleAttribute.class);
-            if (ruleAttribute.isPresent() && 
ruleAttribute.get().isNeedAccumulate(((TableAvailable) 
sqlStatementContext).getTablesContext().getTableNames())) {
-                return true;
-            }
-        }
-        return false;
-    }
-    
-    private int accumulate(final List<Integer> updateResults) {
-        int result = 0;
-        for (Integer each : updateResults) {
-            result += null == each ? 0 : each;
-        }
-        return result;
-    }
-    
-    /**
-     * Execute SQL.
-     *
-     * @param executionGroupContext execution group context
-     * @param queryContext query context
-     * @param routeUnits route units
-     * @param callback JDBC executor callback
-     * @return return true if is DQL, false if is DML
-     * @throws SQLException SQL exception
-     */
-    public boolean execute(final ExecutionGroupContext<JDBCExecutionUnit> 
executionGroupContext, final QueryContext queryContext,
-                           final Collection<RouteUnit> routeUnits, final 
JDBCExecutorCallback<Boolean> callback) throws SQLException {
-        try {
-            processEngine.executeSQL(executionGroupContext, queryContext);
-            List<Boolean> results = doExecute(executionGroupContext, 
queryContext, routeUnits, callback);
-            return null != results && !results.isEmpty() && null != 
results.get(0) && results.get(0);
-        } finally {
-            
processEngine.completeSQLExecution(executionGroupContext.getReportContext().getProcessId());
-        }
-    }
-    
-    private <T> List<T> doExecute(final 
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext, final 
QueryContext queryContext, final Collection<RouteUnit> routeUnits,
-                                  final JDBCExecutorCallback<T> callback) 
throws SQLException {
-        List<T> results = jdbcExecutor.execute(executionGroupContext, 
callback);
-        new MetaDataRefreshEngine(persistService,
-                
metaData.getDatabase(queryContext.getDatabaseNameFromSQLStatement().orElse(databaseName)),
 metaData.getProps()).refresh(queryContext.getSqlStatementContext(), 
routeUnits);
-        return results;
-    }
-}

Reply via email to