This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 1118bcde7c2 Move executeUpdate to DriverExecutor (#31452)
1118bcde7c2 is described below

commit 1118bcde7c2d111c3b6da6dc5a2a7c205da1a796
Author: Liang Zhang <[email protected]>
AuthorDate: Fri May 31 08:40:14 2024 +0800

    Move executeUpdate to DriverExecutor (#31452)
    
    * Move executeUpdate to DriverExecutor
    
    * Move executeUpdate to DriverExecutor
---
 .../driver/executor/DriverExecutor.java            | 101 ++++++++++++++++++++-
 .../statement/ShardingSpherePreparedStatement.java |  57 ++----------
 .../core/statement/ShardingSphereStatement.java    |  54 ++---------
 3 files changed, 110 insertions(+), 102 deletions(-)

diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java
index 10878e41d36..0b464cd1da7 100644
--- 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.driver.executor;
 
 import lombok.Getter;
 import org.apache.shardingsphere.driver.executor.callback.ExecuteQueryCallback;
+import 
org.apache.shardingsphere.driver.executor.callback.ExecuteUpdateCallback;
 import 
org.apache.shardingsphere.driver.executor.callback.impl.PreparedStatementExecuteQueryCallback;
 import 
org.apache.shardingsphere.driver.executor.callback.impl.StatementExecuteQueryCallback;
 import 
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
@@ -29,20 +30,26 @@ import 
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementCont
 import 
org.apache.shardingsphere.infra.binder.context.statement.dml.SelectStatementContext;
 import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
 import org.apache.shardingsphere.infra.connection.kernel.KernelProcessor;
+import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
+import 
org.apache.shardingsphere.infra.exception.dialect.SQLExceptionTransformEngine;
 import org.apache.shardingsphere.infra.executor.audit.SQLAuditEngine;
 import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
 import 
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
 import 
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
 import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.result.update.UpdateResult;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
@@ -55,11 +62,13 @@ import 
org.apache.shardingsphere.infra.metadata.user.Grantee;
 import 
org.apache.shardingsphere.infra.rule.attribute.raw.RawExecutionRuleAttribute;
 import org.apache.shardingsphere.infra.session.query.QueryContext;
 import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 import org.apache.shardingsphere.sqlfederation.engine.SQLFederationEngine;
 import 
org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext;
 import org.apache.shardingsphere.traffic.executor.TrafficExecutor;
 import org.apache.shardingsphere.traffic.executor.TrafficExecutorCallback;
 import org.apache.shardingsphere.traffic.rule.TrafficRule;
+import 
org.apache.shardingsphere.transaction.implicit.ImplicitTransactionCallback;
 
 import java.sql.Connection;
 import java.sql.PreparedStatement;
@@ -249,17 +258,99 @@ public final class DriverExecutor implements 
AutoCloseable {
      * @param queryContext query context
      * @param prepareEngine prepare engine
      * @param trafficCallback traffic callback
+     * @param updateCallback update callback
+     * @param isNeedImplicitCommitTransaction is need implicit commit 
transaction
+     * @param statementReplayCallback statement replay callback
+     * @param executionContext execution context
      * @return updated row count
      * @throws SQLException SQL exception
      */
-    public Optional<Integer> executeAdvanceUpdate(final ShardingSphereMetaData 
metaData, final ShardingSphereDatabase database, final QueryContext 
queryContext,
-                                                  final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
-                                                  final 
TrafficExecutorCallback<Integer> trafficCallback) throws SQLException {
+    @SuppressWarnings("rawtypes")
+    public int executeAdvanceUpdate(final ShardingSphereMetaData metaData, 
final ShardingSphereDatabase database, final QueryContext queryContext,
+                                    final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine, 
final TrafficExecutorCallback<Integer> trafficCallback,
+                                    final ExecuteUpdateCallback 
updateCallback, final boolean isNeedImplicitCommitTransaction,
+                                    final StatementReplayCallback 
statementReplayCallback, final ExecutionContext executionContext) throws 
SQLException {
         Optional<String> trafficInstanceId = 
connection.getTrafficInstanceId(metaData.getGlobalRuleMetaData().getSingleRule(TrafficRule.class),
 queryContext);
         if (trafficInstanceId.isPresent()) {
-            return 
Optional.of(trafficExecutor.execute(connection.getProcessId(), 
database.getName(), trafficInstanceId.get(), queryContext, prepareEngine, 
trafficCallback));
+            return trafficExecutor.execute(connection.getProcessId(), 
database.getName(), trafficInstanceId.get(), queryContext, prepareEngine, 
trafficCallback);
         }
-        return Optional.empty();
+        return 
database.getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty()
+                ? executeUpdate(database, updateCallback, 
queryContext.getSqlStatementContext(), executionContext, prepareEngine, 
isNeedImplicitCommitTransaction, statementReplayCallback)
+                : 
accumulate(rawExecutor.execute(createRawExecutionGroupContext(metaData, 
database, executionContext), queryContext, new RawSQLExecutorCallback()));
+    }
+    
+    @SuppressWarnings("rawtypes")
+    private int executeUpdate(final ShardingSphereDatabase database, final 
ExecuteUpdateCallback updateCallback, final SQLStatementContext 
sqlStatementContext, final ExecutionContext executionContext,
+                              final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine, 
final boolean isNeedImplicitCommitTransaction,
+                              final StatementReplayCallback 
statementReplayCallback) throws SQLException {
+        return isNeedImplicitCommitTransaction
+                ? executeWithImplicitCommitTransaction(() -> 
useDriverToExecuteUpdate(
+                        database, updateCallback, sqlStatementContext, 
executionContext, prepareEngine, statementReplayCallback), connection, 
database.getProtocolType())
+                : useDriverToExecuteUpdate(database, updateCallback, 
sqlStatementContext, executionContext, prepareEngine, statementReplayCallback);
+    }
+    
+    private <T> T executeWithImplicitCommitTransaction(final 
ImplicitTransactionCallback<T> callback, final Connection connection, final 
DatabaseType databaseType) throws SQLException {
+        T result;
+        try {
+            connection.setAutoCommit(false);
+            result = callback.execute();
+            connection.commit();
+            // CHECKSTYLE:OFF
+        } catch (final Exception ex) {
+            // CHECKSTYLE:ON
+            connection.rollback();
+            throw SQLExceptionTransformEngine.toSQLException(ex, databaseType);
+        } finally {
+            connection.setAutoCommit(true);
+        }
+        return result;
+    }
+    
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    private int useDriverToExecuteUpdate(final ShardingSphereDatabase 
database, final ExecuteUpdateCallback updateCallback, final SQLStatementContext 
sqlStatementContext,
+                                         final ExecutionContext 
executionContext, final DriverExecutionPrepareEngine<JDBCExecutionUnit, 
Connection> prepareEngine,
+                                         final StatementReplayCallback 
statementReplayCallback) throws SQLException {
+        ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = 
createExecutionGroupContext(database, executionContext, prepareEngine);
+        for (ExecutionGroup<JDBCExecutionUnit> each : 
executionGroupContext.getInputGroups()) {
+            statements.addAll(getStatements(each));
+            if 
(JDBCDriverType.PREPARED_STATEMENT.equals(prepareEngine.getType())) {
+                parameterSets.addAll(getParameterSets(each));
+            }
+        }
+        statementReplayCallback.replay(statements, parameterSets);
+        JDBCExecutorCallback<Integer> callback = 
createExecuteUpdateCallback(database, updateCallback, sqlStatementContext, 
prepareEngine.getType());
+        return regularExecutor.executeUpdate(executionGroupContext, 
executionContext.getQueryContext(), 
executionContext.getRouteContext().getRouteUnits(), callback);
+    }
+    
+    private ExecutionGroupContext<JDBCExecutionUnit> 
createExecutionGroupContext(final ShardingSphereDatabase database, final 
ExecutionContext executionContext,
+                                                                               
  final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> 
prepareEngine) throws SQLException {
+        return prepareEngine.prepare(executionContext.getRouteContext(), 
executionContext.getExecutionUnits(),
+                new ExecutionGroupReportContext(connection.getProcessId(), 
database.getName(), new Grantee("", "")));
+    }
+    
+    private JDBCExecutorCallback<Integer> createExecuteUpdateCallback(final 
ShardingSphereDatabase database,
+                                                                      final 
ExecuteUpdateCallback updateCallback, final SQLStatementContext 
sqlStatementContext, final String jdbcDriverType) {
+        boolean isExceptionThrown = 
SQLExecutorExceptionHandler.isExceptionThrown();
+        return new JDBCExecutorCallback<Integer>(database.getProtocolType(), 
database.getResourceMetaData(), sqlStatementContext.getSqlStatement(), 
isExceptionThrown) {
+            
+            @Override
+            protected Integer executeSQL(final String sql, final Statement 
statement, final ConnectionMode connectionMode, final DatabaseType storageType) 
throws SQLException {
+                return JDBCDriverType.STATEMENT.equals(jdbcDriverType) ? 
updateCallback.executeUpdate(sql, statement) : ((PreparedStatement) 
statement).executeUpdate();
+            }
+            
+            @Override
+            protected Optional<Integer> getSaneResult(final SQLStatement 
sqlStatement, final SQLException ex) {
+                return Optional.empty();
+            }
+        };
+    }
+    
+    private int accumulate(final Collection<ExecuteResult> results) {
+        int result = 0;
+        for (ExecuteResult each : results) {
+            result += ((UpdateResult) each).getUpdateCount();
+        }
+        return result;
     }
     
     /**
diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
index f029ee03fd1..f185ba74bc9 100644
--- 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
@@ -59,7 +59,6 @@ import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.stream.JDBCStreamQueryResult;
-import 
org.apache.shardingsphere.infra.executor.sql.execute.result.update.UpdateResult;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
@@ -277,18 +276,16 @@ public final class ShardingSpherePreparedStatement 
extends AbstractPreparedState
             QueryContext queryContext = createQueryContext();
             
handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement());
             ShardingSphereDatabase database = 
metaDataContexts.getMetaData().getDatabase(databaseName);
-            Optional<Integer> updatedCount = 
executor.executeAdvanceUpdate(metaDataContexts.getMetaData(), database, 
queryContext, createDriverExecutionPrepareEngine(database),
-                    (statement, sql) -> ((PreparedStatement) 
statement).executeUpdate());
-            if (updatedCount.isPresent()) {
-                return updatedCount.get();
-            }
             ExecutionContext executionContext = 
createExecutionContext(queryContext);
-            if (hasRawExecutionRule()) {
-                Collection<ExecuteResult> results =
-                        
executor.getRawExecutor().execute(createRawExecutionGroupContext(executionContext),
 executionContext.getQueryContext(), new RawSQLExecutorCallback());
-                return accumulate(results);
+            boolean isNeedImplicitCommitTransaction = 
isNeedImplicitCommitTransaction(connection, 
sqlStatementContext.getSqlStatement(), 
executionContext.getExecutionUnits().size() > 1);
+            final int result = 
executor.executeAdvanceUpdate(metaDataContexts.getMetaData(), database, 
queryContext, createDriverExecutionPrepareEngine(database),
+                    (statement, sql) -> ((PreparedStatement) 
statement).executeUpdate(), null, isNeedImplicitCommitTransaction, 
(StatementReplayCallback<PreparedStatement>) this::replay,
+                    executionContext);
+            for (Statement each : executor.getStatements()) {
+                statements.add((PreparedStatement) each);
             }
-            return executeUpdateWithExecutionContext(executionContext);
+            parameterSets.addAll(executor.getParameterSets());
+            return result;
             // CHECKSTYLE:OFF
         } catch (final RuntimeException ex) {
             // CHECKSTYLE:ON
@@ -299,38 +296,6 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
         }
     }
     
-    private int useDriverToExecuteUpdate(final ExecutionContext 
executionContext) throws SQLException {
-        ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = 
createExecutionGroupContext(executionContext);
-        cacheStatements(executionGroupContext.getInputGroups());
-        return 
executor.getRegularExecutor().executeUpdate(executionGroupContext,
-                executionContext.getQueryContext(), 
executionContext.getRouteContext().getRouteUnits(), 
createExecuteUpdateCallback());
-    }
-    
-    private int accumulate(final Collection<ExecuteResult> results) {
-        int result = 0;
-        for (ExecuteResult each : results) {
-            result += ((UpdateResult) each).getUpdateCount();
-        }
-        return result;
-    }
-    
-    private JDBCExecutorCallback<Integer> createExecuteUpdateCallback() {
-        boolean isExceptionThrown = 
SQLExecutorExceptionHandler.isExceptionThrown();
-        return new 
JDBCExecutorCallback<Integer>(metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType(),
-                
metaDataContexts.getMetaData().getDatabase(databaseName).getResourceMetaData(), 
sqlStatement, isExceptionThrown) {
-            
-            @Override
-            protected Integer executeSQL(final String sql, final Statement 
statement, final ConnectionMode connectionMode, final DatabaseType storageType) 
throws SQLException {
-                return ((PreparedStatement) statement).executeUpdate();
-            }
-            
-            @Override
-            protected Optional<Integer> getSaneResult(final SQLStatement 
sqlStatement, final SQLException ex) {
-                return Optional.empty();
-            }
-        };
-    }
-    
     @Override
     public boolean execute() throws SQLException {
         try {
@@ -370,12 +335,6 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
                 : useDriverToExecute(executionContext);
     }
     
-    private int executeUpdateWithExecutionContext(final ExecutionContext 
executionContext) throws SQLException {
-        return isNeedImplicitCommitTransaction(connection, 
sqlStatementContext.getSqlStatement(), 
executionContext.getExecutionUnits().size() > 1)
-                ? executeWithImplicitCommitTransaction(() -> 
useDriverToExecuteUpdate(executionContext), connection, 
metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType())
-                : useDriverToExecuteUpdate(executionContext);
-    }
-    
     private boolean useDriverToExecute(final ExecutionContext 
executionContext) throws SQLException {
         ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = 
createExecutionGroupContext(executionContext);
         cacheStatements(executionGroupContext.getInputGroups());
diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
index 1065c068557..3486275a6ef 100644
--- 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
@@ -54,7 +54,6 @@ import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.stream.JDBCStreamQueryResult;
-import 
org.apache.shardingsphere.infra.executor.sql.execute.result.update.UpdateResult;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
@@ -232,13 +231,6 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
         }
     }
     
-    private int executeUpdate(final ExecuteUpdateCallback updateCallback, 
final SQLStatementContext sqlStatementContext, final ExecutionContext 
executionContext) throws SQLException {
-        return isNeedImplicitCommitTransaction(connection, 
sqlStatementContext.getSqlStatement(), 
executionContext.getExecutionUnits().size() > 1)
-                ? executeWithImplicitCommitTransaction(() -> 
useDriverToExecuteUpdate(updateCallback, sqlStatementContext, 
executionContext), connection,
-                        
metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType())
-                : useDriverToExecuteUpdate(updateCallback, 
sqlStatementContext, executionContext);
-    }
-    
     private int executeUpdate0(final String sql, final ExecuteUpdateCallback 
updateCallback, final TrafficExecutorCallback<Integer> trafficCallback) throws 
SQLException {
         QueryContext queryContext = createQueryContext(sql);
         
handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement());
@@ -246,47 +238,13 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
         
connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName);
         ShardingSphereDatabase database = 
metaDataContexts.getMetaData().getDatabase(databaseName);
         sqlStatementContext = queryContext.getSqlStatementContext();
-        Optional<Integer> updatedCount = 
executor.executeAdvanceUpdate(metaDataContexts.getMetaData(), database, 
queryContext, createDriverExecutionPrepareEngine(database), trafficCallback);
-        if (updatedCount.isPresent()) {
-            return updatedCount.get();
-        }
         ExecutionContext executionContext = 
createExecutionContext(queryContext);
-        if 
(!metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty())
 {
-            Collection<ExecuteResult> results = 
executor.getRawExecutor().execute(createRawExecutionGroupContext(executionContext),
 queryContext, new RawSQLExecutorCallback());
-            return accumulate(results);
-        }
-        return executeUpdate(updateCallback, 
queryContext.getSqlStatementContext(), executionContext);
-    }
-    
-    private int useDriverToExecuteUpdate(final ExecuteUpdateCallback 
updateCallback, final SQLStatementContext sqlStatementContext, final 
ExecutionContext executionContext) throws SQLException {
-        ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = 
createExecutionGroupContext(executionContext);
-        cacheStatements(executionGroupContext.getInputGroups());
-        JDBCExecutorCallback<Integer> callback = 
createExecuteUpdateCallback(updateCallback, sqlStatementContext);
-        return 
executor.getRegularExecutor().executeUpdate(executionGroupContext, 
executionContext.getQueryContext(), 
executionContext.getRouteContext().getRouteUnits(), callback);
-    }
-    
-    private JDBCExecutorCallback<Integer> createExecuteUpdateCallback(final 
ExecuteUpdateCallback updateCallback, final SQLStatementContext 
sqlStatementContext) {
-        boolean isExceptionThrown = 
SQLExecutorExceptionHandler.isExceptionThrown();
-        return new 
JDBCExecutorCallback<Integer>(metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType(),
-                
metaDataContexts.getMetaData().getDatabase(databaseName).getResourceMetaData(), 
sqlStatementContext.getSqlStatement(), isExceptionThrown) {
-            
-            @Override
-            protected Integer executeSQL(final String sql, final Statement 
statement, final ConnectionMode connectionMode, final DatabaseType storageType) 
throws SQLException {
-                return updateCallback.executeUpdate(sql, statement);
-            }
-            
-            @Override
-            protected Optional<Integer> getSaneResult(final SQLStatement 
sqlStatement, final SQLException ex) {
-                return Optional.empty();
-            }
-        };
-    }
-    
-    private int accumulate(final Collection<ExecuteResult> results) {
-        int result = 0;
-        for (ExecuteResult each : results) {
-            result += ((UpdateResult) each).getUpdateCount();
-        }
+        boolean isNeedImplicitCommitTransaction = 
isNeedImplicitCommitTransaction(connection, 
sqlStatementContext.getSqlStatement(), 
executionContext.getExecutionUnits().size() > 1);
+        int result = executor.executeAdvanceUpdate(
+                metaDataContexts.getMetaData(), database, queryContext, 
createDriverExecutionPrepareEngine(database), trafficCallback, updateCallback, 
isNeedImplicitCommitTransaction,
+                (StatementReplayCallback<Statement>) (statements, 
parameterSets) -> replay(statements), executionContext);
+        statements.addAll(executor.getStatements());
+        replay(statements);
         return result;
     }
     

Reply via email to