This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 320308ac2b8 Merge PipelineDataSourceSink.runningStatement (#29525)
320308ac2b8 is described below

commit 320308ac2b88db993480278613d870c855c3ffa3
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Dec 24 13:09:37 2023 +0800

    Merge PipelineDataSourceSink.runningStatement (#29525)
---
 .../importer/sink/type/PipelineDataSourceSink.java | 26 ++++++++--------------
 .../core/ingest/dumper/InventoryDumper.java        | 15 +++++++------
 .../data/pipeline/core/util/PipelineJdbcUtils.java |  5 ++---
 3 files changed, 19 insertions(+), 27 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSink.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSink.java
index 0297d18edd8..f75363e4180 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSink.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSink.java
@@ -60,20 +60,14 @@ public final class PipelineDataSourceSink implements 
PipelineSink {
     
     private final DataRecordGroupEngine groupEngine;
     
-    private final AtomicReference<PreparedStatement> 
runningBatchInsertStatement;
-    
-    private final AtomicReference<PreparedStatement> runningUpdateStatement;
-    
-    private final AtomicReference<PreparedStatement> 
runningBatchDeleteStatement;
+    private final AtomicReference<PreparedStatement> runningStatement;
     
     public PipelineDataSourceSink(final ImporterConfiguration importerConfig, 
final PipelineDataSourceManager dataSourceManager) {
         this.importerConfig = importerConfig;
         dataSource = 
dataSourceManager.getDataSource(importerConfig.getDataSourceConfig());
         importSQLBuilder = new 
PipelineImportSQLBuilder(importerConfig.getDataSourceConfig().getDatabaseType());
         groupEngine = new DataRecordGroupEngine();
-        runningBatchInsertStatement = new AtomicReference<>();
-        runningUpdateStatement = new AtomicReference<>();
-        runningBatchDeleteStatement = new AtomicReference<>();
+        runningStatement = new AtomicReference<>();
     }
     
     @Override
@@ -154,7 +148,7 @@ public final class PipelineDataSourceSink implements 
PipelineSink {
         DataRecord dataRecord = dataRecords.iterator().next();
         String sql = 
importSQLBuilder.buildInsertSQL(importerConfig.findSchemaName(dataRecord.getTableName()).orElse(null),
 dataRecord);
         try (PreparedStatement preparedStatement = 
connection.prepareStatement(sql)) {
-            runningBatchInsertStatement.set(preparedStatement);
+            runningStatement.set(preparedStatement);
             preparedStatement.setQueryTimeout(30);
             for (DataRecord each : dataRecords) {
                 for (int i = 0; i < each.getColumnCount(); i++) {
@@ -164,7 +158,7 @@ public final class PipelineDataSourceSink implements 
PipelineSink {
             }
             preparedStatement.executeBatch();
         } finally {
-            runningBatchInsertStatement.set(null);
+            runningStatement.set(null);
         }
     }
     
@@ -180,7 +174,7 @@ public final class PipelineDataSourceSink implements 
PipelineSink {
         List<Column> setColumns = 
dataRecord.getColumns().stream().filter(Column::isUpdated).collect(Collectors.toList());
         String sql = 
importSQLBuilder.buildUpdateSQL(importerConfig.findSchemaName(dataRecord.getTableName()).orElse(null),
 dataRecord, conditionColumns);
         try (PreparedStatement preparedStatement = 
connection.prepareStatement(sql)) {
-            runningUpdateStatement.set(preparedStatement);
+            runningStatement.set(preparedStatement);
             for (int i = 0; i < setColumns.size(); i++) {
                 preparedStatement.setObject(i + 1, 
setColumns.get(i).getValue());
             }
@@ -199,7 +193,7 @@ public final class PipelineDataSourceSink implements 
PipelineSink {
                 log.warn("executeUpdate failed, updateCount={}, updateSql={}, 
updatedColumns={}, conditionColumns={}", updateCount, sql, setColumns, 
conditionColumns);
             }
         } finally {
-            runningUpdateStatement.set(null);
+            runningStatement.set(null);
         }
     }
     
@@ -208,7 +202,7 @@ public final class PipelineDataSourceSink implements 
PipelineSink {
         String sql = 
importSQLBuilder.buildDeleteSQL(importerConfig.findSchemaName(dataRecord.getTableName()).orElse(null),
 dataRecord,
                 RecordUtils.extractConditionColumns(dataRecord, 
importerConfig.getShardingColumns(dataRecord.getTableName())));
         try (PreparedStatement preparedStatement = 
connection.prepareStatement(sql)) {
-            runningBatchDeleteStatement.set(preparedStatement);
+            runningStatement.set(preparedStatement);
             preparedStatement.setQueryTimeout(30);
             for (DataRecord each : dataRecords) {
                 List<Column> conditionColumns = 
RecordUtils.extractConditionColumns(each, 
importerConfig.getShardingColumns(dataRecord.getTableName()));
@@ -223,14 +217,12 @@ public final class PipelineDataSourceSink implements 
PipelineSink {
             }
             preparedStatement.executeBatch();
         } finally {
-            runningBatchDeleteStatement.set(null);
+            runningStatement.set(null);
         }
     }
     
     @Override
     public void close() {
-        PipelineJdbcUtils.cancelStatement(runningBatchInsertStatement.get());
-        PipelineJdbcUtils.cancelStatement(runningUpdateStatement.get());
-        PipelineJdbcUtils.cancelStatement(runningBatchDeleteStatement.get());
+        
Optional.ofNullable(runningStatement.get()).ifPresent(PipelineJdbcUtils::cancelStatement);
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
index 5b4e4b68ca5..6366244928e 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
@@ -74,22 +74,22 @@ public final class InventoryDumper extends 
AbstractPipelineLifecycleRunnable imp
     
     private final DataSource dataSource;
     
+    private final PipelineTableMetaDataLoader metaDataLoader;
+    
     private final PipelineInventoryDumpSQLBuilder inventoryDumpSQLBuilder;
     
     private final ColumnValueReaderEngine columnValueReaderEngine;
     
-    private final PipelineTableMetaDataLoader metaDataLoader;
-    
-    private final AtomicReference<Statement> dumpStatement = new 
AtomicReference<>();
+    private final AtomicReference<Statement> runningStatement = new 
AtomicReference<>();
     
     public InventoryDumper(final InventoryDumperContext dumperContext, final 
PipelineChannel channel, final DataSource dataSource, final 
PipelineTableMetaDataLoader metaDataLoader) {
         this.dumperContext = dumperContext;
         this.channel = channel;
         this.dataSource = dataSource;
+        this.metaDataLoader = metaDataLoader;
         DatabaseType databaseType = 
dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType();
         inventoryDumpSQLBuilder = new 
PipelineInventoryDumpSQLBuilder(databaseType);
         columnValueReaderEngine = new ColumnValueReaderEngine(databaseType);
-        this.metaDataLoader = metaDataLoader;
     }
     
     @Override
@@ -117,7 +117,7 @@ public final class InventoryDumper extends 
AbstractPipelineLifecycleRunnable imp
             
connection.setTransactionIsolation(dumperContext.getTransactionIsolation());
         }
         try (PreparedStatement preparedStatement = 
JDBCStreamQueryBuilder.build(databaseType, connection, 
buildInventoryDumpSQL())) {
-            dumpStatement.set(preparedStatement);
+            runningStatement.set(preparedStatement);
             if (!(databaseType instanceof MySQLDatabaseType)) {
                 preparedStatement.setFetchSize(batchSize);
             }
@@ -144,8 +144,9 @@ public final class InventoryDumper extends 
AbstractPipelineLifecycleRunnable imp
                 }
                 dataRecords.add(new FinishedRecord(new 
IngestFinishedPosition()));
                 channel.pushRecords(dataRecords);
-                dumpStatement.set(null);
                 log.info("Inventory dump done, rowCount={}, dataSource={}, 
actualTable={}", rowCount, 
dumperContext.getCommonContext().getDataSourceName(), 
dumperContext.getActualTableName());
+            } finally {
+                runningStatement.set(null);
             }
         }
     }
@@ -217,6 +218,6 @@ public final class InventoryDumper extends 
AbstractPipelineLifecycleRunnable imp
     
     @Override
     protected void doStop() {
-        PipelineJdbcUtils.cancelStatement(dumpStatement.get());
+        
Optional.ofNullable(runningStatement.get()).ifPresent(PipelineJdbcUtils::cancelStatement);
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineJdbcUtils.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineJdbcUtils.java
index 43290b84894..4ffd6621cf0 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineJdbcUtils.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineJdbcUtils.java
@@ -87,10 +87,9 @@ public final class PipelineJdbcUtils {
      */
     public static void cancelStatement(final Statement statement) throws 
SQLWrapperException {
         try {
-            if (null == statement || statement.isClosed()) {
-                return;
+            if (!statement.isClosed()) {
+                statement.cancel();
             }
-            statement.cancel();
         } catch (final SQLException ex) {
             throw new SQLWrapperException(ex);
         }

Reply via email to