This is an automated email from the ASF dual-hosted git repository.

azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new c17ec12959e Review and improve pipeline code (#28441)
c17ec12959e is described below

commit c17ec12959e2d9ef72ff4d401cfb957d8b101f16
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Fri Sep 15 16:08:04 2023 +0800

    Review and improve pipeline code (#28441)
    
    * Rename progress columns of "show migration check status" DistSQL result
    
    * Add incremental_idle_seconds column in "show migration check status" 
DistSQL result
    
    * Add sourceDatabaseType for ConsistencyCheckJobItemProgress
    
    * Review
    
    * Impl AutoCloseable for TableDataConsistencyChecker
    
    * Improve PipelineDataSource Wrapper and Manager
    
    * Clean code
    
    * Update unit test
---
 .../StandardPipelineDataSourceConfiguration.java   |  4 +--
 .../DefaultPipelineDataSourceManager.java          | 10 ++++--
 .../datasource/PipelineDataSourceWrapper.java      | 38 ++++++++++++++--------
 .../progress/ConsistencyCheckJobItemProgress.java  |  2 ++
 .../yaml/YamlConsistencyCheckJobItemProgress.java  |  2 ++
 ...YamlConsistencyCheckJobItemProgressSwapper.java |  3 +-
 .../common/pojo/ConsistencyCheckJobItemInfo.java   |  6 ++--
 .../common/util/PipelineLazyInitializer.java       |  5 +++
 .../ConsistencyCheckJobItemProgressContext.java    |  2 ++
 .../CRC32MatchTableDataConsistencyChecker.java     |  4 +++
 .../DataMatchTableDataConsistencyChecker.java      |  4 +++
 .../table/MatchingTableInventoryChecker.java       |  2 +-
 .../table/TableDataConsistencyChecker.java         |  5 ++-
 .../pipeline/core/task/TaskExecuteCallback.java    |  2 +-
 .../datasource/PipelineDataSourceWrapperTest.java  | 12 -------
 .../mysql/ingest/MySQLIncrementalDumper.java       |  6 ++--
 .../query/ShowMigrationCheckStatusExecutor.java    |  6 ++--
 .../ShowMigrationCheckStatusExecutorTest.java      |  7 ++--
 .../api/impl/ConsistencyCheckJobAPI.java           | 13 ++++----
 .../context/ConsistencyCheckJobItemContext.java    |  2 +-
 .../ConsistencyCheckJobItemContextTest.java        |  4 +--
 .../MigrationDataConsistencyChecker.java           |  5 +--
 .../test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java |  2 +-
 .../cases/migration/AbstractMigrationE2EIT.java    |  4 +--
 .../FixtureTableDataConsistencyChecker.java        |  4 +++
 .../consistencycheck/ConsistencyCheckJobTest.java  |  2 ++
 .../migration/api/impl/MigrationJobAPITest.java    |  2 +-
 .../MigrationDataConsistencyCheckerTest.java       |  2 +-
 28 files changed, 100 insertions(+), 60 deletions(-)

diff --git 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/StandardPipelineDataSourceConfiguration.java
 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/StandardPipelineDataSourceConfiguration.java
index 90bd9d1f84c..39b30d6bd26 100644
--- 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/StandardPipelineDataSourceConfiguration.java
+++ 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/StandardPipelineDataSourceConfiguration.java
@@ -64,8 +64,8 @@ public final class StandardPipelineDataSourceConfiguration 
implements PipelineDa
         this(param, YamlEngine.unmarshal(param, Map.class));
     }
     
-    public StandardPipelineDataSourceConfiguration(final Map<String, Object> 
yamlDataSourceConfig) {
-        this(YamlEngine.marshal(yamlDataSourceConfig), new 
HashMap<>(yamlDataSourceConfig));
+    public StandardPipelineDataSourceConfiguration(final Map<String, Object> 
poolProps) {
+        this(YamlEngine.marshal(poolProps), new HashMap<>(poolProps));
     }
     
     private StandardPipelineDataSourceConfiguration(final String param, final 
Map<String, Object> yamlConfig) {
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/datasource/DefaultPipelineDataSourceManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/datasource/DefaultPipelineDataSourceManager.java
index fee03ff0457..c637ca71d8f 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/datasource/DefaultPipelineDataSourceManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/datasource/DefaultPipelineDataSourceManager.java
@@ -41,7 +41,11 @@ public final class DefaultPipelineDataSourceManager 
implements PipelineDataSourc
         synchronized (cachedDataSources) {
             result = cachedDataSources.get(dataSourceConfig);
             if (null != result) {
-                return result;
+                if (!result.isClosed()) {
+                    return result;
+                } else {
+                    log.warn("{} is already closed, create again", result);
+                }
             }
             result = PipelineDataSourceFactory.newInstance(dataSourceConfig);
             cachedDataSources.put(dataSourceConfig, result);
@@ -49,10 +53,12 @@ public final class DefaultPipelineDataSourceManager 
implements PipelineDataSourc
         }
     }
     
-    // TODO monitor each DataSource close
     @Override
     public void close() {
         for (PipelineDataSourceWrapper each : cachedDataSources.values()) {
+            if (each.isClosed()) {
+                continue;
+            }
             try {
                 each.close();
             } catch (final SQLException ex) {
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/datasource/PipelineDataSourceWrapper.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/datasource/PipelineDataSourceWrapper.java
index c5bbda09c0f..8fdac2e2d35 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/datasource/PipelineDataSourceWrapper.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/datasource/PipelineDataSourceWrapper.java
@@ -21,12 +21,14 @@ import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
+import 
org.apache.shardingsphere.infra.datasource.pool.destroyer.DataSourcePoolDestroyer;
 
 import javax.sql.DataSource;
 import java.io.PrintWriter;
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.sql.SQLFeatureNotSupportedException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.logging.Logger;
 
 /**
@@ -41,6 +43,17 @@ public final class PipelineDataSourceWrapper implements 
DataSource, AutoCloseabl
     
     private final DatabaseType databaseType;
     
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+    
+    /**
+     * Whether underlying data source is closed or not.
+     *
+     * @return true if closed
+     */
+    public boolean isClosed() {
+        return closed.get();
+    }
+    
     @Override
     public Connection getConnection() throws SQLException {
         return dataSource.getConnection();
@@ -88,21 +101,20 @@ public final class PipelineDataSourceWrapper implements 
DataSource, AutoCloseabl
     
     @Override
     public void close() throws SQLException {
-        if (null == dataSource) {
+        if (closed.get()) {
+            return;
+        }
+        if (!(dataSource instanceof AutoCloseable)) {
+            log.warn("Data source is not closed, it might cause connection 
leak, data source: {}", dataSource);
             return;
         }
-        if (dataSource instanceof AutoCloseable) {
-            try {
-                ((AutoCloseable) dataSource).close();
-            } catch (final SQLException ex) {
-                throw ex;
-                // CHECKSTYLE:OFF
-            } catch (final Exception ex) {
-                // CHECKSTYLE:ON
-                throw new SQLException("data source close failed.", ex);
-            }
-        } else {
-            log.warn("dataSource is not closed, it might cause connection 
leak, dataSource={}", dataSource);
+        try {
+            new DataSourcePoolDestroyer(dataSource).asyncDestroy();
+            closed.set(true);
+            // CHECKSTYLE:OFF
+        } catch (final RuntimeException ex) {
+            // CHECKSTYLE:ON
+            throw new SQLException("Data source close failed.", ex);
         }
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/ConsistencyCheckJobItemProgress.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/ConsistencyCheckJobItemProgress.java
index 3b36f63e16c..f43db99d70f 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/ConsistencyCheckJobItemProgress.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/ConsistencyCheckJobItemProgress.java
@@ -52,4 +52,6 @@ public final class ConsistencyCheckJobItemProgress implements 
PipelineJobItemPro
     private final Map<String, Object> sourceTableCheckPositions;
     
     private final Map<String, Object> targetTableCheckPositions;
+    
+    private final String sourceDatabaseType;
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlConsistencyCheckJobItemProgress.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlConsistencyCheckJobItemProgress.java
index c7984ca0c4e..2ed595408b3 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlConsistencyCheckJobItemProgress.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlConsistencyCheckJobItemProgress.java
@@ -48,4 +48,6 @@ public final class YamlConsistencyCheckJobItemProgress 
implements YamlConfigurat
     private Map<String, Object> sourceTableCheckPositions = new 
LinkedHashMap<>();
     
     private Map<String, Object> targetTableCheckPositions = new 
LinkedHashMap<>();
+    
+    private String sourceDatabaseType;
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlConsistencyCheckJobItemProgressSwapper.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlConsistencyCheckJobItemProgressSwapper.java
index bece1b9b17c..c399505edf2 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlConsistencyCheckJobItemProgressSwapper.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlConsistencyCheckJobItemProgressSwapper.java
@@ -38,6 +38,7 @@ public final class YamlConsistencyCheckJobItemProgressSwapper 
implements YamlCon
         result.setCheckEndTimeMillis(data.getCheckEndTimeMillis());
         
result.setSourceTableCheckPositions(data.getSourceTableCheckPositions());
         
result.setTargetTableCheckPositions(data.getTargetTableCheckPositions());
+        result.setSourceDatabaseType(data.getSourceDatabaseType());
         return result;
     }
     
@@ -45,7 +46,7 @@ public final class YamlConsistencyCheckJobItemProgressSwapper 
implements YamlCon
     public ConsistencyCheckJobItemProgress swapToObject(final 
YamlConsistencyCheckJobItemProgress yamlConfig) {
         ConsistencyCheckJobItemProgress result = new 
ConsistencyCheckJobItemProgress(yamlConfig.getTableNames(), 
yamlConfig.getIgnoredTableNames(), yamlConfig.getCheckedRecordsCount(),
                 yamlConfig.getRecordsCount(), 
yamlConfig.getCheckBeginTimeMillis(), yamlConfig.getCheckEndTimeMillis(),
-                yamlConfig.getSourceTableCheckPositions(), 
yamlConfig.getTargetTableCheckPositions());
+                yamlConfig.getSourceTableCheckPositions(), 
yamlConfig.getTargetTableCheckPositions(), yamlConfig.getSourceDatabaseType());
         result.setStatus(JobStatus.valueOf(yamlConfig.getStatus()));
         return result;
     }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/pojo/ConsistencyCheckJobItemInfo.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/pojo/ConsistencyCheckJobItemInfo.java
index 0fa4f6cb77f..db7b605b71e 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/pojo/ConsistencyCheckJobItemInfo.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/pojo/ConsistencyCheckJobItemInfo.java
@@ -35,9 +35,11 @@ public final class ConsistencyCheckJobItemInfo {
     
     private String checkFailedTableNames;
     
-    private int finishedPercentage;
+    private int inventoryFinishedPercentage;
     
-    private long remainingSeconds;
+    private long inventoryRemainingSeconds;
+    
+    private String incrementalIdleSeconds = "";
     
     private String checkBeginTime;
     
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/util/PipelineLazyInitializer.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/util/PipelineLazyInitializer.java
index f21d0adfd3e..df9927ecd62 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/util/PipelineLazyInitializer.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/util/PipelineLazyInitializer.java
@@ -38,6 +38,11 @@ public abstract class PipelineLazyInitializer<T> extends 
LazyInitializer<T> {
         return result;
     }
     
+    /**
+     * Is initialized.
+     *
+     * @return initialized or not
+     */
     public boolean isInitialized() {
         return initialized.get();
     }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/ConsistencyCheckJobItemProgressContext.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/ConsistencyCheckJobItemProgressContext.java
index 7822408a81c..ddd11644075 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/ConsistencyCheckJobItemProgressContext.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/ConsistencyCheckJobItemProgressContext.java
@@ -58,6 +58,8 @@ public final class ConsistencyCheckJobItemProgressContext 
implements PipelineJob
     
     private final Map<String, Object> targetTableCheckPositions = new 
ConcurrentHashMap<>();
     
+    private final String sourceDatabaseType;
+    
     @Override
     public void onProgressUpdated(final PipelineJobProgressUpdatedParameter 
param) {
         checkedRecordsCount.addAndGet(param.getProcessedRecordsCount());
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/CRC32MatchTableDataConsistencyChecker.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/CRC32MatchTableDataConsistencyChecker.java
index 42f7ff0824e..7f07d1fb81e 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/CRC32MatchTableDataConsistencyChecker.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/CRC32MatchTableDataConsistencyChecker.java
@@ -47,6 +47,10 @@ public final class CRC32MatchTableDataConsistencyChecker 
implements TableDataCon
         return result;
     }
     
+    @Override
+    public void close() {
+    }
+    
     @Override
     public String getType() {
         return "CRC32_MATCH";
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/DataMatchTableDataConsistencyChecker.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/DataMatchTableDataConsistencyChecker.java
index 9ae4cc1cfb6..410c64c4af2 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/DataMatchTableDataConsistencyChecker.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/DataMatchTableDataConsistencyChecker.java
@@ -74,6 +74,10 @@ public final class DataMatchTableDataConsistencyChecker 
implements TableDataCons
         return 
ShardingSphereServiceLoader.getServiceInstances(DatabaseType.class);
     }
     
+    @Override
+    public void close() {
+    }
+    
     @Override
     public String getType() {
         return "DATA_MATCH";
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
index d941dbb0d26..3e485e97f52 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
@@ -59,7 +59,7 @@ public abstract class MatchingTableInventoryChecker 
implements TableInventoryChe
     
     @Override
     public TableDataConsistencyCheckResult checkSingleTableInventoryData() {
-        ThreadFactory threadFactory = 
ExecutorThreadFactoryBuilder.build("job-" + getJobIdDigest(param.getJobId()) + 
"-check-%d");
+        ThreadFactory threadFactory = 
ExecutorThreadFactoryBuilder.build("job-" + getJobIdDigest(param.getJobId()) + 
"-matching-check-%d");
         ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60, 
TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
         try {
             return checkSingleTableInventoryData(param, executor);
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableDataConsistencyChecker.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableDataConsistencyChecker.java
index 2faa5cc0775..010e504f66d 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableDataConsistencyChecker.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableDataConsistencyChecker.java
@@ -25,7 +25,7 @@ import java.util.Collection;
 /**
  * Table data consistency checker.
  */
-public interface TableDataConsistencyChecker extends ShardingSphereAlgorithm {
+public interface TableDataConsistencyChecker extends ShardingSphereAlgorithm, 
AutoCloseable {
     
     /**
      * Build table inventory checker.
@@ -50,4 +50,7 @@ public interface TableDataConsistencyChecker extends 
ShardingSphereAlgorithm {
      * @return supported database types
      */
     Collection<DatabaseType> getSupportedDatabaseTypes();
+    
+    @Override
+    void close();
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/TaskExecuteCallback.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/TaskExecuteCallback.java
index c4e88b5a79d..82b5ec4259a 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/TaskExecuteCallback.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/TaskExecuteCallback.java
@@ -37,7 +37,7 @@ public final class TaskExecuteCallback implements 
ExecuteCallback {
     
     @Override
     public void onFailure(final Throwable throwable) {
-        log.error("onFailure, task ID={}", task.getTaskId());
+        log.error("onFailure, task ID={}", task.getTaskId(), throwable);
         task.stop();
         IOUtils.closeQuietly(task);
     }
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/datasource/PipelineDataSourceWrapperTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/datasource/PipelineDataSourceWrapperTest.java
index 7a61e3bc6ea..455a2b0dc2d 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/datasource/PipelineDataSourceWrapperTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/datasource/PipelineDataSourceWrapperTest.java
@@ -110,16 +110,4 @@ class PipelineDataSourceWrapperTest {
         doThrow(new 
SQLException("")).when(dataSource).setLogWriter(printWriter);
         assertThrows(SQLException.class, () -> new 
PipelineDataSourceWrapper(dataSource, 
TypedSPILoader.getService(DatabaseType.class, 
"FIXTURE")).setLogWriter(printWriter));
     }
-    
-    @Test
-    void assertCloseExceptionFailure() throws Exception {
-        doThrow(new Exception("")).when((AutoCloseable) dataSource).close();
-        assertThrows(SQLException.class, () -> new 
PipelineDataSourceWrapper(dataSource, 
TypedSPILoader.getService(DatabaseType.class, "FIXTURE")).close());
-    }
-    
-    @Test
-    void assertCloseSQLExceptionFailure() throws Exception {
-        doThrow(new SQLException("")).when((AutoCloseable) dataSource).close();
-        assertThrows(SQLException.class, () -> new 
PipelineDataSourceWrapper(dataSource, 
TypedSPILoader.getService(DatabaseType.class, "FIXTURE")).close());
-    }
 }
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index 0a0eeb75081..6c5cde7993a 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -47,10 +47,10 @@ import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.client.ConnectInfo;
 import org.apache.shardingsphere.data.pipeline.mysql.ingest.client.MySQLClient;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.MySQLDataTypeHandler;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.column.value.string.MySQLBinaryString;
-import 
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
 import 
org.apache.shardingsphere.infra.database.core.connector.ConnectionProperties;
-import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import 
org.apache.shardingsphere.infra.database.core.connector.ConnectionPropertiesParser;
+import 
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
+import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 
 import java.io.Serializable;
@@ -88,10 +88,10 @@ public final class MySQLIncrementalDumper extends 
AbstractLifecycleExecutor impl
         this.channel = channel;
         this.metaDataLoader = metaDataLoader;
         YamlJdbcConfiguration jdbcConfig = 
((StandardPipelineDataSourceConfiguration) 
dumperConfig.getDataSourceConfig()).getJdbcConfig();
-        log.info("incremental dump, jdbcUrl={}", jdbcConfig.getUrl());
         ConnectionPropertiesParser parser = 
DatabaseTypedSPILoader.getService(ConnectionPropertiesParser.class, 
TypedSPILoader.getService(DatabaseType.class, "MySQL"));
         ConnectionProperties connectionProps = 
parser.parse(jdbcConfig.getUrl(), null, null);
         ConnectInfo connectInfo = new ConnectInfo(generateServerId(), 
connectionProps.getHostname(), connectionProps.getPort(), 
jdbcConfig.getUsername(), jdbcConfig.getPassword());
+        log.info("incremental dump, jdbcUrl={}, serverId={}, hostname={}, 
port={}", jdbcConfig.getUrl(), connectInfo.getServerId(), 
connectInfo.getHost(), connectInfo.getPort());
         client = new MySQLClient(connectInfo, dumperConfig.isDecodeWithTX());
         catalog = connectionProps.getCatalog();
     }
diff --git 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusExecutor.java
 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusExecutor.java
index 98202a3c5b6..a1f95255599 100644
--- 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusExecutor.java
+++ 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusExecutor.java
@@ -49,15 +49,15 @@ public final class ShowMigrationCheckStatusExecutor 
implements QueryableRALExecu
     private LocalDataQueryResultRow convert(final ConsistencyCheckJobItemInfo 
info) {
         String checkResult = null == info.getCheckSuccess() ? "" : 
info.getCheckSuccess().toString();
         return new 
LocalDataQueryResultRow(Optional.ofNullable(info.getTableNames()).orElse(""), 
checkResult, Optional.ofNullable(info.getCheckFailedTableNames()).orElse(""),
-                String.valueOf(info.getFinishedPercentage()), 
info.getRemainingSeconds(),
+                String.valueOf(info.getInventoryFinishedPercentage()), 
info.getInventoryRemainingSeconds(), info.getIncrementalIdleSeconds(),
                 Optional.ofNullable(info.getCheckBeginTime()).orElse(""), 
Optional.ofNullable(info.getCheckEndTime()).orElse(""), 
info.getDurationSeconds(),
                 info.getAlgorithmType(), 
Optional.ofNullable(info.getAlgorithmProps()).orElse(""), 
Optional.ofNullable(info.getErrorMessage()).orElse(""));
     }
     
     @Override
     public Collection<String> getColumnNames() {
-        return Arrays.asList("tables", "result", "check_failed_tables", 
"finished_percentage", "remaining_seconds", "check_begin_time", 
"check_end_time", "duration_seconds",
-                "algorithm_type", "algorithm_props", "error_message");
+        return Arrays.asList("tables", "result", "check_failed_tables", 
"inventory_finished_percentage", "inventory_remaining_seconds", 
"incremental_idle_seconds",
+                "check_begin_time", "check_end_time", "duration_seconds", 
"algorithm_type", "algorithm_props", "error_message");
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/distsql/handler/src/test/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusExecutorTest.java
 
b/kernel/data-pipeline/distsql/handler/src/test/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusExecutorTest.java
index fa45d54fb3a..4ffef5d5e08 100644
--- 
a/kernel/data-pipeline/distsql/handler/src/test/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusExecutorTest.java
+++ 
b/kernel/data-pipeline/distsql/handler/src/test/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusExecutorTest.java
@@ -32,13 +32,14 @@ class ShowMigrationCheckStatusExecutorTest {
     @Test
     void assertGetColumnNames() {
         Collection<String> columns = executor.getColumnNames();
-        assertThat(columns.size(), is(11));
+        assertThat(columns.size(), is(12));
         Iterator<String> iterator = columns.iterator();
         assertThat(iterator.next(), is("tables"));
         assertThat(iterator.next(), is("result"));
         assertThat(iterator.next(), is("check_failed_tables"));
-        assertThat(iterator.next(), is("finished_percentage"));
-        assertThat(iterator.next(), is("remaining_seconds"));
+        assertThat(iterator.next(), is("inventory_finished_percentage"));
+        assertThat(iterator.next(), is("inventory_remaining_seconds"));
+        assertThat(iterator.next(), is("incremental_idle_seconds"));
         assertThat(iterator.next(), is("check_begin_time"));
         assertThat(iterator.next(), is("check_end_time"));
         assertThat(iterator.next(), is("duration_seconds"));
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index 37acaab9719..4eedb50c8cd 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -122,6 +122,7 @@ public final class ConsistencyCheckJobAPI extends 
AbstractPipelineJobAPIImpl {
         yamlConfig.setParentJobId(parentJobId);
         yamlConfig.setAlgorithmTypeName(param.getAlgorithmTypeName());
         yamlConfig.setAlgorithmProps(param.getAlgorithmProps());
+        
yamlConfig.setSourceDatabaseType(param.getSourceDatabaseType().getType());
         start(new 
YamlConsistencyCheckJobConfigurationSwapper().swapToObject(yamlConfig));
         return result;
     }
@@ -160,7 +161,7 @@ public final class ConsistencyCheckJobAPI extends 
AbstractPipelineJobAPIImpl {
         String ignoredTableNames = String.join(",", 
progressContext.getIgnoredTableNames());
         ConsistencyCheckJobItemProgress jobItemProgress = new 
ConsistencyCheckJobItemProgress(tableNames, ignoredTableNames, 
progressContext.getCheckedRecordsCount().get(),
                 progressContext.getRecordsCount(), 
progressContext.getCheckBeginTimeMillis(), 
progressContext.getCheckEndTimeMillis(),
-                progressContext.getSourceTableCheckPositions(), 
progressContext.getTargetTableCheckPositions());
+                progressContext.getSourceTableCheckPositions(), 
progressContext.getTargetTableCheckPositions(), 
progressContext.getSourceDatabaseType());
         jobItemProgress.setStatus(context.getStatus());
         return 
YamlEngine.marshal(swapper.swapToYamlConfiguration(jobItemProgress));
     }
@@ -304,7 +305,7 @@ public final class ConsistencyCheckJobAPI extends 
AbstractPipelineJobAPIImpl {
         }
         ConsistencyCheckJobItemProgress jobItemProgress = progress.get();
         if (null == jobItemProgress.getRecordsCount() || null == 
jobItemProgress.getCheckedRecordsCount()) {
-            result.setFinishedPercentage(0);
+            result.setInventoryFinishedPercentage(0);
             result.setCheckSuccess(null);
             return result;
         }
@@ -312,14 +313,14 @@ public final class ConsistencyCheckJobAPI extends 
AbstractPipelineJobAPIImpl {
         long recordsCount = jobItemProgress.getRecordsCount();
         long checkedRecordsCount = 
Math.min(jobItemProgress.getCheckedRecordsCount(), recordsCount);
         if (JobStatus.FINISHED == jobItemProgress.getStatus()) {
-            result.setFinishedPercentage(100);
+            result.setInventoryFinishedPercentage(100);
             LocalDateTime checkEndTime = new 
Timestamp(jobItemProgress.getCheckEndTimeMillis()).toLocalDateTime();
             Duration duration = Duration.between(checkBeginTime, checkEndTime);
             result.setDurationSeconds(duration.getSeconds());
             result.setCheckEndTime(DATE_TIME_FORMATTER.format(checkEndTime));
-            result.setRemainingSeconds(0L);
+            result.setInventoryRemainingSeconds(0L);
         } else if (0 != recordsCount && 0 != checkedRecordsCount) {
-            result.setFinishedPercentage((int) (checkedRecordsCount * 100 / 
recordsCount));
+            result.setInventoryFinishedPercentage((int) (checkedRecordsCount * 
100 / recordsCount));
             JobConfigurationPOJO jobConfigPOJO = 
getElasticJobConfigPOJO(checkJobId);
             Long stopTimeMillis = jobConfigPOJO.isDisabled() ? 
Long.parseLong(jobConfigPOJO.getProps().getProperty("stop_time_millis")) : null;
             long durationMillis = (null != stopTimeMillis ? stopTimeMillis : 
System.currentTimeMillis()) - jobItemProgress.getCheckBeginTimeMillis();
@@ -328,7 +329,7 @@ public final class ConsistencyCheckJobAPI extends 
AbstractPipelineJobAPIImpl {
                 result.setCheckEndTime(DATE_TIME_FORMATTER.format(new 
Timestamp(stopTimeMillis).toLocalDateTime()));
             }
             long remainingMills = Math.max(0, (long) ((recordsCount - 
checkedRecordsCount) * 1.0D / checkedRecordsCount * durationMillis));
-            result.setRemainingSeconds(remainingMills / 1000);
+            result.setInventoryRemainingSeconds(remainingMills / 1000);
         }
         String tableNames = jobItemProgress.getTableNames();
         result.setTableNames(Optional.ofNullable(tableNames).orElse(""));
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContext.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContext.java
index 329d9d1e8cb..5dd7865398b 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContext.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContext.java
@@ -56,7 +56,7 @@ public final class ConsistencyCheckJobItemContext implements 
PipelineJobItemCont
         jobId = jobConfig.getJobId();
         this.shardingItem = shardingItem;
         this.status = status;
-        progressContext = new ConsistencyCheckJobItemProgressContext(jobId, 
shardingItem);
+        progressContext = new ConsistencyCheckJobItemProgressContext(jobId, 
shardingItem, jobConfig.getSourceDatabaseType().getType());
         if (null != jobItemProgress) {
             
progressContext.getCheckedRecordsCount().set(Optional.ofNullable(jobItemProgress.getCheckedRecordsCount()).orElse(0L));
             
Optional.ofNullable(jobItemProgress.getSourceTableCheckPositions()).ifPresent(progressContext.getSourceTableCheckPositions()::putAll);
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContextTest.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContextTest.java
index 753f5e9105f..b84f69bff9e 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContextTest.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContextTest.java
@@ -43,7 +43,7 @@ class ConsistencyCheckJobItemContextTest {
     void assertConstructWithoutTableCheckPositions() {
         Map<String, Object> sourceTableCheckPositions = Collections.emptyMap();
         Map<String, Object> targetTableCheckPositions = Collections.emptyMap();
-        ConsistencyCheckJobItemProgress jobItemProgress = new 
ConsistencyCheckJobItemProgress(TABLE, null, 0L, 10L, null, null, 
sourceTableCheckPositions, targetTableCheckPositions);
+        ConsistencyCheckJobItemProgress jobItemProgress = new 
ConsistencyCheckJobItemProgress(TABLE, null, 0L, 10L, null, null, 
sourceTableCheckPositions, targetTableCheckPositions, "H2");
         ConsistencyCheckJobItemContext actual = new 
ConsistencyCheckJobItemContext(new ConsistencyCheckJobConfiguration("", "", 
"DATA_MATCH", null, databaseType),
                 0, JobStatus.RUNNING, jobItemProgress);
         verifyProgressContext(actual.getProgressContext(), 0, 
sourceTableCheckPositions, targetTableCheckPositions);
@@ -53,7 +53,7 @@ class ConsistencyCheckJobItemContextTest {
     void assertConstructWithTableCheckPositions() {
         Map<String, Object> sourceTableCheckPositions = ImmutableMap.of(TABLE, 
6);
         Map<String, Object> targetTableCheckPositions = ImmutableMap.of(TABLE, 
5);
-        ConsistencyCheckJobItemProgress jobItemProgress = new 
ConsistencyCheckJobItemProgress(TABLE, null, 0L, 10L, null, null, 
sourceTableCheckPositions, targetTableCheckPositions);
+        ConsistencyCheckJobItemProgress jobItemProgress = new 
ConsistencyCheckJobItemProgress(TABLE, null, 0L, 10L, null, null, 
sourceTableCheckPositions, targetTableCheckPositions, "H2");
         ConsistencyCheckJobItemContext actual = new 
ConsistencyCheckJobItemContext(new ConsistencyCheckJobConfiguration("", "", 
"DATA_MATCH", null, databaseType),
                 0, JobStatus.RUNNING, jobItemProgress);
         verifyProgressContext(actual.getProgressContext(), 1, 
sourceTableCheckPositions, targetTableCheckPositions);
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
index 13dde2fb842..846f9b430e0 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
@@ -87,8 +87,9 @@ public final class MigrationDataConsistencyChecker implements 
PipelineDataConsis
         progressContext.getTableNames().addAll(sourceTableNames);
         progressContext.onProgressUpdated(new 
PipelineJobProgressUpdatedParameter(0));
         Map<SchemaTableName, TableDataConsistencyCheckResult> result = new 
LinkedHashMap<>();
-        TableDataConsistencyChecker tableChecker = 
TableDataConsistencyCheckerFactory.newInstance(algorithmType, algorithmProps);
-        try (PipelineDataSourceManager dataSourceManager = new 
DefaultPipelineDataSourceManager()) {
+        try (
+                PipelineDataSourceManager dataSourceManager = new 
DefaultPipelineDataSourceManager();
+                TableDataConsistencyChecker tableChecker = 
TableDataConsistencyCheckerFactory.newInstance(algorithmType, algorithmProps)) {
             for (JobDataNodeLine each : jobConfig.getJobShardingDataNodes()) {
                 checkTableInventoryData(each, tableChecker, result, 
dataSourceManager);
             }
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
index ecf7e569595..a54c866caea 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
@@ -201,7 +201,7 @@ class CDCE2EIT {
         StandardPipelineTableMetaDataLoader metaDataLoader = new 
StandardPipelineTableMetaDataLoader(targetDataSource);
         PipelineTableMetaData tableMetaData = 
metaDataLoader.getTableMetaData(schemaTableName.getSchemaName().getOriginal(), 
schemaTableName.getTableName().getOriginal());
         List<PipelineColumnMetaData> uniqueKeys = 
Collections.singletonList(tableMetaData.getColumnMetaData(tableMetaData.getPrimaryKeyColumns().get(0)));
-        ConsistencyCheckJobItemProgressContext progressContext = new 
ConsistencyCheckJobItemProgressContext("", 0);
+        ConsistencyCheckJobItemProgressContext progressContext = new 
ConsistencyCheckJobItemProgressContext("", 0, 
sourceDataSource.getDatabaseType().getType());
         TableInventoryCheckParameter param = new 
TableInventoryCheckParameter("", sourceDataSource, targetDataSource, 
schemaTableName, schemaTableName,
                 tableMetaData.getColumnNames(), uniqueKeys, null, 
progressContext);
         TableDataConsistencyChecker tableChecker = 
TypedSPILoader.getService(TableDataConsistencyChecker.class, "DATA_MATCH", new 
Properties());
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
index ffd99fd94c6..470f46434a1 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
@@ -165,7 +165,7 @@ public abstract class AbstractMigrationE2EIT {
                 continue;
             }
             List<String> checkEndTimeList = resultList.stream().map(map -> 
map.get("check_end_time").toString()).filter(each -> 
!Strings.isNullOrEmpty(each)).collect(Collectors.toList());
-            Set<String> finishedPercentages = resultList.stream().map(map -> 
map.get("finished_percentage").toString()).collect(Collectors.toSet());
+            Set<String> finishedPercentages = resultList.stream().map(map -> 
map.get("inventory_finished_percentage").toString()).collect(Collectors.toSet());
             if (checkEndTimeList.size() == resultList.size() && 1 == 
finishedPercentages.size() && finishedPercentages.contains("100")) {
                 break;
             } else {
@@ -176,7 +176,7 @@ public abstract class AbstractMigrationE2EIT {
         assertFalse(resultList.isEmpty());
         for (Map<String, Object> each : resultList) {
             assertTrue(Boolean.parseBoolean(each.get("result").toString()), 
String.format("%s check result is false", each.get("tables")));
-            assertThat("finished_percentage is not 100", 
each.get("finished_percentage").toString(), is("100"));
+            assertThat("inventory_finished_percentage is not 100", 
each.get("inventory_finished_percentage").toString(), is("100"));
         }
     }
     
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureTableDataConsistencyChecker.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureTableDataConsistencyChecker.java
index a3dd3e28c4a..fdcc39f5120 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureTableDataConsistencyChecker.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureTableDataConsistencyChecker.java
@@ -39,6 +39,10 @@ public final class FixtureTableDataConsistencyChecker 
implements TableDataConsis
         return 
ShardingSphereServiceLoader.getServiceInstances(DatabaseType.class);
     }
     
+    @Override
+    public void close() {
+    }
+    
     @Override
     public String getType() {
         return "FIXTURE";
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
index 613a74c9bbf..a8c3af697f2 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
@@ -72,6 +72,8 @@ class ConsistencyCheckJobTest {
         YamlConsistencyCheckJobConfiguration result = new 
YamlConsistencyCheckJobConfiguration();
         result.setJobId(checkJobId);
         result.setParentJobId("");
+        result.setAlgorithmTypeName("DATA_MATCH");
+        result.setSourceDatabaseType("H2");
         return result;
     }
 }
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
index d052d9609ae..96fc9eba5c8 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
@@ -177,7 +177,7 @@ class MigrationJobAPITest {
         Optional<String> jobId = jobAPI.start(jobConfig);
         assertTrue(jobId.isPresent());
         Map<String, TableDataConsistencyCheckResult> checkResultMap = 
jobAPI.buildPipelineDataConsistencyChecker(
-                jobConfig, jobAPI.buildPipelineProcessContext(jobConfig), new 
ConsistencyCheckJobItemProgressContext(jobId.get(), 0)).check("FIXTURE", null);
+                jobConfig, jobAPI.buildPipelineProcessContext(jobConfig), new 
ConsistencyCheckJobItemProgressContext(jobId.get(), 0, "H2")).check("FIXTURE", 
null);
         assertThat(checkResultMap.size(), is(1));
         String checkKey = "t_order";
         
assertTrue(checkResultMap.get(checkKey).getCountCheckResult().isMatched());
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
index f4e849174e1..3862d8c061b 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
@@ -72,7 +72,7 @@ class MigrationDataConsistencyCheckerTest {
     }
     
     private ConsistencyCheckJobItemProgressContext 
createConsistencyCheckJobItemProgressContext() {
-        return new ConsistencyCheckJobItemProgressContext("", 0);
+        return new ConsistencyCheckJobItemProgressContext("", 0, "H2");
     }
     
     private MigrationJobConfiguration createJobConfiguration() throws 
SQLException {


Reply via email to