This is an automated email from the ASF dual-hosted git repository.

sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c778a7eab7 Support cancel data consistency check and refactoring 
(#21429)
4c778a7eab7 is described below

commit 4c778a7eab7e1c867c1803568fc1ac7030774b1b
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Mon Oct 10 13:53:13 2022 +0800

    Support cancel data consistency check and refactoring (#21429)
    
    * Move DataConsistencyCalculateAlgorithmFactory
    
    * Add SingleTableInventoryDataConsistencyChecker
    
    * Refactor DataConsistencyCalculateParameter.tableNameSchemaNameMapping to 
schemaName
    
    * Add DataConsistencyCalculatedResult, calculate count and content together
    
    * Refactor SingleTableInventoryDataConsistencyChecker by 
DataConsistencyCalculatedResult
    
    * Remove buildCountSQL from PipelineSQLBuilder
    
    * Unit test
    
    * Revert "Remove buildCountSQL from PipelineSQLBuilder"
    
    This reverts commit ea946fcce577fe2aee72592176657464f4952753.
    
    * Add CRC32_MATCH in MySQLMigrationGeneralIT
    
    * Add log when crc32 not match
    
    * Add cancel() for DataConsistencyCalculateAlgorithm and abstract impl
    
    * Recover PipelineTask start stop methods
    
    * Add ConsistencyCheckTasksRunner and refactor ConsistencyCheckJob
    
    * Refactor AbstractPipelineJob to use common JobBootstrap
    
    * Cancel DataConsistencyCalculateAlgorithm on stopping
    
    * Log
---
 .../DataConsistencyCalculateParameter.java         |   4 +-
 .../DataConsistencyCalculatedResult.java           |  21 +--
 .../DataConsistencyCalculateAlgorithm.java         |  12 +-
 .../DataConsistencyCalculateAlgorithmFactory.java  |   3 +-
 .../spi/sqlbuilder/PipelineSQLBuilder.java         |   1 +
 .../core/api/InventoryIncrementalJobAPI.java       |  15 +-
 .../AbstractInventoryIncrementalJobAPIImpl.java    |  31 ++--
 .../DataConsistencyCalculateAlgorithmChooser.java  |   1 +
 ...SingleTableInventoryDataConsistencyChecker.java | 148 ++++++++++++++++++
 .../AbstractDataConsistencyCalculateAlgorithm.java |  63 ++++++++
 ...StreamingDataConsistencyCalculateAlgorithm.java |  20 +--
 ...RC32MatchDataConsistencyCalculateAlgorithm.java |  76 ++++++++--
 ...DataMatchDataConsistencyCalculateAlgorithm.java |  27 ++--
 .../PipelineJobHasAlreadyFinishedException.java    |   2 +-
 .../pipeline/core/job/AbstractPipelineJob.java     |   4 +-
 .../data/pipeline/core/task/IncrementalTask.java   |  10 +-
 .../data/pipeline/core/task/InventoryTask.java     |  10 +-
 .../data/pipeline/core/task/PipelineTask.java      |  14 ++
 ...tencyCheckChangedJobConfigurationProcessor.java |   2 +-
 .../consistencycheck/ConsistencyCheckJob.java      |  61 +++-----
 .../ConsistencyCheckTasksRunner.java               | 139 +++++++++++++++++
 .../MigrationChangedJobConfigurationProcessor.java |   2 +-
 .../migration/MigrationDataConsistencyChecker.java | 167 +++------------------
 .../pipeline/scenario/migration/MigrationJob.java  |   4 +-
 ...MatchDataConsistencyCalculateAlgorithmTest.java |  17 +--
 .../mysql/sqlbuilder/MySQLPipelineSQLBuilder.java  |   2 +-
 .../sqlbuilder/MySQLPipelineSQLBuilderTest.java    |   2 +-
 .../migration/general/MySQLMigrationGeneralIT.java |   9 +-
 ...taConsistencyCalculateAlgorithmFactoryTest.java |   1 +
 .../DataConsistencyCalculateAlgorithmFixture.java  |   9 +-
 .../FixtureDataConsistencyCalculatedResult.java}   |  19 +--
 31 files changed, 598 insertions(+), 298 deletions(-)

diff --git 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculateParameter.java
 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculateParameter.java
index 7463fadae10..154643bdd59 100644
--- 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculateParameter.java
+++ 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculateParameter.java
@@ -22,7 +22,6 @@ import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.Setter;
 import lombok.ToString;
-import 
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
 
@@ -43,8 +42,7 @@ public final class DataConsistencyCalculateParameter {
      */
     private final PipelineDataSourceWrapper dataSource;
     
-    // TODO replace to schemaName
-    private final TableNameSchemaNameMapping tableNameSchemaNameMapping;
+    private final String schemaName;
     
     private final String logicTableName;
     
diff --git 
a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmFactoryTest.java
 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculatedResult.java
similarity index 70%
copy from 
test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmFactoryTest.java
copy to 
kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculatedResult.java
index 3d1c2827377..ea34f78d60e 100644
--- 
a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmFactoryTest.java
+++ 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculatedResult.java
@@ -15,16 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.check.consistency;
+package org.apache.shardingsphere.data.pipeline.api.check.consistency;
 
-import org.junit.Test;
-
-import java.util.Properties;
-
-public final class DataConsistencyCalculateAlgorithmFactoryTest {
+/**
+ * Data consistency calculated result.
+ */
+public interface DataConsistencyCalculatedResult {
     
-    @Test
-    public void assertNewInstanceSuccess() {
-        DataConsistencyCalculateAlgorithmFactory.newInstance("FIXTURE", new 
Properties());
-    }
+    /**
+     * Get records count.
+     *
+     * @return records count
+     */
+    int getRecordsCount();
 }
diff --git 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/check/consistency/DataConsistencyCalculateAlgorithm.java
 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/check/consistency/DataConsistencyCalculateAlgorithm.java
index 368b501ee5a..6ee50d96e10 100644
--- 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/check/consistency/DataConsistencyCalculateAlgorithm.java
+++ 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/check/consistency/DataConsistencyCalculateAlgorithm.java
@@ -18,9 +18,12 @@
 package org.apache.shardingsphere.data.pipeline.spi.check.consistency;
 
 import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculateParameter;
+import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculatedResult;
 import 
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithm;
 import org.apache.shardingsphere.infra.util.spi.aware.SPIMetadataAware;
 
+import java.sql.SQLException;
+
 /**
  * Data consistency calculate algorithm.
  */
@@ -32,5 +35,12 @@ public interface DataConsistencyCalculateAlgorithm extends 
ShardingSphereAlgorit
      * @param parameter data consistency calculate parameter
      * @return calculated result
      */
-    Iterable<Object> calculate(DataConsistencyCalculateParameter parameter);
+    Iterable<DataConsistencyCalculatedResult> 
calculate(DataConsistencyCalculateParameter parameter);
+    
+    /**
+     * Cancel calculation.
+     *
+     * @throws SQLException SQL exception if cancel underlying SQL execution 
failure
+     */
+    void cancel() throws SQLException;
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmFactory.java
 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/check/consistency/DataConsistencyCalculateAlgorithmFactory.java
similarity index 92%
rename from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmFactory.java
rename to 
kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/check/consistency/DataConsistencyCalculateAlgorithmFactory.java
index a37556b792f..8db297edc3c 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmFactory.java
+++ 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/check/consistency/DataConsistencyCalculateAlgorithmFactory.java
@@ -15,9 +15,8 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.check.consistency;
+package org.apache.shardingsphere.data.pipeline.spi.check.consistency;
 
-import 
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
 import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
 import 
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmFactory;
 import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
diff --git 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
index 0a1299318a7..c5663ba699b 100644
--- 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
+++ 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
@@ -124,6 +124,7 @@ public interface PipelineSQLBuilder extends TypedSPI, 
RequiredSPI {
      * @param tableName table name
      * @return count SQL
      */
+    // TODO keep it for now, it might be used later
     String buildCountSQL(String schemaName, String tableName);
     
     /**
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
index fdf355a8a4f..077d08c7281 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
@@ -20,8 +20,10 @@ package org.apache.shardingsphere.data.pipeline.core.api;
 import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
 import 
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
+import 
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
 
 import java.util.Map;
+import java.util.Properties;
 
 /**
  * Inventory incremental job API.
@@ -39,13 +41,24 @@ public interface InventoryIncrementalJobAPI extends 
PipelineJobAPI {
     @Override
     InventoryIncrementalJobItemProgress getJobItemProgress(String jobId, int 
shardingItem);
     
+    /**
+     * Build data consistency calculate algorithm.
+     *
+     * @param jobConfig job configuration
+     * @param algorithmType algorithm type
+     * @param algorithmProps algorithm properties
+     * @return calculate algorithm
+     */
+    DataConsistencyCalculateAlgorithm 
buildDataConsistencyCalculateAlgorithm(PipelineJobConfiguration jobConfig, 
String algorithmType, Properties algorithmProps);
+    
     /**
      * Do data consistency check.
      *
      * @param pipelineJobConfig job configuration
+     * @param calculateAlgorithm calculate algorithm
      * @return each logic table check result
      */
-    Map<String, DataConsistencyCheckResult> 
dataConsistencyCheck(PipelineJobConfiguration pipelineJobConfig);
+    Map<String, DataConsistencyCheckResult> 
dataConsistencyCheck(PipelineJobConfiguration pipelineJobConfig, 
DataConsistencyCalculateAlgorithm calculateAlgorithm);
     
     /**
      * Aggregate data consistency check results.
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
index acb875a682a..b98eb85cf8f 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -23,20 +23,20 @@ import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsist
 import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.PipelineDataConsistencyChecker;
 import 
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.yaml.process.YamlPipelineProcessConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.yaml.process.YamlPipelineProcessConfigurationSwapper;
 import 
org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
 import 
org.apache.shardingsphere.data.pipeline.core.api.InventoryIncrementalJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmChooser;
-import 
org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.config.process.PipelineProcessConfigurationUtil;
 import 
org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.metadata.AlterNotExistProcessConfigurationException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.metadata.CreateExistsProcessConfigurationException;
 import 
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
+import 
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithmFactory;
+import 
org.apache.shardingsphere.data.pipeline.yaml.process.YamlPipelineProcessConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.yaml.process.YamlPipelineProcessConfigurationSwapper;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
 import 
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
@@ -150,19 +150,23 @@ public abstract class 
AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
         }).collect(Collectors.toList());
     }
     
+    @Override
+    public DataConsistencyCalculateAlgorithm 
buildDataConsistencyCalculateAlgorithm(final PipelineJobConfiguration 
jobConfig, final String algorithmType, final Properties algorithmProps) {
+        ShardingSpherePreconditions.checkState(null != algorithmType || null 
!= jobConfig, () -> new IllegalArgumentException("algorithmType and jobConfig 
are null"));
+        if (null != algorithmType) {
+            return 
DataConsistencyCalculateAlgorithmFactory.newInstance(algorithmType, 
algorithmProps);
+        }
+        return DataConsistencyCalculateAlgorithmChooser.choose(
+                
DatabaseTypeFactory.getInstance(jobConfig.getSourceDatabaseType()), 
DatabaseTypeFactory.getInstance(getTargetDatabaseType(jobConfig)));
+    }
+    
     @Override
     public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final 
String jobId) {
         checkModeConfig();
         log.info("Data consistency check for job {}", jobId);
         PipelineJobConfiguration jobConfig = 
getJobConfiguration(getElasticJobConfigPOJO(jobId));
-        return dataConsistencyCheck(jobConfig);
-    }
-    
-    @Override
-    public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final 
PipelineJobConfiguration jobConfig) {
-        DataConsistencyCalculateAlgorithm algorithm = 
DataConsistencyCalculateAlgorithmChooser.choose(
-                
DatabaseTypeFactory.getInstance(jobConfig.getSourceDatabaseType()), 
DatabaseTypeFactory.getInstance(getTargetDatabaseType(jobConfig)));
-        return dataConsistencyCheck(jobConfig, algorithm);
+        DataConsistencyCalculateAlgorithm calculateAlgorithm = 
buildDataConsistencyCalculateAlgorithm(jobConfig, null, null);
+        return dataConsistencyCheck(jobConfig, calculateAlgorithm);
     }
     
     @Override
@@ -170,10 +174,11 @@ public abstract class 
AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
         checkModeConfig();
         log.info("Data consistency check for job {}, algorithmType: {}", 
jobId, algorithmType);
         PipelineJobConfiguration jobConfig = 
getJobConfiguration(getElasticJobConfigPOJO(jobId));
-        return dataConsistencyCheck(jobConfig, 
DataConsistencyCalculateAlgorithmFactory.newInstance(algorithmType, 
algorithmProps));
+        return dataConsistencyCheck(jobConfig, 
buildDataConsistencyCalculateAlgorithm(jobConfig, algorithmType, 
algorithmProps));
     }
     
-    protected Map<String, DataConsistencyCheckResult> 
dataConsistencyCheck(final PipelineJobConfiguration jobConfig, final 
DataConsistencyCalculateAlgorithm calculateAlgorithm) {
+    @Override
+    public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final 
PipelineJobConfiguration jobConfig, final DataConsistencyCalculateAlgorithm 
calculateAlgorithm) {
         String jobId = jobConfig.getJobId();
         Map<String, DataConsistencyCheckResult> result = 
buildPipelineDataConsistencyChecker(jobConfig, 
buildPipelineProcessContext(jobConfig)).check(calculateAlgorithm);
         log.info("job {} with check algorithm '{}' data consistency checker 
result {}", jobId, calculateAlgorithm.getType(), result);
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmChooser.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmChooser.java
index 5e50032e3eb..499dfa006c5 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmChooser.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmChooser.java
@@ -20,6 +20,7 @@ package 
org.apache.shardingsphere.data.pipeline.core.check.consistency;
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import 
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
+import 
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithmFactory;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
 
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
new file mode 100644
index 00000000000..3125fc778fc
--- /dev/null
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.check.consistency;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculateParameter;
+import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculatedResult;
+import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyContentCheckResult;
+import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
+import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
+import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaTableName;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLException;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
+import 
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
+import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
+import 
org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
+import 
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
+import 
org.apache.shardingsphere.infra.util.exception.external.sql.type.wrapper.SQLWrapperException;
+
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Single table inventory data consistency checker.
+ */
+@Slf4j
+@RequiredArgsConstructor
+public final class SingleTableInventoryDataConsistencyChecker {
+    
+    private final String jobId;
+    
+    private final PipelineDataSourceWrapper sourceDataSource;
+    
+    private final PipelineDataSourceWrapper targetDataSource;
+    
+    private final SchemaTableName sourceTable;
+    
+    private final SchemaTableName targetTable;
+    
+    private final PipelineColumnMetaData uniqueKey;
+    
+    private final PipelineTableMetaDataLoader metaDataLoader;
+    
+    private final JobRateLimitAlgorithm readRateLimitAlgorithm;
+    
+    /**
+     * Data consistency check.
+     *
+     * @param calculateAlgorithm calculate algorithm
+     * @return data consistency check result
+     */
+    public DataConsistencyCheckResult check(final 
DataConsistencyCalculateAlgorithm calculateAlgorithm) {
+        ThreadFactory threadFactory = 
ExecutorThreadFactoryBuilder.build("job-" + getJobIdDigest(jobId) + 
"-check-%d");
+        ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60, 
TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
+        try {
+            return check(calculateAlgorithm, executor);
+        } finally {
+            executor.shutdown();
+            executor.shutdownNow();
+        }
+    }
+    
+    private DataConsistencyCheckResult check(final 
DataConsistencyCalculateAlgorithm calculateAlgorithm, final ThreadPoolExecutor 
executor) {
+        String sourceDatabaseType = 
sourceDataSource.getDatabaseType().getType();
+        String targetDatabaseType = 
targetDataSource.getDatabaseType().getType();
+        String sourceTableName = sourceTable.getTableName().getOriginal();
+        PipelineTableMetaData tableMetaData = 
metaDataLoader.getTableMetaData(sourceTable.getSchemaName().getOriginal(), 
sourceTableName);
+        ShardingSpherePreconditions.checkNotNull(tableMetaData, () -> new 
PipelineTableDataConsistencyCheckLoadingFailedException(sourceTableName));
+        Collection<String> columnNames = tableMetaData.getColumnNames();
+        DataConsistencyCalculateParameter sourceParameter = buildParameter(
+                sourceDataSource, sourceTable.getSchemaName().getOriginal(), 
sourceTableName, columnNames, sourceDatabaseType, targetDatabaseType, 
uniqueKey);
+        DataConsistencyCalculateParameter targetParameter = buildParameter(
+                targetDataSource, targetTable.getSchemaName().getOriginal(), 
targetTable.getTableName().getOriginal(), columnNames, targetDatabaseType, 
sourceDatabaseType, uniqueKey);
+        Iterator<DataConsistencyCalculatedResult> sourceCalculatedResults = 
calculateAlgorithm.calculate(sourceParameter).iterator();
+        Iterator<DataConsistencyCalculatedResult> targetCalculatedResults = 
calculateAlgorithm.calculate(targetParameter).iterator();
+        long sourceRecordsCount = 0;
+        long targetRecordsCount = 0;
+        boolean contentMatched = true;
+        while (sourceCalculatedResults.hasNext() && 
targetCalculatedResults.hasNext()) {
+            if (null != readRateLimitAlgorithm) {
+                readRateLimitAlgorithm.intercept(JobOperationType.SELECT, 1);
+            }
+            Future<DataConsistencyCalculatedResult> sourceFuture = 
executor.submit(sourceCalculatedResults::next);
+            Future<DataConsistencyCalculatedResult> targetFuture = 
executor.submit(targetCalculatedResults::next);
+            DataConsistencyCalculatedResult sourceCalculatedResult = 
waitFuture(sourceFuture);
+            DataConsistencyCalculatedResult targetCalculatedResult = 
waitFuture(targetFuture);
+            sourceRecordsCount += sourceCalculatedResult.getRecordsCount();
+            targetRecordsCount += targetCalculatedResult.getRecordsCount();
+            contentMatched = Objects.equals(sourceCalculatedResult, 
targetCalculatedResult);
+            if (!contentMatched) {
+                log.info("content matched false, jobId={}, sourceTable={}, 
targetTable={}, uniqueKey={}", jobId, sourceTable, targetTable, uniqueKey);
+                break;
+            }
+        }
+        return new DataConsistencyCheckResult(new 
DataConsistencyCountCheckResult(sourceRecordsCount, targetRecordsCount), new 
DataConsistencyContentCheckResult(contentMatched));
+    }
+    
+    // TODO use digest (crc32, murmurhash)
+    private String getJobIdDigest(final String jobId) {
+        return jobId.length() <= 6 ? jobId : jobId.substring(0, 6);
+    }
+    
+    private DataConsistencyCalculateParameter buildParameter(final 
PipelineDataSourceWrapper sourceDataSource,
+                                                             final String 
schemaName, final String tableName, final Collection<String> columnNames,
+                                                             final String 
sourceDatabaseType, final String targetDatabaseType, final 
PipelineColumnMetaData uniqueKey) {
+        return new DataConsistencyCalculateParameter(sourceDataSource, 
schemaName, tableName, columnNames, sourceDatabaseType, targetDatabaseType, 
uniqueKey);
+    }
+    
+    private <T> T waitFuture(final Future<T> future) {
+        try {
+            return future.get();
+        } catch (final InterruptedException | ExecutionException ex) {
+            if (ex.getCause() instanceof PipelineSQLException) {
+                throw (PipelineSQLException) ex.getCause();
+            }
+            throw new SQLWrapperException(new SQLException(ex));
+        }
+    }
+}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/AbstractDataConsistencyCalculateAlgorithm.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/AbstractDataConsistencyCalculateAlgorithm.java
new file mode 100644
index 00000000000..7e14ced4342
--- /dev/null
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/AbstractDataConsistencyCalculateAlgorithm.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.data.pipeline.core.check.consistency.algorithm;
+
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import 
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
+
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.sql.Statement;
+
+/**
+ * Abstract data consistency calculate algorithm.
+ */
+@Slf4j
+public abstract class AbstractDataConsistencyCalculateAlgorithm implements 
DataConsistencyCalculateAlgorithm {
+    
+    @Getter(AccessLevel.PROTECTED)
+    private volatile boolean canceling;
+    
+    private volatile Statement currentStatement;
+    
+    protected <T extends Statement> T setCurrentStatement(final T statement) {
+        this.currentStatement = statement;
+        return statement;
+    }
+    
+    @Override
+    public void cancel() throws SQLException {
+        canceling = true;
+        Statement statement = currentStatement;
+        if (null == statement || statement.isClosed()) {
+            log.info("cancel, statement is null or closed");
+            return;
+        }
+        long startTimeMillis = System.currentTimeMillis();
+        try {
+            statement.cancel();
+        } catch (final SQLFeatureNotSupportedException ex) {
+            log.info("cancel is not supported: {}", ex.getMessage());
+        } catch (final SQLException ex) {
+            log.info("cancel failed: {}", ex.getMessage());
+        }
+        log.info("cancel cost {} ms", System.currentTimeMillis() - 
startTimeMillis);
+    }
+}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/AbstractStreamingDataConsistencyCalculateAlgorithm.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/AbstractStreamingDataConsistencyCalculateAlgorithm.java
index e35afffba1c..c1c18347e88 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/AbstractStreamingDataConsistencyCalculateAlgorithm.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/AbstractStreamingDataConsistencyCalculateAlgorithm.java
@@ -21,7 +21,7 @@ import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculateParameter;
-import 
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
+import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculatedResult;
 
 import java.util.Iterator;
 import java.util.Optional;
@@ -33,10 +33,10 @@ import java.util.concurrent.atomic.AtomicInteger;
 @RequiredArgsConstructor
 @Getter
 @Slf4j
-public abstract class AbstractStreamingDataConsistencyCalculateAlgorithm 
implements DataConsistencyCalculateAlgorithm {
+public abstract class AbstractStreamingDataConsistencyCalculateAlgorithm 
extends AbstractDataConsistencyCalculateAlgorithm {
     
     @Override
-    public final Iterable<Object> calculate(final 
DataConsistencyCalculateParameter parameter) {
+    public final Iterable<DataConsistencyCalculatedResult> calculate(final 
DataConsistencyCalculateParameter parameter) {
         return new ResultIterable(parameter);
     }
     
@@ -46,30 +46,30 @@ public abstract class 
AbstractStreamingDataConsistencyCalculateAlgorithm impleme
      * @param parameter data consistency calculate parameter
      * @return optional calculated result, empty means there's no more result
      */
-    protected abstract Optional<Object> 
calculateChunk(DataConsistencyCalculateParameter parameter);
+    protected abstract Optional<DataConsistencyCalculatedResult> 
calculateChunk(DataConsistencyCalculateParameter parameter);
     
     /**
      * It's not thread-safe, it should be executed in only one thread at the 
same time.
      */
     @RequiredArgsConstructor
-    final class ResultIterable implements Iterable<Object> {
+    final class ResultIterable implements 
Iterable<DataConsistencyCalculatedResult> {
         
         private final DataConsistencyCalculateParameter parameter;
         
         @Override
-        public Iterator<Object> iterator() {
+        public Iterator<DataConsistencyCalculatedResult> iterator() {
             return new ResultIterator(parameter);
         }
     }
     
     @RequiredArgsConstructor
-    final class ResultIterator implements Iterator<Object> {
+    final class ResultIterator implements 
Iterator<DataConsistencyCalculatedResult> {
         
         private final DataConsistencyCalculateParameter parameter;
         
         private final AtomicInteger calculationCount = new AtomicInteger(0);
         
-        private volatile Optional<Object> nextResult;
+        private volatile Optional<DataConsistencyCalculatedResult> nextResult;
         
         @Override
         public boolean hasNext() {
@@ -78,9 +78,9 @@ public abstract class 
AbstractStreamingDataConsistencyCalculateAlgorithm impleme
         }
         
         @Override
-        public Object next() {
+        public DataConsistencyCalculatedResult next() {
             calculateIfNecessary();
-            Optional<Object> nextResult = this.nextResult;
+            Optional<DataConsistencyCalculatedResult> nextResult = 
this.nextResult;
             parameter.setPreviousCalculatedResult(nextResult.orElse(null));
             this.nextResult = null;
             return nextResult.orElse(null);
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
index b2eeac94959..230298808c9 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
@@ -18,11 +18,14 @@
 package 
org.apache.shardingsphere.data.pipeline.core.check.consistency.algorithm;
 
 import lombok.Getter;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculateParameter;
+import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculatedResult;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.data.UnsupportedCRC32DataConsistencyCalculateAlgorithmException;
 import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
-import 
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
 import 
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
@@ -34,6 +37,7 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.stream.Collectors;
@@ -41,11 +45,12 @@ import java.util.stream.Collectors;
 /**
  * CRC32 match data consistency calculate algorithm.
  */
-@Getter
-public final class CRC32MatchDataConsistencyCalculateAlgorithm implements 
DataConsistencyCalculateAlgorithm {
+@Slf4j
+public final class CRC32MatchDataConsistencyCalculateAlgorithm extends 
AbstractDataConsistencyCalculateAlgorithm {
     
     private static final Collection<String> SUPPORTED_DATABASE_TYPES = 
Collections.singletonList(new MySQLDatabaseType().getType());
     
+    @Getter
     private Properties props;
     
     @Override
@@ -54,26 +59,29 @@ public final class 
CRC32MatchDataConsistencyCalculateAlgorithm implements DataCo
     }
     
     @Override
-    public Iterable<Object> calculate(final DataConsistencyCalculateParameter 
parameter) {
+    public Iterable<DataConsistencyCalculatedResult> calculate(final 
DataConsistencyCalculateParameter parameter) {
         PipelineSQLBuilder sqlBuilder = 
PipelineSQLBuilderFactory.getInstance(parameter.getDatabaseType());
-        return 
Collections.unmodifiableList(parameter.getColumnNames().stream().map(each -> 
calculateCRC32(sqlBuilder, parameter, each)).collect(Collectors.toList()));
+        List<CalculatedItem> calculatedItems = 
parameter.getColumnNames().stream().map(each -> calculateCRC32(sqlBuilder, 
parameter, each)).collect(Collectors.toList());
+        return Collections.singletonList(new 
CalculatedResult(calculatedItems.get(0).getRecordsCount(), 
calculatedItems.stream().map(CalculatedItem::getCrc32).collect(Collectors.toList())));
     }
     
-    private long calculateCRC32(final PipelineSQLBuilder sqlBuilder, final 
DataConsistencyCalculateParameter parameter, final String columnName) {
+    private CalculatedItem calculateCRC32(final PipelineSQLBuilder sqlBuilder, 
final DataConsistencyCalculateParameter parameter, final String columnName) {
         String logicTableName = parameter.getLogicTableName();
-        String schemaName = 
parameter.getTableNameSchemaNameMapping().getSchemaName(logicTableName);
+        String schemaName = parameter.getSchemaName();
         Optional<String> sql = sqlBuilder.buildCRC32SQL(schemaName, 
logicTableName, columnName);
         ShardingSpherePreconditions.checkState(sql.isPresent(), () -> new 
UnsupportedCRC32DataConsistencyCalculateAlgorithmException(parameter.getDatabaseType()));
         return calculateCRC32(parameter.getDataSource(), logicTableName, 
sql.get());
     }
     
-    private long calculateCRC32(final DataSource dataSource, final String 
logicTableName, final String sql) {
+    private CalculatedItem calculateCRC32(final DataSource dataSource, final 
String logicTableName, final String sql) {
         try (
                 Connection connection = dataSource.getConnection();
-                PreparedStatement preparedStatement = 
connection.prepareStatement(sql);
+                PreparedStatement preparedStatement = 
setCurrentStatement(connection.prepareStatement(sql));
                 ResultSet resultSet = preparedStatement.executeQuery()) {
             resultSet.next();
-            return resultSet.getLong(1);
+            long crc32 = resultSet.getLong(1);
+            int recordsCount = resultSet.getInt(2);
+            return new CalculatedItem(crc32, recordsCount);
         } catch (final SQLException ex) {
             throw new 
PipelineTableDataConsistencyCheckLoadingFailedException(logicTableName);
         }
@@ -93,4 +101,52 @@ public final class 
CRC32MatchDataConsistencyCalculateAlgorithm implements DataCo
     public String getDescription() {
         return "Match CRC32 of records.";
     }
+    
+    @RequiredArgsConstructor
+    @Getter
+    private static final class CalculatedItem {
+        
+        private final long crc32;
+        
+        private final int recordsCount;
+    }
+    
+    @RequiredArgsConstructor
+    @Getter
+    private static final class CalculatedResult implements 
DataConsistencyCalculatedResult {
+        
+        private final int recordsCount;
+        
+        @NonNull
+        private final Collection<Long> columnsCrc32;
+        
+        @Override
+        public boolean equals(final @NonNull Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (getClass() != o.getClass()) {
+                log.warn("CalculatedResult type not match, o.className={}", 
o.getClass().getName());
+                return false;
+            }
+            final CalculatedResult that = (CalculatedResult) o;
+            if (recordsCount != that.recordsCount) {
+                log.info("recordsCount not match, recordsCount={}, 
that.recordsCount={}", recordsCount, that.recordsCount);
+                return false;
+            }
+            if (!columnsCrc32.equals(that.columnsCrc32)) {
+                log.info("columnsCrc32 not match, columnsCrc32={}, 
that.columnsCrc32={}", columnsCrc32, that.columnsCrc32);
+                return false;
+            } else {
+                return true;
+            }
+        }
+        
+        @Override
+        public int hashCode() {
+            int result = recordsCount;
+            result = 31 * result + columnsCrc32.hashCode();
+            return result;
+        }
+    }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
index da0dbf28569..a2df6b70b79 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
@@ -25,6 +25,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculateParameter;
+import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculatedResult;
 import 
org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCheckUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.ColumnValueReaderFactory;
@@ -87,12 +88,12 @@ public final class 
DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
     }
     
     @Override
-    protected Optional<Object> calculateChunk(final 
DataConsistencyCalculateParameter parameter) {
+    protected Optional<DataConsistencyCalculatedResult> calculateChunk(final 
DataConsistencyCalculateParameter parameter) {
         CalculatedResult previousCalculatedResult = (CalculatedResult) 
parameter.getPreviousCalculatedResult();
         String sql = getQuerySQL(parameter);
         try (
                 Connection connection = 
parameter.getDataSource().getConnection();
-                PreparedStatement preparedStatement = 
connection.prepareStatement(sql)) {
+                PreparedStatement preparedStatement = 
setCurrentStatement(connection.prepareStatement(sql))) {
             preparedStatement.setFetchSize(chunkSize);
             if (null == previousCalculatedResult) {
                 preparedStatement.setInt(1, chunkSize);
@@ -105,6 +106,10 @@ public final class 
DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
             try (ResultSet resultSet = preparedStatement.executeQuery()) {
                 ColumnValueReader columnValueReader = 
ColumnValueReaderFactory.getInstance(parameter.getDatabaseType());
                 while (resultSet.next()) {
+                    if (isCanceling()) {
+                        log.info("canceling, schemaName={}, tableName={}", 
parameter.getSchemaName(), parameter.getLogicTableName());
+                        throw new 
PipelineTableDataConsistencyCheckLoadingFailedException(parameter.getLogicTableName());
+                    }
                     ResultSetMetaData resultSetMetaData = 
resultSet.getMetaData();
                     int columnCount = resultSetMetaData.getColumnCount();
                     Collection<Object> record = new LinkedList<>();
@@ -124,11 +129,11 @@ public final class 
DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
     private String getQuerySQL(final DataConsistencyCalculateParameter 
parameter) {
         PipelineSQLBuilder sqlBuilder = 
PipelineSQLBuilderFactory.getInstance(parameter.getDatabaseType());
         String logicTableName = parameter.getLogicTableName();
-        String schemaName = 
parameter.getTableNameSchemaNameMapping().getSchemaName(logicTableName);
+        String schemaName = parameter.getSchemaName();
         String uniqueKey = parameter.getUniqueKey().getName();
-        String cacheKey = parameter.getDatabaseType() + "-" + 
(DatabaseTypeFactory.getInstance(parameter.getDatabaseType()).isSchemaAvailable()
-                ? schemaName.toLowerCase() + "." + logicTableName.toLowerCase()
-                : logicTableName.toLowerCase());
+        String cacheKey = parameter.getDatabaseType() + "-" + (null != 
schemaName && 
DatabaseTypeFactory.getInstance(parameter.getDatabaseType()).isSchemaAvailable()
+                ? schemaName + "." + logicTableName
+                : logicTableName);
         if (null == parameter.getPreviousCalculatedResult()) {
             return firstSQLCache.computeIfAbsent(cacheKey, s -> 
sqlBuilder.buildChunkedQuerySQL(schemaName, logicTableName, uniqueKey, true));
         }
@@ -152,12 +157,12 @@ public final class 
DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
     
     @RequiredArgsConstructor
     @Getter
-    private static final class CalculatedResult {
+    private static final class CalculatedResult implements 
DataConsistencyCalculatedResult {
         
         @NonNull
         private final Object maxUniqueKeyValue;
         
-        private final int recordCount;
+        private final int recordsCount;
         
         private final Collection<Collection<Object>> records;
         
@@ -172,10 +177,10 @@ public final class 
DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
                 return false;
             }
             final CalculatedResult that = (CalculatedResult) o;
-            boolean equalsFirst = new EqualsBuilder().append(getRecordCount(), 
that.getRecordCount()).append(getMaxUniqueKeyValue(), 
that.getMaxUniqueKeyValue()).isEquals();
+            boolean equalsFirst = new 
EqualsBuilder().append(getRecordsCount(), 
that.getRecordsCount()).append(getMaxUniqueKeyValue(), 
that.getMaxUniqueKeyValue()).isEquals();
             if (!equalsFirst) {
                 log.warn("recordCount or maxUniqueKeyValue not match, 
recordCount1={}, recordCount2={}, maxUniqueKeyValue1={}, maxUniqueKeyValue2={}",
-                        getRecordCount(), that.getRecordCount(), 
getMaxUniqueKeyValue(), that.getMaxUniqueKeyValue());
+                        getRecordsCount(), that.getRecordsCount(), 
getMaxUniqueKeyValue(), that.getMaxUniqueKeyValue());
                 return false;
             }
             Iterator<Collection<Object>> thisIterator = 
this.records.iterator();
@@ -218,7 +223,7 @@ public final class 
DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
         
         @Override
         public int hashCode() {
-            return new HashCodeBuilder(17, 
37).append(getMaxUniqueKeyValue()).append(getRecordCount()).append(getRecords()).toHashCode();
+            return new HashCodeBuilder(17, 
37).append(getMaxUniqueKeyValue()).append(getRecordsCount()).append(getRecords()).toHashCode();
         }
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobHasAlreadyFinishedException.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobHasAlreadyFinishedException.java
index edbd4afb2f7..afdbcce0856 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobHasAlreadyFinishedException.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobHasAlreadyFinishedException.java
@@ -28,6 +28,6 @@ public final class PipelineJobHasAlreadyFinishedException 
extends PipelineSQLExc
     private static final long serialVersionUID = 6881217592831423520L;
     
     public PipelineJobHasAlreadyFinishedException(final String jobId) {
-        super(XOpenSQLState.GENERAL_ERROR, 95, "Job has already finished, 
please run `CHECK MIGRATION %s` to start a new data consistency check job.", 
jobId);
+        super(XOpenSQLState.GENERAL_ERROR, 95, "Data consistency check job has 
already finished.", jobId);
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
index 7d14d2707e0..d48f4467a0c 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
@@ -22,7 +22,7 @@ import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.job.PipelineJob;
 import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
-import 
org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
+import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.JobBootstrap;
 
 import java.util.Map;
 import java.util.Optional;
@@ -42,7 +42,7 @@ public abstract class AbstractPipelineJob implements 
PipelineJob {
     private volatile boolean stopping;
     
     @Setter
-    private volatile OneOffJobBootstrap oneOffJobBootstrap;
+    private volatile JobBootstrap jobBootstrap;
     
     private final Map<Integer, PipelineTasksRunner> tasksRunnerMap = new 
ConcurrentHashMap<>();
     
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index 9e7363970a0..8b435383e69 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -104,11 +104,7 @@ public final class IncrementalTask implements 
PipelineTask, AutoCloseable {
         });
     }
     
-    /**
-     * Start.
-     *
-     * @return future
-     */
+    @Override
     public CompletableFuture<?> start() {
         
taskProgress.getIncrementalTaskDelay().setLatestActiveTimeMillis(System.currentTimeMillis());
         CompletableFuture<?> dumperFuture = 
incrementalExecuteEngine.submit(dumper, new ExecuteCallback() {
@@ -140,9 +136,7 @@ public final class IncrementalTask implements PipelineTask, 
AutoCloseable {
         return CompletableFuture.allOf(dumperFuture, importerFuture);
     }
     
-    /**
-     * Stop.
-     */
+    @Override
     public void stop() {
         dumper.stop();
         for (Importer each : importers) {
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
index ca90aeb859e..a69ee2d121d 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
@@ -83,11 +83,7 @@ public final class InventoryTask implements PipelineTask, 
AutoCloseable {
         return null == inventoryDumperConfig.getShardingItem() ? result : 
result + "#" + inventoryDumperConfig.getShardingItem();
     }
     
-    /**
-     * Start.
-     *
-     * @return future
-     */
+    @Override
     public CompletableFuture<?> start() {
         CompletableFuture<?> dumperFuture = 
inventoryDumperExecuteEngine.submit(dumper, new ExecuteCallback() {
             
@@ -138,9 +134,7 @@ public final class InventoryTask implements PipelineTask, 
AutoCloseable {
         return null;
     }
     
-    /**
-     * Stop.
-     */
+    @Override
     public void stop() {
         dumper.stop();
         importer.stop();
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java
index ebc5a7e0e33..45dabe9ad68 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java
@@ -19,11 +19,25 @@ package org.apache.shardingsphere.data.pipeline.core.task;
 
 import org.apache.shardingsphere.data.pipeline.api.task.progress.TaskProgress;
 
+import java.util.concurrent.CompletableFuture;
+
 /**
  * Pipeline task interface.
  */
 public interface PipelineTask {
     
+    /**
+     * Start task.
+     *
+     * @return future
+     */
+    CompletableFuture<?> start();
+    
+    /**
+     * Stop task.
+     */
+    void stop();
+    
     /**
      * Get task id.
      *
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java
index 8d71eae5a7c..406fe64310c 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java
@@ -70,7 +70,7 @@ public final class 
ConsistencyCheckChangedJobConfigurationProcessor implements P
         ConsistencyCheckJob job = new ConsistencyCheckJob();
         PipelineJobCenter.addJob(jobConfigPOJO.getJobName(), job);
         OneOffJobBootstrap oneOffJobBootstrap = new 
OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(), job, 
jobConfigPOJO.toJobConfiguration());
-        job.setOneOffJobBootstrap(oneOffJobBootstrap);
+        job.setJobBootstrap(oneOffJobBootstrap);
         oneOffJobBootstrap.execute();
     }
     
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
index 77355f5281d..b86955e252d 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
@@ -18,26 +18,16 @@
 package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
-import 
org.apache.shardingsphere.data.pipeline.api.InventoryIncrementalJobPublicAPI;
-import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
-import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
 import 
org.apache.shardingsphere.data.pipeline.api.config.job.ConsistencyCheckJobConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.yaml.job.YamlConsistencyCheckJobConfigurationSwapper;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import org.apache.shardingsphere.data.pipeline.api.job.PipelineJob;
-import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
 import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
-import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
 import 
org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
+import 
org.apache.shardingsphere.data.pipeline.yaml.job.YamlConsistencyCheckJobConfigurationSwapper;
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
 import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
-import 
org.apache.shardingsphere.infra.util.exception.external.sql.type.wrapper.SQLWrapperException;
-
-import java.util.Collections;
-import java.util.Map;
 
 /**
  * Consistency check job.
@@ -45,50 +35,43 @@ import java.util.Map;
 @Slf4j
 public final class ConsistencyCheckJob extends AbstractPipelineJob implements 
SimpleJob, PipelineJob {
     
-    private final ConsistencyCheckJobAPI jobAPI = 
ConsistencyCheckJobAPIFactory.getInstance();
-    
     private final PipelineDistributedBarrier pipelineDistributedBarrier = 
PipelineDistributedBarrier.getInstance();
     
     @Override
     public void execute(final ShardingContext shardingContext) {
         String checkJobId = shardingContext.getJobName();
+        int shardingItem = shardingContext.getShardingItem();
+        log.info("Execute job {}-{}", checkJobId, shardingItem);
+        if (isStopping()) {
+            log.info("stopping true, ignore");
+            return;
+        }
         setJobId(checkJobId);
-        ConsistencyCheckJobConfiguration consistencyCheckJobConfig = new 
YamlConsistencyCheckJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
-        JobStatus status = JobStatus.RUNNING;
-        ConsistencyCheckJobItemContext jobItemContext = new 
ConsistencyCheckJobItemContext(consistencyCheckJobConfig, 0, status);
-        jobAPI.persistJobItemProgress(jobItemContext);
-        String parentJobId = consistencyCheckJobConfig.getParentJobId();
-        log.info("execute consistency check, job id:{}, referred job id:{}", 
checkJobId, parentJobId);
-        JobType jobType = PipelineJobIdUtils.parseJobType(parentJobId);
-        InventoryIncrementalJobPublicAPI jobPublicAPI = 
PipelineJobPublicAPIFactory.getInventoryIncrementalJobPublicAPI(jobType.getTypeName());
-        Map<String, DataConsistencyCheckResult> dataConsistencyCheckResult = 
Collections.emptyMap();
-        try {
-            dataConsistencyCheckResult = 
StringUtils.isBlank(consistencyCheckJobConfig.getAlgorithmTypeName())
-                    ? jobPublicAPI.dataConsistencyCheck(parentJobId)
-                    : jobPublicAPI.dataConsistencyCheck(parentJobId, 
consistencyCheckJobConfig.getAlgorithmTypeName(), 
consistencyCheckJobConfig.getAlgorithmProps());
-            status = JobStatus.FINISHED;
-        } catch (final SQLWrapperException ex) {
-            log.error("data consistency check failed", ex);
-            status = JobStatus.CONSISTENCY_CHECK_FAILURE;
-            jobAPI.persistJobItemErrorMessage(checkJobId, 0, ex);
+        ConsistencyCheckJobConfiguration jobConfig = new 
YamlConsistencyCheckJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
+        ConsistencyCheckJobItemContext jobItemContext = new 
ConsistencyCheckJobItemContext(jobConfig, shardingItem, JobStatus.RUNNING);
+        if (getTasksRunnerMap().containsKey(shardingItem)) {
+            log.warn("tasksRunnerMap contains shardingItem {}, ignore", 
shardingItem);
+            return;
         }
-        
PipelineAPIFactory.getGovernanceRepositoryAPI().persistCheckJobResult(parentJobId,
 checkJobId, dataConsistencyCheckResult);
-        jobItemContext.setStatus(status);
-        jobAPI.persistJobItemProgress(jobItemContext);
-        jobAPI.stop(checkJobId);
-        log.info("execute consistency check job finished, job id:{}, parent 
job id:{}", checkJobId, parentJobId);
+        ConsistencyCheckTasksRunner tasksRunner = new 
ConsistencyCheckTasksRunner(jobItemContext);
+        tasksRunner.start();
+        getTasksRunnerMap().put(shardingItem, tasksRunner);
     }
     
     @Override
     public void stop() {
         setStopping(true);
-        if (null != getOneOffJobBootstrap()) {
-            getOneOffJobBootstrap().shutdown();
+        if (null != getJobBootstrap()) {
+            getJobBootstrap().shutdown();
         }
         if (null == getJobId()) {
             log.info("stop consistency check job, jobId is null, ignore");
             return;
         }
+        for (PipelineTasksRunner each : getTasksRunnerMap().values()) {
+            each.stop();
+        }
+        getTasksRunnerMap().clear();
         String jobBarrierDisablePath = 
PipelineMetaDataNode.getJobBarrierDisablePath(getJobId());
         
pipelineDistributedBarrier.persistEphemeralChildrenNode(jobBarrierDisablePath, 
0);
     }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckTasksRunner.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckTasksRunner.java
new file mode 100644
index 00000000000..69055b5a85d
--- /dev/null
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckTasksRunner.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import 
org.apache.shardingsphere.data.pipeline.api.config.job.ConsistencyCheckJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
+import org.apache.shardingsphere.data.pipeline.api.executor.LifecycleExecutor;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
+import 
org.apache.shardingsphere.data.pipeline.core.api.InventoryIncrementalJobAPI;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
+import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
+import 
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
+
+import java.sql.SQLException;
+import java.util.Map;
+
+/**
+ * Consistency check tasks runner.
+ */
+@Slf4j
+public final class ConsistencyCheckTasksRunner implements PipelineTasksRunner {
+    
+    private final ConsistencyCheckJobAPI checkJobAPI = 
ConsistencyCheckJobAPIFactory.getInstance();
+    
+    @Getter
+    private final ConsistencyCheckJobItemContext jobItemContext;
+    
+    private final ConsistencyCheckJobConfiguration checkJobConfig;
+    
+    private final String checkJobId;
+    
+    private final String parentJobId;
+    
+    private final LifecycleExecutor checkExecutor;
+    
+    private final ExecuteCallback checkExecuteCallback;
+    
+    public ConsistencyCheckTasksRunner(final ConsistencyCheckJobItemContext 
jobItemContext) {
+        this.jobItemContext = jobItemContext;
+        checkJobConfig = jobItemContext.getJobConfig();
+        checkJobId = checkJobConfig.getJobId();
+        parentJobId = checkJobConfig.getParentJobId();
+        checkExecutor = new CheckLifecycleExecutor();
+        checkExecuteCallback = new CheckExecuteCallback();
+    }
+    
+    @Override
+    public void start() {
+        if (jobItemContext.isStopping()) {
+            log.info("job stopping, ignore consistency check");
+            return;
+        }
+        
PipelineAPIFactory.getPipelineJobAPI(PipelineJobIdUtils.parseJobType(jobItemContext.getJobId())).persistJobItemProgress(jobItemContext);
+        ExecuteEngine executeEngine = ExecuteEngine.newFixedThreadInstance(1, 
checkJobId + "-check");
+        executeEngine.submit(checkExecutor, checkExecuteCallback);
+    }
+    
+    @Override
+    public void stop() {
+        jobItemContext.setStopping(true);
+        log.info("stop, jobId={}, shardingItem={}", jobItemContext.getJobId(), 
jobItemContext.getShardingItem());
+        checkExecutor.stop();
+    }
+    
+    private final class CheckLifecycleExecutor extends 
AbstractLifecycleExecutor {
+        
+        private volatile DataConsistencyCalculateAlgorithm calculateAlgorithm;
+        
+        @Override
+        protected void runBlocking() {
+            log.info("execute consistency check, check job id: {}, parent job 
id: {}", checkJobId, parentJobId);
+            checkJobAPI.persistJobItemProgress(jobItemContext);
+            JobType jobType = PipelineJobIdUtils.parseJobType(parentJobId);
+            InventoryIncrementalJobAPI jobAPI = (InventoryIncrementalJobAPI) 
PipelineAPIFactory.getPipelineJobAPI(jobType);
+            PipelineJobConfiguration parentJobConfig = 
jobAPI.getJobConfiguration(parentJobId);
+            DataConsistencyCalculateAlgorithm calculateAlgorithm = 
jobAPI.buildDataConsistencyCalculateAlgorithm(
+                    parentJobConfig, checkJobConfig.getAlgorithmTypeName(), 
checkJobConfig.getAlgorithmProps());
+            this.calculateAlgorithm = calculateAlgorithm;
+            Map<String, DataConsistencyCheckResult> dataConsistencyCheckResult 
= jobAPI.dataConsistencyCheck(parentJobConfig, calculateAlgorithm);
+            
PipelineAPIFactory.getGovernanceRepositoryAPI().persistCheckJobResult(parentJobId,
 checkJobId, dataConsistencyCheckResult);
+        }
+        
+        @Override
+        protected void doStop() {
+            DataConsistencyCalculateAlgorithm algorithm = calculateAlgorithm;
+            log.info("doStop, algorithm={}", algorithm);
+            if (null != algorithm) {
+                try {
+                    algorithm.cancel();
+                } catch (final SQLException ex) {
+                    throw new RuntimeException(ex);
+                }
+            }
+        }
+    }
+    
+    private final class CheckExecuteCallback implements ExecuteCallback {
+        
+        @Override
+        public void onSuccess() {
+            log.info("onSuccess, check job id: {}, parent job id: {}", 
checkJobId, parentJobId);
+            jobItemContext.setStatus(JobStatus.FINISHED);
+            checkJobAPI.persistJobItemProgress(jobItemContext);
+            checkJobAPI.stop(checkJobId);
+        }
+        
+        @Override
+        public void onFailure(final Throwable throwable) {
+            log.info("onFailure, check job id: {}, parent job id: {}", 
checkJobId, parentJobId);
+            checkJobAPI.persistJobItemErrorMessage(checkJobId, 0, throwable);
+            jobItemContext.setStatus(JobStatus.CONSISTENCY_CHECK_FAILURE);
+            checkJobAPI.persistJobItemProgress(jobItemContext);
+            checkJobAPI.stop(checkJobId);
+        }
+    }
+}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
index 404bf1edc72..b0429f58cd2 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
@@ -72,7 +72,7 @@ public final class MigrationChangedJobConfigurationProcessor 
implements Pipeline
         MigrationJob job = new MigrationJob();
         PipelineJobCenter.addJob(jobConfigPOJO.getJobName(), job);
         OneOffJobBootstrap oneOffJobBootstrap = new 
OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(), job, 
jobConfigPOJO.toJobConfiguration());
-        job.setOneOffJobBootstrap(oneOffJobBootstrap);
+        job.setJobBootstrap(oneOffJobBootstrap);
         oneOffJobBootstrap.execute();
     }
     
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
index ea68ca1cd0b..50ab3a39afb 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
@@ -18,54 +18,32 @@
 package org.apache.shardingsphere.data.pipeline.scenario.migration;
 
 import lombok.extern.slf4j.Slf4j;
-import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculateParameter;
 import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
-import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyContentCheckResult;
-import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
 import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.PipelineDataConsistencyChecker;
 import 
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
 import 
org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
-import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
-import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
-import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
+import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaName;
+import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaTableName;
+import org.apache.shardingsphere.data.pipeline.api.metadata.TableName;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
+import 
org.apache.shardingsphere.data.pipeline.core.check.consistency.SingleTableInventoryDataConsistencyChecker;
 import 
org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
-import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLException;
-import 
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.data.UnsupportedPipelineDatabaseTypeException;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
-import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
 import 
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
-import org.apache.shardingsphere.infra.database.type.DatabaseType;
-import 
org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
 import 
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
 import 
org.apache.shardingsphere.infra.util.exception.external.sql.type.wrapper.SQLWrapperException;
 
-import javax.sql.DataSource;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Objects;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 
 /**
  * Data consistency checker for migration job.
@@ -81,144 +59,35 @@ public final class MigrationDataConsistencyChecker 
implements PipelineDataConsis
     
     public MigrationDataConsistencyChecker(final MigrationJobConfiguration 
jobConfig, final InventoryIncrementalProcessContext processContext) {
         this.jobConfig = jobConfig;
-        this.readRateLimitAlgorithm = null != processContext ? 
processContext.getReadRateLimitAlgorithm() : null;
+        readRateLimitAlgorithm = null != processContext ? 
processContext.getReadRateLimitAlgorithm() : null;
         tableNameSchemaNameMapping = new TableNameSchemaNameMapping(
                 
TableNameSchemaNameMapping.convert(jobConfig.getSourceSchemaName(), new 
HashSet<>(Arrays.asList(jobConfig.getSourceTableName(), 
jobConfig.getTargetTableName()))));
     }
     
     @Override
-    public Map<String, DataConsistencyCheckResult> check(final 
DataConsistencyCalculateAlgorithm calculator) {
-        Map<String, DataConsistencyCountCheckResult> countCheckResult = 
checkCount();
-        Map<String, DataConsistencyContentCheckResult> contentCheckResult = 
countCheckResult.values().stream().allMatch(DataConsistencyCountCheckResult::isMatched)
-                ? checkData(calculator)
-                : Collections.emptyMap();
-        Map<String, DataConsistencyCheckResult> result = new 
LinkedHashMap<>(countCheckResult.size());
-        for (Entry<String, DataConsistencyCountCheckResult> entry : 
countCheckResult.entrySet()) {
-            result.put(entry.getKey(), new 
DataConsistencyCheckResult(entry.getValue(), 
contentCheckResult.getOrDefault(entry.getKey(), new 
DataConsistencyContentCheckResult(false))));
-        }
-        return result;
-    }
-    
-    private Map<String, DataConsistencyCountCheckResult> checkCount() {
-        ThreadFactory threadFactory = 
ExecutorThreadFactoryBuilder.build("job-" + 
getJobIdDigest(jobConfig.getJobId()) + "-count-check-%d");
-        ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60, 
TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
+    public Map<String, DataConsistencyCheckResult> check(final 
DataConsistencyCalculateAlgorithm calculateAlgorithm) {
+        verifyPipelineDatabaseType(calculateAlgorithm, jobConfig.getSource());
+        verifyPipelineDatabaseType(calculateAlgorithm, jobConfig.getTarget());
+        SchemaTableName sourceTable = new SchemaTableName(new 
SchemaName(tableNameSchemaNameMapping.getSchemaName(jobConfig.getSourceTableName())),
 new TableName(jobConfig.getSourceTableName()));
+        SchemaTableName targetTable = new SchemaTableName(new 
SchemaName(tableNameSchemaNameMapping.getSchemaName(jobConfig.getTargetTableName())),
 new TableName(jobConfig.getTargetTableName()));
         PipelineDataSourceConfiguration sourceDataSourceConfig = 
PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getSource().getType(),
 jobConfig.getSource().getParameter());
         PipelineDataSourceConfiguration targetDataSourceConfig = 
PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getTarget().getType(),
 jobConfig.getTarget().getParameter());
-        Map<String, DataConsistencyCountCheckResult> result = new 
LinkedHashMap<>(1, 1);
+        Map<String, DataConsistencyCheckResult> result = new LinkedHashMap<>();
         try (
                 PipelineDataSourceWrapper sourceDataSource = 
PipelineDataSourceFactory.newInstance(sourceDataSourceConfig);
                 PipelineDataSourceWrapper targetDataSource = 
PipelineDataSourceFactory.newInstance(targetDataSourceConfig)) {
-            result.put(jobConfig.getSourceTableName(), 
checkCount(sourceDataSource, targetDataSource, executor));
-            return result;
+            PipelineTableMetaDataLoader metaDataLoader = new 
StandardPipelineTableMetaDataLoader(sourceDataSource);
+            SingleTableInventoryDataConsistencyChecker 
singleTableInventoryChecker = new SingleTableInventoryDataConsistencyChecker(
+                    jobConfig.getJobId(), sourceDataSource, targetDataSource, 
sourceTable, targetTable, jobConfig.getUniqueKeyColumn(), metaDataLoader, 
readRateLimitAlgorithm);
+            result.put(sourceTable.getTableName().getOriginal(), 
singleTableInventoryChecker.check(calculateAlgorithm));
         } catch (final SQLException ex) {
             throw new SQLWrapperException(ex);
-        } finally {
-            executor.shutdown();
-            executor.shutdownNow();
-        }
-    }
-    
-    private DataConsistencyCountCheckResult checkCount(final 
PipelineDataSourceWrapper sourceDataSource, final PipelineDataSourceWrapper 
targetDataSource, final ThreadPoolExecutor executor) {
-        Future<Long> sourceFuture = executor.submit(() -> 
count(sourceDataSource, jobConfig.getSourceTableName(), 
sourceDataSource.getDatabaseType()));
-        Future<Long> targetFuture = executor.submit(() -> 
count(targetDataSource, jobConfig.getTargetTableName(), 
targetDataSource.getDatabaseType()));
-        long sourceCount;
-        long targetCount;
-        try {
-            sourceCount = sourceFuture.get();
-        } catch (final InterruptedException | ExecutionException ex) {
-            if (ex.getCause() instanceof PipelineSQLException) {
-                throw (PipelineSQLException) ex.getCause();
-            }
-            throw new SQLWrapperException(new SQLException(ex));
-        }
-        try {
-            targetCount = targetFuture.get();
-        } catch (final InterruptedException | ExecutionException ex) {
-            if (ex.getCause() instanceof PipelineSQLException) {
-                throw (PipelineSQLException) ex.getCause();
-            }
-            throw new SQLWrapperException(new SQLException(ex));
-        }
-        return new DataConsistencyCountCheckResult(sourceCount, targetCount);
-    }
-    
-    // TODO use digest (crc32, murmurhash)
-    private String getJobIdDigest(final String jobId) {
-        return jobId.length() <= 6 ? jobId : jobId.substring(0, 6);
-    }
-    
-    private long count(final DataSource dataSource, final String tableName, 
final DatabaseType databaseType) {
-        String sql = 
PipelineSQLBuilderFactory.getInstance(databaseType.getType()).buildCountSQL(tableNameSchemaNameMapping.getSchemaName(tableName),
 tableName);
-        try (
-                Connection connection = dataSource.getConnection();
-                PreparedStatement preparedStatement = 
connection.prepareStatement(sql);
-                ResultSet resultSet = preparedStatement.executeQuery()) {
-            resultSet.next();
-            return resultSet.getLong(1);
-        } catch (final SQLException ex) {
-            throw new 
PipelineTableDataConsistencyCheckLoadingFailedException(tableName);
-        }
-    }
-    
-    private Map<String, DataConsistencyContentCheckResult> checkData(final 
DataConsistencyCalculateAlgorithm calculator) {
-        checkPipelineDatabaseType(calculator, jobConfig.getSource());
-        PipelineDataSourceConfiguration sourceDataSourceConfig = 
jobConfig.getSource();
-        checkPipelineDatabaseType(calculator, jobConfig.getTarget());
-        PipelineDataSourceConfiguration targetDataSourceConfig = 
jobConfig.getTarget();
-        ThreadFactory threadFactory = 
ExecutorThreadFactoryBuilder.build("job-" + 
getJobIdDigest(jobConfig.getJobId()) + "-data-check-%d");
-        ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60, 
TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
-        Map<String, DataConsistencyContentCheckResult> result = new 
HashMap<>(1, 1);
-        try (
-                PipelineDataSourceWrapper sourceDataSource = 
PipelineDataSourceFactory.newInstance(sourceDataSourceConfig);
-                PipelineDataSourceWrapper targetDataSource = 
PipelineDataSourceFactory.newInstance(targetDataSourceConfig)) {
-            String sourceDatabaseType = 
sourceDataSourceConfig.getDatabaseType().getType();
-            String targetDatabaseType = 
targetDataSourceConfig.getDatabaseType().getType();
-            StandardPipelineTableMetaDataLoader metaDataLoader = new 
StandardPipelineTableMetaDataLoader(sourceDataSource);
-            for (String each : 
Collections.singleton(jobConfig.getSourceTableName())) {
-                PipelineTableMetaData tableMetaData = 
metaDataLoader.getTableMetaData(tableNameSchemaNameMapping.getSchemaName(each), 
each);
-                ShardingSpherePreconditions.checkNotNull(tableMetaData, () -> 
new PipelineTableDataConsistencyCheckLoadingFailedException(each));
-                Collection<String> columnNames = 
tableMetaData.getColumnNames();
-                PipelineColumnMetaData uniqueKey = 
jobConfig.getUniqueKeyColumn();
-                DataConsistencyCalculateParameter sourceParameter = 
buildParameter(sourceDataSource, tableNameSchemaNameMapping, each, columnNames, 
sourceDatabaseType, targetDatabaseType, uniqueKey);
-                DataConsistencyCalculateParameter targetParameter = 
buildParameter(
-                        targetDataSource, tableNameSchemaNameMapping, 
jobConfig.getTargetTableName(), columnNames, targetDatabaseType, 
sourceDatabaseType, uniqueKey);
-                Iterator<Object> sourceCalculatedResults = 
calculator.calculate(sourceParameter).iterator();
-                Iterator<Object> targetCalculatedResults = 
calculator.calculate(targetParameter).iterator();
-                boolean contentMatched = true;
-                while (sourceCalculatedResults.hasNext() && 
targetCalculatedResults.hasNext()) {
-                    if (null != readRateLimitAlgorithm) {
-                        
readRateLimitAlgorithm.intercept(JobOperationType.SELECT, 1);
-                    }
-                    Future<Object> sourceFuture = 
executor.submit(sourceCalculatedResults::next);
-                    Future<Object> targetFuture = 
executor.submit(targetCalculatedResults::next);
-                    Object sourceCalculatedResult = sourceFuture.get();
-                    Object targetCalculatedResult = targetFuture.get();
-                    contentMatched = Objects.equals(sourceCalculatedResult, 
targetCalculatedResult);
-                    if (!contentMatched) {
-                        break;
-                    }
-                }
-                result.put(each, new 
DataConsistencyContentCheckResult(contentMatched));
-            }
-        } catch (final SQLException ex) {
-            throw new SQLWrapperException(ex);
-        } catch (final ExecutionException | InterruptedException ex) {
-            throw new SQLWrapperException(new SQLException(ex.getCause()));
-        } finally {
-            executor.shutdown();
-            executor.shutdownNow();
         }
         return result;
     }
     
-    private void checkPipelineDatabaseType(final 
DataConsistencyCalculateAlgorithm calculator, final 
PipelineDataSourceConfiguration dataSourceConfig) {
-        
ShardingSpherePreconditions.checkState(calculator.getSupportedDatabaseTypes().contains(dataSourceConfig.getDatabaseType().getType()),
+    private void verifyPipelineDatabaseType(final 
DataConsistencyCalculateAlgorithm calculateAlgorithm, final 
PipelineDataSourceConfiguration dataSourceConfig) {
+        
ShardingSpherePreconditions.checkState(calculateAlgorithm.getSupportedDatabaseTypes().contains(dataSourceConfig.getDatabaseType().getType()),
                 () -> new 
UnsupportedPipelineDatabaseTypeException(dataSourceConfig.getDatabaseType()));
     }
-    
-    private DataConsistencyCalculateParameter buildParameter(final 
PipelineDataSourceWrapper sourceDataSource, final TableNameSchemaNameMapping 
tableNameSchemaNameMapping,
-                                                             final String 
tableName, final Collection<String> columnNames,
-                                                             final String 
sourceDatabaseType, final String targetDatabaseType, final 
PipelineColumnMetaData uniqueKey) {
-        return new DataConsistencyCalculateParameter(sourceDataSource, 
tableNameSchemaNameMapping, tableName, columnNames, sourceDatabaseType, 
targetDatabaseType, uniqueKey);
-    }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index 018570e184c..244dc46efcf 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -112,8 +112,8 @@ public final class MigrationJob extends AbstractPipelineJob 
implements SimpleJob
     public void stop() {
         setStopping(true);
         dataSourceManager.close();
-        if (null != getOneOffJobBootstrap()) {
-            getOneOffJobBootstrap().shutdown();
+        if (null != getJobBootstrap()) {
+            getJobBootstrap().shutdown();
         }
         String jobId = getJobId();
         if (null == jobId) {
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithmTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithmTest.java
index b0d0f42a10d..e5ae0a9635e 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithmTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithmTest.java
@@ -18,7 +18,7 @@
 package 
org.apache.shardingsphere.data.pipeline.core.check.consistency.algorithm;
 
 import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculateParameter;
-import 
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
+import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculatedResult;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
@@ -35,7 +35,6 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Types;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.Iterator;
 
 import static org.hamcrest.CoreMatchers.is;
@@ -60,28 +59,28 @@ public final class 
CRC32MatchDataConsistencyCalculateAlgorithmTest {
     @Before
     public void setUp() throws SQLException {
         PipelineColumnMetaData uniqueKey = new PipelineColumnMetaData(1, "id", 
Types.INTEGER, "integer", false, true, true);
-        parameter = new DataConsistencyCalculateParameter(pipelineDataSource, 
new TableNameSchemaNameMapping(Collections.emptyMap()),
+        parameter = new DataConsistencyCalculateParameter(pipelineDataSource, 
null,
                 "foo_tbl", Arrays.asList("foo_col", "bar_col"), "FIXTURE", 
"FIXTURE", uniqueKey);
         when(pipelineDataSource.getConnection()).thenReturn(connection);
     }
     
     @Test
     public void assertCalculateSuccess() throws SQLException {
-        PreparedStatement preparedStatement0 = mockPreparedStatement(0L);
+        PreparedStatement preparedStatement0 = mockPreparedStatement(123L, 10);
         when(connection.prepareStatement("SELECT CRC32(foo_col) FROM 
foo_tbl")).thenReturn(preparedStatement0);
-        PreparedStatement preparedStatement1 = mockPreparedStatement(1L);
+        PreparedStatement preparedStatement1 = mockPreparedStatement(456L, 10);
         when(connection.prepareStatement("SELECT CRC32(bar_col) FROM 
foo_tbl")).thenReturn(preparedStatement1);
-        Iterator<Object> actual = new 
CRC32MatchDataConsistencyCalculateAlgorithm().calculate(parameter).iterator();
-        assertThat(actual.next(), is(0L));
-        assertThat(actual.next(), is(1L));
+        Iterator<DataConsistencyCalculatedResult> actual = new 
CRC32MatchDataConsistencyCalculateAlgorithm().calculate(parameter).iterator();
+        assertThat(actual.next().getRecordsCount(), is(10));
         assertFalse(actual.hasNext());
     }
     
-    private PreparedStatement mockPreparedStatement(final long 
expectedCRC32Result) throws SQLException {
+    private PreparedStatement mockPreparedStatement(final long 
expectedCRC32Result, final int expectedRecordsCount) throws SQLException {
         ResultSet resultSet = mock(ResultSet.class);
         PreparedStatement result = mock(PreparedStatement.class, 
RETURNS_DEEP_STUBS);
         when(result.executeQuery()).thenReturn(resultSet);
         when(resultSet.getLong(1)).thenReturn(expectedCRC32Result);
+        when(resultSet.getInt(2)).thenReturn(expectedRecordsCount);
         return result;
     }
     
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
index 14440f0344d..286de508afe 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
@@ -51,7 +51,7 @@ public final class MySQLPipelineSQLBuilder extends 
AbstractPipelineSQLBuilder {
     
     @Override
     public Optional<String> buildCRC32SQL(final String schemaName, final 
String tableName, final String column) {
-        return Optional.of(String.format("SELECT BIT_XOR(CAST(CRC32(%s) AS 
UNSIGNED)) AS checksum FROM %s", quote(column), quote(tableName)));
+        return Optional.of(String.format("SELECT BIT_XOR(CAST(CRC32(%s) AS 
UNSIGNED)) AS checksum, COUNT(1) AS cnt FROM %s", quote(column), 
quote(tableName)));
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
index 3a2049c3487..f2b276fefde 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
@@ -55,7 +55,7 @@ public final class MySQLPipelineSQLBuilderTest {
     public void assertBuildSumCrc32SQL() {
         Optional<String> actual = sqlBuilder.buildCRC32SQL(null, "t2", "id");
         assertTrue(actual.isPresent());
-        assertThat(actual.get(), is("SELECT BIT_XOR(CAST(CRC32(id) AS 
UNSIGNED)) AS checksum FROM t2"));
+        assertThat(actual.get(), is("SELECT BIT_XOR(CAST(CRC32(id) AS 
UNSIGNED)) AS checksum, COUNT(1) AS cnt FROM t2"));
     }
     
     private DataRecord mockDataRecord(final String tableName) {
diff --git 
a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
 
b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
index 0c4047c2286..2929e143128 100644
--- 
a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
+++ 
b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
@@ -99,8 +99,9 @@ public final class MySQLMigrationGeneralIT extends 
AbstractMigrationITCase {
         startIncrementTask(new MySQLIncrementTask(jdbcTemplate, 
getSourceTableOrderName(), keyGenerateAlgorithm, 30));
         String orderJobId = getJobIdByTableName(getSourceTableOrderName());
         String orderItemJobId = getJobIdByTableName("t_order_item");
-        assertMigrationSuccessById(orderJobId);
-        assertMigrationSuccessById(orderItemJobId);
+        assertMigrationSuccessById(orderJobId, "DATA_MATCH");
+        assertMigrationSuccessById(orderItemJobId, "DATA_MATCH");
+        assertMigrationSuccessById(orderItemJobId, "CRC32_MATCH");
         if (ENV.getItEnvType() == ITEnvTypeEnum.DOCKER) {
             for (String each : listJobId()) {
                 commitMigrationByJobId(each);
@@ -111,12 +112,12 @@ public final class MySQLMigrationGeneralIT extends 
AbstractMigrationITCase {
         assertGreaterThanOrderTableInitRows(TABLE_INIT_ROW_COUNT, "");
     }
     
-    private void assertMigrationSuccessById(final String jobId) throws 
SQLException, InterruptedException {
+    private void assertMigrationSuccessById(final String jobId, final String 
algorithmType) throws SQLException, InterruptedException {
         List<Map<String, Object>> jobStatus = 
waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
         for (Map<String, Object> each : jobStatus) {
             
assertTrue(Integer.parseInt(each.get("processed_records_count").toString()) > 
0);
         }
-        assertCheckMigrationSuccess(jobId, "DATA_MATCH");
+        assertCheckMigrationSuccess(jobId, algorithmType);
         stopMigrationByJobId(jobId);
     }
 }
diff --git 
a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmFactoryTest.java
 
b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmFactoryTest.java
index 3d1c2827377..01fd3394a42 100644
--- 
a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmFactoryTest.java
+++ 
b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmFactoryTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.data.pipeline.core.check.consistency;
 
+import 
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithmFactory;
 import org.junit.Test;
 
 import java.util.Properties;
diff --git 
a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/DataConsistencyCalculateAlgorithmFixture.java
 
b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/DataConsistencyCalculateAlgorithmFixture.java
index f6ec7327507..051ce468505 100644
--- 
a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/DataConsistencyCalculateAlgorithmFixture.java
+++ 
b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/DataConsistencyCalculateAlgorithmFixture.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.data.pipeline.core.fixture;
 
 import lombok.Getter;
 import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculateParameter;
+import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculatedResult;
 import 
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
@@ -39,8 +40,12 @@ public final class DataConsistencyCalculateAlgorithmFixture 
implements DataConsi
     }
     
     @Override
-    public Iterable<Object> calculate(final DataConsistencyCalculateParameter 
parameter) {
-        return Collections.singletonList(true);
+    public Iterable<DataConsistencyCalculatedResult> calculate(final 
DataConsistencyCalculateParameter parameter) {
+        return Collections.singletonList(new 
FixtureDataConsistencyCalculatedResult(2));
+    }
+    
+    @Override
+    public void cancel() {
     }
     
     @Override
diff --git 
a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmFactoryTest.java
 
b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureDataConsistencyCalculatedResult.java
similarity index 64%
copy from 
test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmFactoryTest.java
copy to 
test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureDataConsistencyCalculatedResult.java
index 3d1c2827377..b61af786c4a 100644
--- 
a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmFactoryTest.java
+++ 
b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureDataConsistencyCalculatedResult.java
@@ -15,16 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.check.consistency;
+package org.apache.shardingsphere.data.pipeline.core.fixture;
 
-import org.junit.Test;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculatedResult;
 
-import java.util.Properties;
-
-public final class DataConsistencyCalculateAlgorithmFactoryTest {
+@RequiredArgsConstructor
+@EqualsAndHashCode
+@Getter
+public final class FixtureDataConsistencyCalculatedResult implements 
DataConsistencyCalculatedResult {
     
-    @Test
-    public void assertNewInstanceSuccess() {
-        DataConsistencyCalculateAlgorithmFactory.newInstance("FIXTURE", new 
Properties());
-    }
+    private final int recordsCount;
 }

Reply via email to