This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 2dc21aec93f Rename PipelineDataSourceWrapper to PipelineDataSource
(#32772)
2dc21aec93f is described below
commit 2dc21aec93f21f257f2abc21f72937d03367919e
Author: Liang Zhang <[email protected]>
AuthorDate: Mon Sep 2 20:58:51 2024 +0800
Rename PipelineDataSourceWrapper to PipelineDataSource (#32772)
---
.../table/TableInventoryCheckParameter.java | 6 +++---
.../calculator/SingleTableInventoryCalculateParameter.java | 6 +++---
...elineDataSourceWrapper.java => PipelineDataSource.java} | 4 ++--
.../core/datasource/PipelineDataSourceManager.java | 10 +++++-----
.../pipeline/core/metadata/loader/PipelineSchemaUtils.java | 4 ++--
.../loader/StandardPipelineTableMetaDataLoader.java | 4 ++--
.../incremental/IncrementalTaskPositionManager.java | 6 +++---
.../calculator/InventoryRecordsCountCalculator.java | 4 ++--
.../inventory/splitter/InventoryDumperContextSplitter.java | 4 ++--
.../preparer/inventory/splitter/InventoryTaskSplitter.java | 4 ++--
.../CRC32SingleTableInventoryCalculatorTest.java | 4 ++--
...aSourceWrapperTest.java => PipelineDataSourceTest.java} | 8 ++++----
.../incremental/dumper/MySQLIncrementalDumperTest.java | 4 ++--
.../ingest/incremental/wal/WALEventConverterTest.java | 4 ++--
.../data/pipeline/cdc/context/CDCJobItemContext.java | 8 ++++----
.../pipeline/scenario/migration/api/MigrationJobAPI.java | 4 ++--
.../check/consistency/MigrationDataConsistencyChecker.java | 6 +++---
.../migration/context/MigrationJobItemContext.java | 8 ++++----
.../scenario/migration/preparer/MigrationJobPreparer.java | 4 ++--
.../e2e/data/pipeline/cases/PipelineContainerComposer.java | 4 ++--
.../test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java | 14 +++++++-------
.../data/pipeline/cases/cdc/DataSourceRecordConsumer.java | 4 ++--
.../pipeline/common/PipelineDataSourceManagerTest.java | 4 ++--
.../loader/StandardPipelineTableMetaDataLoaderTest.java | 6 +++---
.../RecordSingleTableInventoryCalculatorTest.java | 10 +++++-----
.../pipeline/core/prepare/InventoryTaskSplitterTest.java | 6 +++---
.../test/it/data/pipeline/core/task/InventoryTaskTest.java | 4 ++--
.../data/pipeline/core/util/JobConfigurationBuilder.java | 4 ++--
.../scenario/migration/api/impl/MigrationJobAPITest.java | 4 ++--
.../consistency/MigrationDataConsistencyCheckerTest.java | 4 ++--
30 files changed, 83 insertions(+), 83 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableInventoryCheckParameter.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableInventoryCheckParameter.java
index 76300d15588..8c5d674eeb2 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableInventoryCheckParameter.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableInventoryCheckParameter.java
@@ -20,7 +20,7 @@ package
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
import
org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
import
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable;
@@ -36,9 +36,9 @@ public final class TableInventoryCheckParameter {
private final String jobId;
- private final PipelineDataSourceWrapper sourceDataSource;
+ private final PipelineDataSource sourceDataSource;
- private final PipelineDataSourceWrapper targetDataSource;
+ private final PipelineDataSource targetDataSource;
private final CaseInsensitiveQualifiedTable sourceTable;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/SingleTableInventoryCalculateParameter.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/SingleTableInventoryCalculateParameter.java
index 9614b971b16..3b40cf9a76a 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/SingleTableInventoryCalculateParameter.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/SingleTableInventoryCalculateParameter.java
@@ -19,7 +19,7 @@ package
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calc
import lombok.Getter;
import lombok.RequiredArgsConstructor;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.QueryRange;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.QueryType;
import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
@@ -43,7 +43,7 @@ public final class SingleTableInventoryCalculateParameter {
* Data source of source side or target side.
* Do not close it, it will be reused later.
*/
- private final PipelineDataSourceWrapper dataSource;
+ private final PipelineDataSource dataSource;
private final CaseInsensitiveQualifiedTable table;
@@ -67,7 +67,7 @@ public final class SingleTableInventoryCalculateParameter {
private final QueryType queryType;
- public SingleTableInventoryCalculateParameter(final
PipelineDataSourceWrapper dataSource, final CaseInsensitiveQualifiedTable
table, final List<String> columnNames,
+ public SingleTableInventoryCalculateParameter(final PipelineDataSource
dataSource, final CaseInsensitiveQualifiedTable table, final List<String>
columnNames,
final
List<PipelineColumnMetaData> uniqueKeys, final Object tableCheckPosition) {
this.dataSource = dataSource;
this.table = table;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceWrapper.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSource.java
similarity index 95%
rename from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceWrapper.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSource.java
index 6de0f2d2e81..9c8fdfb96a2 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceWrapper.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSource.java
@@ -40,7 +40,7 @@ import java.util.logging.Logger;
*/
@RequiredArgsConstructor
@Slf4j
-public final class PipelineDataSourceWrapper implements DataSource,
AutoCloseable {
+public final class PipelineDataSource implements DataSource, AutoCloseable {
private final DataSource dataSource;
@@ -50,7 +50,7 @@ public final class PipelineDataSourceWrapper implements
DataSource, AutoCloseabl
private final AtomicBoolean closed = new AtomicBoolean(false);
@SneakyThrows(SQLException.class)
- public PipelineDataSourceWrapper(final PipelineDataSourceConfiguration
pipelineDataSourceConfig) {
+ public PipelineDataSource(final PipelineDataSourceConfiguration
pipelineDataSourceConfig) {
dataSource =
TypedSPILoader.getService(PipelineDataSourceCreator.class,
pipelineDataSourceConfig.getType()).create(pipelineDataSourceConfig.getDataSourceConfiguration());
databaseType = pipelineDataSourceConfig.getDatabaseType();
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManager.java
index ad731b33cbc..d02dc6c3013 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManager.java
@@ -30,7 +30,7 @@ import java.util.concurrent.ConcurrentHashMap;
@Slf4j
public final class PipelineDataSourceManager implements AutoCloseable {
- private final Map<PipelineDataSourceConfiguration,
PipelineDataSourceWrapper> cachedDataSources = new ConcurrentHashMap<>();
+ private final Map<PipelineDataSourceConfiguration, PipelineDataSource>
cachedDataSources = new ConcurrentHashMap<>();
/**
* Get cached data source.
@@ -38,8 +38,8 @@ public final class PipelineDataSourceManager implements
AutoCloseable {
* @param dataSourceConfig data source configuration
* @return data source
*/
- public PipelineDataSourceWrapper getDataSource(final
PipelineDataSourceConfiguration dataSourceConfig) {
- PipelineDataSourceWrapper result =
cachedDataSources.get(dataSourceConfig);
+ public PipelineDataSource getDataSource(final
PipelineDataSourceConfiguration dataSourceConfig) {
+ PipelineDataSource result = cachedDataSources.get(dataSourceConfig);
if (null != result) {
return result;
}
@@ -51,7 +51,7 @@ public final class PipelineDataSourceManager implements
AutoCloseable {
}
log.warn("{} is already closed, create again.", result);
}
- result = new PipelineDataSourceWrapper(dataSourceConfig);
+ result = new PipelineDataSource(dataSourceConfig);
cachedDataSources.put(dataSourceConfig, result);
return result;
}
@@ -59,7 +59,7 @@ public final class PipelineDataSourceManager implements
AutoCloseable {
@Override
public void close() {
- for (PipelineDataSourceWrapper each : cachedDataSources.values()) {
+ for (PipelineDataSource each : cachedDataSources.values()) {
if (each.isClosed()) {
continue;
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineSchemaUtils.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineSchemaUtils.java
index f1e78d620f8..b136fdd94d5 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineSchemaUtils.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineSchemaUtils.java
@@ -22,7 +22,7 @@ import lombok.NoArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
import java.sql.Connection;
import java.sql.SQLException;
@@ -42,7 +42,7 @@ public final class PipelineSchemaUtils {
*/
@SneakyThrows(SQLException.class)
public static String getDefaultSchema(final
PipelineDataSourceConfiguration dataSourceConfig) {
- try (PipelineDataSourceWrapper dataSource = new
PipelineDataSourceWrapper(dataSourceConfig)) {
+ try (PipelineDataSource dataSource = new
PipelineDataSource(dataSourceConfig)) {
try (Connection connection = dataSource.getConnection()) {
String result = connection.getSchema();
log.info("get default schema {}", result);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/StandardPipelineTableMetaDataLoader.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/StandardPipelineTableMetaDataLoader.java
index aee15385a8b..2cde14d35b8 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/StandardPipelineTableMetaDataLoader.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/StandardPipelineTableMetaDataLoader.java
@@ -20,7 +20,7 @@ package
org.apache.shardingsphere.data.pipeline.core.metadata.loader;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.DataConsistencyCheckUtils;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineIndexMetaData;
@@ -49,7 +49,7 @@ import java.util.stream.Collectors;
@Slf4j
public final class StandardPipelineTableMetaDataLoader implements
PipelineTableMetaDataLoader {
- private final PipelineDataSourceWrapper dataSource;
+ private final PipelineDataSource dataSource;
private final Map<CaseInsensitiveIdentifier, PipelineTableMetaData>
tableMetaDataMap = new ConcurrentHashMap<>();
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/incremental/IncrementalTaskPositionManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/incremental/IncrementalTaskPositionManager.java
index 154dbadc69d..8367dd211bf 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/incremental/IncrementalTaskPositionManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/incremental/IncrementalTaskPositionManager.java
@@ -22,7 +22,7 @@ import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfigurati
import
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
@@ -92,7 +92,7 @@ public final class IncrementalTaskPositionManager {
private void destroyPosition(final String jobId,
final
ShardingSpherePipelineDataSourceConfiguration pipelineDataSourceConfig, final
DialectIncrementalPositionManager positionInitializer) throws SQLException {
for (DataSourcePoolProperties each : new
YamlDataSourceConfigurationSwapper().getDataSourcePoolPropertiesMap(pipelineDataSourceConfig.getRootConfig()).values())
{
- try (PipelineDataSourceWrapper dataSource = new
PipelineDataSourceWrapper(DataSourcePoolCreator.create(each), databaseType)) {
+ try (PipelineDataSource dataSource = new
PipelineDataSource(DataSourcePoolCreator.create(each), databaseType)) {
positionInitializer.destroy(dataSource, jobId);
}
}
@@ -101,7 +101,7 @@ public final class IncrementalTaskPositionManager {
private void destroyPosition(final String jobId, final
StandardPipelineDataSourceConfiguration pipelineDataSourceConfig,
final DialectIncrementalPositionManager
positionInitializer) throws SQLException {
try (
- PipelineDataSourceWrapper dataSource = new
PipelineDataSourceWrapper(
+ PipelineDataSource dataSource = new PipelineDataSource(
DataSourcePoolCreator.create((DataSourcePoolProperties)
pipelineDataSourceConfig.getDataSourceConfiguration()), databaseType)) {
positionInitializer.destroy(dataSource, jobId);
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/InventoryRecordsCountCalculator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/InventoryRecordsCountCalculator.java
index 51f1809ca48..4d99e401c58 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/InventoryRecordsCountCalculator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/InventoryRecordsCountCalculator.java
@@ -21,7 +21,7 @@ import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobByUniqueKeyException;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
@@ -52,7 +52,7 @@ public final class InventoryRecordsCountCalculator {
* @return table records count
* @throws SplitPipelineJobByUniqueKeyException if there's exception from
database
*/
- public static long getTableRecordsCount(final InventoryDumperContext
dumperContext, final PipelineDataSourceWrapper dataSource) {
+ public static long getTableRecordsCount(final InventoryDumperContext
dumperContext, final PipelineDataSource dataSource) {
String schemaName =
dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
String actualTableName = dumperContext.getActualTableName();
PipelinePrepareSQLBuilder pipelineSQLBuilder = new
PipelinePrepareSQLBuilder(dataSource.getDatabaseType());
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
index 6be5240a655..25d17e637be 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
@@ -21,7 +21,7 @@ import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.Range;
import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext;
import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobByUniqueKeyException;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
@@ -54,7 +54,7 @@ import java.util.stream.Collectors;
@RequiredArgsConstructor
public final class InventoryDumperContextSplitter {
- private final PipelineDataSourceWrapper sourceDataSource;
+ private final PipelineDataSource sourceDataSource;
private final InventoryDumperContext dumperContext;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitter.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitter.java
index b646d01ae8d..a3f1e3ff6f5 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitter.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitter.java
@@ -23,7 +23,7 @@ import
org.apache.shardingsphere.data.pipeline.core.channel.InventoryChannelCrea
import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext;
import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
import
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.importer.SingleChannelConsumerImporter;
@@ -48,7 +48,7 @@ import java.util.concurrent.atomic.AtomicReference;
@Slf4j
public final class InventoryTaskSplitter {
- private final PipelineDataSourceWrapper sourceDataSource;
+ private final PipelineDataSource sourceDataSource;
private final InventoryDumperContext dumperContext;
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCalculatorTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCalculatorTest.java
index cf5b661cb60..61f6c800bd0 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCalculatorTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCalculatorTest.java
@@ -19,7 +19,7 @@ package
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calc
import
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable;
import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.SingleTableInventoryCalculatedResult;
import
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
@@ -56,7 +56,7 @@ class CRC32SingleTableInventoryCalculatorTest {
private SingleTableInventoryCalculateParameter parameter;
@Mock
- private PipelineDataSourceWrapper pipelineDataSource;
+ private PipelineDataSource pipelineDataSource;
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private Connection connection;
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceWrapperTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceTest.java
similarity index 90%
rename from
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceWrapperTest.java
rename to
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceTest.java
index d53460be6fe..72d6985a241 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceWrapperTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceTest.java
@@ -42,7 +42,7 @@ import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
-class PipelineDataSourceWrapperTest {
+class PipelineDataSourceTest {
private static final String CLIENT_USERNAME = "username";
@@ -74,7 +74,7 @@ class PipelineDataSourceWrapperTest {
@Test
void assertGetConnection() throws SQLException {
- PipelineDataSourceWrapper dataSourceWrapper = new
PipelineDataSourceWrapper(dataSource,
TypedSPILoader.getService(DatabaseType.class, "FIXTURE"));
+ PipelineDataSource dataSourceWrapper = new
PipelineDataSource(dataSource, TypedSPILoader.getService(DatabaseType.class,
"FIXTURE"));
assertThat(dataSourceWrapper.getConnection(), is(connection));
assertThat(dataSourceWrapper.getConnection(CLIENT_USERNAME,
CLIENT_PASSWORD), is(connection));
assertGetLogWriter(dataSourceWrapper.getLogWriter());
@@ -102,12 +102,12 @@ class PipelineDataSourceWrapperTest {
@Test
void assertSetLoginTimeoutFailure() throws SQLException {
doThrow(new
SQLException("")).when(dataSource).setLoginTimeout(LOGIN_TIMEOUT);
- assertThrows(SQLException.class, () -> new
PipelineDataSourceWrapper(dataSource,
TypedSPILoader.getService(DatabaseType.class,
"FIXTURE")).setLoginTimeout(LOGIN_TIMEOUT));
+ assertThrows(SQLException.class, () -> new
PipelineDataSource(dataSource, TypedSPILoader.getService(DatabaseType.class,
"FIXTURE")).setLoginTimeout(LOGIN_TIMEOUT));
}
@Test
void assertSetLogWriterFailure() throws SQLException {
doThrow(new
SQLException("")).when(dataSource).setLogWriter(printWriter);
- assertThrows(SQLException.class, () -> new
PipelineDataSourceWrapper(dataSource,
TypedSPILoader.getService(DatabaseType.class,
"FIXTURE")).setLogWriter(printWriter));
+ assertThrows(SQLException.class, () -> new
PipelineDataSource(dataSource, TypedSPILoader.getService(DatabaseType.class,
"FIXTURE")).setLogWriter(printWriter));
}
}
diff --git
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java
index 36395dcc41f..ac1469b6e66 100644
---
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java
+++
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java
@@ -21,7 +21,7 @@ import
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSour
import
org.apache.shardingsphere.data.pipeline.core.channel.memory.MemoryPipelineChannel;
import
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.DumperCommonContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.ActualAndLogicTableNameMapper;
@@ -111,7 +111,7 @@ class MySQLIncrementalDumperTest {
private void initTableData(final IncrementalDumperContext dumperContext)
throws SQLException {
try (
PipelineDataSourceManager dataSourceManager = new
PipelineDataSourceManager();
- PipelineDataSourceWrapper dataSource =
dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig());
+ PipelineDataSource dataSource =
dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig());
Connection connection = dataSource.getConnection();
Statement statement = connection.createStatement()) {
statement.execute("DROP TABLE IF EXISTS t_order");
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/wal/WALEventConverterTest.java
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/wal/WALEventConverterTest.java
index 90b424b4805..b4294fe387e 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/wal/WALEventConverterTest.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/wal/WALEventConverterTest.java
@@ -20,7 +20,7 @@ package
org.apache.shardingsphere.data.pipeline.postgresql.ingest.incremental.wa
import
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.DumperCommonContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.ActualAndLogicTableNameMapper;
@@ -101,7 +101,7 @@ class WALEventConverterTest {
private void initTableData(final IncrementalDumperContext dumperContext)
throws SQLException {
try (
PipelineDataSourceManager dataSourceManager = new
PipelineDataSourceManager();
- PipelineDataSourceWrapper dataSource =
dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig());
+ PipelineDataSource dataSource =
dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig());
Connection connection = dataSource.getConnection();
Statement statement = connection.createStatement()) {
statement.execute("DROP TABLE IF EXISTS t_order");
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java
index a3bd6cbb0c5..f1d189aa5af 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java
@@ -27,7 +27,7 @@ import
org.apache.shardingsphere.data.pipeline.cdc.config.CDCTaskConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext;
import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
@@ -75,10 +75,10 @@ public final class CDCJobItemContext implements
TransmissionJobItemContext {
private final AtomicLong inventoryRecordsCount = new AtomicLong(0L);
- private final LazyInitializer<PipelineDataSourceWrapper>
sourceDataSourceLazyInitializer = new
LazyInitializer<PipelineDataSourceWrapper>() {
+ private final LazyInitializer<PipelineDataSource>
sourceDataSourceLazyInitializer = new LazyInitializer<PipelineDataSource>() {
@Override
- protected PipelineDataSourceWrapper initialize() {
+ protected PipelineDataSource initialize() {
return
dataSourceManager.getDataSource(taskConfig.getDumperContext().getCommonContext().getDataSourceConfig());
}
};
@@ -128,7 +128,7 @@ public final class CDCJobItemContext implements
TransmissionJobItemContext {
* @return source data source
*/
@SneakyThrows(ConcurrentException.class)
- public PipelineDataSourceWrapper getSourceDataSource() {
+ public PipelineDataSource getSourceDataSource() {
return sourceDataSourceLazyInitializer.get();
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
index 236381a9cca..0e7c241cd8d 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
@@ -27,7 +27,7 @@ import
org.apache.shardingsphere.data.pipeline.core.datanode.DataNodeUtils;
import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeEntry;
import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLine;
import
org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLineConvertUtils;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
import
org.apache.shardingsphere.data.pipeline.core.datasource.yaml.config.YamlPipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.TableAndSchemaNameMapper;
@@ -321,7 +321,7 @@ public final class MigrationJobAPI implements
TransmissionJobAPI {
PipelinePrepareSQLBuilder pipelineSQLBuilder = new
PipelinePrepareSQLBuilder(jobConfig.getTargetDatabaseType());
TableAndSchemaNameMapper mapping = new
TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap());
try (
- PipelineDataSourceWrapper dataSource = new
PipelineDataSourceWrapper(jobConfig.getTarget());
+ PipelineDataSource dataSource = new
PipelineDataSource(jobConfig.getTarget());
Connection connection = dataSource.getConnection()) {
for (String each : jobConfig.getTargetTableNames()) {
String targetSchemaName = mapping.getSchemaName(each);
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
index 2ed17f99198..1f5d8137572 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
@@ -30,7 +30,7 @@ import
org.apache.shardingsphere.data.pipeline.core.datanode.DataNodeUtils;
import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeEntry;
import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLine;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
import
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
@@ -128,8 +128,8 @@ public final class MigrationDataConsistencyChecker
implements PipelineDataConsis
final TableDataConsistencyChecker tableChecker, final PipelineDataSourceManager
dataSourceManager) {
CaseInsensitiveQualifiedTable sourceTable = new
CaseInsensitiveQualifiedTable(dataNode.getSchemaName(),
dataNode.getTableName());
CaseInsensitiveQualifiedTable targetTable = new
CaseInsensitiveQualifiedTable(dataNode.getSchemaName(), targetTableName);
- PipelineDataSourceWrapper sourceDataSource =
dataSourceManager.getDataSource(jobConfig.getSources().get(dataNode.getDataSourceName()));
- PipelineDataSourceWrapper targetDataSource =
dataSourceManager.getDataSource(jobConfig.getTarget());
+ PipelineDataSource sourceDataSource =
dataSourceManager.getDataSource(jobConfig.getSources().get(dataNode.getDataSourceName()));
+ PipelineDataSource targetDataSource =
dataSourceManager.getDataSource(jobConfig.getTarget());
PipelineTableMetaDataLoader metaDataLoader = new
StandardPipelineTableMetaDataLoader(sourceDataSource);
PipelineTableMetaData tableMetaData =
metaDataLoader.getTableMetaData(dataNode.getSchemaName(),
dataNode.getTableName());
ShardingSpherePreconditions.checkNotNull(tableMetaData, () -> new
PipelineTableDataConsistencyCheckLoadingFailedException(dataNode.getSchemaName(),
dataNode.getTableName()));
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
index 743ca8f47e4..f2d5c695e72 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
@@ -25,7 +25,7 @@ import org.apache.commons.lang3.concurrent.LazyInitializer;
import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext;
import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
@@ -77,10 +77,10 @@ public final class MigrationJobItemContext implements
TransmissionJobItemContext
private final PipelineDataSourceManager dataSourceManager;
- private final LazyInitializer<PipelineDataSourceWrapper>
sourceDataSourceLazyInitializer = new
LazyInitializer<PipelineDataSourceWrapper>() {
+ private final LazyInitializer<PipelineDataSource>
sourceDataSourceLazyInitializer = new LazyInitializer<PipelineDataSource>() {
@Override
- protected PipelineDataSourceWrapper initialize() {
+ protected PipelineDataSource initialize() {
return
dataSourceManager.getDataSource(taskConfig.getDumperContext().getCommonContext().getDataSourceConfig());
}
};
@@ -115,7 +115,7 @@ public final class MigrationJobItemContext implements
TransmissionJobItemContext
* @return source data source
*/
@SneakyThrows(ConcurrentException.class)
- public PipelineDataSourceWrapper getSourceDataSource() {
+ public PipelineDataSource getSourceDataSource() {
return sourceDataSourceLazyInitializer.get();
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
index 2414c46dbc2..981473f86ba 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
@@ -25,7 +25,7 @@ import
org.apache.shardingsphere.data.pipeline.core.checker.PipelineDataSourceCh
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
import
org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCancelingException;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
@@ -147,7 +147,7 @@ public final class MigrationJobPreparer implements
PipelineJobPreparer<Migration
prepareTarget(jobItemContext, targetDatabaseType, contextManager);
}
if (null == jobItemContext.getInitProgress()) {
- PipelineDataSourceWrapper targetDataSource =
jobItemContext.getDataSourceManager().getDataSource(jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig());
+ PipelineDataSource targetDataSource =
jobItemContext.getDataSourceManager().getDataSource(jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig());
new
PipelineDataSourceCheckEngine(targetDatabaseType).checkTargetDataSources(Collections.singleton(targetDataSource),
jobItemContext.getTaskConfig().getImporterConfig());
}
}
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
index 5d5ca34916f..f6624e35d22 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
@@ -24,7 +24,7 @@ import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.cdc.CDCJobType;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
import
org.apache.shardingsphere.infra.database.core.connector.url.JdbcUrlAppender;
@@ -612,7 +612,7 @@ public final class PipelineContainerComposer implements
AutoCloseable {
YamlSingleRuleConfiguration singleRuleConfig = new
YamlSingleRuleConfiguration();
singleRuleConfig.setTables(Collections.singletonList("*.*"));
rootConfig.getRules().add(singleRuleConfig);
- return new PipelineDataSourceWrapper(new
ShardingSpherePipelineDataSourceConfiguration(rootConfig));
+ return new PipelineDataSource(new
ShardingSpherePipelineDataSourceConfiguration(rootConfig));
}
private YamlRootConfiguration getYamlRootConfig() {
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
index cb5bc9c27ca..291f7dea30d 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
@@ -31,7 +31,7 @@ import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.Consistency
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryCheckParameter;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
import
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
@@ -107,7 +107,7 @@ class CDCE2EIT {
try (Connection connection =
containerComposer.getProxyDataSource().getConnection()) {
initSchemaAndTable(containerComposer, connection, 3);
}
- PipelineDataSourceWrapper sourceDataSource = new
PipelineDataSourceWrapper(containerComposer.generateShardingSphereDataSourceFromProxy(),
containerComposer.getDatabaseType());
+ PipelineDataSource sourceDataSource = new
PipelineDataSource(containerComposer.generateShardingSphereDataSourceFromProxy(),
containerComposer.getDatabaseType());
Pair<List<Object[]>, List<Object[]>> dataPair =
PipelineCaseHelper.generateFullInsertData(containerComposer.getDatabaseType(),
PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
log.info("init data begin: {}", LocalDateTime.now());
DataSourceExecuteUtils.execute(sourceDataSource,
containerComposer.getExtraSQLCommand().getFullInsertOrder(SOURCE_TABLE_NAME),
dataPair.getLeft());
@@ -119,7 +119,7 @@ class CDCE2EIT {
containerComposer.getUsername(),
containerComposer.getPassword())) {
initSchemaAndTable(containerComposer, connection, 0);
}
- PipelineDataSourceWrapper targetDataSource =
createStandardDataSource(containerComposer, PipelineContainerComposer.DS_4);
+ PipelineDataSource targetDataSource =
createStandardDataSource(containerComposer, PipelineContainerComposer.DS_4);
final CDCClient cdcClient =
buildCDCClientAndStart(targetDataSource, containerComposer);
Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(1L,
TimeUnit.SECONDS).until(() -> !containerComposer.queryForListWithLog("SHOW
STREAMING LIST").isEmpty());
String jobId = containerComposer.queryForListWithLog("SHOW
STREAMING LIST").get(0).get("id").toString();
@@ -168,15 +168,15 @@ class CDCE2EIT {
}
}
- private PipelineDataSourceWrapper createStandardDataSource(final
PipelineContainerComposer containerComposer, final String storageUnitName) {
+ private PipelineDataSource createStandardDataSource(final
PipelineContainerComposer containerComposer, final String storageUnitName) {
Map<String, Object> poolProps = new HashMap<>(3, 1F);
poolProps.put("url",
containerComposer.getActualJdbcUrlTemplate(storageUnitName, false));
poolProps.put("username", containerComposer.getUsername());
poolProps.put("password", containerComposer.getPassword());
- return new PipelineDataSourceWrapper(new
StandardPipelineDataSourceConfiguration(poolProps));
+ return new PipelineDataSource(new
StandardPipelineDataSourceConfiguration(poolProps));
}
- private CDCClient buildCDCClientAndStart(final PipelineDataSourceWrapper
dataSource, final PipelineContainerComposer containerComposer) {
+ private CDCClient buildCDCClientAndStart(final PipelineDataSource
dataSource, final PipelineContainerComposer containerComposer) {
DataSourceRecordConsumer recordConsumer = new
DataSourceRecordConsumer(dataSource, containerComposer.getDatabaseType());
CDCClient result = new CDCClient(new
CDCClientConfiguration("localhost",
containerComposer.getContainerComposer().getProxyCDCPort(), 5000));
result.connect(recordConsumer, new
RetryStreamingExceptionHandler(result, 5, 5000), (ctx, serverErrorResult) ->
log.error("Server error: {}", serverErrorResult.getErrorMessage()));
@@ -186,7 +186,7 @@ class CDCE2EIT {
return result;
}
- private void assertDataMatched(final PipelineDataSourceWrapper
sourceDataSource, final PipelineDataSourceWrapper targetDataSource, final
CaseInsensitiveQualifiedTable schemaTableName) {
+ private void assertDataMatched(final PipelineDataSource sourceDataSource,
final PipelineDataSource targetDataSource, final CaseInsensitiveQualifiedTable
schemaTableName) {
StandardPipelineTableMetaDataLoader metaDataLoader = new
StandardPipelineTableMetaDataLoader(targetDataSource);
PipelineTableMetaData tableMetaData =
metaDataLoader.getTableMetaData(schemaTableName.getSchemaName().toString(),
schemaTableName.getTableName().toString());
List<PipelineColumnMetaData> uniqueKeys =
Collections.singletonList(tableMetaData.getColumnMetaData(tableMetaData.getPrimaryKeyColumns().get(0)));
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/DataSourceRecordConsumer.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/DataSourceRecordConsumer.java
index f7a43258d83..83ff14b84c7 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/DataSourceRecordConsumer.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/DataSourceRecordConsumer.java
@@ -25,7 +25,7 @@ import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordR
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record.DataChangeType;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record.MetaData;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.TableColumn;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
@@ -59,7 +59,7 @@ public final class DataSourceRecordConsumer implements
Consumer<List<Record>> {
public DataSourceRecordConsumer(final DataSource dataSource, final
DatabaseType databaseType) {
this.dataSource = dataSource;
- loader = new StandardPipelineTableMetaDataLoader(new
PipelineDataSourceWrapper(dataSource, databaseType));
+ loader = new StandardPipelineTableMetaDataLoader(new
PipelineDataSource(dataSource, databaseType));
}
@Override
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/common/PipelineDataSourceManagerTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/common/PipelineDataSourceManagerTest.java
index 17aac2f0997..c6805cb1328 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/common/PipelineDataSourceManagerTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/common/PipelineDataSourceManagerTest.java
@@ -20,7 +20,7 @@ package
org.apache.shardingsphere.test.it.data.pipeline.common;
import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.datasource.config.PipelineDataSourceConfigurationFactory;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
import
org.apache.shardingsphere.test.it.data.pipeline.core.util.JobConfigurationBuilder;
import
org.apache.shardingsphere.test.it.data.pipeline.core.util.PipelineContextUtils;
@@ -56,7 +56,7 @@ class PipelineDataSourceManagerTest {
try (PipelineDataSourceManager dataSourceManager = new
PipelineDataSourceManager()) {
PipelineDataSourceConfiguration source =
jobConfig.getSources().values().iterator().next();
DataSource actual =
dataSourceManager.getDataSource(PipelineDataSourceConfigurationFactory.newInstance(source.getType(),
source.getParameter()));
- assertThat(actual, instanceOf(PipelineDataSourceWrapper.class));
+ assertThat(actual, instanceOf(PipelineDataSource.class));
}
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/common/metadata/loader/StandardPipelineTableMetaDataLoaderTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/common/metadata/loader/StandardPipelineTableMetaDataLoaderTest.java
index cae26d5244d..641c6ac2bf8 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/common/metadata/loader/StandardPipelineTableMetaDataLoaderTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/common/metadata/loader/StandardPipelineTableMetaDataLoaderTest.java
@@ -19,7 +19,7 @@ package
org.apache.shardingsphere.test.it.data.pipeline.common.metadata.loader;
import com.zaxxer.hikari.HikariDataSource;
import lombok.SneakyThrows;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineIndexMetaData;
@@ -42,11 +42,11 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
class StandardPipelineTableMetaDataLoaderTest {
- private PipelineDataSourceWrapper dataSource;
+ private PipelineDataSource dataSource;
@BeforeEach
void setUp() {
- dataSource = new PipelineDataSourceWrapper(createHikariDataSource(),
TypedSPILoader.getService(DatabaseType.class, "H2"));
+ dataSource = new PipelineDataSource(createHikariDataSource(),
TypedSPILoader.getService(DatabaseType.class, "H2"));
}
private HikariDataSource createHikariDataSource() {
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculatorTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculatorTest.java
index 532822549ee..c36790e51e0 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculatorTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculatorTest.java
@@ -23,7 +23,7 @@ import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.Reco
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.SingleTableInventoryCalculatedResult;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.RecordSingleTableInventoryCalculator;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.SingleTableInventoryCalculateParameter;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.QueryRange;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.QueryType;
import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
@@ -51,11 +51,11 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
class RecordSingleTableInventoryCalculatorTest {
- private static PipelineDataSourceWrapper dataSource;
+ private static PipelineDataSource dataSource;
@BeforeAll
static void setUp() throws Exception {
- dataSource = new
PipelineDataSourceWrapper(createHikariDataSource("calc_" +
RandomStringUtils.randomAlphanumeric(9)),
TypedSPILoader.getService(DatabaseType.class, "H2"));
+ dataSource = new PipelineDataSource(createHikariDataSource("calc_" +
RandomStringUtils.randomAlphanumeric(9)),
TypedSPILoader.getService(DatabaseType.class, "H2"));
createTableAndInitData(dataSource);
}
@@ -76,7 +76,7 @@ class RecordSingleTableInventoryCalculatorTest {
return result;
}
- private static void createTableAndInitData(final PipelineDataSourceWrapper
dataSource) throws SQLException {
+ private static void createTableAndInitData(final PipelineDataSource
dataSource) throws SQLException {
try (Connection connection = dataSource.getConnection()) {
String sql = "CREATE TABLE t_order (user_id INT NOT NULL, order_id
INT, status VARCHAR(12), PRIMARY KEY (user_id, order_id))";
connection.createStatement().execute(sql);
@@ -123,7 +123,7 @@ class RecordSingleTableInventoryCalculatorTest {
assertThat(actual.getMaxUniqueKeyValue().get(), is(9));
}
- private SingleTableInventoryCalculateParameter generateParameter(final
PipelineDataSourceWrapper dataSource, final Object dataCheckPosition) {
+ private SingleTableInventoryCalculateParameter generateParameter(final
PipelineDataSource dataSource, final Object dataCheckPosition) {
List<PipelineColumnMetaData> uniqueKeys =
Collections.singletonList(new PipelineColumnMetaData(1, "order_id",
Types.INTEGER, "integer", false, true, true));
return new SingleTableInventoryCalculateParameter(dataSource, new
CaseInsensitiveQualifiedTable(null, "t_order"), Collections.emptyList(),
uniqueKeys, dataCheckPosition);
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
index 8b3aef32347..2fd7282016d 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
@@ -21,7 +21,7 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.DumperCommonCo
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.IntegerPrimaryKeyIngestPosition;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataUtils;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
@@ -124,7 +124,7 @@ class InventoryTaskSplitterTest {
@Test
void assertSplitWithMultipleColumnsKey() throws SQLException {
initUnionPrimaryEnvironment(dumperContext.getCommonContext());
- try (PipelineDataSourceWrapper dataSource =
dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig()))
{
+ try (PipelineDataSource dataSource =
dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig()))
{
List<PipelineColumnMetaData> uniqueKeyColumns =
PipelineTableMetaDataUtils.getUniqueKeyColumns(null, "t_order", new
StandardPipelineTableMetaDataLoader(dataSource));
dumperContext.setUniqueKeyColumns(uniqueKeyColumns);
List<InventoryTask> actual =
inventoryTaskSplitter.split(jobItemContext);
@@ -135,7 +135,7 @@ class InventoryTaskSplitterTest {
@Test
void assertSplitWithoutPrimaryAndUniqueIndex() throws SQLException {
initNoPrimaryEnvironment(dumperContext.getCommonContext());
- try (PipelineDataSourceWrapper dataSource =
dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig()))
{
+ try (PipelineDataSource dataSource =
dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig()))
{
List<PipelineColumnMetaData> uniqueKeyColumns =
PipelineTableMetaDataUtils.getUniqueKeyColumns(null, "t_order", new
StandardPipelineTableMetaDataLoader(dataSource));
assertTrue(uniqueKeyColumns.isEmpty());
List<InventoryTask> inventoryTasks =
inventoryTaskSplitter.split(jobItemContext);
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java
index a2c28dc9134..f96b1141e90 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.test.it.data.pipeline.core.task;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
@@ -86,7 +86,7 @@ class InventoryTaskTest {
private void initTableData(final IncrementalDumperContext dumperContext)
throws SQLException {
PipelineDataSourceManager dataSourceManager = new
PipelineDataSourceManager();
try (
- PipelineDataSourceWrapper dataSource =
dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig());
+ PipelineDataSource dataSource =
dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig());
Connection connection = dataSource.getConnection();
Statement statement = connection.createStatement()) {
statement.execute("DROP TABLE IF EXISTS t_order");
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java
index 6e88c183f8b..394da9ff2c9 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java
@@ -24,7 +24,7 @@ import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfigurati
import
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
import
org.apache.shardingsphere.data.pipeline.core.datasource.yaml.config.YamlPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId;
@@ -82,7 +82,7 @@ public final class JobConfigurationBuilder {
PipelineDataSourceConfiguration sourceDataSourceConfig = new
StandardPipelineDataSourceConfiguration(
ConfigurationFileUtils.readFile("migration_standard_jdbc_source.yaml").replace("${databaseNameSuffix}",
databaseNameSuffix));
try (
- PipelineDataSourceWrapper dataSource = new
PipelineDataSourceWrapper(sourceDataSourceConfig);
+ PipelineDataSource dataSource = new
PipelineDataSource(sourceDataSourceConfig);
Connection connection = dataSource.getConnection();
Statement statement = connection.createStatement()) {
statement.execute(PipelineContextUtils.getCreateOrderTableSchema());
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
index c638c6b71d2..a7c22224d65 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
@@ -26,7 +26,7 @@ import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessC
import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeEntry;
import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLine;
import
org.apache.shardingsphere.data.pipeline.core.datasource.config.PipelineDataSourceConfigurationFactory;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.config.YamlTransmissionJobItemProgress;
@@ -293,7 +293,7 @@ class MigrationJobAPITest {
Map<String, DataSourcePoolProperties> metaDataDataSource = new
PipelineDataSourcePersistService().load(PipelineContextUtils.getContextKey(),
"MIGRATION");
DataSourcePoolProperties props = metaDataDataSource.get("ds_0");
try (
- PipelineDataSourceWrapper dataSource = new
PipelineDataSourceWrapper(DataSourcePoolCreator.create(props), databaseType);
+ PipelineDataSource dataSource = new
PipelineDataSource(DataSourcePoolCreator.create(props), databaseType);
Connection connection = dataSource.getConnection();
Statement statement = connection.createStatement()) {
statement.execute("DROP TABLE IF EXISTS t_order");
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
index 6a4336e8058..ae5953b44f6 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
@@ -23,7 +23,7 @@ import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.Tabl
import
org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
import
org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.PipelineGovernanceFacade;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.check.consistency.MigrationDataConsistencyChecker;
@@ -89,7 +89,7 @@ class MigrationDataConsistencyCheckerTest {
private void initTableData(final PipelineDataSourceConfiguration
dataSourceConfig) throws SQLException {
try (
PipelineDataSourceManager dataSourceManager = new
PipelineDataSourceManager();
- PipelineDataSourceWrapper dataSource =
dataSourceManager.getDataSource(dataSourceConfig);
+ PipelineDataSource dataSource =
dataSourceManager.getDataSource(dataSourceConfig);
Connection connection = dataSource.getConnection();
Statement statement = connection.createStatement()) {
statement.execute("DROP TABLE IF EXISTS t_order");