This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 2dc21aec93f Rename PipelineDataSourceWrapper to PipelineDataSource 
(#32772)
2dc21aec93f is described below

commit 2dc21aec93f21f257f2abc21f72937d03367919e
Author: Liang Zhang <[email protected]>
AuthorDate: Mon Sep 2 20:58:51 2024 +0800

    Rename PipelineDataSourceWrapper to PipelineDataSource (#32772)
---
 .../table/TableInventoryCheckParameter.java                |  6 +++---
 .../calculator/SingleTableInventoryCalculateParameter.java |  6 +++---
 ...elineDataSourceWrapper.java => PipelineDataSource.java} |  4 ++--
 .../core/datasource/PipelineDataSourceManager.java         | 10 +++++-----
 .../pipeline/core/metadata/loader/PipelineSchemaUtils.java |  4 ++--
 .../loader/StandardPipelineTableMetaDataLoader.java        |  4 ++--
 .../incremental/IncrementalTaskPositionManager.java        |  6 +++---
 .../calculator/InventoryRecordsCountCalculator.java        |  4 ++--
 .../inventory/splitter/InventoryDumperContextSplitter.java |  4 ++--
 .../preparer/inventory/splitter/InventoryTaskSplitter.java |  4 ++--
 .../CRC32SingleTableInventoryCalculatorTest.java           |  4 ++--
 ...aSourceWrapperTest.java => PipelineDataSourceTest.java} |  8 ++++----
 .../incremental/dumper/MySQLIncrementalDumperTest.java     |  4 ++--
 .../ingest/incremental/wal/WALEventConverterTest.java      |  4 ++--
 .../data/pipeline/cdc/context/CDCJobItemContext.java       |  8 ++++----
 .../pipeline/scenario/migration/api/MigrationJobAPI.java   |  4 ++--
 .../check/consistency/MigrationDataConsistencyChecker.java |  6 +++---
 .../migration/context/MigrationJobItemContext.java         |  8 ++++----
 .../scenario/migration/preparer/MigrationJobPreparer.java  |  4 ++--
 .../e2e/data/pipeline/cases/PipelineContainerComposer.java |  4 ++--
 .../test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java         | 14 +++++++-------
 .../data/pipeline/cases/cdc/DataSourceRecordConsumer.java  |  4 ++--
 .../pipeline/common/PipelineDataSourceManagerTest.java     |  4 ++--
 .../loader/StandardPipelineTableMetaDataLoaderTest.java    |  6 +++---
 .../RecordSingleTableInventoryCalculatorTest.java          | 10 +++++-----
 .../pipeline/core/prepare/InventoryTaskSplitterTest.java   |  6 +++---
 .../test/it/data/pipeline/core/task/InventoryTaskTest.java |  4 ++--
 .../data/pipeline/core/util/JobConfigurationBuilder.java   |  4 ++--
 .../scenario/migration/api/impl/MigrationJobAPITest.java   |  4 ++--
 .../consistency/MigrationDataConsistencyCheckerTest.java   |  4 ++--
 30 files changed, 83 insertions(+), 83 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableInventoryCheckParameter.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableInventoryCheckParameter.java
index 76300d15588..8c5d674eeb2 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableInventoryCheckParameter.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableInventoryCheckParameter.java
@@ -20,7 +20,7 @@ package 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
 import 
