This is an automated email from the ASF dual-hosted git repository.

sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e58dd5107a Refactor PipelineDataSourceSink (#29523)
5e58dd5107a is described below

commit 5e58dd5107ac280f34a02c8022f14257d4950443
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Dec 24 12:36:51 2023 +0800

    Refactor PipelineDataSourceSink (#29523)
---
 .../importer/sink/type/PipelineDataSourceSink.java | 23 ++++++++--------
 .../core/importer/PipelineDataSourceSinkTest.java  | 31 +++++++++++-----------
 2 files changed, 26 insertions(+), 28 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSink.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSink.java
index d1785edc690..0297d18edd8 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSink.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSink.java
@@ -54,7 +54,7 @@ public final class PipelineDataSourceSink implements 
PipelineSink {
     
     private final ImporterConfiguration importerConfig;
     
-    private final PipelineDataSourceManager dataSourceManager;
+    private final DataSource dataSource;
     
     private final PipelineImportSQLBuilder importSQLBuilder;
     
@@ -68,7 +68,7 @@ public final class PipelineDataSourceSink implements 
PipelineSink {
     
     public PipelineDataSourceSink(final ImporterConfiguration importerConfig, 
final PipelineDataSourceManager dataSourceManager) {
         this.importerConfig = importerConfig;
-        this.dataSourceManager = dataSourceManager;
+        dataSource = 
dataSourceManager.getDataSource(importerConfig.getDataSourceConfig());
         importSQLBuilder = new 
PipelineImportSQLBuilder(importerConfig.getDataSourceConfig().getDatabaseType());
         groupEngine = new DataRecordGroupEngine();
         runningBatchInsertStatement = new AtomicReference<>();
@@ -78,29 +78,28 @@ public final class PipelineDataSourceSink implements 
PipelineSink {
     
     @Override
     public PipelineJobProgressUpdatedParameter write(final String ackId, final 
Collection<Record> records) {
-        DataSource dataSource = 
dataSourceManager.getDataSource(importerConfig.getDataSourceConfig());
         Collection<DataRecord> dataRecords = 
records.stream().filter(DataRecord.class::isInstance).map(DataRecord.class::cast).collect(Collectors.toList());
         if (dataRecords.isEmpty()) {
             return new PipelineJobProgressUpdatedParameter(0);
         }
         for (GroupedDataRecord each : groupEngine.group(dataRecords)) {
-            batchWrite(dataSource, each.getBatchDeleteDataRecords());
-            batchWrite(dataSource, each.getBatchInsertDataRecords());
-            batchWrite(dataSource, each.getBatchUpdateDataRecords());
-            sequentialWrite(dataSource, each.getNonBatchRecords());
+            batchWrite(each.getBatchDeleteDataRecords());
+            batchWrite(each.getBatchInsertDataRecords());
+            batchWrite(each.getBatchUpdateDataRecords());
+            sequentialWrite(each.getNonBatchRecords());
         }
         return new PipelineJobProgressUpdatedParameter((int) 
dataRecords.stream().filter(each -> PipelineSQLOperationType.INSERT == 
each.getType()).count());
     }
     
     @SuppressWarnings("BusyWait")
     @SneakyThrows(InterruptedException.class)
-    private void batchWrite(final DataSource dataSource, final 
Collection<DataRecord> records) {
+    private void batchWrite(final Collection<DataRecord> records) {
         if (records.isEmpty()) {
             return;
         }
         for (int i = 0; !Thread.interrupted() && i <= 
importerConfig.getRetryTimes(); i++) {
             try {
-                doWrite(dataSource, records);
+                doWrite(records);
                 break;
             } catch (final SQLException ex) {
                 log.error("Flush failed {}/{} times.", i, 
importerConfig.getRetryTimes(), ex);
@@ -112,18 +111,18 @@ public final class PipelineDataSourceSink implements 
PipelineSink {
         }
     }
     
-    private void sequentialWrite(final DataSource dataSource, final 
Collection<DataRecord> records) {
+    private void sequentialWrite(final Collection<DataRecord> records) {
         // TODO it's better use transaction, but execute delete maybe not 
effect when open transaction of PostgreSQL sometimes
         try {
             for (DataRecord each : records) {
-                doWrite(dataSource, Collections.singleton(each));
+                doWrite(Collections.singleton(each));
             }
         } catch (final SQLException ex) {
             throw new PipelineImporterJobWriteException(ex);
         }
     }
     
-    private void doWrite(final DataSource dataSource, final 
Collection<DataRecord> records) throws SQLException {
+    private void doWrite(final Collection<DataRecord> records) throws 
SQLException {
         try (Connection connection = dataSource.getConnection()) {
             boolean enableTransaction = records.size() > 1;
             if (enableTransaction) {
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
index 3e61f66f537..75aebe0fe1f 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
@@ -21,11 +21,10 @@ import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfigurati
 import 
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
 import 
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.importer.SingleChannelConsumerImporter;
-import 
org.apache.shardingsphere.data.pipeline.core.importer.sink.type.PipelineDataSourceSink;
 import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
+import 
org.apache.shardingsphere.data.pipeline.core.importer.sink.type.PipelineDataSourceSink;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.channel.PipelineChannel;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.finished.IngestFinishedPosition;
@@ -56,7 +55,9 @@ import java.util.concurrent.TimeUnit;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -65,18 +66,12 @@ class PipelineDataSourceSinkTest {
     
     private static final String TABLE_NAME = "test_table";
     
-    @Mock
-    private PipelineDataSourceManager dataSourceManager;
-    
     private final PipelineDataSourceConfiguration dataSourceConfig = new 
StandardPipelineDataSourceConfiguration(
             
"jdbc:h2:mem:test_db;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL;USER=root;PASSWORD=root",
 "root", "root");
     
     @Mock
     private PipelineChannel channel;
     
-    @Mock
-    private PipelineDataSourceWrapper dataSource;
-    
     @Mock
     private Connection connection;
     
@@ -87,10 +82,19 @@ class PipelineDataSourceSinkTest {
     
     @BeforeEach
     void setUp() throws SQLException {
-        PipelineSink pipelineSink = new 
PipelineDataSourceSink(mockImporterConfiguration(), dataSourceManager);
+        PipelineSink pipelineSink = new 
PipelineDataSourceSink(mockImporterConfiguration(), 
mockPipelineDataSourceManager());
         importer = new SingleChannelConsumerImporter(channel, 100, 1, 
TimeUnit.SECONDS, pipelineSink, new FixtureTransmissionJobItemContext());
-        
when(dataSourceManager.getDataSource(dataSourceConfig)).thenReturn(dataSource);
-        when(dataSource.getConnection()).thenReturn(connection);
+    }
+    
+    private ImporterConfiguration mockImporterConfiguration() {
+        Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap = 
Collections.singletonMap(new CaseInsensitiveIdentifier("test_table"), 
Collections.singleton("user"));
+        return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap, 
new TableAndSchemaNameMapper(Collections.emptyMap()), 1000, null, 3, 3);
+    }
+    
+    private PipelineDataSourceManager mockPipelineDataSourceManager() throws 
SQLException {
+        PipelineDataSourceManager result = 
mock(PipelineDataSourceManager.class, RETURNS_DEEP_STUBS);
+        
when(result.getDataSource(dataSourceConfig).getConnection()).thenReturn(connection);
+        return result;
     }
     
     @Test
@@ -190,9 +194,4 @@ class PipelineDataSourceSinkTest {
         result.addColumn(new Column("status", statusOldValue, statusValue, 
true, false));
         return result;
     }
-    
-    private ImporterConfiguration mockImporterConfiguration() {
-        Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap = 
Collections.singletonMap(new CaseInsensitiveIdentifier("test_table"), 
Collections.singleton("user"));
-        return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap, 
new TableAndSchemaNameMapper(Collections.emptyMap()), 1000, null, 3, 3);
-    }
 }

Reply via email to