This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new feb9ae48679 Refactor DumperCommonContext (#28946)
feb9ae48679 is described below

commit feb9ae4867964187b1f49e5812437f6ff329e0bc
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Nov 5 15:55:13 2023 +0800

    Refactor DumperCommonContext (#28946)
---
 .../ingest/dumper/context/DumperCommonContext.java   |  2 +-
 .../dumper/context/IncrementalDumperContext.java     |  8 ++++++--
 .../dumper/context/InventoryDumperContext.java       | 17 ++++++++++-------
 .../data/pipeline/core/dumper/InventoryDumper.java   | 17 +++++++++--------
 .../preparer/InventoryRecordsCountCalculator.java    |  2 +-
 .../core/preparer/InventoryTaskSplitter.java         | 16 ++++++++--------
 .../core/preparer/PipelineJobPreparerUtils.java      |  4 ++--
 .../data/pipeline/core/task/PipelineTaskUtils.java   |  2 +-
 .../mysql/ingest/MySQLIncrementalDumper.java         | 13 +++++++------
 .../mysql/ingest/MySQLIncrementalDumperTest.java     | 15 ++++++++-------
 .../opengauss/ingest/OpenGaussWALDumper.java         |  4 ++--
 .../postgresql/ingest/PostgreSQLWALDumper.java       |  4 ++--
 .../postgresql/ingest/wal/WALEventConverter.java     |  8 ++++----
 .../postgresql/ingest/PostgreSQLWALDumperTest.java   | 15 +++++++++------
 .../postgresql/ingest/wal/WALEventConverterTest.java | 15 ++++++++-------
 .../data/pipeline/cdc/api/impl/CDCJobAPI.java        | 14 ++++++++------
 .../data/pipeline/cdc/context/CDCJobItemContext.java |  4 ++--
 .../pipeline/cdc/core/prepare/CDCJobPreparer.java    | 20 +++++++++++---------
 .../scenario/migration/api/impl/MigrationJobAPI.java |  8 +++++---
 .../MigrationIncrementalDumperContextCreator.java    | 12 +++++++-----
 .../migration/context/MigrationJobItemContext.java   |  2 +-
 .../migration/prepare/MigrationJobPreparer.java      | 16 +++++++++-------
 .../job/service/GovernanceRepositoryAPIImplTest.java |  4 ++--
 .../core/prepare/InventoryTaskSplitterTest.java      | 18 +++++++++---------
 .../data/pipeline/core/task/IncrementalTaskTest.java |  2 +-
 .../data/pipeline/core/task/InventoryTaskTest.java   |  9 +++++----
 .../MigrationDataConsistencyCheckerTest.java         |  2 +-
 27 files changed, 139 insertions(+), 114 deletions(-)

diff --git 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java
 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java
index 9166f609df9..cdcbf6b0f3f 100644
--- 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java
+++ 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java
@@ -31,7 +31,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPositio
 @Getter
 @Setter
 @ToString(exclude = {"dataSourceConfig", "tableAndSchemaNameMapper"})
-public abstract class DumperCommonContext {
+public final class DumperCommonContext {
     
     private String dataSourceName;
     
diff --git 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/IncrementalDumperContext.java
 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/IncrementalDumperContext.java
index 4ea3cb2c5da..4b7697c7dfa 100644
--- 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/IncrementalDumperContext.java
+++ 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/IncrementalDumperContext.java
@@ -18,16 +18,20 @@
 package org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context;
 
 import lombok.Getter;
+import lombok.RequiredArgsConstructor;
 import lombok.Setter;
 import lombok.ToString;
 
 /**
  * Incremental dumper context.
  */
+@RequiredArgsConstructor
 @Getter
 @Setter
-@ToString(callSuper = true)
-public final class IncrementalDumperContext extends DumperCommonContext {
+@ToString
+public final class IncrementalDumperContext {
+    
+    private final DumperCommonContext commonContext;
     
     private String jobId;
     
diff --git 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/InventoryDumperContext.java
 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/InventoryDumperContext.java
index e96375d8c6e..2e1d44438d7 100644
--- 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/InventoryDumperContext.java
+++ 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/InventoryDumperContext.java
@@ -30,8 +30,10 @@ import java.util.List;
  */
 @Getter
 @Setter
-@ToString(callSuper = true)
-public final class InventoryDumperContext extends DumperCommonContext {
+@ToString
+public final class InventoryDumperContext {
+    
+    private final DumperCommonContext commonContext;
     
     private String actualTableName;
     
@@ -51,11 +53,12 @@ public final class InventoryDumperContext extends 
DumperCommonContext {
     
     private JobRateLimitAlgorithm rateLimitAlgorithm;
     
-    public InventoryDumperContext(final DumperCommonContext dumperContext) {
-        setDataSourceName(dumperContext.getDataSourceName());
-        setDataSourceConfig(dumperContext.getDataSourceConfig());
-        setTableNameMapper(dumperContext.getTableNameMapper());
-        
setTableAndSchemaNameMapper(dumperContext.getTableAndSchemaNameMapper());
+    public InventoryDumperContext(final DumperCommonContext commonContext) {
+        this.commonContext = new DumperCommonContext();
+        
this.commonContext.setDataSourceName(commonContext.getDataSourceName());
+        
this.commonContext.setDataSourceConfig(commonContext.getDataSourceConfig());
+        
this.commonContext.setTableNameMapper(commonContext.getTableNameMapper());
+        
this.commonContext.setTableAndSchemaNameMapper(commonContext.getTableAndSchemaNameMapper());
     }
     
     /**
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
index 36943d542bd..ebd0f8183ee 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
@@ -88,7 +88,7 @@ public final class InventoryDumper extends 
AbstractLifecycleExecutor implements
         this.dumperContext = dumperContext;
         this.channel = channel;
         this.dataSource = dataSource;
-        DatabaseType databaseType = 
dumperContext.getDataSourceConfig().getDatabaseType();
+        DatabaseType databaseType = 
dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType();
         inventoryDumpSQLBuilder = new 
PipelineInventoryDumpSQLBuilder(databaseType);
         columnValueReaderEngine = new ColumnValueReaderEngine(databaseType);
         this.metaDataLoader = metaDataLoader;
@@ -96,13 +96,13 @@ public final class InventoryDumper extends 
AbstractLifecycleExecutor implements
     
     @Override
     protected void runBlocking() {
-        IngestPosition position = dumperContext.getPosition();
+        IngestPosition position = 
dumperContext.getCommonContext().getPosition();
         if (position instanceof FinishedPosition) {
             log.info("Ignored because of already finished.");
             return;
         }
         PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(
-                
dumperContext.getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName()),
 dumperContext.getActualTableName());
+                
dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName()),
 dumperContext.getActualTableName());
         try (Connection connection = dataSource.getConnection()) {
             dump(tableMetaData, connection);
         } catch (final SQLException ex) {
@@ -114,7 +114,7 @@ public final class InventoryDumper extends 
AbstractLifecycleExecutor implements
     @SuppressWarnings("MagicConstant")
     private void dump(final PipelineTableMetaData tableMetaData, final 
Connection connection) throws SQLException {
         int batchSize = dumperContext.getBatchSize();
-        DatabaseType databaseType = 
dumperContext.getDataSourceConfig().getDatabaseType();
+        DatabaseType databaseType = 
dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType();
         if (null != dumperContext.getTransactionIsolation()) {
             
connection.setTransactionIsolation(dumperContext.getTransactionIsolation());
         }
@@ -156,11 +156,11 @@ public final class InventoryDumper extends 
AbstractLifecycleExecutor implements
         if (!Strings.isNullOrEmpty(dumperContext.getQuerySQL())) {
             return dumperContext.getQuerySQL();
         }
-        String schemaName = 
dumperContext.getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
+        String schemaName = 
dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
         if (!dumperContext.hasUniqueKey()) {
             return inventoryDumpSQLBuilder.buildFetchAllSQL(schemaName, 
dumperContext.getActualTableName());
         }
-        PrimaryKeyPosition<?> primaryKeyPosition = (PrimaryKeyPosition<?>) 
dumperContext.getPosition();
+        PrimaryKeyPosition<?> primaryKeyPosition = (PrimaryKeyPosition<?>) 
dumperContext.getCommonContext().getPosition();
         PipelineColumnMetaData firstColumn = 
dumperContext.getUniqueKeyColumns().get(0);
         Collection<String> columnNames = Collections.singleton("*");
         if (PipelineJdbcUtils.isIntegerColumn(firstColumn.getDataType()) || 
PipelineJdbcUtils.isStringColumn(firstColumn.getDataType())) {
@@ -179,7 +179,7 @@ public final class InventoryDumper extends 
AbstractLifecycleExecutor implements
             return;
         }
         PipelineColumnMetaData firstColumn = 
dumperContext.getUniqueKeyColumns().get(0);
-        PrimaryKeyPosition<?> position = (PrimaryKeyPosition<?>) 
dumperContext.getPosition();
+        PrimaryKeyPosition<?> position = (PrimaryKeyPosition<?>) 
dumperContext.getCommonContext().getPosition();
         if (PipelineJdbcUtils.isIntegerColumn(firstColumn.getDataType()) && 
null != position.getBeginValue() && null != position.getEndValue()) {
             preparedStatement.setObject(1, position.getBeginValue());
             preparedStatement.setObject(2, position.getEndValue());
@@ -212,7 +212,8 @@ public final class InventoryDumper extends 
AbstractLifecycleExecutor implements
     
     private IngestPosition newPosition(final ResultSet resultSet) throws 
SQLException {
         return dumperContext.hasUniqueKey()
-                ? 
PrimaryKeyPositionFactory.newInstance(resultSet.getObject(dumperContext.getUniqueKeyColumns().get(0).getName()),
 ((PrimaryKeyPosition<?>) dumperContext.getPosition()).getEndValue())
+                ? PrimaryKeyPositionFactory.newInstance(
+                        
resultSet.getObject(dumperContext.getUniqueKeyColumns().get(0).getName()), 
((PrimaryKeyPosition<?>) 
dumperContext.getCommonContext().getPosition()).getEndValue())
                 : new PlaceholderPosition();
     }
     
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java
index f1e46de7bee..5610c5bf0d7 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java
@@ -51,7 +51,7 @@ public final class InventoryRecordsCountCalculator {
      * @throws SplitPipelineJobByUniqueKeyException if there's exception from 
database
      */
     public static long getTableRecordsCount(final InventoryDumperContext 
dumperContext, final PipelineDataSourceWrapper dataSource) {
-        String schemaName = 
dumperContext.getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
+        String schemaName = 
dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
         String actualTableName = dumperContext.getActualTableName();
         PipelineCommonSQLBuilder pipelineSQLBuilder = new 
PipelineCommonSQLBuilder(dataSource.getDatabaseType());
         Optional<String> sql = 
pipelineSQLBuilder.buildEstimatedCountSQL(schemaName, actualTableName);
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java
index 97159392918..f497583c9f8 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java
@@ -83,7 +83,7 @@ public final class InventoryTaskSplitter {
         long startTimeMillis = System.currentTimeMillis();
         InventoryIncrementalProcessContext processContext = 
jobItemContext.getJobProcessContext();
         for (InventoryDumperContext each : 
splitInventoryDumperContext(jobItemContext)) {
-            AtomicReference<IngestPosition> position = new 
AtomicReference<>(each.getPosition());
+            AtomicReference<IngestPosition> position = new 
AtomicReference<>(each.getCommonContext().getPosition());
             PipelineChannel channel = 
PipelineTaskUtils.createInventoryChannel(processContext.getPipelineChannelCreator(),
 importerConfig.getBatchSize(), position);
             Dumper dumper = new InventoryDumper(each, channel, 
sourceDataSource, jobItemContext.getSourceMetaDataLoader());
             Importer importer = new SingleChannelConsumerImporter(channel, 
importerConfig.getBatchSize(), 3, TimeUnit.SECONDS, jobItemContext.getSink(), 
jobItemContext);
@@ -110,12 +110,12 @@ public final class InventoryTaskSplitter {
     
     private Collection<InventoryDumperContext> splitByTable(final 
InventoryDumperContext dumperContext) {
         Collection<InventoryDumperContext> result = new LinkedList<>();
-        dumperContext.getTableNameMapper().getTableNameMap().forEach((key, 
value) -> {
-            InventoryDumperContext inventoryDumperContext = new 
InventoryDumperContext(dumperContext);
+        
dumperContext.getCommonContext().getTableNameMapper().getTableNameMap().forEach((key,
 value) -> {
+            InventoryDumperContext inventoryDumperContext = new 
InventoryDumperContext(dumperContext.getCommonContext());
             // use original table name, for metadata loader, since some 
database table name case-sensitive
             inventoryDumperContext.setActualTableName(key.getOriginal());
             inventoryDumperContext.setLogicTableName(value.getOriginal());
-            inventoryDumperContext.setPosition(new PlaceholderPosition());
+            inventoryDumperContext.getCommonContext().setPosition(new 
PlaceholderPosition());
             
inventoryDumperContext.setInsertColumnNames(dumperContext.getInsertColumnNames());
             
inventoryDumperContext.setUniqueKeyColumns(dumperContext.getUniqueKeyColumns());
             result.add(inventoryDumperContext);
@@ -126,7 +126,7 @@ public final class InventoryTaskSplitter {
     private Collection<InventoryDumperContext> splitByPrimaryKey(final 
InventoryDumperContext dumperContext, final InventoryIncrementalJobItemContext 
jobItemContext,
                                                                  final 
PipelineDataSourceWrapper dataSource) {
         if (null == dumperContext.getUniqueKeyColumns()) {
-            String schemaName = 
dumperContext.getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
+            String schemaName = 
dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
             String actualTableName = dumperContext.getActualTableName();
             List<PipelineColumnMetaData> uniqueKeyColumns = 
PipelineTableMetaDataUtils.getUniqueKeyColumns(schemaName, actualTableName, 
jobItemContext.getSourceMetaDataLoader());
             dumperContext.setUniqueKeyColumns(uniqueKeyColumns);
@@ -139,8 +139,8 @@ public final class InventoryTaskSplitter {
         Collection<IngestPosition> inventoryPositions = 
getInventoryPositions(dumperContext, jobItemContext, dataSource);
         int i = 0;
         for (IngestPosition each : inventoryPositions) {
-            InventoryDumperContext splitDumperContext = new 
InventoryDumperContext(dumperContext);
-            splitDumperContext.setPosition(each);
+            InventoryDumperContext splitDumperContext = new 
InventoryDumperContext(dumperContext.getCommonContext());
+            splitDumperContext.getCommonContext().setPosition(each);
             splitDumperContext.setShardingItem(i++);
             
splitDumperContext.setActualTableName(dumperContext.getActualTableName());
             
splitDumperContext.setLogicTableName(dumperContext.getLogicTableName());
@@ -204,7 +204,7 @@ public final class InventoryTaskSplitter {
         String uniqueKey = 
dumperContext.getUniqueKeyColumns().get(0).getName();
         PipelineCommonSQLBuilder pipelineSQLBuilder = new 
PipelineCommonSQLBuilder(jobItemContext.getJobConfig().getSourceDatabaseType());
         String sql = pipelineSQLBuilder.buildUniqueKeyMinMaxValuesSQL(
-                
dumperContext.getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName()),
 dumperContext.getActualTableName(), uniqueKey);
+                
dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName()),
 dumperContext.getActualTableName(), uniqueKey);
         try (
                 Connection connection = dataSource.getConnection();
                 Statement statement = connection.createStatement();
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java
index 2f38ea4c72c..e40fa297fe0 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java
@@ -131,8 +131,8 @@ public final class PipelineJobPreparerUtils {
                 return position.get();
             }
         }
-        DatabaseType databaseType = 
dumperContext.getDataSourceConfig().getDatabaseType();
-        DataSource dataSource = 
dataSourceManager.getDataSource(dumperContext.getDataSourceConfig());
+        DatabaseType databaseType = 
dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType();
+        DataSource dataSource = 
dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig());
         return DatabaseTypedSPILoader.getService(PositionInitializer.class, 
databaseType).init(dataSource, dumperContext.getJobId());
     }
     
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java
index fedb6911f05..3522cc82bba 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java
@@ -43,7 +43,7 @@ public final class PipelineTaskUtils {
      * @return inventory task id
      */
     public static String generateInventoryTaskId(final InventoryDumperContext 
inventoryDumperContext) {
-        String result = String.format("%s.%s", 
inventoryDumperContext.getDataSourceName(), 
inventoryDumperContext.getActualTableName());
+        String result = String.format("%s.%s", 
inventoryDumperContext.getCommonContext().getDataSourceName(), 
inventoryDumperContext.getActualTableName());
         return result + "#" + inventoryDumperContext.getShardingItem();
     }
     
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index ed64a865cf7..705a02200a8 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -80,12 +80,13 @@ public final class MySQLIncrementalDumper extends 
AbstractLifecycleExecutor impl
     
     public MySQLIncrementalDumper(final IncrementalDumperContext 
dumperContext, final IngestPosition binlogPosition,
                                   final PipelineChannel channel, final 
PipelineTableMetaDataLoader metaDataLoader) {
-        Preconditions.checkArgument(dumperContext.getDataSourceConfig() 
instanceof StandardPipelineDataSourceConfiguration, "MySQLBinlogDumper only 
support StandardPipelineDataSourceConfiguration");
+        
Preconditions.checkArgument(dumperContext.getCommonContext().getDataSourceConfig()
 instanceof StandardPipelineDataSourceConfiguration,
+                "MySQLBinlogDumper only support 
StandardPipelineDataSourceConfiguration");
         this.dumperContext = dumperContext;
         this.binlogPosition = (BinlogPosition) binlogPosition;
         this.channel = channel;
         this.metaDataLoader = metaDataLoader;
-        YamlJdbcConfiguration jdbcConfig = 
((StandardPipelineDataSourceConfiguration) 
dumperContext.getDataSourceConfig()).getJdbcConfig();
+        YamlJdbcConfiguration jdbcConfig = 
((StandardPipelineDataSourceConfiguration) 
dumperContext.getCommonContext().getDataSourceConfig()).getJdbcConfig();
         ConnectionPropertiesParser parser = 
DatabaseTypedSPILoader.getService(ConnectionPropertiesParser.class, 
TypedSPILoader.getService(DatabaseType.class, "MySQL"));
         ConnectionProperties connectionProps = 
parser.parse(jdbcConfig.getUrl(), null, null);
         ConnectInfo connectInfo = new ConnectInfo(generateServerId(), 
connectionProps.getHostname(), connectionProps.getPort(), 
jdbcConfig.getUsername(), jdbcConfig.getPassword());
@@ -132,7 +133,7 @@ public final class MySQLIncrementalDumper extends 
AbstractLifecycleExecutor impl
             return Collections.singletonList(createPlaceholderRecord(event));
         }
         AbstractRowsEvent rowsEvent = (AbstractRowsEvent) event;
-        if (!rowsEvent.getDatabaseName().equals(catalog) || 
!dumperContext.getTableNameMapper().containsTable(rowsEvent.getTableName())) {
+        if (!rowsEvent.getDatabaseName().equals(catalog) || 
!dumperContext.getCommonContext().getTableNameMapper().containsTable(rowsEvent.getTableName()))
 {
             return Collections.singletonList(createPlaceholderRecord(event));
         }
         PipelineTableMetaData tableMetaData = 
getPipelineTableMetaData(rowsEvent.getTableName());
@@ -155,8 +156,8 @@ public final class MySQLIncrementalDumper extends 
AbstractLifecycleExecutor impl
     }
     
     private PipelineTableMetaData getPipelineTableMetaData(final String 
actualTableName) {
-        LogicTableName logicTableName = 
dumperContext.getTableNameMapper().getLogicTableName(actualTableName);
-        return 
metaDataLoader.getTableMetaData(dumperContext.getTableAndSchemaNameMapper().getSchemaName(logicTableName),
 actualTableName);
+        LogicTableName logicTableName = 
dumperContext.getCommonContext().getTableNameMapper().getLogicTableName(actualTableName);
+        return 
metaDataLoader.getTableMetaData(dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(logicTableName),
 actualTableName);
     }
     
     private List<DataRecord> handleWriteRowsEvent(final WriteRowsEvent event, 
final PipelineTableMetaData tableMetaData) {
@@ -217,7 +218,7 @@ public final class MySQLIncrementalDumper extends 
AbstractLifecycleExecutor impl
     }
     
     private DataRecord createDataRecord(final String type, final 
AbstractRowsEvent rowsEvent, final int columnCount) {
-        String tableName = 
dumperContext.getTableNameMapper().getLogicTableName(rowsEvent.getTableName()).getOriginal();
+        String tableName = 
dumperContext.getCommonContext().getTableNameMapper().getLogicTableName(rowsEvent.getTableName()).getOriginal();
         IngestPosition position = new BinlogPosition(rowsEvent.getFileName(), 
rowsEvent.getPosition(), rowsEvent.getServerId());
         DataRecord result = new DataRecord(type, tableName, position, 
columnCount);
         result.setActualTableName(rowsEvent.getTableName());
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
index 7d6ee11ad2f..5ed2365d713 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.mysql.ingest;
 
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.DumperCommonContext;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
@@ -81,7 +82,7 @@ class MySQLIncrementalDumperTest {
     void setUp() throws SQLException {
         IncrementalDumperContext dumperContext = createDumperContext();
         initTableData(dumperContext);
-        dumperContext.setDataSourceConfig(new 
StandardPipelineDataSourceConfiguration("jdbc:mock://127.0.0.1:3306/test", 
"root", "root"));
+        dumperContext.getCommonContext().setDataSourceConfig(new 
StandardPipelineDataSourceConfiguration("jdbc:mock://127.0.0.1:3306/test", 
"root", "root"));
         PipelineTableMetaDataLoader metaDataLoader = 
mock(PipelineTableMetaDataLoader.class);
         SimpleMemoryPipelineChannel channel = new 
SimpleMemoryPipelineChannel(10000, new EmptyAckCallback());
         incrementalDumper = new MySQLIncrementalDumper(dumperContext, new 
BinlogPosition("binlog-000001", 4L, 0L), channel, metaDataLoader);
@@ -90,17 +91,17 @@ class MySQLIncrementalDumperTest {
     }
     
     private IncrementalDumperContext createDumperContext() {
-        IncrementalDumperContext result = new IncrementalDumperContext();
-        result.setDataSourceConfig(new 
StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL",
 "root", "root"));
-        result.setTableNameMapper(new 
ActualAndLogicTableNameMapper(Collections.singletonMap(new 
ActualTableName("t_order"), new LogicTableName("t_order"))));
-        result.setTableAndSchemaNameMapper(new 
TableAndSchemaNameMapper(Collections.emptyMap()));
-        return result;
+        DumperCommonContext commonContext = new DumperCommonContext();
+        commonContext.setDataSourceConfig(new 
StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL",
 "root", "root"));
+        commonContext.setTableNameMapper(new 
ActualAndLogicTableNameMapper(Collections.singletonMap(new 
ActualTableName("t_order"), new LogicTableName("t_order"))));
+        commonContext.setTableAndSchemaNameMapper(new 
TableAndSchemaNameMapper(Collections.emptyMap()));
+        return new IncrementalDumperContext(commonContext);
     }
     
     private void initTableData(final IncrementalDumperContext dumperContext) 
throws SQLException {
         try (
                 PipelineDataSourceManager dataSourceManager = new 
DefaultPipelineDataSourceManager();
-                PipelineDataSourceWrapper dataSource = 
dataSourceManager.getDataSource(dumperContext.getDataSourceConfig());
+                PipelineDataSourceWrapper dataSource = 
dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig());
                 Connection connection = dataSource.getConnection();
                 Statement statement = connection.createStatement()) {
             statement.execute("DROP TABLE IF EXISTS t_order");
diff --git 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
index dbc2a573ba6..dfa912c4a0e 100644
--- 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
+++ 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
@@ -74,7 +74,7 @@ public final class OpenGaussWALDumper extends 
AbstractLifecycleExecutor implemen
     
     public OpenGaussWALDumper(final IncrementalDumperContext dumperContext, 
final IngestPosition position,
                               final PipelineChannel channel, final 
PipelineTableMetaDataLoader metaDataLoader) {
-        
ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperContext.getDataSourceConfig().getClass()),
+        
ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperContext.getCommonContext().getDataSourceConfig().getClass()),
                 () -> new 
UnsupportedSQLOperationException("PostgreSQLWALDumper only support 
PipelineDataSourceConfiguration"));
         this.dumperContext = dumperContext;
         walPosition = new AtomicReference<>((WALPosition) position);
@@ -137,7 +137,7 @@ public final class OpenGaussWALDumper extends 
AbstractLifecycleExecutor implemen
     }
     
     private PgConnection getReplicationConnectionUnwrap() throws SQLException {
-        return 
logicalReplication.createConnection((StandardPipelineDataSourceConfiguration) 
dumperContext.getDataSourceConfig()).unwrap(PgConnection.class);
+        return 
logicalReplication.createConnection((StandardPipelineDataSourceConfiguration) 
dumperContext.getCommonContext().getDataSourceConfig()).unwrap(PgConnection.class);
     }
     
     private void processEventWithTX(final AbstractWALEvent event) {
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
index 23a0d808b08..b23686cbe4e 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
@@ -76,7 +76,7 @@ public final class PostgreSQLWALDumper extends 
AbstractLifecycleExecutor impleme
     
     public PostgreSQLWALDumper(final IncrementalDumperContext dumperContext, 
final IngestPosition position,
                                final PipelineChannel channel, final 
PipelineTableMetaDataLoader metaDataLoader) {
-        
ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperContext.getDataSourceConfig().getClass()),
+        
ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperContext.getCommonContext().getDataSourceConfig().getClass()),
                 () -> new 
UnsupportedSQLOperationException("PostgreSQLWALDumper only support 
PipelineDataSourceConfiguration"));
         this.dumperContext = dumperContext;
         walPosition = new AtomicReference<>((WALPosition) position);
@@ -111,7 +111,7 @@ public final class PostgreSQLWALDumper extends 
AbstractLifecycleExecutor impleme
     private void dump() throws SQLException {
         // TODO use unified PgConnection
         try (
-                Connection connection = 
logicalReplication.createConnection((StandardPipelineDataSourceConfiguration) 
dumperContext.getDataSourceConfig());
+                Connection connection = 
logicalReplication.createConnection((StandardPipelineDataSourceConfiguration) 
dumperContext.getCommonContext().getDataSourceConfig());
                 PGReplicationStream stream = 
logicalReplication.createReplicationStream(connection, 
PostgreSQLPositionInitializer.getUniqueSlotName(connection, 
dumperContext.getJobId()),
                         walPosition.get().getLogSequenceNumber())) {
             PostgreSQLTimestampUtils utils = new 
PostgreSQLTimestampUtils(connection.unwrap(PgConnection.class).getTimestampUtils());
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
index cc2dca00d86..389fb0e4593 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
@@ -80,7 +80,7 @@ public final class WALEventConverter {
     private boolean filter(final AbstractWALEvent event) {
         if (event instanceof AbstractRowEvent) {
             AbstractRowEvent rowEvent = (AbstractRowEvent) event;
-            return 
!dumperContext.getTableNameMapper().containsTable(rowEvent.getTableName());
+            return 
!dumperContext.getCommonContext().getTableNameMapper().containsTable(rowEvent.getTableName());
         }
         return false;
     }
@@ -90,8 +90,8 @@ public final class WALEventConverter {
     }
     
     private PipelineTableMetaData getPipelineTableMetaData(final String 
actualTableName) {
-        LogicTableName logicTableName = 
dumperContext.getTableNameMapper().getLogicTableName(actualTableName);
-        return 
metaDataLoader.getTableMetaData(dumperContext.getTableAndSchemaNameMapper().getSchemaName(logicTableName),
 actualTableName);
+        LogicTableName logicTableName = 
dumperContext.getCommonContext().getTableNameMapper().getLogicTableName(actualTableName);
+        return 
metaDataLoader.getTableMetaData(dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(logicTableName),
 actualTableName);
     }
     
     private DataRecord handleWriteRowEvent(final WriteRowEvent writeRowEvent, 
final PipelineTableMetaData tableMetaData) {
@@ -118,7 +118,7 @@ public final class WALEventConverter {
     }
     
     private DataRecord createDataRecord(final String type, final 
AbstractRowEvent rowsEvent, final int columnCount) {
-        String tableName = 
dumperContext.getTableNameMapper().getLogicTableName(rowsEvent.getTableName()).getOriginal();
+        String tableName = 
dumperContext.getCommonContext().getTableNameMapper().getLogicTableName(rowsEvent.getTableName()).getOriginal();
         DataRecord result = new DataRecord(type, rowsEvent.getSchemaName(), 
tableName, new WALPosition(rowsEvent.getLogSequenceNumber()), columnCount);
         result.setActualTableName(rowsEvent.getTableName());
         result.setCsn(rowsEvent.getCsn());
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
index 6cbb3233bac..4a033b76e63 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.postgresql.ingest;
 
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.DumperCommonContext;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
@@ -90,7 +91,8 @@ class PostgreSQLWALDumperTest {
         String password = "root";
         createTable(jdbcUrl, username, password);
         dumperContext = createDumperContext(jdbcUrl, username, password);
-        walDumper = new PostgreSQLWALDumper(dumperContext, position, channel, 
new 
StandardPipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperContext.getDataSourceConfig())));
+        walDumper = new PostgreSQLWALDumper(dumperContext, position, channel,
+                new 
StandardPipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig())));
     }
     
     private void createTable(final String jdbcUrl, final String username, 
final String password) {
@@ -105,11 +107,12 @@ class PostgreSQLWALDumperTest {
     }
     
     private IncrementalDumperContext createDumperContext(final String jdbcUrl, 
final String username, final String password) {
-        IncrementalDumperContext result = new IncrementalDumperContext();
+        DumperCommonContext commonContext = new DumperCommonContext();
+        commonContext.setDataSourceConfig(new 
StandardPipelineDataSourceConfiguration(jdbcUrl, username, password));
+        commonContext.setTableNameMapper(new 
ActualAndLogicTableNameMapper(Collections.singletonMap(new 
ActualTableName("t_order_0"), new LogicTableName("t_order"))));
+        commonContext.setTableAndSchemaNameMapper(new 
TableAndSchemaNameMapper(Collections.emptyMap()));
+        IncrementalDumperContext result = new 
IncrementalDumperContext(commonContext);
         result.setJobId("0101123456");
-        result.setDataSourceConfig(new 
StandardPipelineDataSourceConfiguration(jdbcUrl, username, password));
-        result.setTableNameMapper(new 
ActualAndLogicTableNameMapper(Collections.singletonMap(new 
ActualTableName("t_order_0"), new LogicTableName("t_order"))));
-        result.setTableAndSchemaNameMapper(new 
TableAndSchemaNameMapper(Collections.emptyMap()));
         return result;
     }
     
@@ -120,7 +123,7 @@ class PostgreSQLWALDumperTest {
     
     @Test
     void assertStart() throws SQLException, ReflectiveOperationException {
-        StandardPipelineDataSourceConfiguration dataSourceConfig = 
(StandardPipelineDataSourceConfiguration) dumperContext.getDataSourceConfig();
+        StandardPipelineDataSourceConfiguration dataSourceConfig = 
(StandardPipelineDataSourceConfiguration) 
dumperContext.getCommonContext().getDataSourceConfig();
         try {
             
Plugins.getMemberAccessor().set(PostgreSQLWALDumper.class.getDeclaredField("logicalReplication"),
 walDumper, logicalReplication);
             
when(logicalReplication.createConnection(dataSourceConfig)).thenReturn(pgConnection);
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
index 15895cd370b..29c625de0a7 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal;
 
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.DumperCommonContext;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
@@ -79,23 +80,23 @@ class WALEventConverterTest {
     void setUp() throws SQLException {
         IncrementalDumperContext dumperContext = mockDumperContext();
         PipelineDataSourceManager dataSourceManager = new 
DefaultPipelineDataSourceManager();
-        walEventConverter = new WALEventConverter(dumperContext, new 
StandardPipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperContext.getDataSourceConfig())));
+        walEventConverter = new WALEventConverter(dumperContext, new 
StandardPipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig())));
         initTableData(dumperContext);
         pipelineTableMetaData = new PipelineTableMetaData("t_order", 
mockOrderColumnsMetaDataMap(), Collections.emptyList());
     }
     
     private IncrementalDumperContext mockDumperContext() {
-        IncrementalDumperContext result = new IncrementalDumperContext();
-        result.setDataSourceConfig(new 
StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=PostgreSQL",
 "root", "root"));
-        result.setTableNameMapper(new 
ActualAndLogicTableNameMapper(Collections.singletonMap(new 
ActualTableName("t_order"), new LogicTableName("t_order"))));
-        result.setTableAndSchemaNameMapper(new 
TableAndSchemaNameMapper(Collections.emptyMap()));
-        return result;
+        DumperCommonContext commonContext = new DumperCommonContext();
+        commonContext.setDataSourceConfig(new 
StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=PostgreSQL",
 "root", "root"));
+        commonContext.setTableNameMapper(new 
ActualAndLogicTableNameMapper(Collections.singletonMap(new 
ActualTableName("t_order"), new LogicTableName("t_order"))));
+        commonContext.setTableAndSchemaNameMapper(new 
TableAndSchemaNameMapper(Collections.emptyMap()));
+        return new IncrementalDumperContext(commonContext);
     }
     
     private void initTableData(final IncrementalDumperContext dumperContext) 
throws SQLException {
         try (
                 PipelineDataSourceManager dataSourceManager = new 
DefaultPipelineDataSourceManager();
-                PipelineDataSourceWrapper dataSource = 
dataSourceManager.getDataSource(dumperContext.getDataSourceConfig());
+                PipelineDataSourceWrapper dataSource = 
dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig());
                 Connection connection = dataSource.getConnection();
                 Statement statement = connection.createStatement()) {
             statement.execute("DROP TABLE IF EXISTS t_order");
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 85f2aea4206..d25feef3a3b 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -26,6 +26,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDat
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfigurationSwapper;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.DumperCommonContext;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
@@ -193,7 +194,7 @@ public final class CDCJobAPI extends 
AbstractInventoryIncrementalJobAPIImpl {
                                                                                
               final IncrementalDumperContext incrementalDumperContext) throws 
SQLException {
         InventoryIncrementalJobItemProgress result = new 
InventoryIncrementalJobItemProgress();
         result.setSourceDatabaseType(jobConfig.getSourceDatabaseType());
-        result.setDataSourceName(incrementalDumperContext.getDataSourceName());
+        
result.setDataSourceName(incrementalDumperContext.getCommonContext().getDataSourceName());
         IncrementalTaskProgress incrementalTaskProgress = new 
IncrementalTaskProgress(PipelineJobPreparerUtils.getIncrementalPosition(null, 
incrementalDumperContext, dataSourceManager));
         result.setIncremental(new 
JobItemIncrementalTasksProgress(incrementalTaskProgress));
         return result;
@@ -282,12 +283,13 @@ public final class CDCJobAPI extends 
AbstractInventoryIncrementalJobAPIImpl {
         dataNodeLine.getEntries().forEach(each -> 
each.getDataNodes().forEach(node -> tableNameMap.put(new 
ActualTableName(node.getTableName()), new 
LogicTableName(each.getLogicTableName()))));
         String dataSourceName = 
dataNodeLine.getEntries().iterator().next().getDataNodes().iterator().next().getDataSourceName();
         StandardPipelineDataSourceConfiguration actualDataSourceConfig = 
jobConfig.getDataSourceConfig().getActualDataSourceConfiguration(dataSourceName);
-        IncrementalDumperContext result = new IncrementalDumperContext();
+        DumperCommonContext commonContext = new DumperCommonContext();
+        commonContext.setDataSourceName(dataSourceName);
+        commonContext.setDataSourceConfig(actualDataSourceConfig);
+        commonContext.setTableNameMapper(new 
ActualAndLogicTableNameMapper(tableNameMap));
+        commonContext.setTableAndSchemaNameMapper(tableAndSchemaNameMapper);
+        IncrementalDumperContext result = new 
IncrementalDumperContext(commonContext);
         result.setJobId(jobConfig.getJobId());
-        result.setDataSourceName(dataSourceName);
-        result.setDataSourceConfig(actualDataSourceConfig);
-        result.setTableNameMapper(new 
ActualAndLogicTableNameMapper(tableNameMap));
-        result.setTableAndSchemaNameMapper(tableAndSchemaNameMapper);
         result.setDecodeWithTX(jobConfig.isDecodeWithTX());
         return result;
     }
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java
index 23bc51e1f51..47e6acba40c 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java
@@ -80,7 +80,7 @@ public final class CDCJobItemContext implements 
InventoryIncrementalJobItemConte
         
         @Override
         protected PipelineDataSourceWrapper initialize() {
-            return 
dataSourceManager.getDataSource(taskConfig.getDumperContext().getDataSourceConfig());
+            return 
dataSourceManager.getDataSource(taskConfig.getDumperContext().getCommonContext().getDataSourceConfig());
         }
     };
     
@@ -99,7 +99,7 @@ public final class CDCJobItemContext implements 
InventoryIncrementalJobItemConte
     
     @Override
     public String getDataSourceName() {
-        return taskConfig.getDumperContext().getDataSourceName();
+        return 
taskConfig.getDumperContext().getCommonContext().getDataSourceName();
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index 0cf3ea7d135..3d0f982de59 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -104,7 +104,8 @@ public final class CDCJobPreparer {
         CDCTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
         JobItemIncrementalTasksProgress initIncremental = null == 
jobItemContext.getInitProgress() ? null : 
jobItemContext.getInitProgress().getIncremental();
         try {
-            
taskConfig.getDumperContext().setPosition(PipelineJobPreparerUtils.getIncrementalPosition(initIncremental,
 taskConfig.getDumperContext(), jobItemContext.getDataSourceManager()));
+            taskConfig.getDumperContext().getCommonContext().setPosition(
+                    
PipelineJobPreparerUtils.getIncrementalPosition(initIncremental, 
taskConfig.getDumperContext(), jobItemContext.getDataSourceManager()));
         } catch (final SQLException ex) {
             throw new 
PrepareJobWithGetBinlogPositionException(jobItemContext.getJobId(), ex);
         }
@@ -115,19 +116,19 @@ public final class CDCJobPreparer {
         CDCTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
         ImporterConfiguration importerConfig = taskConfig.getImporterConfig();
         CDCProcessContext processContext = 
jobItemContext.getJobProcessContext();
-        for (InventoryDumperContext each : new 
InventoryTaskSplitter(jobItemContext.getSourceDataSource(), new 
InventoryDumperContext(taskConfig.getDumperContext()), importerConfig)
+        for (InventoryDumperContext each : new 
InventoryTaskSplitter(jobItemContext.getSourceDataSource(), new 
InventoryDumperContext(taskConfig.getDumperContext().getCommonContext()), 
importerConfig)
                 .splitInventoryDumperContext(jobItemContext)) {
-            AtomicReference<IngestPosition> position = new 
AtomicReference<>(each.getPosition());
+            AtomicReference<IngestPosition> position = new 
AtomicReference<>(each.getCommonContext().getPosition());
             PipelineChannel channel = 
PipelineTaskUtils.createInventoryChannel(processContext.getPipelineChannelCreator(),
 importerConfig.getBatchSize(), position);
             channelProgressPairs.add(new CDCChannelProgressPair(channel, 
jobItemContext));
             Dumper dumper = new InventoryDumper(each, channel, 
jobItemContext.getSourceDataSource(), jobItemContext.getSourceMetaDataLoader());
             Importer importer = importerUsed.get() ? null
                     : new CDCImporter(channelProgressPairs, 
importerConfig.getBatchSize(), 3, TimeUnit.SECONDS, jobItemContext.getSink(),
-                            needSorting(ImporterType.INVENTORY, 
hasGlobalCSN(taskConfig.getDumperContext().getDataSourceConfig().getDatabaseType())),
+                            needSorting(ImporterType.INVENTORY, 
hasGlobalCSN(taskConfig.getDumperContext().getCommonContext().getDataSourceConfig().getDatabaseType())),
                             importerConfig.getRateLimitAlgorithm());
             jobItemContext.getInventoryTasks().add(new 
CDCInventoryTask(PipelineTaskUtils.generateInventoryTaskId(each), 
processContext.getInventoryDumperExecuteEngine(),
                     processContext.getInventoryImporterExecuteEngine(), 
dumper, importer, position));
-            if (!(each.getPosition() instanceof FinishedPosition)) {
+            if (!(each.getCommonContext().getPosition() instanceof 
FinishedPosition)) {
                 importerUsed.set(true);
             }
         }
@@ -146,16 +147,17 @@ public final class CDCJobPreparer {
         CDCTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
         IncrementalDumperContext dumperContext = taskConfig.getDumperContext();
         ImporterConfiguration importerConfig = taskConfig.getImporterConfig();
-        IncrementalTaskProgress taskProgress = 
PipelineTaskUtils.createIncrementalTaskProgress(dumperContext.getPosition(), 
jobItemContext.getInitProgress());
+        IncrementalTaskProgress taskProgress = 
PipelineTaskUtils.createIncrementalTaskProgress(dumperContext.getCommonContext().getPosition(),
 jobItemContext.getInitProgress());
         PipelineChannel channel = 
PipelineTaskUtils.createIncrementalChannel(importerConfig.getConcurrency(), 
jobItemContext.getJobProcessContext().getPipelineChannelCreator(), 
taskProgress);
         channelProgressPairs.add(new CDCChannelProgressPair(channel, 
jobItemContext));
-        Dumper dumper = 
DatabaseTypedSPILoader.getService(IncrementalDumperCreator.class, 
dumperContext.getDataSourceConfig().getDatabaseType())
-                .createIncrementalDumper(dumperContext, 
dumperContext.getPosition(), channel, jobItemContext.getSourceMetaDataLoader());
+        Dumper dumper = 
DatabaseTypedSPILoader.getService(IncrementalDumperCreator.class, 
dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType())
+                .createIncrementalDumper(dumperContext, 
dumperContext.getCommonContext().getPosition(), channel, 
jobItemContext.getSourceMetaDataLoader());
         boolean needSorting = needSorting(ImporterType.INCREMENTAL, 
hasGlobalCSN(importerConfig.getDataSourceConfig().getDatabaseType()));
         Importer importer = importerUsed.get() ? null
                 : new CDCImporter(channelProgressPairs, 
importerConfig.getBatchSize(), 300, TimeUnit.MILLISECONDS,
                         jobItemContext.getSink(), needSorting, 
importerConfig.getRateLimitAlgorithm());
-        PipelineTask incrementalTask = new 
CDCIncrementalTask(dumperContext.getDataSourceName(), 
jobItemContext.getJobProcessContext().getIncrementalExecuteEngine(), dumper, 
importer, taskProgress);
+        PipelineTask incrementalTask = new CDCIncrementalTask(
+                dumperContext.getCommonContext().getDataSourceName(), 
jobItemContext.getJobProcessContext().getIncrementalExecuteEngine(), dumper, 
importer, taskProgress);
         jobItemContext.getIncrementalTasks().add(incrementalTask);
         importerUsed.set(true);
     }
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index 0be975696ed..3a6447fa4fa 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -264,12 +264,14 @@ public final class MigrationJobAPI extends 
AbstractInventoryIncrementalJobAPIImp
         MigrationJobConfiguration jobConfig = (MigrationJobConfiguration) 
pipelineJobConfig;
         IncrementalDumperContext incrementalDumperContext = new 
MigrationIncrementalDumperContextCreator(
                 
jobConfig).createDumperContext(jobConfig.getJobShardingDataNodes().get(jobShardingItem));
-        CreateTableConfiguration createTableConfig = 
buildCreateTableConfiguration(jobConfig, 
incrementalDumperContext.getTableAndSchemaNameMapper());
+        CreateTableConfiguration createTableConfig = 
buildCreateTableConfiguration(jobConfig, 
incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper());
         Set<LogicTableName> targetTableNames = 
jobConfig.getTargetTableNames().stream().map(LogicTableName::new).collect(Collectors.toSet());
         Map<LogicTableName, Set<String>> shardingColumnsMap = new 
ShardingColumnsExtractor().getShardingColumnsMap(
                 ((ShardingSpherePipelineDataSourceConfiguration) 
jobConfig.getTarget()).getRootConfig().getRules(), targetTableNames);
-        ImporterConfiguration importerConfig = 
buildImporterConfiguration(jobConfig, pipelineProcessConfig, 
shardingColumnsMap, incrementalDumperContext.getTableAndSchemaNameMapper());
-        MigrationTaskConfiguration result = new 
MigrationTaskConfiguration(incrementalDumperContext.getDataSourceName(), 
createTableConfig, incrementalDumperContext, importerConfig);
+        ImporterConfiguration importerConfig = buildImporterConfiguration(
+                jobConfig, pipelineProcessConfig, shardingColumnsMap, 
incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper());
+        MigrationTaskConfiguration result = new MigrationTaskConfiguration(
+                
incrementalDumperContext.getCommonContext().getDataSourceName(), 
createTableConfig, incrementalDumperContext, importerConfig);
         log.info("buildTaskConfiguration, result={}", result);
         return result;
     }
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java
index 659b15995eb..764b22dec92 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java
@@ -18,6 +18,7 @@
 package 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.ingest;
 
 import lombok.RequiredArgsConstructor;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.DumperCommonContext;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
@@ -49,12 +50,13 @@ public final class MigrationIncrementalDumperContextCreator 
implements Increment
     
     private IncrementalDumperContext buildDumperContext(final String jobId, 
final String dataSourceName, final PipelineDataSourceConfiguration 
sourceDataSource,
                                                         final 
Map<ActualTableName, LogicTableName> tableNameMap, final 
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
-        IncrementalDumperContext result = new IncrementalDumperContext();
+        DumperCommonContext commonContext = new DumperCommonContext();
+        commonContext.setDataSourceName(dataSourceName);
+        commonContext.setDataSourceConfig(sourceDataSource);
+        commonContext.setTableNameMapper(new 
ActualAndLogicTableNameMapper(tableNameMap));
+        commonContext.setTableAndSchemaNameMapper(tableAndSchemaNameMapper);
+        IncrementalDumperContext result = new 
IncrementalDumperContext(commonContext);
         result.setJobId(jobId);
-        result.setDataSourceName(dataSourceName);
-        result.setDataSourceConfig(sourceDataSource);
-        result.setTableNameMapper(new 
ActualAndLogicTableNameMapper(tableNameMap));
-        result.setTableAndSchemaNameMapper(tableAndSchemaNameMapper);
         return result;
     }
 }
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
index 1fe9a79b3d9..a4eaa8330c7 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
@@ -80,7 +80,7 @@ public final class MigrationJobItemContext implements 
InventoryIncrementalJobIte
         
         @Override
         protected PipelineDataSourceWrapper initialize() {
-            return 
dataSourceManager.getDataSource(taskConfig.getDumperContext().getDataSourceConfig());
+            return 
dataSourceManager.getDataSource(taskConfig.getDumperContext().getCommonContext().getDataSourceConfig());
         }
     };
     
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
index c882dd924a1..a017a3559d2 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
@@ -88,7 +88,8 @@ public final class MigrationJobPreparer {
      * @throws SQLException SQL exception
      */
     public void prepare(final MigrationJobItemContext jobItemContext) throws 
SQLException {
-        
ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(jobItemContext.getTaskConfig().getDumperContext().getDataSourceConfig().getClass()),
+        
ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(
+                
jobItemContext.getTaskConfig().getDumperContext().getCommonContext().getDataSourceConfig().getClass()),
                 () -> new UnsupportedSQLOperationException("Migration 
inventory dumper only support StandardPipelineDataSourceConfiguration"));
         
PipelineJobPreparerUtils.checkSourceDataSource(jobItemContext.getJobConfig().getSourceDatabaseType(),
 Collections.singleton(jobItemContext.getSourceDataSource()));
         if (jobItemContext.isStopping()) {
@@ -169,14 +170,15 @@ public final class MigrationJobPreparer {
         MigrationTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
         JobItemIncrementalTasksProgress initIncremental = null == 
jobItemContext.getInitProgress() ? null : 
jobItemContext.getInitProgress().getIncremental();
         try {
-            
taskConfig.getDumperContext().setPosition(PipelineJobPreparerUtils.getIncrementalPosition(initIncremental,
 taskConfig.getDumperContext(), jobItemContext.getDataSourceManager()));
+            taskConfig.getDumperContext().getCommonContext().setPosition(
+                    
PipelineJobPreparerUtils.getIncrementalPosition(initIncremental, 
taskConfig.getDumperContext(), jobItemContext.getDataSourceManager()));
         } catch (final SQLException ex) {
             throw new 
PrepareJobWithGetBinlogPositionException(jobItemContext.getJobId(), ex);
         }
     }
     
     private void initInventoryTasks(final MigrationJobItemContext 
jobItemContext) {
-        InventoryDumperContext inventoryDumperContext = new 
InventoryDumperContext(jobItemContext.getTaskConfig().getDumperContext());
+        InventoryDumperContext inventoryDumperContext = new 
InventoryDumperContext(jobItemContext.getTaskConfig().getDumperContext().getCommonContext());
         InventoryTaskSplitter inventoryTaskSplitter = new 
InventoryTaskSplitter(jobItemContext.getSourceDataSource(), 
inventoryDumperContext, jobItemContext.getTaskConfig().getImporterConfig());
         
jobItemContext.getInventoryTasks().addAll(inventoryTaskSplitter.splitInventoryData(jobItemContext));
     }
@@ -188,12 +190,12 @@ public final class MigrationJobPreparer {
         IncrementalDumperContext dumperContext = taskConfig.getDumperContext();
         ImporterConfiguration importerConfig = taskConfig.getImporterConfig();
         ExecuteEngine incrementalExecuteEngine = 
jobItemContext.getJobProcessContext().getIncrementalExecuteEngine();
-        IncrementalTaskProgress taskProgress = 
PipelineTaskUtils.createIncrementalTaskProgress(dumperContext.getPosition(), 
jobItemContext.getInitProgress());
+        IncrementalTaskProgress taskProgress = 
PipelineTaskUtils.createIncrementalTaskProgress(dumperContext.getCommonContext().getPosition(),
 jobItemContext.getInitProgress());
         PipelineChannel channel = 
PipelineTaskUtils.createIncrementalChannel(importerConfig.getConcurrency(), 
pipelineChannelCreator, taskProgress);
-        Dumper dumper = 
DatabaseTypedSPILoader.getService(IncrementalDumperCreator.class, 
dumperContext.getDataSourceConfig().getDatabaseType())
-                .createIncrementalDumper(dumperContext, 
dumperContext.getPosition(), channel, sourceMetaDataLoader);
+        Dumper dumper = 
DatabaseTypedSPILoader.getService(IncrementalDumperCreator.class, 
dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType())
+                .createIncrementalDumper(dumperContext, 
dumperContext.getCommonContext().getPosition(), channel, sourceMetaDataLoader);
         Collection<Importer> importers = createImporters(importerConfig, 
jobItemContext.getSink(), channel, jobItemContext);
-        PipelineTask incrementalTask = new 
IncrementalTask(dumperContext.getDataSourceName(), incrementalExecuteEngine, 
dumper, importers, taskProgress);
+        PipelineTask incrementalTask = new 
IncrementalTask(dumperContext.getCommonContext().getDataSourceName(), 
incrementalExecuteEngine, dumper, importers, taskProgress);
         jobItemContext.getIncrementalTasks().add(incrementalTask);
     }
     
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java
index 74ddfef8e14..7229363eb7e 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java
@@ -177,8 +177,8 @@ class GovernanceRepositoryAPIImplTest {
     }
     
     private InventoryTask mockInventoryTask(final MigrationTaskConfiguration 
taskConfig) {
-        InventoryDumperContext dumperContext = new 
InventoryDumperContext(taskConfig.getDumperContext());
-        dumperContext.setPosition(new PlaceholderPosition());
+        InventoryDumperContext dumperContext = new 
InventoryDumperContext(taskConfig.getDumperContext().getCommonContext());
+        dumperContext.getCommonContext().setPosition(new 
PlaceholderPosition());
         dumperContext.setActualTableName("t_order");
         dumperContext.setLogicTableName("t_order");
         
dumperContext.setUniqueKeyColumns(Collections.singletonList(PipelineContextUtils.mockOrderIdColumnMetaData()));
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
index 6a0105b71f3..9c1f051695a 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
@@ -66,7 +66,7 @@ class InventoryTaskSplitterTest {
     @BeforeEach
     void setUp() {
         initJobItemContext();
-        dumperContext = new 
InventoryDumperContext(jobItemContext.getTaskConfig().getDumperContext());
+        dumperContext = new 
InventoryDumperContext(jobItemContext.getTaskConfig().getDumperContext().getCommonContext());
         PipelineColumnMetaData columnMetaData = new PipelineColumnMetaData(1, 
"order_id", Types.INTEGER, "int", false, true, true);
         
dumperContext.setUniqueKeyColumns(Collections.singletonList(columnMetaData));
         inventoryTaskSplitter = new 
InventoryTaskSplitter(jobItemContext.getSourceDataSource(), dumperContext, 
jobItemContext.getTaskConfig().getImporterConfig());
@@ -85,7 +85,7 @@ class InventoryTaskSplitterTest {
     
     @Test
     void assertSplitInventoryDataWithEmptyTable() throws SQLException {
-        initEmptyTablePrimaryEnvironment(dumperContext);
+        initEmptyTablePrimaryEnvironment(dumperContext.getCommonContext());
         List<InventoryTask> actual = 
inventoryTaskSplitter.splitInventoryData(jobItemContext);
         assertThat(actual.size(), is(1));
         InventoryTask task = actual.get(0);
@@ -95,7 +95,7 @@ class InventoryTaskSplitterTest {
     
     @Test
     void assertSplitInventoryDataWithIntPrimary() throws SQLException {
-        initIntPrimaryEnvironment(dumperContext);
+        initIntPrimaryEnvironment(dumperContext.getCommonContext());
         List<InventoryTask> actual = 
inventoryTaskSplitter.splitInventoryData(jobItemContext);
         assertThat(actual.size(), is(10));
         InventoryTask task = actual.get(9);
@@ -105,7 +105,7 @@ class InventoryTaskSplitterTest {
     
     @Test
     void assertSplitInventoryDataWithCharPrimary() throws SQLException {
-        initCharPrimaryEnvironment(dumperContext);
+        initCharPrimaryEnvironment(dumperContext.getCommonContext());
         List<InventoryTask> actual = 
inventoryTaskSplitter.splitInventoryData(jobItemContext);
         assertThat(actual.size(), is(1));
         assertThat(actual.get(0).getTaskId(), is("ds_0.t_order#0"));
@@ -116,15 +116,15 @@ class InventoryTaskSplitterTest {
     
     @Test
     void assertSplitInventoryDataWithoutPrimaryButWithUniqueIndex() throws 
SQLException {
-        initUniqueIndexOnNotNullColumnEnvironment(dumperContext);
+        
initUniqueIndexOnNotNullColumnEnvironment(dumperContext.getCommonContext());
         List<InventoryTask> actual = 
inventoryTaskSplitter.splitInventoryData(jobItemContext);
         assertThat(actual.size(), is(1));
     }
     
     @Test
     void assertSplitInventoryDataWithMultipleColumnsKey() throws SQLException {
-        initUnionPrimaryEnvironment(dumperContext);
-        try (PipelineDataSourceWrapper dataSource = 
dataSourceManager.getDataSource(dumperContext.getDataSourceConfig())) {
+        initUnionPrimaryEnvironment(dumperContext.getCommonContext());
+        try (PipelineDataSourceWrapper dataSource = 
dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig()))
 {
             List<PipelineColumnMetaData> uniqueKeyColumns = 
PipelineTableMetaDataUtils.getUniqueKeyColumns(null, "t_order", new 
StandardPipelineTableMetaDataLoader(dataSource));
             dumperContext.setUniqueKeyColumns(uniqueKeyColumns);
             List<InventoryTask> actual = 
inventoryTaskSplitter.splitInventoryData(jobItemContext);
@@ -134,8 +134,8 @@ class InventoryTaskSplitterTest {
     
     @Test
     void assertSplitInventoryDataWithoutPrimaryAndUniqueIndex() throws 
SQLException {
-        initNoPrimaryEnvironment(dumperContext);
-        try (PipelineDataSourceWrapper dataSource = 
dataSourceManager.getDataSource(dumperContext.getDataSourceConfig())) {
+        initNoPrimaryEnvironment(dumperContext.getCommonContext());
+        try (PipelineDataSourceWrapper dataSource = 
dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig()))
 {
             List<PipelineColumnMetaData> uniqueKeyColumns = 
PipelineTableMetaDataUtils.getUniqueKeyColumns(null, "t_order", new 
StandardPipelineTableMetaDataLoader(dataSource));
             assertTrue(uniqueKeyColumns.isEmpty());
             List<InventoryTask> inventoryTasks = 
inventoryTaskSplitter.splitInventoryData(jobItemContext);
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/IncrementalTaskTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/IncrementalTaskTest.java
index 0e70960365a..eb42b7d5b8e 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/IncrementalTaskTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/IncrementalTaskTest.java
@@ -54,7 +54,7 @@ class IncrementalTaskTest {
     @BeforeEach
     void setUp() {
         MigrationTaskConfiguration taskConfig = 
PipelineContextUtils.mockMigrationJobItemContext(JobConfigurationBuilder.createJobConfiguration()).getTaskConfig();
-        taskConfig.getDumperContext().setPosition(new PlaceholderPosition());
+        taskConfig.getDumperContext().getCommonContext().setPosition(new 
PlaceholderPosition());
         incrementalTask = new IncrementalTask("ds_0", 
PipelineContextUtils.getExecuteEngine(), mock(Dumper.class),
                 Collections.singletonList(mock(Importer.class)), new 
IncrementalTaskProgress(new PlaceholderPosition()));
     }
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java
index e075cd57c08..7291ad65752 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java
@@ -76,7 +76,7 @@ class InventoryTaskTest {
         initTableData(taskConfig.getDumperContext());
         // TODO use t_order_0, and also others
         InventoryDumperContext inventoryDumperContext = 
createInventoryDumperContext("t_order", "t_order");
-        AtomicReference<IngestPosition> position = new 
AtomicReference<>(inventoryDumperContext.getPosition());
+        AtomicReference<IngestPosition> position = new 
AtomicReference<>(inventoryDumperContext.getCommonContext().getPosition());
         InventoryTask inventoryTask = new 
InventoryTask(PipelineTaskUtils.generateInventoryTaskId(inventoryDumperContext),
                 PipelineContextUtils.getExecuteEngine(), 
PipelineContextUtils.getExecuteEngine(), mock(Dumper.class), 
mock(Importer.class), position);
         CompletableFuture.allOf(inventoryTask.start().toArray(new 
CompletableFuture[0])).get(10L, TimeUnit.SECONDS);
@@ -87,7 +87,7 @@ class InventoryTaskTest {
     private void initTableData(final IncrementalDumperContext dumperContext) 
throws SQLException {
         PipelineDataSourceManager dataSourceManager = new 
DefaultPipelineDataSourceManager();
         try (
-                PipelineDataSourceWrapper dataSource = 
dataSourceManager.getDataSource(dumperContext.getDataSourceConfig());
+                PipelineDataSourceWrapper dataSource = 
dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig());
                 Connection connection = dataSource.getConnection();
                 Statement statement = connection.createStatement()) {
             statement.execute("DROP TABLE IF EXISTS t_order");
@@ -98,11 +98,12 @@ class InventoryTaskTest {
     }
     
     private InventoryDumperContext createInventoryDumperContext(final String 
logicTableName, final String actualTableName) {
-        InventoryDumperContext result = new 
InventoryDumperContext(taskConfig.getDumperContext());
+        InventoryDumperContext result = new 
InventoryDumperContext(taskConfig.getDumperContext().getCommonContext());
         result.setLogicTableName(logicTableName);
         result.setActualTableName(actualTableName);
         
result.setUniqueKeyColumns(Collections.singletonList(PipelineContextUtils.mockOrderIdColumnMetaData()));
-        result.setPosition(null == taskConfig.getDumperContext().getPosition() 
? new IntegerPrimaryKeyPosition(0, 1000) : 
taskConfig.getDumperContext().getPosition());
+        result.getCommonContext().setPosition(
+                null == 
taskConfig.getDumperContext().getCommonContext().getPosition() ? new 
IntegerPrimaryKeyPosition(0, 1000) : 
taskConfig.getDumperContext().getCommonContext().getPosition());
         return result;
     }
 }
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
index c500b0e370e..23f8210a468 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
@@ -74,7 +74,7 @@ class MigrationDataConsistencyCheckerTest {
     
     private MigrationJobConfiguration createJobConfiguration() throws 
SQLException {
         MigrationJobItemContext jobItemContext = 
PipelineContextUtils.mockMigrationJobItemContext(JobConfigurationBuilder.createJobConfiguration());
-        
initTableData(jobItemContext.getTaskConfig().getDumperContext().getDataSourceConfig());
+        
initTableData(jobItemContext.getTaskConfig().getDumperContext().getCommonContext().getDataSourceConfig());
         
initTableData(jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig());
         return jobItemContext.getJobConfig();
     }

Reply via email to