This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 5e58dd5107a Refactor PipelineDataSourceSink (#29523)
5e58dd5107a is described below
commit 5e58dd5107ac280f34a02c8022f14257d4950443
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Dec 24 12:36:51 2023 +0800
Refactor PipelineDataSourceSink (#29523)
---
.../importer/sink/type/PipelineDataSourceSink.java | 23 ++++++++--------
.../core/importer/PipelineDataSourceSinkTest.java | 31 +++++++++++-----------
2 files changed, 26 insertions(+), 28 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSink.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSink.java
index d1785edc690..0297d18edd8 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSink.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSink.java
@@ -54,7 +54,7 @@ public final class PipelineDataSourceSink implements
PipelineSink {
private final ImporterConfiguration importerConfig;
- private final PipelineDataSourceManager dataSourceManager;
+ private final DataSource dataSource;
private final PipelineImportSQLBuilder importSQLBuilder;
@@ -68,7 +68,7 @@ public final class PipelineDataSourceSink implements
PipelineSink {
public PipelineDataSourceSink(final ImporterConfiguration importerConfig,
final PipelineDataSourceManager dataSourceManager) {
this.importerConfig = importerConfig;
- this.dataSourceManager = dataSourceManager;
+ dataSource =
dataSourceManager.getDataSource(importerConfig.getDataSourceConfig());
importSQLBuilder = new
PipelineImportSQLBuilder(importerConfig.getDataSourceConfig().getDatabaseType());
groupEngine = new DataRecordGroupEngine();
runningBatchInsertStatement = new AtomicReference<>();
@@ -78,29 +78,28 @@ public final class PipelineDataSourceSink implements
PipelineSink {
@Override
public PipelineJobProgressUpdatedParameter write(final String ackId, final
Collection<Record> records) {
- DataSource dataSource =
dataSourceManager.getDataSource(importerConfig.getDataSourceConfig());
Collection<DataRecord> dataRecords =
records.stream().filter(DataRecord.class::isInstance).map(DataRecord.class::cast).collect(Collectors.toList());
if (dataRecords.isEmpty()) {
return new PipelineJobProgressUpdatedParameter(0);
}
for (GroupedDataRecord each : groupEngine.group(dataRecords)) {
- batchWrite(dataSource, each.getBatchDeleteDataRecords());
- batchWrite(dataSource, each.getBatchInsertDataRecords());
- batchWrite(dataSource, each.getBatchUpdateDataRecords());
- sequentialWrite(dataSource, each.getNonBatchRecords());
+ batchWrite(each.getBatchDeleteDataRecords());
+ batchWrite(each.getBatchInsertDataRecords());
+ batchWrite(each.getBatchUpdateDataRecords());
+ sequentialWrite(each.getNonBatchRecords());
}
return new PipelineJobProgressUpdatedParameter((int)
dataRecords.stream().filter(each -> PipelineSQLOperationType.INSERT ==
each.getType()).count());
}
@SuppressWarnings("BusyWait")
@SneakyThrows(InterruptedException.class)
- private void batchWrite(final DataSource dataSource, final
Collection<DataRecord> records) {
+ private void batchWrite(final Collection<DataRecord> records) {
if (records.isEmpty()) {
return;
}
for (int i = 0; !Thread.interrupted() && i <=
importerConfig.getRetryTimes(); i++) {
try {
- doWrite(dataSource, records);
+ doWrite(records);
break;
} catch (final SQLException ex) {
log.error("Flush failed {}/{} times.", i,
importerConfig.getRetryTimes(), ex);
@@ -112,18 +111,18 @@ public final class PipelineDataSourceSink implements
PipelineSink {
}
}
- private void sequentialWrite(final DataSource dataSource, final
Collection<DataRecord> records) {
+ private void sequentialWrite(final Collection<DataRecord> records) {
// TODO it's better use transaction, but execute delete maybe not
effect when open transaction of PostgreSQL sometimes
try {
for (DataRecord each : records) {
- doWrite(dataSource, Collections.singleton(each));
+ doWrite(Collections.singleton(each));
}
} catch (final SQLException ex) {
throw new PipelineImporterJobWriteException(ex);
}
}
- private void doWrite(final DataSource dataSource, final
Collection<DataRecord> records) throws SQLException {
+ private void doWrite(final Collection<DataRecord> records) throws
SQLException {
try (Connection connection = dataSource.getConnection()) {
boolean enableTransaction = records.size() > 1;
if (enableTransaction) {
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
index 3e61f66f537..75aebe0fe1f 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
@@ -21,11 +21,10 @@ import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfigurati
import
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
import
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.importer.SingleChannelConsumerImporter;
-import
org.apache.shardingsphere.data.pipeline.core.importer.sink.type.PipelineDataSourceSink;
import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
+import
org.apache.shardingsphere.data.pipeline.core.importer.sink.type.PipelineDataSourceSink;
import
org.apache.shardingsphere.data.pipeline.core.ingest.channel.PipelineChannel;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.finished.IngestFinishedPosition;
@@ -56,7 +55,9 @@ import java.util.concurrent.TimeUnit;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -65,18 +66,12 @@ class PipelineDataSourceSinkTest {
private static final String TABLE_NAME = "test_table";
- @Mock
- private PipelineDataSourceManager dataSourceManager;
-
private final PipelineDataSourceConfiguration dataSourceConfig = new
StandardPipelineDataSourceConfiguration(
"jdbc:h2:mem:test_db;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL;USER=root;PASSWORD=root",
"root", "root");
@Mock
private PipelineChannel channel;
- @Mock
- private PipelineDataSourceWrapper dataSource;
-
@Mock
private Connection connection;
@@ -87,10 +82,19 @@ class PipelineDataSourceSinkTest {
@BeforeEach
void setUp() throws SQLException {
- PipelineSink pipelineSink = new
PipelineDataSourceSink(mockImporterConfiguration(), dataSourceManager);
+ PipelineSink pipelineSink = new
PipelineDataSourceSink(mockImporterConfiguration(),
mockPipelineDataSourceManager());
importer = new SingleChannelConsumerImporter(channel, 100, 1,
TimeUnit.SECONDS, pipelineSink, new FixtureTransmissionJobItemContext());
-
when(dataSourceManager.getDataSource(dataSourceConfig)).thenReturn(dataSource);
- when(dataSource.getConnection()).thenReturn(connection);
+ }
+
+ private ImporterConfiguration mockImporterConfiguration() {
+ Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap =
Collections.singletonMap(new CaseInsensitiveIdentifier("test_table"),
Collections.singleton("user"));
+ return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap,
new TableAndSchemaNameMapper(Collections.emptyMap()), 1000, null, 3, 3);
+ }
+
+ private PipelineDataSourceManager mockPipelineDataSourceManager() throws
SQLException {
+ PipelineDataSourceManager result =
mock(PipelineDataSourceManager.class, RETURNS_DEEP_STUBS);
+
when(result.getDataSource(dataSourceConfig).getConnection()).thenReturn(connection);
+ return result;
}
@Test
@@ -190,9 +194,4 @@ class PipelineDataSourceSinkTest {
result.addColumn(new Column("status", statusOldValue, statusValue,
true, false));
return result;
}
-
- private ImporterConfiguration mockImporterConfiguration() {
- Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap =
Collections.singletonMap(new CaseInsensitiveIdentifier("test_table"),
Collections.singleton("user"));
- return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap,
new TableAndSchemaNameMapper(Collections.emptyMap()), 1000, null, 3, 3);
- }
}