org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
 import 
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable;
@@ -36,9 +36,9 @@ public final class TableInventoryCheckParameter {
     
     private final String jobId;
     
-    private final PipelineDataSourceWrapper sourceDataSource;
+    private final PipelineDataSource sourceDataSource;
     
-    private final PipelineDataSourceWrapper targetDataSource;
+    private final PipelineDataSource targetDataSource;
     
     private final CaseInsensitiveQualifiedTable sourceTable;
     
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/SingleTableInventoryCalculateParameter.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/SingleTableInventoryCalculateParameter.java
index 9614b971b16..3b40cf9a76a 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/SingleTableInventoryCalculateParameter.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/SingleTableInventoryCalculateParameter.java
@@ -19,7 +19,7 @@ package 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calc
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.QueryRange;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.QueryType;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
@@ -43,7 +43,7 @@ public final class SingleTableInventoryCalculateParameter {
      * Data source of source side or target side.
      * Do not close it, it will be reused later.
      */
-    private final PipelineDataSourceWrapper dataSource;
+    private final PipelineDataSource dataSource;
     
     private final CaseInsensitiveQualifiedTable table;
     
@@ -67,7 +67,7 @@ public final class SingleTableInventoryCalculateParameter {
     
     private final QueryType queryType;
     
-    public SingleTableInventoryCalculateParameter(final 
PipelineDataSourceWrapper dataSource, final CaseInsensitiveQualifiedTable 
table, final List<String> columnNames,
+    public SingleTableInventoryCalculateParameter(final PipelineDataSource 
dataSource, final CaseInsensitiveQualifiedTable table, final List<String> 
columnNames,
                                                   final 
List<PipelineColumnMetaData> uniqueKeys, final Object tableCheckPosition) {
         this.dataSource = dataSource;
         this.table = table;
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceWrapper.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSource.java
similarity index 95%
rename from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceWrapper.java
rename to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSource.java
index 6de0f2d2e81..9c8fdfb96a2 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceWrapper.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSource.java
@@ -40,7 +40,7 @@ import java.util.logging.Logger;
  */
 @RequiredArgsConstructor
 @Slf4j
-public final class PipelineDataSourceWrapper implements DataSource, 
AutoCloseable {
+public final class PipelineDataSource implements DataSource, AutoCloseable {
     
     private final DataSource dataSource;
     
@@ -50,7 +50,7 @@ public final class PipelineDataSourceWrapper implements 
DataSource, AutoCloseabl
     private final AtomicBoolean closed = new AtomicBoolean(false);
     
     @SneakyThrows(SQLException.class)
-    public PipelineDataSourceWrapper(final PipelineDataSourceConfiguration 
pipelineDataSourceConfig) {
+    public PipelineDataSource(final PipelineDataSourceConfiguration 
pipelineDataSourceConfig) {
         dataSource = 
TypedSPILoader.getService(PipelineDataSourceCreator.class, 
pipelineDataSourceConfig.getType()).create(pipelineDataSourceConfig.getDataSourceConfiguration());
         databaseType = pipelineDataSourceConfig.getDatabaseType();
     }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManager.java
index ad731b33cbc..d02dc6c3013 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManager.java
@@ -30,7 +30,7 @@ import java.util.concurrent.ConcurrentHashMap;
 @Slf4j
 public final class PipelineDataSourceManager implements AutoCloseable {
     
-    private final Map<PipelineDataSourceConfiguration, 
PipelineDataSourceWrapper> cachedDataSources = new ConcurrentHashMap<>();
+    private final Map<PipelineDataSourceConfiguration, PipelineDataSource> 
cachedDataSources = new ConcurrentHashMap<>();
     
     /**
      * Get cached data source.
@@ -38,8 +38,8 @@ public final class PipelineDataSourceManager implements 
AutoCloseable {
      * @param dataSourceConfig data source configuration
      * @return data source
      */
-    public PipelineDataSourceWrapper getDataSource(final 
PipelineDataSourceConfiguration dataSourceConfig) {
-        PipelineDataSourceWrapper result = 
cachedDataSources.get(dataSourceConfig);
+    public PipelineDataSource getDataSource(final 
PipelineDataSourceConfiguration dataSourceConfig) {
+        PipelineDataSource result = cachedDataSources.get(dataSourceConfig);
         if (null != result) {
             return result;
         }
@@ -51,7 +51,7 @@ public final class PipelineDataSourceManager implements 
AutoCloseable {
                 }
                 log.warn("{} is already closed, create again.", result);
             }
-            result = new PipelineDataSourceWrapper(dataSourceConfig);
+            result = new PipelineDataSource(dataSourceConfig);
             cachedDataSources.put(dataSourceConfig, result);
             return result;
         }
@@ -59,7 +59,7 @@ public final class PipelineDataSourceManager implements 
AutoCloseable {
     
     @Override
     public void close() {
-        for (PipelineDataSourceWrapper each : cachedDataSources.values()) {
+        for (PipelineDataSource each : cachedDataSources.values()) {
             if (each.isClosed()) {
                 continue;
             }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineSchemaUtils.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineSchemaUtils.java
index f1e78d620f8..b136fdd94d5 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineSchemaUtils.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineSchemaUtils.java
@@ -22,7 +22,7 @@ import lombok.NoArgsConstructor;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
 
 import java.sql.Connection;
 import java.sql.SQLException;
@@ -42,7 +42,7 @@ public final class PipelineSchemaUtils {
      */
     @SneakyThrows(SQLException.class)
     public static String getDefaultSchema(final 
PipelineDataSourceConfiguration dataSourceConfig) {
-        try (PipelineDataSourceWrapper dataSource = new 
PipelineDataSourceWrapper(dataSourceConfig)) {
+        try (PipelineDataSource dataSource = new 
PipelineDataSource(dataSourceConfig)) {
             try (Connection connection = dataSource.getConnection()) {
                 String result = connection.getSchema();
                 log.info("get default schema {}", result);
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/StandardPipelineTableMetaDataLoader.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/StandardPipelineTableMetaDataLoader.java
index aee15385a8b..2cde14d35b8 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/StandardPipelineTableMetaDataLoader.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/StandardPipelineTableMetaDataLoader.java
@@ -20,7 +20,7 @@ package 
org.apache.shardingsphere.data.pipeline.core.metadata.loader;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.DataConsistencyCheckUtils;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineIndexMetaData;
@@ -49,7 +49,7 @@ import java.util.stream.Collectors;
 @Slf4j
 public final class StandardPipelineTableMetaDataLoader implements 
PipelineTableMetaDataLoader {
     
-    private final PipelineDataSourceWrapper dataSource;
+    private final PipelineDataSource dataSource;
     
     private final Map<CaseInsensitiveIdentifier, PipelineTableMetaData> 
tableMetaDataMap = new ConcurrentHashMap<>();
     
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/incremental/IncrementalTaskPositionManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/incremental/IncrementalTaskPositionManager.java
index 154dbadc69d..8367dd211bf 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/incremental/IncrementalTaskPositionManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/incremental/IncrementalTaskPositionManager.java
@@ -22,7 +22,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfigurati
 import 
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
@@ -92,7 +92,7 @@ public final class IncrementalTaskPositionManager {
     private void destroyPosition(final String jobId,
                                  final 
ShardingSpherePipelineDataSourceConfiguration pipelineDataSourceConfig, final 
DialectIncrementalPositionManager positionInitializer) throws SQLException {
         for (DataSourcePoolProperties each : new 
YamlDataSourceConfigurationSwapper().getDataSourcePoolPropertiesMap(pipelineDataSourceConfig.getRootConfig()).values())
 {
-            try (PipelineDataSourceWrapper dataSource = new 
PipelineDataSourceWrapper(DataSourcePoolCreator.create(each), databaseType)) {
+            try (PipelineDataSource dataSource = new 
PipelineDataSource(DataSourcePoolCreator.create(each), databaseType)) {
                 positionInitializer.destroy(dataSource, jobId);
             }
         }
@@ -101,7 +101,7 @@ public final class IncrementalTaskPositionManager {
     private void destroyPosition(final String jobId, final 
StandardPipelineDataSourceConfiguration pipelineDataSourceConfig,
                                  final DialectIncrementalPositionManager 
positionInitializer) throws SQLException {
         try (
-                PipelineDataSourceWrapper dataSource = new 
PipelineDataSourceWrapper(
+                PipelineDataSource dataSource = new PipelineDataSource(
                         
DataSourcePoolCreator.create((DataSourcePoolProperties) 
pipelineDataSourceConfig.getDataSourceConfiguration()), databaseType)) {
             positionInitializer.destroy(dataSource, jobId);
         }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/InventoryRecordsCountCalculator.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/InventoryRecordsCountCalculator.java
index 51f1809ca48..4d99e401c58 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/InventoryRecordsCountCalculator.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/InventoryRecordsCountCalculator.java
@@ -21,7 +21,7 @@ import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobByUniqueKeyException;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
@@ -52,7 +52,7 @@ public final class InventoryRecordsCountCalculator {
      * @return table records count
      * @throws SplitPipelineJobByUniqueKeyException if there's exception from 
database
      */
-    public static long getTableRecordsCount(final InventoryDumperContext 
dumperContext, final PipelineDataSourceWrapper dataSource) {
+    public static long getTableRecordsCount(final InventoryDumperContext 
dumperContext, final PipelineDataSource dataSource) {
         String schemaName = 
dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
         String actualTableName = dumperContext.getActualTableName();
         PipelinePrepareSQLBuilder pipelineSQLBuilder = new 
PipelinePrepareSQLBuilder(dataSource.getDatabaseType());
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
index 6be5240a655..25d17e637be 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
@@ -21,7 +21,7 @@ import lombok.RequiredArgsConstructor;
 import org.apache.commons.lang3.Range;
 import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobByUniqueKeyException;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
@@ -54,7 +54,7 @@ import java.util.stream.Collectors;
 @RequiredArgsConstructor
 public final class InventoryDumperContextSplitter {
     
-    private final PipelineDataSourceWrapper sourceDataSource;
+    private final PipelineDataSource sourceDataSource;
     
     private final InventoryDumperContext dumperContext;
     
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitter.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitter.java
index b646d01ae8d..a3f1e3ff6f5 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitter.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitter.java
@@ -23,7 +23,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.channel.InventoryChannelCrea
 import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
 import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
 import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
 import 
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.importer.SingleChannelConsumerImporter;
@@ -48,7 +48,7 @@ import java.util.concurrent.atomic.AtomicReference;
 @Slf4j
 public final class InventoryTaskSplitter {
     
-    private final PipelineDataSourceWrapper sourceDataSource;
+    private final PipelineDataSource sourceDataSource;
     
     private final InventoryDumperContext dumperContext;
     
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCalculatorTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCalculatorTest.java
index cf5b661cb60..61f6c800bd0 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCalculatorTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCalculatorTest.java
@@ -19,7 +19,7 @@ package 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calc
 
 import 
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.SingleTableInventoryCalculatedResult;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
@@ -56,7 +56,7 @@ class CRC32SingleTableInventoryCalculatorTest {
     private SingleTableInventoryCalculateParameter parameter;
     
     @Mock
-    private PipelineDataSourceWrapper pipelineDataSource;
+    private PipelineDataSource pipelineDataSource;
     
     @Mock(answer = Answers.RETURNS_DEEP_STUBS)
     private Connection connection;
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceWrapperTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceTest.java
similarity index 90%
rename from 
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceWrapperTest.java
rename to 
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceTest.java
index d53460be6fe..72d6985a241 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceWrapperTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceTest.java
@@ -42,7 +42,7 @@ import static org.mockito.Mockito.when;
 
 @ExtendWith(MockitoExtension.class)
 @MockitoSettings(strictness = Strictness.LENIENT)
-class PipelineDataSourceWrapperTest {
+class PipelineDataSourceTest {
     
     private static final String CLIENT_USERNAME = "username";
     
@@ -74,7 +74,7 @@ class PipelineDataSourceWrapperTest {
     
     @Test
     void assertGetConnection() throws SQLException {
-        PipelineDataSourceWrapper dataSourceWrapper = new 
PipelineDataSourceWrapper(dataSource, 
TypedSPILoader.getService(DatabaseType.class, "FIXTURE"));
+        PipelineDataSource dataSourceWrapper = new 
PipelineDataSource(dataSource, TypedSPILoader.getService(DatabaseType.class, 
"FIXTURE"));
         assertThat(dataSourceWrapper.getConnection(), is(connection));
         assertThat(dataSourceWrapper.getConnection(CLIENT_USERNAME, 
CLIENT_PASSWORD), is(connection));
         assertGetLogWriter(dataSourceWrapper.getLogWriter());
@@ -102,12 +102,12 @@ class PipelineDataSourceWrapperTest {
     @Test
     void assertSetLoginTimeoutFailure() throws SQLException {
         doThrow(new 
SQLException("")).when(dataSource).setLoginTimeout(LOGIN_TIMEOUT);
-        assertThrows(SQLException.class, () -> new 
PipelineDataSourceWrapper(dataSource, 
TypedSPILoader.getService(DatabaseType.class, 
"FIXTURE")).setLoginTimeout(LOGIN_TIMEOUT));
+        assertThrows(SQLException.class, () -> new 
PipelineDataSource(dataSource, TypedSPILoader.getService(DatabaseType.class, 
"FIXTURE")).setLoginTimeout(LOGIN_TIMEOUT));
     }
     
     @Test
     void assertSetLogWriterFailure() throws SQLException {
         doThrow(new 
SQLException("")).when(dataSource).setLogWriter(printWriter);
-        assertThrows(SQLException.class, () -> new 
PipelineDataSourceWrapper(dataSource, 
TypedSPILoader.getService(DatabaseType.class, 
"FIXTURE")).setLogWriter(printWriter));
+        assertThrows(SQLException.class, () -> new 
PipelineDataSource(dataSource, TypedSPILoader.getService(DatabaseType.class, 
"FIXTURE")).setLogWriter(printWriter));
     }
 }
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java
 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java
index 36395dcc41f..ac1469b6e66 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java
@@ -21,7 +21,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSour
 import 
org.apache.shardingsphere.data.pipeline.core.channel.memory.MemoryPipelineChannel;
 import 
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.DumperCommonContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.ActualAndLogicTableNameMapper;
@@ -111,7 +111,7 @@ class MySQLIncrementalDumperTest {
     private void initTableData(final IncrementalDumperContext dumperContext) 
throws SQLException {
         try (
                 PipelineDataSourceManager dataSourceManager = new 
PipelineDataSourceManager();
-                PipelineDataSourceWrapper dataSource = 
dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig());
+                PipelineDataSource dataSource = 
dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig());
                 Connection connection = dataSource.getConnection();
                 Statement statement = connection.createStatement()) {
             statement.execute("DROP TABLE IF EXISTS t_order");
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/wal/WALEventConverterTest.java
 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/wal/WALEventConverterTest.java
index 90b424b4805..b4294fe387e 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/wal/WALEventConverterTest.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/wal/WALEventConverterTest.java
@@ -20,7 +20,7 @@ package 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.incremental.wa
 import 
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.DumperCommonContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.ActualAndLogicTableNameMapper;
@@ -101,7 +101,7 @@ class WALEventConverterTest {
     private void initTableData(final IncrementalDumperContext dumperContext) 
throws SQLException {
         try (
                 PipelineDataSourceManager dataSourceManager = new 
PipelineDataSourceManager();
-                PipelineDataSourceWrapper dataSource = 
dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig());
+                PipelineDataSource dataSource = 
dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig());
                 Connection connection = dataSource.getConnection();
                 Statement statement = connection.createStatement()) {
             statement.execute("DROP TABLE IF EXISTS t_order");
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java
index a3bd6cbb0c5..f1d189aa5af 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java
@@ -27,7 +27,7 @@ import 
org.apache.shardingsphere.data.pipeline.cdc.config.CDCTaskConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
 import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
@@ -75,10 +75,10 @@ public final class CDCJobItemContext implements 
TransmissionJobItemContext {
     
     private final AtomicLong inventoryRecordsCount = new AtomicLong(0L);
     
-    private final LazyInitializer<PipelineDataSourceWrapper> 
sourceDataSourceLazyInitializer = new 
LazyInitializer<PipelineDataSourceWrapper>() {
+    private final LazyInitializer<PipelineDataSource> 
sourceDataSourceLazyInitializer = new LazyInitializer<PipelineDataSource>() {
         
         @Override
-        protected PipelineDataSourceWrapper initialize() {
+        protected PipelineDataSource initialize() {
             return 
dataSourceManager.getDataSource(taskConfig.getDumperContext().getCommonContext().getDataSourceConfig());
         }
     };
@@ -128,7 +128,7 @@ public final class CDCJobItemContext implements 
TransmissionJobItemContext {
      * @return source data source
      */
     @SneakyThrows(ConcurrentException.class)
-    public PipelineDataSourceWrapper getSourceDataSource() {
+    public PipelineDataSource getSourceDataSource() {
         return sourceDataSourceLazyInitializer.get();
     }
     
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
index 236381a9cca..0e7c241cd8d 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
@@ -27,7 +27,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.datanode.DataNodeUtils;
 import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeEntry;
 import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLine;
 import 
org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLineConvertUtils;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.yaml.config.YamlPipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.TableAndSchemaNameMapper;
@@ -321,7 +321,7 @@ public final class MigrationJobAPI implements 
TransmissionJobAPI {
         PipelinePrepareSQLBuilder pipelineSQLBuilder = new 
PipelinePrepareSQLBuilder(jobConfig.getTargetDatabaseType());
         TableAndSchemaNameMapper mapping = new 
TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap());
         try (
-                PipelineDataSourceWrapper dataSource = new 
PipelineDataSourceWrapper(jobConfig.getTarget());
+                PipelineDataSource dataSource = new 
PipelineDataSource(jobConfig.getTarget());
                 Connection connection = dataSource.getConnection()) {
             for (String each : jobConfig.getTargetTableNames()) {
                 String targetSchemaName = mapping.getSchemaName(each);
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
index 2ed17f99198..1f5d8137572 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
@@ -30,7 +30,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.datanode.DataNodeUtils;
 import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeEntry;
 import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLine;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
@@ -128,8 +128,8 @@ public final class MigrationDataConsistencyChecker 
implements PipelineDataConsis
                                                                           
final TableDataConsistencyChecker tableChecker, final PipelineDataSourceManager 
dataSourceManager) {
         CaseInsensitiveQualifiedTable sourceTable = new 
CaseInsensitiveQualifiedTable(dataNode.getSchemaName(), 
dataNode.getTableName());
         CaseInsensitiveQualifiedTable targetTable = new 
CaseInsensitiveQualifiedTable(dataNode.getSchemaName(), targetTableName);
-        PipelineDataSourceWrapper sourceDataSource = 
dataSourceManager.getDataSource(jobConfig.getSources().get(dataNode.getDataSourceName()));
-        PipelineDataSourceWrapper targetDataSource = 
dataSourceManager.getDataSource(jobConfig.getTarget());
+        PipelineDataSource sourceDataSource = 
dataSourceManager.getDataSource(jobConfig.getSources().get(dataNode.getDataSourceName()));
+        PipelineDataSource targetDataSource = 
dataSourceManager.getDataSource(jobConfig.getTarget());
         PipelineTableMetaDataLoader metaDataLoader = new 
StandardPipelineTableMetaDataLoader(sourceDataSource);
         PipelineTableMetaData tableMetaData = 
metaDataLoader.getTableMetaData(dataNode.getSchemaName(), 
dataNode.getTableName());
         ShardingSpherePreconditions.checkNotNull(tableMetaData, () -> new 
PipelineTableDataConsistencyCheckLoadingFailedException(dataNode.getSchemaName(),
 dataNode.getTableName()));
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
index 743ca8f47e4..f2d5c695e72 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
@@ -25,7 +25,7 @@ import org.apache.commons.lang3.concurrent.LazyInitializer;
 import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
 import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
@@ -77,10 +77,10 @@ public final class MigrationJobItemContext implements 
TransmissionJobItemContext
     
     private final PipelineDataSourceManager dataSourceManager;
     
-    private final LazyInitializer<PipelineDataSourceWrapper> 
sourceDataSourceLazyInitializer = new 
LazyInitializer<PipelineDataSourceWrapper>() {
+    private final LazyInitializer<PipelineDataSource> 
sourceDataSourceLazyInitializer = new LazyInitializer<PipelineDataSource>() {
         
         @Override
-        protected PipelineDataSourceWrapper initialize() {
+        protected PipelineDataSource initialize() {
             return 
dataSourceManager.getDataSource(taskConfig.getDumperContext().getCommonContext().getDataSourceConfig());
         }
     };
@@ -115,7 +115,7 @@ public final class MigrationJobItemContext implements 
TransmissionJobItemContext
      * @return source data source
      */
     @SneakyThrows(ConcurrentException.class)
-    public PipelineDataSourceWrapper getSourceDataSource() {
+    public PipelineDataSource getSourceDataSource() {
         return sourceDataSourceLazyInitializer.get();
     }
     
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
index 2414c46dbc2..981473f86ba 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
@@ -25,7 +25,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.checker.PipelineDataSourceCh
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
 import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCancelingException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
@@ -147,7 +147,7 @@ public final class MigrationJobPreparer implements 
PipelineJobPreparer<Migration
             prepareTarget(jobItemContext, targetDatabaseType, contextManager);
         }
         if (null == jobItemContext.getInitProgress()) {
-            PipelineDataSourceWrapper targetDataSource = 
jobItemContext.getDataSourceManager().getDataSource(jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig());
+            PipelineDataSource targetDataSource = 
jobItemContext.getDataSourceManager().getDataSource(jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig());
             new 
PipelineDataSourceCheckEngine(targetDatabaseType).checkTargetDataSources(Collections.singleton(targetDataSource),
 jobItemContext.getTaskConfig().getImporterConfig());
         }
     }
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
index 5d5ca34916f..f6624e35d22 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
@@ -24,7 +24,7 @@ import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.cdc.CDCJobType;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
 import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
 import 
org.apache.shardingsphere.infra.database.core.connector.url.JdbcUrlAppender;
@@ -612,7 +612,7 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
         YamlSingleRuleConfiguration singleRuleConfig = new 
YamlSingleRuleConfiguration();
         singleRuleConfig.setTables(Collections.singletonList("*.*"));
         rootConfig.getRules().add(singleRuleConfig);
-        return new PipelineDataSourceWrapper(new 
ShardingSpherePipelineDataSourceConfiguration(rootConfig));
+        return new PipelineDataSource(new 
ShardingSpherePipelineDataSourceConfiguration(rootConfig));
     }
     
     private YamlRootConfiguration getYamlRootConfig() {
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
index cb5bc9c27ca..291f7dea30d 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
@@ -31,7 +31,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.Consistency
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryCheckParameter;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
 import 
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
@@ -107,7 +107,7 @@ class CDCE2EIT {
             try (Connection connection = 
containerComposer.getProxyDataSource().getConnection()) {
                 initSchemaAndTable(containerComposer, connection, 3);
             }
-            PipelineDataSourceWrapper sourceDataSource = new 
PipelineDataSourceWrapper(containerComposer.generateShardingSphereDataSourceFromProxy(),
 containerComposer.getDatabaseType());
+            PipelineDataSource sourceDataSource = new 
PipelineDataSource(containerComposer.generateShardingSphereDataSourceFromProxy(),
 containerComposer.getDatabaseType());
             Pair<List<Object[]>, List<Object[]>> dataPair = 
PipelineCaseHelper.generateFullInsertData(containerComposer.getDatabaseType(), 
PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
             log.info("init data begin: {}", LocalDateTime.now());
             DataSourceExecuteUtils.execute(sourceDataSource, 
containerComposer.getExtraSQLCommand().getFullInsertOrder(SOURCE_TABLE_NAME), 
dataPair.getLeft());
@@ -119,7 +119,7 @@ class CDCE2EIT {
                             containerComposer.getUsername(), 
containerComposer.getPassword())) {
                 initSchemaAndTable(containerComposer, connection, 0);
             }
-            PipelineDataSourceWrapper targetDataSource = 
createStandardDataSource(containerComposer, PipelineContainerComposer.DS_4);
+            PipelineDataSource targetDataSource = 
createStandardDataSource(containerComposer, PipelineContainerComposer.DS_4);
             final CDCClient cdcClient = 
buildCDCClientAndStart(targetDataSource, containerComposer);
             Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(1L, 
TimeUnit.SECONDS).until(() -> !containerComposer.queryForListWithLog("SHOW 
STREAMING LIST").isEmpty());
             String jobId = containerComposer.queryForListWithLog("SHOW 
STREAMING LIST").get(0).get("id").toString();
@@ -168,15 +168,15 @@ class CDCE2EIT {
         }
     }
     
-    private PipelineDataSourceWrapper createStandardDataSource(final 
PipelineContainerComposer containerComposer, final String storageUnitName) {
+    private PipelineDataSource createStandardDataSource(final 
PipelineContainerComposer containerComposer, final String storageUnitName) {
         Map<String, Object> poolProps = new HashMap<>(3, 1F);
         poolProps.put("url", 
containerComposer.getActualJdbcUrlTemplate(storageUnitName, false));
         poolProps.put("username", containerComposer.getUsername());
         poolProps.put("password", containerComposer.getPassword());
-        return new PipelineDataSourceWrapper(new 
StandardPipelineDataSourceConfiguration(poolProps));
+        return new PipelineDataSource(new 
StandardPipelineDataSourceConfiguration(poolProps));
     }
     
-    private CDCClient buildCDCClientAndStart(final PipelineDataSourceWrapper 
dataSource, final PipelineContainerComposer containerComposer) {
+    private CDCClient buildCDCClientAndStart(final PipelineDataSource 
dataSource, final PipelineContainerComposer containerComposer) {
         DataSourceRecordConsumer recordConsumer = new 
DataSourceRecordConsumer(dataSource, containerComposer.getDatabaseType());
         CDCClient result = new CDCClient(new 
CDCClientConfiguration("localhost", 
containerComposer.getContainerComposer().getProxyCDCPort(), 5000));
         result.connect(recordConsumer, new 
RetryStreamingExceptionHandler(result, 5, 5000), (ctx, serverErrorResult) -> 
log.error("Server error: {}", serverErrorResult.getErrorMessage()));
@@ -186,7 +186,7 @@ class CDCE2EIT {
         return result;
     }
     
-    private void assertDataMatched(final PipelineDataSourceWrapper 
sourceDataSource, final PipelineDataSourceWrapper targetDataSource, final 
CaseInsensitiveQualifiedTable schemaTableName) {
+    private void assertDataMatched(final PipelineDataSource sourceDataSource, 
final PipelineDataSource targetDataSource, final CaseInsensitiveQualifiedTable 
schemaTableName) {
         StandardPipelineTableMetaDataLoader metaDataLoader = new 
StandardPipelineTableMetaDataLoader(targetDataSource);
         PipelineTableMetaData tableMetaData = 
metaDataLoader.getTableMetaData(schemaTableName.getSchemaName().toString(), 
schemaTableName.getTableName().toString());
         List<PipelineColumnMetaData> uniqueKeys = 
Collections.singletonList(tableMetaData.getColumnMetaData(tableMetaData.getPrimaryKeyColumns().get(0)));
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/DataSourceRecordConsumer.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/DataSourceRecordConsumer.java
index f7a43258d83..83ff14b84c7 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/DataSourceRecordConsumer.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/DataSourceRecordConsumer.java
@@ -25,7 +25,7 @@ import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordR
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record.DataChangeType;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record.MetaData;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.TableColumn;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
@@ -59,7 +59,7 @@ public final class DataSourceRecordConsumer implements 
Consumer<List<Record>> {
     
     public DataSourceRecordConsumer(final DataSource dataSource, final 
DatabaseType databaseType) {
         this.dataSource = dataSource;
-        loader = new StandardPipelineTableMetaDataLoader(new 
PipelineDataSourceWrapper(dataSource, databaseType));
+        loader = new StandardPipelineTableMetaDataLoader(new 
PipelineDataSource(dataSource, databaseType));
     }
     
     @Override
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/common/PipelineDataSourceManagerTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/common/PipelineDataSourceManagerTest.java
index 17aac2f0997..c6805cb1328 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/common/PipelineDataSourceManagerTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/common/PipelineDataSourceManagerTest.java
@@ -20,7 +20,7 @@ package 
org.apache.shardingsphere.test.it.data.pipeline.common;
 import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.config.PipelineDataSourceConfigurationFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
 import 
org.apache.shardingsphere.test.it.data.pipeline.core.util.JobConfigurationBuilder;
 import 
org.apache.shardingsphere.test.it.data.pipeline.core.util.PipelineContextUtils;
@@ -56,7 +56,7 @@ class PipelineDataSourceManagerTest {
         try (PipelineDataSourceManager dataSourceManager = new 
PipelineDataSourceManager()) {
             PipelineDataSourceConfiguration source = 
jobConfig.getSources().values().iterator().next();
             DataSource actual = 
dataSourceManager.getDataSource(PipelineDataSourceConfigurationFactory.newInstance(source.getType(),
 source.getParameter()));
-            assertThat(actual, instanceOf(PipelineDataSourceWrapper.class));
+            assertThat(actual, instanceOf(PipelineDataSource.class));
         }
     }
     
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/common/metadata/loader/StandardPipelineTableMetaDataLoaderTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/common/metadata/loader/StandardPipelineTableMetaDataLoaderTest.java
index cae26d5244d..641c6ac2bf8 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/common/metadata/loader/StandardPipelineTableMetaDataLoaderTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/common/metadata/loader/StandardPipelineTableMetaDataLoaderTest.java
@@ -19,7 +19,7 @@ package 
org.apache.shardingsphere.test.it.data.pipeline.common.metadata.loader;
 
 import com.zaxxer.hikari.HikariDataSource;
 import lombok.SneakyThrows;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineIndexMetaData;
@@ -42,11 +42,11 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 class StandardPipelineTableMetaDataLoaderTest {
     
-    private PipelineDataSourceWrapper dataSource;
+    private PipelineDataSource dataSource;
     
     @BeforeEach
     void setUp() {
-        dataSource = new PipelineDataSourceWrapper(createHikariDataSource(), 
TypedSPILoader.getService(DatabaseType.class, "H2"));
+        dataSource = new PipelineDataSource(createHikariDataSource(), 
TypedSPILoader.getService(DatabaseType.class, "H2"));
     }
     
     private HikariDataSource createHikariDataSource() {
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculatorTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculatorTest.java
index 532822549ee..c36790e51e0 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculatorTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculatorTest.java
@@ -23,7 +23,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.Reco
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.SingleTableInventoryCalculatedResult;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.RecordSingleTableInventoryCalculator;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.SingleTableInventoryCalculateParameter;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.QueryRange;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.QueryType;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
@@ -51,11 +51,11 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 class RecordSingleTableInventoryCalculatorTest {
     
-    private static PipelineDataSourceWrapper dataSource;
+    private static PipelineDataSource dataSource;
     
     @BeforeAll
     static void setUp() throws Exception {
-        dataSource = new 
PipelineDataSourceWrapper(createHikariDataSource("calc_" + 
RandomStringUtils.randomAlphanumeric(9)), 
TypedSPILoader.getService(DatabaseType.class, "H2"));
+        dataSource = new PipelineDataSource(createHikariDataSource("calc_" + 
RandomStringUtils.randomAlphanumeric(9)), 
TypedSPILoader.getService(DatabaseType.class, "H2"));
         createTableAndInitData(dataSource);
     }
     
@@ -76,7 +76,7 @@ class RecordSingleTableInventoryCalculatorTest {
         return result;
     }
     
-    private static void createTableAndInitData(final PipelineDataSourceWrapper 
dataSource) throws SQLException {
+    private static void createTableAndInitData(final PipelineDataSource 
dataSource) throws SQLException {
         try (Connection connection = dataSource.getConnection()) {
             String sql = "CREATE TABLE t_order (user_id INT NOT NULL, order_id 
INT, status VARCHAR(12), PRIMARY KEY (user_id, order_id))";
             connection.createStatement().execute(sql);
@@ -123,7 +123,7 @@ class RecordSingleTableInventoryCalculatorTest {
         assertThat(actual.getMaxUniqueKeyValue().get(), is(9));
     }
     
-    private SingleTableInventoryCalculateParameter generateParameter(final 
PipelineDataSourceWrapper dataSource, final Object dataCheckPosition) {
+    private SingleTableInventoryCalculateParameter generateParameter(final 
PipelineDataSource dataSource, final Object dataCheckPosition) {
         List<PipelineColumnMetaData> uniqueKeys = 
Collections.singletonList(new PipelineColumnMetaData(1, "order_id", 
Types.INTEGER, "integer", false, true, true));
         return new SingleTableInventoryCalculateParameter(dataSource, new 
CaseInsensitiveQualifiedTable(null, "t_order"), Collections.emptyList(), 
uniqueKeys, dataCheckPosition);
     }
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
index 8b3aef32347..2fd7282016d 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
@@ -21,7 +21,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.DumperCommonCo
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.IntegerPrimaryKeyIngestPosition;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
@@ -124,7 +124,7 @@ class InventoryTaskSplitterTest {
     @Test
     void assertSplitWithMultipleColumnsKey() throws SQLException {
         initUnionPrimaryEnvironment(dumperContext.getCommonContext());
-        try (PipelineDataSourceWrapper dataSource = 
dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig()))
 {
+        try (PipelineDataSource dataSource = 
dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig()))
 {
             List<PipelineColumnMetaData> uniqueKeyColumns = 
PipelineTableMetaDataUtils.getUniqueKeyColumns(null, "t_order", new 
StandardPipelineTableMetaDataLoader(dataSource));
             dumperContext.setUniqueKeyColumns(uniqueKeyColumns);
             List<InventoryTask> actual = 
inventoryTaskSplitter.split(jobItemContext);
@@ -135,7 +135,7 @@ class InventoryTaskSplitterTest {
     @Test
     void assertSplitWithoutPrimaryAndUniqueIndex() throws SQLException {
         initNoPrimaryEnvironment(dumperContext.getCommonContext());
-        try (PipelineDataSourceWrapper dataSource = 
dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig()))
 {
+        try (PipelineDataSource dataSource = 
dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig()))
 {
             List<PipelineColumnMetaData> uniqueKeyColumns = 
PipelineTableMetaDataUtils.getUniqueKeyColumns(null, "t_order", new 
StandardPipelineTableMetaDataLoader(dataSource));
             assertTrue(uniqueKeyColumns.isEmpty());
             List<InventoryTask> inventoryTasks = 
inventoryTaskSplitter.split(jobItemContext);
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java
index a2c28dc9134..f96b1141e90 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java
@@ -18,7 +18,7 @@
 package org.apache.shardingsphere.test.it.data.pipeline.core.task;
 
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
 import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
@@ -86,7 +86,7 @@ class InventoryTaskTest {
     private void initTableData(final IncrementalDumperContext dumperContext) 
throws SQLException {
         PipelineDataSourceManager dataSourceManager = new 
PipelineDataSourceManager();
         try (
-                PipelineDataSourceWrapper dataSource = 
dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig());
+                PipelineDataSource dataSource = 
dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig());
                 Connection connection = dataSource.getConnection();
                 Statement statement = connection.createStatement()) {
             statement.execute("DROP TABLE IF EXISTS t_order");
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java
index 6e88c183f8b..394da9ff2c9 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java
@@ -24,7 +24,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfigurati
 import 
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.yaml.config.YamlPipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId;
@@ -82,7 +82,7 @@ public final class JobConfigurationBuilder {
         PipelineDataSourceConfiguration sourceDataSourceConfig = new 
StandardPipelineDataSourceConfiguration(
                 
ConfigurationFileUtils.readFile("migration_standard_jdbc_source.yaml").replace("${databaseNameSuffix}",
 databaseNameSuffix));
         try (
-                PipelineDataSourceWrapper dataSource = new 
PipelineDataSourceWrapper(sourceDataSourceConfig);
+                PipelineDataSource dataSource = new 
PipelineDataSource(sourceDataSourceConfig);
                 Connection connection = dataSource.getConnection();
                 Statement statement = connection.createStatement()) {
             
statement.execute(PipelineContextUtils.getCreateOrderTableSchema());
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
index c638c6b71d2..a7c22224d65 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
@@ -26,7 +26,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessC
 import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeEntry;
 import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLine;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.config.PipelineDataSourceConfigurationFactory;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
 import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.config.YamlTransmissionJobItemProgress;
@@ -293,7 +293,7 @@ class MigrationJobAPITest {
         Map<String, DataSourcePoolProperties> metaDataDataSource = new 
PipelineDataSourcePersistService().load(PipelineContextUtils.getContextKey(), 
"MIGRATION");
         DataSourcePoolProperties props = metaDataDataSource.get("ds_0");
         try (
-                PipelineDataSourceWrapper dataSource = new 
PipelineDataSourceWrapper(DataSourcePoolCreator.create(props), databaseType);
+                PipelineDataSource dataSource = new 
PipelineDataSource(DataSourcePoolCreator.create(props), databaseType);
                 Connection connection = dataSource.getConnection();
                 Statement statement = connection.createStatement()) {
             statement.execute("DROP TABLE IF EXISTS t_order");
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
index 6a4336e8058..ae5953b44f6 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
@@ -23,7 +23,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.Tabl
 import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
 import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
 import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.PipelineGovernanceFacade;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.check.consistency.MigrationDataConsistencyChecker;
@@ -89,7 +89,7 @@ class MigrationDataConsistencyCheckerTest {
     private void initTableData(final PipelineDataSourceConfiguration 
dataSourceConfig) throws SQLException {
         try (
                 PipelineDataSourceManager dataSourceManager = new 
PipelineDataSourceManager();
-                PipelineDataSourceWrapper dataSource = 
dataSourceManager.getDataSource(dataSourceConfig);
+                PipelineDataSource dataSource = 
dataSourceManager.getDataSource(dataSourceConfig);
                 Connection connection = dataSource.getConnection();
                 Statement statement = connection.createStatement()) {
             statement.execute("DROP TABLE IF EXISTS t_order");

Reply via email to