This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new d91ffcb  Add DATA_MATCH data consistency checker impl, support chunked 
streaming data consistency check, refactor data consitency check SPI definition 
and process (#14304)
d91ffcb is described below

commit d91ffcb5a9489e0e02d9a2230f9cdd2cf9e69381
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Sat Dec 25 11:00:32 2021 +0800

    Add DATA_MATCH data consistency checker impl, support chunked streaming 
data consistency check, refactor data consitency check SPI definition and 
process (#14304)
    
    * Refactor SingleTableDataCalculator.getDatabaseTypes to support multiple 
databases
    
    * Refactor DEFAULT DataConsistencyCheckAlgorithm to CRC32_MATCH
    
    * Add DATA_MATCH data consistency checker definition
    
    * Refactor CRC32_MATCH impl package
    
    * Add DataMatchSingleTableDataCalculator simple impl
    
    * Reuse data source in checkCount
    
    * Reuse data source in dataCheck
    
    * Support chunked data consistency check
    
    * Support chunked data calculation for DATA_MATCH
    
    * Refactor data consistency checker and calculator method signature
    
    * Prepare chunk-size refactoring
    
    * Extract abstract streaming data calculator
    
    * Refactor data consistency check algorithm properties initialization
    
    * Set up uniqueKey for data consistency check
    
    * Enable rate limit for data consistency check
    
    * Real test
    
    * Enable range query for inline sharding algorithm when data consistency 
check
    
    * Add calculate param for extension
    
    * Fix checkstyle
    
    * Unit test
---
 .../resources/yaml/encrypt-dataConverters.yaml     |   2 +-
 .../infra/database/type/DatabaseTypeRegistry.java  |  11 ++
 .../ShardingSphereJDBCDataSourceCreator.java       |  15 ++-
 .../check/consistency/DataCalculateParameter.java  |  33 ++++--
 .../consistency/DataConsistencyCheckResult.java    |  16 +--
 .../pipeline/core/api/impl/PipelineJobAPIImpl.java |  17 +--
 .../check/consistency/DataConsistencyChecker.java  |  12 +-
 .../consistency/DataConsistencyCheckerImpl.java    | 103 ++++++++++-------
 .../SingleTableDataCalculatorRegistry.java         |  12 +-
 .../core/datasource/DataSourceFactory.java         |   5 +-
 .../core/datasource/DataSourceManager.java         |   1 +
 .../core/datasource/DataSourceWrapper.java         |   5 +
 .../ingest/dumper/AbstractInventoryDumper.java     |   1 +
 .../AbstractDataConsistencyCheckAlgorithm.java}    |  37 +++---
 .../AbstractSingleTableDataCalculator.java         |  26 +++--
 ...AbstractStreamingSingleTableDataCalculator.java | 101 ++++++++++++++++
 .../CRC32MatchDataConsistencyCheckAlgorithm.java}  |  26 +----
 .../DataMatchDataConsistencyCheckAlgorithm.java    |  28 +++--
 .../DataMatchSingleTableDataCalculator.java        | 128 +++++++++++++++++++++
 .../sqlbuilder/AbstractPipelineSQLBuilder.java     |   8 ++
 ...dleRuleAlteredJobCompletionDetectAlgorithm.java |  14 ++-
 .../consistency/DataConsistencyCheckAlgorithm.java |   2 +-
 .../consistency/SingleTableDataCalculator.java     |  36 ++++--
 .../spi/sqlbuilder/PipelineSQLBuilder.java         |  10 ++
 ...check.consistency.DataConsistencyCheckAlgorithm |   3 +-
 ...spi.check.consistency.SingleTableDataCalculator |   2 +-
 .../src/main/resources/conf/config-sharding.yaml   |  15 ++-
 ... CRC32MatchMySQLSingleTableDataCalculator.java} |  43 +++----
 ...spi.check.consistency.SingleTableDataCalculator |   2 +-
 ...2MatchPostgreSQLSingleTableDataCalculator.java} |  25 ++--
 ...spi.check.consistency.SingleTableDataCalculator |   2 +-
 .../handler/CheckScalingQueryResultSet.java        |  10 +-
 ...owScalingCheckAlgorithmsQueryResultSetTest.java |  22 ++--
 .../pipeline/api/impl/PipelineJobAPIImplTest.java  |  14 +--
 .../DataConsistencyCheckerImplTest.java            |   8 +-
 .../core/datasource/DataSourceWrapperTest.java     |  11 +-
 .../FixtureH2SingleTableDataCalculator.java        |  15 ++-
 ...C32MatchDataConsistencyCheckAlgorithmTest.java} |  10 +-
 38 files changed, 602 insertions(+), 229 deletions(-)

diff --git 
a/shardingsphere-features/shardingsphere-encrypt/shardingsphere-encrypt-core/src/test/resources/yaml/encrypt-dataConverters.yaml
 
b/shardingsphere-features/shardingsphere-encrypt/shardingsphere-encrypt-core/src/test/resources/yaml/encrypt-dataConverters.yaml
index 63761f4..bd3cb68 100644
--- 
a/shardingsphere-features/shardingsphere-encrypt/shardingsphere-encrypt-core/src/test/resources/yaml/encrypt-dataConverters.yaml
+++ 
b/shardingsphere-features/shardingsphere-encrypt/shardingsphere-encrypt-core/src/test/resources/yaml/encrypt-dataConverters.yaml
@@ -34,6 +34,6 @@ dataConverters:
     dataConsistencyChecker:
       type: DATA_MATCH
       props:
-        chunk-size: 10000
+        chunk-size: 1000
     checkoutLocker:
       type: DEFAULT
diff --git 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/database/type/DatabaseTypeRegistry.java
 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/database/type/DatabaseTypeRegistry.java
index 8ef032b..664b34d 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/database/type/DatabaseTypeRegistry.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/database/type/DatabaseTypeRegistry.java
@@ -21,6 +21,8 @@ import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import org.apache.shardingsphere.infra.exception.ShardingSphereException;
 
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
@@ -94,4 +96,13 @@ public final class DatabaseTypeRegistry {
     public static DatabaseType getDefaultDatabaseType() {
         return DATABASE_TYPES.get(DEFAULT_DATABASE_TYPE);
     }
+    
+    /**
+     * Get names of all database types.
+     *
+     * @return database type names
+     */
+    public static Collection<String> getDatabaseTypeNames() {
+        return Collections.unmodifiableSet(DATABASE_TYPES.keySet());
+    }
 }
diff --git 
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/config/datasource/ShardingSphereJDBCDataSourceCreator.java
 
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/config/datasource/ShardingSphereJDBCDataSourceCreator.java
index 4e47a1b..71a5f8b 100644
--- 
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/config/datasource/ShardingSphereJDBCDataSourceCreator.java
+++ 
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/config/datasource/ShardingSphereJDBCDataSourceCreator.java
@@ -18,10 +18,12 @@
 package org.apache.shardingsphere.driver.config.datasource;
 
 import org.apache.shardingsphere.driver.api.ShardingSphereDataSourceFactory;
+import 
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmConfiguration;
 import 
org.apache.shardingsphere.infra.config.datasource.jdbc.config.impl.ShardingSphereJDBCDataSourceConfiguration;
 import 
org.apache.shardingsphere.infra.config.datasource.jdbc.creator.JDBCDataSourceCreator;
 import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
 import 
org.apache.shardingsphere.infra.yaml.config.swapper.YamlDataSourceConfigurationSwapper;
+import org.apache.shardingsphere.sharding.api.config.ShardingRuleConfiguration;
 import 
org.apache.shardingsphere.sharding.yaml.swapper.ShardingRuleConfigurationConverter;
 
 import javax.sql.DataSource;
@@ -36,8 +38,19 @@ public final class ShardingSphereJDBCDataSourceCreator 
implements JDBCDataSource
     @Override
     public DataSource createDataSource(final Object dataSourceConfig) throws 
SQLException {
         YamlRootConfiguration rootConfig = (YamlRootConfiguration) 
dataSourceConfig;
+        ShardingRuleConfiguration shardingRuleConfig = 
ShardingRuleConfigurationConverter.findAndConvertShardingRuleConfiguration(rootConfig.getRules());
+        enableRangeQueryForInline(shardingRuleConfig);
         return 
ShardingSphereDataSourceFactory.createDataSource(rootConfig.getSchemaName(), 
new 
YamlDataSourceConfigurationSwapper().swapToDataSources(rootConfig.getDataSources()),
 
-                
Collections.singletonList(ShardingRuleConfigurationConverter.findAndConvertShardingRuleConfiguration(rootConfig.getRules())),
 null);
+                Collections.singletonList(shardingRuleConfig), null);
+    }
+    
+    private void enableRangeQueryForInline(final ShardingRuleConfiguration 
shardingRuleConfig) {
+        for (ShardingSphereAlgorithmConfiguration each : 
shardingRuleConfig.getShardingAlgorithms().values()) {
+            if (!"INLINE".equalsIgnoreCase(each.getType())) {
+                continue;
+            }
+            each.getProps().put("allow-range-query-with-inline-sharding", 
"true");
+        }
     }
     
     @Override
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataCalculateParameter.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataCalculateParameter.java
index 36e7431..ca10b9d 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataCalculateParameter.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataCalculateParameter.java
@@ -21,7 +21,8 @@ import lombok.Builder;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.ToString;
-import 
org.apache.shardingsphere.infra.config.datasource.jdbc.config.JDBCDataSourceConfiguration;
+import org.apache.commons.lang3.tuple.Pair;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.DataSourceWrapper;
 
 import java.util.Collection;
 
@@ -35,9 +36,10 @@ import java.util.Collection;
 public final class DataCalculateParameter {
     
     /**
-     * Data source configuration of source side or target side.
+     * Data source of source side or target side.
+     * Do not close it, it will be reused later.
      */
-    private JDBCDataSourceConfiguration dataSourceConfig;
+    private DataSourceWrapper dataSource;
     
     private String logicTableName;
     
@@ -47,27 +49,36 @@ public final class DataCalculateParameter {
     private Collection<String> columnNames;
     
     /**
+     * Database type.
+     */
+    private String databaseType;
+    
+    /**
      * Peer database type.
      */
     private String peerDatabaseType;
     
     /**
-     * Chunk size of limited records to be calculated in a batch.
+     * It could be primary key.
+     * It could be used in order by clause.
      */
-    private Integer chunkSize;
+    private String uniqueKey;
     
     /**
-     * Ignored column names.
+     * Used for range query.
+     * If it's configured, then it could be translated to SQL like 
"uniqueColumn >= pair.left AND uniqueColumn <= pair.right".
+     * One of left and right of pair could be null.
      */
-    private Collection<String> ignoredColumnNames;
+    private Pair<Object, Object> uniqueColumnValueRange;
     
     /**
-     * If {@link #chunkSize} exists, it could be used in order by clause on 
first priority.
+     * Used for multiple records query.
+     * If it's configured, then it could be translated to SQL like 
"uniqueColumn IN (value1,value2,value3)".
      */
-    private Collection<String> primaryColumnNames;
+    private Collection<Object> uniqueColumnValues;
     
     /**
-     * If {@link #chunkSize} exists, it could be used in order by clause on 
second priority.
+     * Previous calculated result will be transferred to next call.
      */
-    private Collection<String> uniqueColumnNames;
+    private volatile Object previousCalculatedResult;
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCheckResult.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCheckResult.java
index 20f13b5..895ecac 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCheckResult.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCheckResult.java
@@ -29,17 +29,17 @@ import lombok.ToString;
 @ToString
 public final class DataConsistencyCheckResult {
     
-    private final long sourceCount;
+    private final long sourceRecordsCount;
     
-    private final long targetCount;
+    private final long targetRecordsCount;
     
-    private final boolean countValid;
+    private final boolean recordsCountMatched;
     
-    private boolean dataValid;
+    private boolean recordsContentMatched;
     
-    public DataConsistencyCheckResult(final long sourceCount, final long 
targetCount) {
-        this.sourceCount = sourceCount;
-        this.targetCount = targetCount;
-        countValid = sourceCount == targetCount;
+    public DataConsistencyCheckResult(final long sourceRecordsCount, final 
long targetRecordsCount) {
+        this.sourceRecordsCount = sourceRecordsCount;
+        this.targetRecordsCount = targetRecordsCount;
+        recordsCountMatched = sourceRecordsCount == targetRecordsCount;
     }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineJobAPIImpl.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineJobAPIImpl.java
index 5ce51ef..35c4b8f 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineJobAPIImpl.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineJobAPIImpl.java
@@ -248,10 +248,10 @@ public final class PipelineJobAPIImpl implements 
PipelineJobAPI {
     private Map<String, DataConsistencyCheckResult> 
dataConsistencyCheck0(final RuleAlteredJobContext jobContext, final 
DataConsistencyCheckAlgorithm checkAlgorithm) {
         long jobId = jobContext.getJobId();
         DataConsistencyChecker dataConsistencyChecker = 
EnvironmentCheckerFactory.newInstance(jobContext);
-        Map<String, DataConsistencyCheckResult> result = 
dataConsistencyChecker.countCheck();
-        if 
(result.values().stream().allMatch(DataConsistencyCheckResult::isCountValid)) {
-            Map<String, Boolean> dataCheckResult = 
dataConsistencyChecker.dataCheck(checkAlgorithm);
-            result.forEach((key, value) -> 
value.setDataValid(dataCheckResult.getOrDefault(key, false)));
+        Map<String, DataConsistencyCheckResult> result = 
dataConsistencyChecker.checkRecordsCount();
+        if 
(result.values().stream().allMatch(DataConsistencyCheckResult::isRecordsCountMatched))
 {
+            Map<String, Boolean> contentCheckResult = 
dataConsistencyChecker.checkRecordsContent(checkAlgorithm);
+            result.forEach((key, value) -> 
value.setRecordsContentMatched(contentCheckResult.getOrDefault(key, false)));
         }
         log.info("Scaling job {} with check algorithm '{}' data consistency 
checker result {}", jobId, checkAlgorithm.getClass().getName(), result);
         
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobCheckResult(jobId, 
aggregateDataConsistencyCheckResults(jobId, result));
@@ -264,10 +264,11 @@ public final class PipelineJobAPIImpl implements 
PipelineJobAPI {
             return false;
         }
         for (Entry<String, DataConsistencyCheckResult> entry : 
checkResultMap.entrySet()) {
-            boolean isDataValid = entry.getValue().isDataValid();
-            boolean isCountValid = entry.getValue().isCountValid();
-            if (!isDataValid || !isCountValid) {
-                log.error("Scaling job: {}, table: {} data consistency check 
failed, dataValid: {}, countValid: {}", jobId, entry.getKey(), isDataValid, 
isCountValid);
+            boolean recordsCountMatched = 
entry.getValue().isRecordsCountMatched();
+            boolean recordsContentMatched = 
entry.getValue().isRecordsContentMatched();
+            if (!recordsContentMatched || !recordsCountMatched) {
+                log.error("Scaling job: {}, table: {} data consistency check 
failed, recordsContentMatched: {}, recordsCountMatched: {}",
+                    jobId, entry.getKey(), recordsContentMatched, 
recordsCountMatched);
                 return false;
             }
         }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
index b0c78f6..ff70369 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
@@ -28,17 +28,17 @@ import java.util.Map;
 public interface DataConsistencyChecker {
     
     /**
-     * Check each table count is valid.
+     * Check whether each table's records count is valid.
      *
-     * @return count check result. key is logic table name, value is check 
result.
+     * @return records count check result. key is logic table name, value is 
check result.
      */
-    Map<String, DataConsistencyCheckResult> countCheck();
+    Map<String, DataConsistencyCheckResult> checkRecordsCount();
     
     /**
-     * Check each table data is valid.
+     * Check whether each table's records content is valid.
      *
      * @param checkAlgorithm check algorithm
-     * @return data is valid or not. key is logic table name, value is check 
result.
+     * @return records content check result. key is logic table name, value is 
check result.
      */
-    Map<String, Boolean> dataCheck(DataConsistencyCheckAlgorithm 
checkAlgorithm);
+    Map<String, Boolean> checkRecordsContent(DataConsistencyCheckAlgorithm 
checkAlgorithm);
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java
index 8d8b944..4a4ae9e 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java
@@ -28,10 +28,13 @@ import 
org.apache.shardingsphere.data.pipeline.core.exception.DataCheckFailExcep
 import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
 import 
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCheckAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator;
+import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import 
org.apache.shardingsphere.infra.config.datasource.jdbc.config.JDBCDataSourceConfiguration;
 import 
org.apache.shardingsphere.infra.config.datasource.jdbc.config.JDBCDataSourceConfigurationFactory;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import 
org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
+import 
org.apache.shardingsphere.infra.metadata.schema.builder.loader.common.TableMetaDataLoader;
+import org.apache.shardingsphere.infra.metadata.schema.model.TableMetaData;
 import 
org.apache.shardingsphere.scaling.core.job.sqlbuilder.ScalingSQLBuilderFactory;
 
 import javax.sql.DataSource;
@@ -39,12 +42,13 @@ import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -68,33 +72,36 @@ public final class DataConsistencyCheckerImpl implements 
DataConsistencyChecker
     private final RuleAlteredJobContext jobContext;
     
     @Override
-    public Map<String, DataConsistencyCheckResult> countCheck() {
+    public Map<String, DataConsistencyCheckResult> checkRecordsCount() {
         ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build("job" 
+ jobContext.getJobId() % 10_000 + "-countCheck-%d");
         ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60, 
TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
-        try {
+        JDBCDataSourceConfiguration sourceConfig = 
JDBCDataSourceConfigurationFactory.newInstance(
+            jobContext.getJobConfig().getRuleConfig().getSource().getType(), 
jobContext.getJobConfig().getRuleConfig().getSource().getParameter());
+        JDBCDataSourceConfiguration targetConfig = 
JDBCDataSourceConfigurationFactory.newInstance(
+            jobContext.getJobConfig().getRuleConfig().getTarget().getType(), 
jobContext.getJobConfig().getRuleConfig().getTarget().getParameter());
+        try (DataSourceWrapper sourceDataSource = 
dataSourceFactory.newInstance(sourceConfig);
+             DataSourceWrapper targetDataSource = 
dataSourceFactory.newInstance(targetConfig)) {
             return jobContext.getTaskConfigs()
-                    .stream().flatMap(each -> 
each.getDumperConfig().getTableNameMap().values().stream()).collect(Collectors.toSet())
-                    .stream().collect(Collectors.toMap(Function.identity(), 
table -> countCheck(table, executor), (oldValue, currentValue) -> oldValue, 
LinkedHashMap::new));
+                .stream().flatMap(each -> 
each.getDumperConfig().getTableNameMap().values().stream()).collect(Collectors.toSet())
+                .stream().collect(Collectors.toMap(Function.identity(), table 
-> countCheck(table, sourceDataSource, targetDataSource, executor),
+                    (oldValue, currentValue) -> oldValue, LinkedHashMap::new));
+        } catch (final SQLException ex) {
+            throw new DataCheckFailException("count check failed", ex);
         } finally {
             executor.shutdown();
             executor.shutdownNow();
         }
     }
     
-    private DataConsistencyCheckResult countCheck(final String table, final 
ThreadPoolExecutor executor) {
-        JDBCDataSourceConfiguration sourceConfig = 
JDBCDataSourceConfigurationFactory.newInstance(
-                
jobContext.getJobConfig().getRuleConfig().getSource().getType(), 
jobContext.getJobConfig().getRuleConfig().getSource().getParameter());
-        JDBCDataSourceConfiguration targetConfig = 
JDBCDataSourceConfigurationFactory.newInstance(
-                
jobContext.getJobConfig().getRuleConfig().getTarget().getType(), 
jobContext.getJobConfig().getRuleConfig().getTarget().getParameter());
-        try (DataSourceWrapper sourceDataSource = 
dataSourceFactory.newInstance(sourceConfig);
-             DataSourceWrapper targetDataSource = 
dataSourceFactory.newInstance(targetConfig)) {
-            Future<Long> sourceFuture = executor.submit(() -> 
count(sourceDataSource, table, sourceConfig.getDatabaseType()));
-            Future<Long> targetFuture = executor.submit(() -> 
count(targetDataSource, table, targetConfig.getDatabaseType()));
+    private DataConsistencyCheckResult countCheck(final String table, final 
DataSourceWrapper sourceDataSource, final DataSourceWrapper targetDataSource, 
final ThreadPoolExecutor executor) {
+        try {
+            Future<Long> sourceFuture = executor.submit(() -> 
count(sourceDataSource, table, sourceDataSource.getDatabaseType()));
+            Future<Long> targetFuture = executor.submit(() -> 
count(targetDataSource, table, targetDataSource.getDatabaseType()));
             long sourceCount = sourceFuture.get();
             long targetCount = targetFuture.get();
             return new DataConsistencyCheckResult(sourceCount, targetCount);
-        } catch (final SQLException | InterruptedException | 
ExecutionException ex) {
-            throw new DataCheckFailException(String.format("table %s count 
check failed.", table), ex);
+        } catch (final InterruptedException | ExecutionException ex) {
+            throw new DataCheckFailException("count check failed, table=" + 
table, ex);
         }
     }
     
@@ -110,7 +117,7 @@ public final class DataConsistencyCheckerImpl implements 
DataConsistencyChecker
     }
     
     @Override
-    public Map<String, Boolean> dataCheck(final DataConsistencyCheckAlgorithm 
checkAlgorithm) {
+    public Map<String, Boolean> checkRecordsContent(final 
DataConsistencyCheckAlgorithm checkAlgorithm) {
         Collection<String> supportedDatabaseTypes = 
checkAlgorithm.getSupportedDatabaseTypes();
         JDBCDataSourceConfiguration sourceConfig = 
JDBCDataSourceConfigurationFactory.newInstance(
                 
jobContext.getJobConfig().getRuleConfig().getSource().getType(), 
jobContext.getJobConfig().getRuleConfig().getSource().getParameter());
@@ -119,31 +126,49 @@ public final class DataConsistencyCheckerImpl implements 
DataConsistencyChecker
                 
jobContext.getJobConfig().getRuleConfig().getTarget().getType(), 
jobContext.getJobConfig().getRuleConfig().getTarget().getParameter());
         checkDatabaseTypeSupportedOrNot(supportedDatabaseTypes, 
targetConfig.getDatabaseType().getName());
         Collection<String> logicTableNames = 
jobContext.getTaskConfigs().stream().flatMap(each -> 
each.getDumperConfig().getTableNameMap().values().stream()).distinct().collect(Collectors.toList());
-        Map<String, Collection<String>> tablesColumnNamesMap = 
getTablesColumnNamesMap(sourceConfig);
+        Map<String, TableMetaData> tableMetaDataMap = 
getTablesColumnsMap(sourceConfig, logicTableNames);
         logicTableNames.forEach(each -> {
             //TODO put to preparer
-            if (!tablesColumnNamesMap.containsKey(each)) {
+            if (!tableMetaDataMap.containsKey(each)) {
                 throw new DataCheckFailException(String.format("could not get 
table columns for '%s'", each));
             }
         });
-        SingleTableDataCalculator sourceCalculator = 
checkAlgorithm.getSingleTableDataCalculator(sourceConfig.getDatabaseType().getName());
-        SingleTableDataCalculator targetCalculator = 
checkAlgorithm.getSingleTableDataCalculator(targetConfig.getDatabaseType().getName());
+        String sourceDatabaseType = sourceConfig.getDatabaseType().getName();
+        String targetDatabaseType = targetConfig.getDatabaseType().getName();
+        SingleTableDataCalculator sourceCalculator = 
checkAlgorithm.getSingleTableDataCalculator(sourceDatabaseType);
+        SingleTableDataCalculator targetCalculator = 
checkAlgorithm.getSingleTableDataCalculator(targetDatabaseType);
         Map<String, Boolean> result = new HashMap<>();
         ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build("job" 
+ jobContext.getJobId() % 10_000 + "-dataCheck-%d");
         ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60, 
TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
-        try {
+        JobRateLimitAlgorithm rateLimitAlgorithm = 
jobContext.getRuleAlteredContext().getRateLimitAlgorithm();
+        try (DataSourceWrapper sourceDataSource = 
dataSourceFactory.newInstance(sourceConfig);
+             DataSourceWrapper targetDataSource = 
dataSourceFactory.newInstance(targetConfig)) {
             for (String each : logicTableNames) {
-                Collection<String> columnNames = 
tablesColumnNamesMap.get(each);
-                DataCalculateParameter sourceCalculateParameter = 
DataCalculateParameter.builder().dataSourceConfig(sourceConfig).logicTableName(each).columnNames(columnNames).build();
-                Future<Object> sourceFuture = executor.submit(() -> 
sourceCalculator.dataCalculate(sourceCalculateParameter));
-                DataCalculateParameter targetCalculateParameter = 
DataCalculateParameter.builder().dataSourceConfig(targetConfig).logicTableName(each).columnNames(columnNames).build();
-                Future<Object> targetFuture = executor.submit(() -> 
targetCalculator.dataCalculate(targetCalculateParameter));
-                Object sourceCalculateResult = sourceFuture.get();
-                Object targetCalculateResult = targetFuture.get();
-                boolean calculateResultsEquals = 
Objects.equals(sourceCalculateResult, targetCalculateResult);
+                Collection<String> columnNames = 
tableMetaDataMap.get(each).getColumns().keySet();
+                String uniqueKey = 
tableMetaDataMap.get(each).getPrimaryKeyColumns().get(0);
+                DataCalculateParameter sourceCalculateParameter = 
DataCalculateParameter.builder().dataSource(sourceDataSource).databaseType(sourceDatabaseType).peerDatabaseType(targetDatabaseType)
+                    
.logicTableName(each).columnNames(columnNames).uniqueKey(uniqueKey).build();
+                DataCalculateParameter targetCalculateParameter = 
DataCalculateParameter.builder().dataSource(targetDataSource).databaseType(targetDatabaseType).peerDatabaseType(sourceDatabaseType)
+                    
.logicTableName(each).columnNames(columnNames).uniqueKey(uniqueKey).build();
+                Iterator<Object> sourceCalculatedResultIterator = 
sourceCalculator.calculate(sourceCalculateParameter).iterator();
+                Iterator<Object> targetCalculatedResultIterator = 
targetCalculator.calculate(targetCalculateParameter).iterator();
+                boolean calculateResultsEquals = true;
+                while (sourceCalculatedResultIterator.hasNext() && 
targetCalculatedResultIterator.hasNext()) {
+                    if (null != rateLimitAlgorithm) {
+                        rateLimitAlgorithm.onQuery();
+                    }
+                    Future<Object> sourceFuture = 
executor.submit(sourceCalculatedResultIterator::next);
+                    Future<Object> targetFuture = 
executor.submit(targetCalculatedResultIterator::next);
+                    Object sourceCalculatedResult = sourceFuture.get();
+                    Object targetCalculatedResult = targetFuture.get();
+                    calculateResultsEquals = 
Objects.equals(sourceCalculatedResult, targetCalculatedResult);
+                    if (!calculateResultsEquals) {
+                        break;
+                    }
+                }
                 result.put(each, calculateResultsEquals);
             }
-        } catch (final ExecutionException | InterruptedException ex) {
+        } catch (final ExecutionException | InterruptedException | 
SQLException ex) {
             throw new DataCheckFailException("data check failed");
         } finally {
             executor.shutdown();
@@ -158,14 +183,14 @@ public final class DataConsistencyCheckerImpl implements 
DataConsistencyChecker
         }
     }
     
-    private Map<String, Collection<String>> getTablesColumnNamesMap(final 
JDBCDataSourceConfiguration dataSourceConfig) {
-        try (DataSourceWrapper dataSource = 
dataSourceFactory.newInstance(dataSourceConfig);
-             Connection connection = dataSource.getConnection()) {
-            Map<String, Collection<String>> result = new LinkedHashMap<>();
-            try (ResultSet resultSet = 
connection.getMetaData().getColumns(connection.getCatalog(), null, "%", "%")) {
-                while (resultSet.next()) {
-                    result.computeIfAbsent(resultSet.getString("TABLE_NAME"), 
tableName -> new ArrayList<>()).add(resultSet.getString("COLUMN_NAME"));
-                }
+    // TODO reuse metadata
+    private Map<String, TableMetaData> getTablesColumnsMap(final 
JDBCDataSourceConfiguration dataSourceConfig, final Collection<String> 
tableNames) {
+        try (DataSourceWrapper dataSource = 
dataSourceFactory.newInstance(dataSourceConfig)) {
+            Map<String, TableMetaData> result = new LinkedHashMap<>();
+            for (String each : tableNames) {
+                Optional<TableMetaData> tableMetaDataOptional = 
TableMetaDataLoader.load(dataSource, each, dataSourceConfig.getDatabaseType());
+                TableMetaData tableMetaData = 
tableMetaDataOptional.orElseThrow(() -> new DataCheckFailException("get table 
metadata failed, tableName=" + each));
+                result.put(each, tableMetaData);
             }
             return result;
         } catch (final SQLException ex) {
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableDataCalculatorRegistry.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableDataCalculatorRegistry.java
index 222e7e2..cf0dcc6 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableDataCalculatorRegistry.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableDataCalculatorRegistry.java
@@ -37,11 +37,13 @@ public final class SingleTableDataCalculatorRegistry {
     static {
         ShardingSphereServiceLoader.register(SingleTableDataCalculator.class);
         for (SingleTableDataCalculator each : 
ShardingSphereServiceLoader.getSingletonServiceInstances(SingleTableDataCalculator.class))
 {
-            SingleTableDataCalculator replaced = 
ALGORITHM_DATABASE_CALCULATOR_MAP.computeIfAbsent(each.getAlgorithmType(), 
algorithmType -> new HashMap<>())
-                    .put(each.getDatabaseType(), each);
-            if (null != replaced) {
-                log.info("element replaced, algorithmType={}, databaseType={}, 
current={}, replaced={}",
-                        each.getAlgorithmType(), each.getDatabaseType(), 
each.getClass().getName(), replaced.getClass().getName());
+            Map<String, SingleTableDataCalculator> dataCalculatorMap = 
ALGORITHM_DATABASE_CALCULATOR_MAP.computeIfAbsent(each.getAlgorithmType(), 
algorithmType -> new HashMap<>());
+            for (String databaseType : each.getDatabaseTypes()) {
+                SingleTableDataCalculator replaced = 
dataCalculatorMap.put(databaseType, each);
+                if (null != replaced) {
+                    log.warn("element replaced, algorithmType={}, 
databaseTypes={}, current={}, replaced={}",
+                            each.getAlgorithmType(), each.getDatabaseTypes(), 
each.getClass().getName(), replaced.getClass().getName());
+                }
             }
         }
     }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/DataSourceFactory.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/DataSourceFactory.java
index ac0b209..293e567 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/DataSourceFactory.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/DataSourceFactory.java
@@ -21,6 +21,7 @@ import lombok.SneakyThrows;
 import 
org.apache.shardingsphere.infra.config.datasource.jdbc.config.JDBCDataSourceConfiguration;
 import 
org.apache.shardingsphere.infra.config.datasource.jdbc.creator.JDBCDataSourceCreatorFactory;
 
+import javax.sql.DataSource;
 import java.sql.SQLException;
 
 /**
@@ -36,6 +37,8 @@ public final class DataSourceFactory {
      */
     @SneakyThrows(SQLException.class)
     public DataSourceWrapper newInstance(final JDBCDataSourceConfiguration 
dataSourceConfig) {
-        return new 
DataSourceWrapper(JDBCDataSourceCreatorFactory.getInstance(dataSourceConfig.getType()).createDataSource(dataSourceConfig.getDataSourceConfiguration()));
+        // TODO cache and reuse, try DataSourceManager
+        DataSource dataSource = 
JDBCDataSourceCreatorFactory.getInstance(dataSourceConfig.getType()).createDataSource(dataSourceConfig.getDataSourceConfiguration());
+        return new DataSourceWrapper(dataSource, 
dataSourceConfig.getDatabaseType());
     }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/DataSourceManager.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/DataSourceManager.java
index a335e38..0bf76a3 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/DataSourceManager.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/DataSourceManager.java
@@ -71,6 +71,7 @@ public final class DataSourceManager implements AutoCloseable 
{
      * @return data source
      */
     public DataSourceWrapper getDataSource(final JDBCDataSourceConfiguration 
dataSourceConfig) {
+        // TODO re-init if existing dataSource was closed
         if (cachedDataSources.containsKey(dataSourceConfig)) {
             return cachedDataSources.get(dataSourceConfig);
         }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/DataSourceWrapper.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/DataSourceWrapper.java
index 47b4d2e..e70326f 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/DataSourceWrapper.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/DataSourceWrapper.java
@@ -17,8 +17,10 @@
 
 package org.apache.shardingsphere.data.pipeline.core.datasource;
 
+import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
 
 import javax.sql.DataSource;
 import java.io.PrintWriter;
@@ -36,6 +38,9 @@ public final class DataSourceWrapper implements DataSource, 
AutoCloseable {
     
     private final DataSource dataSource;
     
+    @Getter
+    private final DatabaseType databaseType;
+    
     @Override
     public Connection getConnection() throws SQLException {
         return dataSource.getConnection();
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
index 44b0deb..ebd453a 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
@@ -82,6 +82,7 @@ public abstract class AbstractInventoryDumper extends 
AbstractLifecycleExecutor
     
     private TableMetaData createTableMetaData() {
         JDBCDataSourceConfiguration dataSourceConfig = 
inventoryDumperConfig.getDataSourceConfig();
+        // TODO share MetaDataManager
         MetaDataManager metaDataManager = new 
MetaDataManager(dataSourceManager.getDataSource(dataSourceConfig));
         return 
metaDataManager.getTableMetaData(inventoryDumperConfig.getTableName(), 
dataSourceConfig.getDatabaseType());
     }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DefaultDataConsistencyCheckAlgorithm.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/AbstractDataConsistencyCheckAlgorithm.java
similarity index 54%
copy from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DefaultDataConsistencyCheckAlgorithm.java
copy to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/AbstractDataConsistencyCheckAlgorithm.java
index 51d7794..82de859 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DefaultDataConsistencyCheckAlgorithm.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/AbstractDataConsistencyCheckAlgorithm.java
@@ -15,36 +15,33 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.check.consistency;
+package org.apache.shardingsphere.data.pipeline.core.spi.check.consistency;
 
+import 
org.apache.shardingsphere.data.pipeline.core.check.consistency.SingleTableDataCalculatorRegistry;
 import 
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCheckAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator;
-import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
 
-import java.util.Collection;
-import java.util.Collections;
+import java.util.Properties;
 
 /**
- * Scaling default data consistency check algorithm.
+ * Abstract data consistency check algorithm.
  */
-public final class DefaultDataConsistencyCheckAlgorithm implements 
DataConsistencyCheckAlgorithm {
+public abstract class AbstractDataConsistencyCheckAlgorithm implements 
DataConsistencyCheckAlgorithm {
     
-    public static final String TYPE = "DEFAULT";
-    
-    private static final Collection<String> SUPPORTED_DATABASE_TYPES = 
Collections.singletonList(new MySQLDatabaseType().getName());
+    private Properties props = new Properties();
     
     @Override
-    public void init() {
+    public Properties getProps() {
+        return props;
     }
     
     @Override
-    public String getDescription() {
-        return "Default implementation with CRC32 of all records.";
+    public void setProps(final Properties props) {
+        this.props = props;
     }
     
     @Override
-    public Collection<String> getSupportedDatabaseTypes() {
-        return SUPPORTED_DATABASE_TYPES;
+    public void init() {
     }
     
     @Override
@@ -53,12 +50,10 @@ public final class DefaultDataConsistencyCheckAlgorithm 
implements DataConsisten
     }
     
     @Override
-    public SingleTableDataCalculator getSingleTableDataCalculator(final String 
supportedDatabaseType) {
-        return SingleTableDataCalculatorRegistry.newServiceInstance(TYPE, 
supportedDatabaseType);
-    }
-    
-    @Override
-    public String getType() {
-        return TYPE;
+    public final SingleTableDataCalculator getSingleTableDataCalculator(final 
String supportedDatabaseType) {
+        SingleTableDataCalculator result = 
SingleTableDataCalculatorRegistry.newServiceInstance(getType(), 
supportedDatabaseType);
+        result.setAlgorithmProps(props);
+        result.init();
+        return result;
     }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/AbstractSingleTableDataCalculator.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/AbstractSingleTableDataCalculator.java
similarity index 78%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/AbstractSingleTableDataCalculator.java
rename to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/AbstractSingleTableDataCalculator.java
index 3a7633b..f66a347 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/AbstractSingleTableDataCalculator.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/AbstractSingleTableDataCalculator.java
@@ -15,27 +15,39 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.check.consistency;
+package org.apache.shardingsphere.data.pipeline.core.spi.check.consistency;
 
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.DataSourceFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.DataSourceWrapper;
 import 
org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator;
 import 
org.apache.shardingsphere.infra.config.datasource.jdbc.config.JDBCDataSourceConfiguration;
 
+import java.util.Properties;
+
 /**
  * Abstract single table data calculator.
  */
-@RequiredArgsConstructor
-@Getter
-@Slf4j
 public abstract class AbstractSingleTableDataCalculator implements 
SingleTableDataCalculator {
     
     private final DataSourceFactory dataSourceFactory = new 
DataSourceFactory();
     
+    private Properties algorithmProps;
+    
     protected final DataSourceWrapper getDataSource(final 
JDBCDataSourceConfiguration dataSourceConfig) {
         return dataSourceFactory.newInstance(dataSourceConfig);
     }
+    
+    @Override
+    public Properties getAlgorithmProps() {
+        return algorithmProps;
+    }
+    
+    @Override
+    public void setAlgorithmProps(final Properties algorithmProps) {
+        this.algorithmProps = algorithmProps;
+    }
+    
+    @Override
+    public void init() {
+    }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/AbstractStreamingSingleTableDataCalculator.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/AbstractStreamingSingleTableDataCalculator.java
new file mode 100644
index 0000000..ba7b031
--- /dev/null
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/AbstractStreamingSingleTableDataCalculator.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.spi.check.consistency;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataCalculateParameter;
+
+import java.util.Iterator;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Abstract single table data calculator.
+ */
+@RequiredArgsConstructor
+@Getter
+@Slf4j
+public abstract class AbstractStreamingSingleTableDataCalculator extends 
AbstractSingleTableDataCalculator {
+    
+    @Override
+    public final Iterable<Object> calculate(final DataCalculateParameter 
dataCalculateParameter) {
+        return new ResultIterable(dataCalculateParameter);
+    }
+    
+    /**
+     * Calculate chunked records at one time.
+     *
+     * @param dataCalculateParameter data calculate parameter
+     * @return optional calculated result, empty means there's no more result
+     */
+    protected abstract Optional<Object> calculateChunk(DataCalculateParameter 
dataCalculateParameter);
+    
+    /**
+     * It's not thread-safe, it should be executed in only one thread at the 
same time.
+     */
+    @RequiredArgsConstructor
+    final class ResultIterable implements Iterable<Object> {
+        
+        private final DataCalculateParameter dataCalculateParameter;
+        
+        @Override
+        public Iterator<Object> iterator() {
+            return new ResultIterator(dataCalculateParameter);
+        }
+    }
+    
+    @RequiredArgsConstructor
+    final class ResultIterator implements Iterator<Object> {
+        
+        private final DataCalculateParameter dataCalculateParameter;
+        
+        private final AtomicInteger calculationCount = new AtomicInteger(0);
+        
+        private volatile Optional<Object> nextResult;
+        
+        @Override
+        public boolean hasNext() {
+            calculateIfNecessary();
+            return nextResult.isPresent();
+        }
+        
+        @Override
+        public Object next() {
+            calculateIfNecessary();
+            Optional<Object> nextResult = this.nextResult;
+            
dataCalculateParameter.setPreviousCalculatedResult(nextResult.orElse(null));
+            this.nextResult = null;
+            return nextResult;
+        }
+        
+        private void calculateIfNecessary() {
+            if (null != nextResult) {
+                return;
+            }
+            nextResult = calculateChunk(dataCalculateParameter);
+            if (!nextResult.isPresent()) {
+                log.info("nextResult not present, calculation done. 
calculationCount={}", calculationCount);
+            }
+            if (calculationCount.incrementAndGet() % 100_0000 == 0) {
+                log.warn("possible infinite loop, calculationCount={}", 
calculationCount);
+            }
+        }
+    }
+}
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DefaultDataConsistencyCheckAlgorithm.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/CRC32MatchDataConsistencyCheckAlgorithm.java
similarity index 60%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DefaultDataConsistencyCheckAlgorithm.java
rename to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/CRC32MatchDataConsistencyCheckAlgorithm.java
index 51d7794..e65e7c1 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DefaultDataConsistencyCheckAlgorithm.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/CRC32MatchDataConsistencyCheckAlgorithm.java
@@ -15,31 +15,25 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.check.consistency;
+package org.apache.shardingsphere.data.pipeline.core.spi.check.consistency;
 
-import 
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCheckAlgorithm;
-import 
org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator;
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
 
 import java.util.Collection;
 import java.util.Collections;
 
 /**
- * Scaling default data consistency check algorithm.
+ * CRC32 match implementation of data consistency check algorithm.
  */
-public final class DefaultDataConsistencyCheckAlgorithm implements 
DataConsistencyCheckAlgorithm {
+public final class CRC32MatchDataConsistencyCheckAlgorithm extends 
AbstractDataConsistencyCheckAlgorithm {
     
-    public static final String TYPE = "DEFAULT";
+    public static final String TYPE = "CRC32_MATCH";
     
     private static final Collection<String> SUPPORTED_DATABASE_TYPES = 
Collections.singletonList(new MySQLDatabaseType().getName());
     
     @Override
-    public void init() {
-    }
-    
-    @Override
     public String getDescription() {
-        return "Default implementation with CRC32 of all records.";
+        return "Match CRC32 of records.";
     }
     
     @Override
@@ -48,16 +42,6 @@ public final class DefaultDataConsistencyCheckAlgorithm 
implements DataConsisten
     }
     
     @Override
-    public String getProvider() {
-        return "ShardingSphere";
-    }
-    
-    @Override
-    public SingleTableDataCalculator getSingleTableDataCalculator(final String 
supportedDatabaseType) {
-        return SingleTableDataCalculatorRegistry.newServiceInstance(TYPE, 
supportedDatabaseType);
-    }
-    
-    @Override
     public String getType() {
         return TYPE;
     }
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureH2SingleTableDataCalculator.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/DataMatchDataConsistencyCheckAlgorithm.java
similarity index 53%
copy from 
shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureH2SingleTableDataCalculator.java
copy to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/DataMatchDataConsistencyCheckAlgorithm.java
index 0c4792d..d1207de 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureH2SingleTableDataCalculator.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/DataMatchDataConsistencyCheckAlgorithm.java
@@ -15,25 +15,33 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.fixture;
+package org.apache.shardingsphere.data.pipeline.core.spi.check.consistency;
 
-import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataCalculateParameter;
-import 
org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator;
+import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry;
 
-public final class FixtureH2SingleTableDataCalculator implements 
SingleTableDataCalculator {
+import java.util.Collection;
+
+/**
+ * Data match implementation of data consistency check algorithm.
+ */
+public final class DataMatchDataConsistencyCheckAlgorithm extends 
AbstractDataConsistencyCheckAlgorithm {
+    
+    public static final String TYPE = "DATA_MATCH";
+    
+    private static final Collection<String> SUPPORTED_DATABASE_TYPES = 
DatabaseTypeRegistry.getDatabaseTypeNames();
     
     @Override
-    public String getAlgorithmType() {
-        return FixtureDataConsistencyCheckAlgorithm.TYPE;
+    public String getDescription() {
+        return "Match raw data of records.";
     }
     
     @Override
-    public String getDatabaseType() {
-        return "H2";
+    public Collection<String> getSupportedDatabaseTypes() {
+        return SUPPORTED_DATABASE_TYPES;
     }
     
     @Override
-    public Object dataCalculate(final DataCalculateParameter 
dataCalculateParameter) {
-        return true;
+    public String getType() {
+        return TYPE;
     }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/DataMatchSingleTableDataCalculator.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/DataMatchSingleTableDataCalculator.java
new file mode 100644
index 0000000..3dd8b2b
--- /dev/null
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/DataMatchSingleTableDataCalculator.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.spi.check.consistency;
+
+import com.google.common.base.Strings;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataCalculateParameter;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.DataCheckFailException;
+import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
+import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry;
+import 
org.apache.shardingsphere.scaling.core.job.sqlbuilder.ScalingSQLBuilderFactory;
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Data match implementation of single table data calculator.
+ */
+@Slf4j
+public final class DataMatchSingleTableDataCalculator extends 
AbstractStreamingSingleTableDataCalculator {
+    
+    private static final Collection<String> DATABASE_TYPES = 
DatabaseTypeRegistry.getDatabaseTypeNames();
+    
+    private static final String CHUNK_SIZE_KEY = "chunk-size";
+    
+    private volatile int chunkSize = 1000;
+    
+    @Override
+    public String getAlgorithmType() {
+        return DataMatchDataConsistencyCheckAlgorithm.TYPE;
+    }
+    
+    @Override
+    public Collection<String> getDatabaseTypes() {
+        return DATABASE_TYPES;
+    }
+    
+    @Override
+    public void init() {
+        Properties algorithmProps = getAlgorithmProps();
+        String chunkSizeValue = algorithmProps.getProperty(CHUNK_SIZE_KEY);
+        if (!Strings.isNullOrEmpty(chunkSizeValue)) {
+            int chunkSize = Integer.parseInt(chunkSizeValue);
+            if (chunkSize <= 0) {
+                log.warn("invalid chunkSize={}, use default value", chunkSize);
+            }
+            this.chunkSize = chunkSize;
+        }
+    }
+    
+    @Override
+    protected Optional<Object> calculateChunk(final DataCalculateParameter 
dataCalculateParameter) {
+        String logicTableName = dataCalculateParameter.getLogicTableName();
+        PipelineSQLBuilder sqlBuilder = 
ScalingSQLBuilderFactory.newInstance(dataCalculateParameter.getDatabaseType());
+        String uniqueKey = dataCalculateParameter.getUniqueKey();
+        CalculatedResult previousCalculatedResult = (CalculatedResult) 
dataCalculateParameter.getPreviousCalculatedResult();
+        Number startUniqueValue = null != previousCalculatedResult ? 
previousCalculatedResult.getMaxUniqueValue() : -1;
+        String sql = sqlBuilder.buildChunkedQuerySQL(logicTableName, 
uniqueKey, startUniqueValue);
+        try {
+            return query(dataCalculateParameter.getDataSource(), sql, 
uniqueKey, startUniqueValue, chunkSize);
+        } catch (final SQLException ex) {
+            throw new DataCheckFailException(String.format("table %s data 
check failed.", logicTableName), ex);
+        }
+    }
+    
+    private Optional<Object> query(final DataSource dataSource, final String 
sql, final String uniqueKey, final Number startUniqueValue, final int 
chunkSize) throws SQLException {
+        try (Connection connection = dataSource.getConnection();
+             PreparedStatement preparedStatement = 
connection.prepareStatement(sql)) {
+            preparedStatement.setObject(1, startUniqueValue);
+            preparedStatement.setInt(2, chunkSize);
+            Collection<Collection<Object>> records = new 
ArrayList<>(chunkSize);
+            Number maxUniqueValue = null;
+            try (ResultSet resultSet = preparedStatement.executeQuery()) {
+                while (resultSet.next()) {
+                    ResultSetMetaData resultSetMetaData = 
resultSet.getMetaData();
+                    int columnCount = resultSetMetaData.getColumnCount();
+                    Collection<Object> record = new ArrayList<>(columnCount);
+                    for (int columnIndex = 1; columnIndex <= columnCount; 
columnIndex++) {
+                        record.add(resultSet.getObject(columnIndex));
+                    }
+                    records.add(record);
+                    maxUniqueValue = (Number) resultSet.getObject(uniqueKey);
+                }
+            }
+            return records.isEmpty() ? Optional.empty() : Optional.of(new 
CalculatedResult(maxUniqueValue, records.size(), records));
+        }
+    }
+    
+    @RequiredArgsConstructor
+    @Getter
+    @EqualsAndHashCode
+    private static final class CalculatedResult {
+        
+        @NonNull
+        private final Number maxUniqueValue;
+        
+        private final int recordCount;
+        
+        private final Collection<Collection<Object>> records;
+    }
+}
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
index 8bb4ccb..2a81ac5 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.data.pipeline.core.sqlbuilder;
 
+import com.google.common.base.Preconditions;
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
@@ -144,6 +145,13 @@ public abstract class AbstractPipelineSQLBuilder 
implements PipelineSQLBuilder {
     }
     
     @Override
+    public String buildChunkedQuerySQL(final String tableName, final String 
uniqueKey, final Number startUniqueValue) {
+        Preconditions.checkNotNull(uniqueKey, "uniqueKey is null");
+        Preconditions.checkNotNull(startUniqueValue, "startUniqueValue is 
null");
+        return "SELECT * FROM " + quote(tableName) + " WHERE " + 
quote(uniqueKey) + " > ? ORDER BY " + quote(uniqueKey) + " ASC LIMIT ?";
+    }
+    
+    @Override
     public String buildCheckEmptySQL(final String tableName) {
         return String.format("SELECT * FROM %s LIMIT 1", quote(tableName));
     }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/IdleRuleAlteredJobCompletionDetectAlgorithm.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/IdleRuleAlteredJobCompletionDetectAlgorithm.java
index 5676b27..42d338e 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/IdleRuleAlteredJobCompletionDetectAlgorithm.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/IdleRuleAlteredJobCompletionDetectAlgorithm.java
@@ -18,8 +18,6 @@
 package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
 
 import com.google.common.base.Preconditions;
-import lombok.Getter;
-import lombok.Setter;
 import 
org.apache.shardingsphere.data.pipeline.api.detect.AllIncrementalTasksAlmostFinishedParameter;
 import 
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobCompletionDetectAlgorithm;
 
@@ -29,8 +27,6 @@ import java.util.Properties;
 /**
  * Idle rule altered job completion detect algorithm.
  */
-@Getter
-@Setter
 public final class IdleRuleAlteredJobCompletionDetectAlgorithm implements 
RuleAlteredJobCompletionDetectAlgorithm {
     
     public static final String IDLE_THRESHOLD_KEY = 
"incremental-task-idle-minute-threshold";
@@ -40,6 +36,16 @@ public final class 
IdleRuleAlteredJobCompletionDetectAlgorithm implements RuleAl
     private long incrementalTaskIdleMinuteThreshold = 30;
     
     @Override
+    public Properties getProps() {
+        return props;
+    }
+    
+    @Override
+    public void setProps(final Properties props) {
+        this.props = props;
+    }
+    
+    @Override
     public void init() {
         Preconditions.checkArgument(props.containsKey(IDLE_THRESHOLD_KEY), "%s 
can not be null.", IDLE_THRESHOLD_KEY);
         incrementalTaskIdleMinuteThreshold = 
Long.parseLong(props.getProperty(IDLE_THRESHOLD_KEY));
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/check/consistency/DataConsistencyCheckAlgorithm.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/check/consistency/DataConsistencyCheckAlgorithm.java
index 844233e..19a6d9d 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/check/consistency/DataConsistencyCheckAlgorithm.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/check/consistency/DataConsistencyCheckAlgorithm.java
@@ -23,7 +23,7 @@ import 
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmP
 import java.util.Collection;
 
 /**
- * Data consistency check algorithm for SPI.
+ * Data consistency check algorithm, SPI.
  */
 public interface DataConsistencyCheckAlgorithm extends 
ShardingSphereAlgorithm, ShardingSphereAlgorithmPostProcessor {
     
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/check/consistency/SingleTableDataCalculator.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/check/consistency/SingleTableDataCalculator.java
index 067b7e5..06223b5 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/check/consistency/SingleTableDataCalculator.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/check/consistency/SingleTableDataCalculator.java
@@ -19,8 +19,11 @@ package 
org.apache.shardingsphere.data.pipeline.spi.check.consistency;
 
 import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataCalculateParameter;
 
+import java.util.Collection;
+import java.util.Properties;
+
 /**
- * Single table data calculator interface for SPI.
+ * Single table data calculator interface, SPI.
  * <p>
  * SPI implementation will be initialized as new instance every time.
  * </p>
@@ -35,18 +38,37 @@ public interface SingleTableDataCalculator {
     String getAlgorithmType();
     
     /**
-     * Get database type.
+     * Get database types.
+     *
+     * @return database types
+     */
+    Collection<String> getDatabaseTypes();
+    
+    /**
+     * Get algorithm properties.
+     *
+     * @return properties
+     */
+    Properties getAlgorithmProps();
+    
+    /**
+     * Set algorithm properties.
+     * Used by data consistency check algorithm.
      *
-     * @return database type
+     * @param algorithmProps algorithm properties
+     */
+    void setAlgorithmProps(Properties algorithmProps);
+    
+    /**
+     * Initialize create data calculator.
      */
-    // TODO support database types?
-    String getDatabaseType();
+    void init();
     
     /**
-     * Calculate table data, usually checksum.
+     * Calculate table data content, return checksum typically.
      *
      * @param dataCalculateParameter data calculate parameter
      * @return calculated result, it will be used to check equality.
      */
-    Object dataCalculate(DataCalculateParameter dataCalculateParameter);
+    Iterable<Object> calculate(DataCalculateParameter dataCalculateParameter);
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
index 8f14744..6232940 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
@@ -84,6 +84,16 @@ public interface PipelineSQLBuilder {
     String buildCountSQL(String tableName);
     
     /**
+     * Build query SQL.
+     *
+     * @param tableName table name
+     * @param uniqueKey unique key, it may be primary key, not null
+     * @param startUniqueValue start unique value, not null
+     * @return query SQL
+     */
+    String buildChunkedQuerySQL(String tableName, String uniqueKey, Number 
startUniqueValue);
+    
+    /**
      * Build check empty SQL.
      *
      * @param tableName table name
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCheckAlgorithm
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCheckAlgorithm
index 57ab8e3..078c666 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCheckAlgorithm
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCheckAlgorithm
@@ -15,4 +15,5 @@
 # limitations under the License.
 #
 
-org.apache.shardingsphere.data.pipeline.core.check.consistency.DefaultDataConsistencyCheckAlgorithm
+org.apache.shardingsphere.data.pipeline.core.spi.check.consistency.CRC32MatchDataConsistencyCheckAlgorithm
+org.apache.shardingsphere.data.pipeline.core.spi.check.consistency.DataMatchDataConsistencyCheckAlgorithm
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator
similarity index 88%
copy from 
shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator
copy to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator
index 89cd902..5749056 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator
@@ -15,4 +15,4 @@
 # limitations under the License.
 #
 
-org.apache.shardingsphere.scaling.mysql.component.checker.DefaultMySQLSingleTableDataCalculator
+org.apache.shardingsphere.data.pipeline.core.spi.check.consistency.DataMatchSingleTableDataCalculator
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/resources/conf/config-sharding.yaml
 
b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/resources/conf/config-sharding.yaml
index 08fb878..4ef066f 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/resources/conf/config-sharding.yaml
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/resources/conf/config-sharding.yaml
@@ -111,7 +111,9 @@
 #      sourceWritingStopper:
 #        type: DEFAULT
 #      dataConsistencyChecker:
-#        type: DEFAULT
+#        type: DATA_MATCH
+#        props:
+#          chunk-size: 1000
 #      checkoutLocker:
 #        type: DEFAULT
 
@@ -198,9 +200,20 @@
 #    default_scaling:
 #      blockQueueSize: 10000
 #      workerThread: 40
+#      readBatchSize: 1000
+#      rateLimiter:
+#        type: SOURCE
+#        props:
+#          qps: 50
 #      completionDetector:
 #        type: IDLE
 #        props:
 #          incremental-task-idle-minute-threshold: 30
+#      sourceWritingStopper:
+#        type: DEFAULT
 #      dataConsistencyChecker:
+#        type: DATA_MATCH
+#        props:
+#          chunk-size: 1000
+#      checkoutLocker:
 #        type: DEFAULT
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/checker/DefaultMySQLSingleTableDataCalculator.java
 
b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/checker/CRC32MatchMySQLSingleTableDataCalculator.java
similarity index 59%
rename from 
shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/checker/DefaultMySQLSingleTableDataCalculator.java
rename to 
shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/checker/CRC32MatchMySQLSingleTableDataCalculator.java
index ce7aa65..3b2219eb 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/checker/DefaultMySQLSingleTableDataCalculator.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/checker/CRC32MatchMySQLSingleTableDataCalculator.java
@@ -18,10 +18,9 @@
 package org.apache.shardingsphere.scaling.mysql.component.checker;
 
 import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataCalculateParameter;
-import 
org.apache.shardingsphere.data.pipeline.core.check.consistency.AbstractSingleTableDataCalculator;
-import 
org.apache.shardingsphere.data.pipeline.core.check.consistency.DefaultDataConsistencyCheckAlgorithm;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.DataSourceWrapper;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.DataCheckFailException;
+import 
org.apache.shardingsphere.data.pipeline.core.spi.check.consistency.AbstractSingleTableDataCalculator;
+import 
org.apache.shardingsphere.data.pipeline.core.spi.check.consistency.CRC32MatchDataConsistencyCheckAlgorithm;
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
 import 
org.apache.shardingsphere.scaling.mysql.component.MySQLPipelineSQLBuilder;
 
@@ -30,48 +29,50 @@ import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.stream.Collectors;
 
 /**
- * Default MySQL single table data calculator.
+ * CRC32 match MySQL implementation of single table data calculator.
  */
-public final class DefaultMySQLSingleTableDataCalculator extends 
AbstractSingleTableDataCalculator {
+public final class CRC32MatchMySQLSingleTableDataCalculator extends 
AbstractSingleTableDataCalculator {
     
-    private static final String DATABASE_TYPE = new 
MySQLDatabaseType().getName();
+    private static final Collection<String> DATABASE_TYPES = 
Collections.singletonList(new MySQLDatabaseType().getName());
     
     @Override
     public String getAlgorithmType() {
-        return DefaultDataConsistencyCheckAlgorithm.TYPE;
+        return CRC32MatchDataConsistencyCheckAlgorithm.TYPE;
     }
     
     @Override
-    public String getDatabaseType() {
-        return DATABASE_TYPE;
+    public Collection<String> getDatabaseTypes() {
+        return DATABASE_TYPES;
     }
     
     @Override
-    public Object dataCalculate(final DataCalculateParameter 
dataCalculateParameter) {
+    public Iterable<Object> calculate(final DataCalculateParameter 
dataCalculateParameter) {
         String logicTableName = dataCalculateParameter.getLogicTableName();
         MySQLPipelineSQLBuilder scalingSQLBuilder = new 
MySQLPipelineSQLBuilder(new HashMap<>());
-        try (DataSourceWrapper dataSource = 
getDataSource(dataCalculateParameter.getDataSourceConfig())) {
-            return dataCalculateParameter.getColumnNames().stream().map(each 
-> {
-                String sql = 
scalingSQLBuilder.buildSumCrc32SQL(logicTableName, each);
-                return sumCrc32(dataSource, sql);
-            }).collect(Collectors.toList());
-        } catch (final SQLException ex) {
-            throw new DataCheckFailException(String.format("table %s data 
check failed.", logicTableName), ex);
-        }
+        List<Long> result = 
dataCalculateParameter.getColumnNames().stream().map(each -> {
+            String sql = scalingSQLBuilder.buildSumCrc32SQL(logicTableName, 
each);
+            try {
+                return sumCrc32(dataCalculateParameter.getDataSource(), sql);
+            } catch (final SQLException ex) {
+                throw new DataCheckFailException(String.format("table %s data 
check failed.", logicTableName), ex);
+            }
+        }).collect(Collectors.toList());
+        return Collections.unmodifiableList(result);
     }
     
-    private long sumCrc32(final DataSource dataSource, final String sql) {
+    private long sumCrc32(final DataSource dataSource, final String sql) 
throws SQLException {
         try (Connection connection = dataSource.getConnection();
              PreparedStatement preparedStatement = 
connection.prepareStatement(sql);
              ResultSet resultSet = preparedStatement.executeQuery()) {
             resultSet.next();
             return resultSet.getLong(1);
-        } catch (final SQLException ex) {
-            throw new DataCheckFailException(String.format("execute %s 
failed.", sql), ex);
         }
     }
 }
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator
 
b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator
index 89cd902..f05ee7e 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator
@@ -15,4 +15,4 @@
 # limitations under the License.
 #
 
-org.apache.shardingsphere.scaling.mysql.component.checker.DefaultMySQLSingleTableDataCalculator
+org.apache.shardingsphere.scaling.mysql.component.checker.CRC32MatchMySQLSingleTableDataCalculator
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/checker/DefaultPostgreSQLSingleTableDataCalculator.java
 
b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/checker/CRC32MatchPostgreSQLSingleTableDataCalculator.java
similarity index 56%
rename from 
shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/checker/DefaultPostgreSQLSingleTableDataCalculator.java
rename to 
shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/checker/CRC32MatchPostgreSQLSingleTableDataCalculator.java
index 791dddd..30ee199 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/checker/DefaultPostgreSQLSingleTableDataCalculator.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/checker/CRC32MatchPostgreSQLSingleTableDataCalculator.java
@@ -18,30 +18,33 @@
 package org.apache.shardingsphere.scaling.postgresql.component.checker;
 
 import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataCalculateParameter;
-import 
org.apache.shardingsphere.data.pipeline.core.check.consistency.AbstractSingleTableDataCalculator;
-import 
org.apache.shardingsphere.data.pipeline.core.check.consistency.DefaultDataConsistencyCheckAlgorithm;
+import 
org.apache.shardingsphere.data.pipeline.core.spi.check.consistency.AbstractSingleTableDataCalculator;
+import 
org.apache.shardingsphere.data.pipeline.core.spi.check.consistency.CRC32MatchDataConsistencyCheckAlgorithm;
 import 
org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
 
+import java.util.Collection;
+import java.util.Collections;
+
 /**
- * Default PostgreSQL single table data calculator.
+ * CRC32 match PostgreSQL implementation of single table data calculator.
  */
-public final class DefaultPostgreSQLSingleTableDataCalculator extends 
AbstractSingleTableDataCalculator {
+public final class CRC32MatchPostgreSQLSingleTableDataCalculator extends 
AbstractSingleTableDataCalculator {
     
-    private static final String DATABASE_TYPE = new 
PostgreSQLDatabaseType().getName();
+    private static final Collection<String> DATABASE_TYPES = 
Collections.singletonList(new PostgreSQLDatabaseType().getName());
     
     @Override
     public String getAlgorithmType() {
-        return DefaultDataConsistencyCheckAlgorithm.TYPE;
+        return CRC32MatchDataConsistencyCheckAlgorithm.TYPE;
     }
     
     @Override
-    public String getDatabaseType() {
-        return DATABASE_TYPE;
+    public Collection<String> getDatabaseTypes() {
+        return DATABASE_TYPES;
     }
     
     @Override
-    public Object dataCalculate(final DataCalculateParameter 
dataCalculateParameter) {
-        //TODO PostgreSQL dataCalculate
-        return true;
+    public Iterable<Object> calculate(final DataCalculateParameter 
dataCalculateParameter) {
+        //TODO PostgreSQL calculate
+        return Collections.emptyList();
     }
 }
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator
 
b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator
index ad3cf6b..710380b 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator
@@ -15,4 +15,4 @@
 # limitations under the License.
 #
 
-org.apache.shardingsphere.scaling.postgresql.component.checker.DefaultPostgreSQLSingleTableDataCalculator
+org.apache.shardingsphere.scaling.postgresql.component.checker.CRC32MatchPostgreSQLSingleTableDataCalculator
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-distsql/shardingsphere-scaling-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/CheckScalingQueryResultSet.java
 
b/shardingsphere-scaling/shardingsphere-scaling-distsql/shardingsphere-scaling-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/CheckScalingQueryResultSet.java
index afa89a1..44830e0 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-distsql/shardingsphere-scaling-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/CheckScalingQueryResultSet.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-distsql/shardingsphere-scaling-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/CheckScalingQueryResultSet.java
@@ -51,17 +51,17 @@ public final class CheckScalingQueryResultSet implements 
DistSQLResultSet {
                 .map(each -> {
                     Collection<Object> list = new LinkedList<>();
                     list.add(each.getKey());
-                    list.add(each.getValue().getSourceCount());
-                    list.add(each.getValue().getTargetCount());
-                    list.add(each.getValue().isCountValid() + "");
-                    list.add(each.getValue().isDataValid() + "");
+                    list.add(each.getValue().getSourceRecordsCount());
+                    list.add(each.getValue().getTargetRecordsCount());
+                    list.add(each.getValue().isRecordsCountMatched() + "");
+                    list.add(each.getValue().isRecordsContentMatched() + "");
                     return list;
                 }).collect(Collectors.toList()).iterator();
     }
     
     @Override
     public Collection<String> getColumnNames() {
-        return Arrays.asList("table_name", "source_count", "target_count", 
"count_valid", "data_valid");
+        return Arrays.asList("table_name", "source_records_count", 
"target_records_count", "records_count_matched", "records_content_matched");
     }
     
     @Override
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-distsql/shardingsphere-scaling-distsql-handler/src/test/java/org/apache/shardingsphere/scaling/distsql/handler/ShowScalingCheckAlgorithmsQueryResultSetTest.java
 
b/shardingsphere-scaling/shardingsphere-scaling-distsql/shardingsphere-scaling-distsql-handler/src/test/java/org/apache/shardingsphere/scaling/distsql/handler/ShowScalingCheckAlgorithmsQueryResultSetTest.java
index a89152d..0061785 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-distsql/shardingsphere-scaling-distsql-handler/src/test/java/org/apache/shardingsphere/scaling/distsql/handler/ShowScalingCheckAlgorithmsQueryResultSetTest.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-distsql/shardingsphere-scaling-distsql-handler/src/test/java/org/apache/shardingsphere/scaling/distsql/handler/ShowScalingCheckAlgorithmsQueryResultSetTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.scaling.distsql.handler;
 
-import org.apache.shardingsphere.infra.distsql.query.DistSQLResultSet;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
 import 
org.apache.shardingsphere.scaling.distsql.statement.ShowScalingCheckAlgorithmsStatement;
 import org.junit.Test;
@@ -27,9 +26,12 @@ import org.mockito.junit.MockitoJUnitRunner;
 
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.Set;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 
 @RunWith(MockitoJUnitRunner.class)
 public final class ShowScalingCheckAlgorithmsQueryResultSetTest {
@@ -42,14 +44,16 @@ public final class 
ShowScalingCheckAlgorithmsQueryResultSetTest {
     
     @Test
     public void assertGetRowData() {
-        DistSQLResultSet resultSet = new 
ShowScalingCheckAlgorithmsQueryResultSet();
+        ShowScalingCheckAlgorithmsQueryResultSet resultSet = new 
ShowScalingCheckAlgorithmsQueryResultSet();
         resultSet.init(shardingSphereMetaData, 
showScalingCheckAlgorithmsStatement);
-        Collection<Object> actual = resultSet.getRowData();
-        assertThat(actual.size(), is(4));
-        Iterator<Object> rowData = actual.iterator();
-        assertThat(rowData.next(), is("DEFAULT"));
-        assertThat(rowData.next(), is("Default implementation with CRC32 of 
all records."));
-        assertThat(rowData.next(), is("MySQL"));
-        assertThat(rowData.next(), is("ShardingSphere"));
+        Set<Object> algorithmTypes = new LinkedHashSet<>();
+        while (resultSet.next()) {
+            Collection<Object> actual = resultSet.getRowData();
+            assertThat(actual.size(), is(4));
+            Iterator<Object> rowData = actual.iterator();
+            algorithmTypes.add(rowData.next());
+        }
+        assertTrue(algorithmTypes.contains("DATA_MATCH"));
+        assertTrue(algorithmTypes.contains("CRC32_MATCH"));
     }
 }
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/PipelineJobAPIImplTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/PipelineJobAPIImplTest.java
index 5ac6109..59cb3e9 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/PipelineJobAPIImplTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/PipelineJobAPIImplTest.java
@@ -152,9 +152,9 @@ public final class PipelineJobAPIImplTest {
         initTableData(jobConfig.getRuleConfig());
         Map<String, DataConsistencyCheckResult> checkResultMap = 
pipelineJobAPI.dataConsistencyCheck(jobId.get(), 
FixtureDataConsistencyCheckAlgorithm.TYPE);
         assertThat(checkResultMap.size(), is(1));
-        assertTrue(checkResultMap.get("t_order").isCountValid());
-        assertTrue(checkResultMap.get("t_order").isDataValid());
-        assertThat(checkResultMap.get("t_order").getTargetCount(), is(2L));
+        assertTrue(checkResultMap.get("t_order").isRecordsCountMatched());
+        assertTrue(checkResultMap.get("t_order").isRecordsContentMatched());
+        assertThat(checkResultMap.get("t_order").getTargetRecordsCount(), 
is(2L));
     }
     
     @Test
@@ -164,17 +164,17 @@ public final class PipelineJobAPIImplTest {
         checkResultMap = Collections.emptyMap();
         assertThat(pipelineJobAPI.aggregateDataConsistencyCheckResults(jobId, 
checkResultMap), is(false));
         DataConsistencyCheckResult trueResult = new 
DataConsistencyCheckResult(1, 1);
-        trueResult.setDataValid(true);
+        trueResult.setRecordsContentMatched(true);
         DataConsistencyCheckResult checkResult;
         checkResult = new DataConsistencyCheckResult(100, 95);
         checkResultMap = ImmutableMap.<String, 
DataConsistencyCheckResult>builder().put("t", trueResult).put("t_order", 
checkResult).build();
         assertThat(pipelineJobAPI.aggregateDataConsistencyCheckResults(jobId, 
checkResultMap), is(false));
         checkResult = new DataConsistencyCheckResult(100, 100);
-        checkResult.setDataValid(false);
+        checkResult.setRecordsContentMatched(false);
         checkResultMap = ImmutableMap.<String, 
DataConsistencyCheckResult>builder().put("t", trueResult).put("t_order", 
checkResult).build();
         assertThat(pipelineJobAPI.aggregateDataConsistencyCheckResults(jobId, 
checkResultMap), is(false));
         checkResult = new DataConsistencyCheckResult(100, 100);
-        checkResult.setDataValid(true);
+        checkResult.setRecordsContentMatched(true);
         checkResultMap = ImmutableMap.<String, 
DataConsistencyCheckResult>builder().put("t", trueResult).put("t_order", 
checkResult).build();
         assertThat(pipelineJobAPI.aggregateDataConsistencyCheckResults(jobId, 
checkResultMap), is(true));
     }
@@ -188,7 +188,7 @@ public final class PipelineJobAPIImplTest {
         initTableData(jobConfig.getRuleConfig());
         pipelineJobAPI.reset(jobId.get());
         Map<String, DataConsistencyCheckResult> checkResultMap = 
pipelineJobAPI.dataConsistencyCheck(jobId.get(), 
FixtureDataConsistencyCheckAlgorithm.TYPE);
-        assertThat(checkResultMap.get("t_order").getTargetCount(), is(0L));
+        assertThat(checkResultMap.get("t_order").getTargetRecordsCount(), 
is(0L));
     }
     
     @SneakyThrows(SQLException.class)
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImplTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImplTest.java
index b0b41d5..39463ce 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImplTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImplTest.java
@@ -49,10 +49,10 @@ public final class DataConsistencyCheckerImplTest {
         DataConsistencyChecker dataConsistencyChecker = 
EnvironmentCheckerFactory.newInstance(jobContext);
         
initTableData(jobContext.getTaskConfigs().get(0).getDumperConfig().getDataSourceConfig());
         
initTableData(jobContext.getTaskConfigs().get(0).getImporterConfig().getDataSourceConfig());
-        Map<String, DataConsistencyCheckResult> resultMap = 
dataConsistencyChecker.countCheck();
-        assertTrue(resultMap.get("t_order").isCountValid());
-        assertThat(resultMap.get("t_order").getSourceCount(), 
is(resultMap.get("t_order").getTargetCount()));
-        Map<String, Boolean> dataCheckResultMap = 
dataConsistencyChecker.dataCheck(new FixtureDataConsistencyCheckAlgorithm());
+        Map<String, DataConsistencyCheckResult> resultMap = 
dataConsistencyChecker.checkRecordsCount();
+        assertTrue(resultMap.get("t_order").isRecordsCountMatched());
+        assertThat(resultMap.get("t_order").getSourceRecordsCount(), 
is(resultMap.get("t_order").getTargetRecordsCount()));
+        Map<String, Boolean> dataCheckResultMap = 
dataConsistencyChecker.checkRecordsContent(new 
FixtureDataConsistencyCheckAlgorithm());
         assertTrue(dataCheckResultMap.get("t_order"));
     }
     
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/DataSourceWrapperTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/DataSourceWrapperTest.java
index 0c6ff2a..8997096 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/DataSourceWrapperTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/DataSourceWrapperTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.data.pipeline.core.datasource;
 
+import org.apache.shardingsphere.infra.database.type.dialect.H2DatabaseType;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -68,7 +69,7 @@ public final class DataSourceWrapperTest {
 
     @Test
     public void assertGetConnection() throws SQLException {
-        DataSourceWrapper dataSourceWrapper = new 
DataSourceWrapper(dataSource);
+        DataSourceWrapper dataSourceWrapper = new 
DataSourceWrapper(dataSource, new H2DatabaseType());
         assertThat(dataSourceWrapper.getConnection(), is(connection));
         assertThat(dataSourceWrapper.getConnection(CLIENT_USERNAME, 
CLIENT_PASSWORD), is(connection));
         assertGetLogWriter(dataSourceWrapper.getLogWriter());
@@ -96,24 +97,24 @@ public final class DataSourceWrapperTest {
     @Test(expected = SQLException.class)
     public void assertSetLoginTimeoutFailure() throws SQLException {
         doThrow(new 
SQLException("")).when(dataSource).setLoginTimeout(LOGIN_TIMEOUT);
-        new DataSourceWrapper(dataSource).setLoginTimeout(LOGIN_TIMEOUT);
+        new DataSourceWrapper(dataSource, new 
H2DatabaseType()).setLoginTimeout(LOGIN_TIMEOUT);
     }
 
     @Test(expected = SQLException.class)
     public void assertSetLogWriterFailure() throws SQLException {
         doThrow(new 
SQLException("")).when(dataSource).setLogWriter(printWriter);
-        new DataSourceWrapper(dataSource).setLogWriter(printWriter);
+        new DataSourceWrapper(dataSource, new 
H2DatabaseType()).setLogWriter(printWriter);
     }
 
     @Test(expected = SQLException.class)
     public void assertCloseExceptionFailure() throws Exception {
         doThrow(new Exception("")).when((AutoCloseable) dataSource).close();
-        new DataSourceWrapper(dataSource).close();
+        new DataSourceWrapper(dataSource, new H2DatabaseType()).close();
     }
 
     @Test(expected = SQLException.class)
     public void assertCloseSQLExceptionFailure() throws Exception {
         doThrow(new SQLException("")).when((AutoCloseable) dataSource).close();
-        new DataSourceWrapper(dataSource).close();
+        new DataSourceWrapper(dataSource, new H2DatabaseType()).close();
     }
 }
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureH2SingleTableDataCalculator.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureH2SingleTableDataCalculator.java
index 0c4792d..61a7a14 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureH2SingleTableDataCalculator.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureH2SingleTableDataCalculator.java
@@ -18,9 +18,12 @@
 package org.apache.shardingsphere.data.pipeline.core.fixture;
 
 import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataCalculateParameter;
-import 
org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator;
+import 
org.apache.shardingsphere.data.pipeline.core.spi.check.consistency.AbstractSingleTableDataCalculator;
 
-public final class FixtureH2SingleTableDataCalculator implements 
SingleTableDataCalculator {
+import java.util.Collection;
+import java.util.Collections;
+
+public final class FixtureH2SingleTableDataCalculator extends 
AbstractSingleTableDataCalculator {
     
     @Override
     public String getAlgorithmType() {
@@ -28,12 +31,12 @@ public final class FixtureH2SingleTableDataCalculator 
implements SingleTableData
     }
     
     @Override
-    public String getDatabaseType() {
-        return "H2";
+    public Collection<String> getDatabaseTypes() {
+        return Collections.singletonList("H2");
     }
     
     @Override
-    public Object dataCalculate(final DataCalculateParameter 
dataCalculateParameter) {
-        return true;
+    public Iterable<Object> calculate(final DataCalculateParameter 
dataCalculateParameter) {
+        return Collections.singletonList(true);
     }
 }
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/ScalingDefaultDataConsistencyCheckAlgorithmTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/CRC32MatchDataConsistencyCheckAlgorithmTest.java
similarity index 76%
rename from 
shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/ScalingDefaultDataConsistencyCheckAlgorithmTest.java
rename to 
shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/CRC32MatchDataConsistencyCheckAlgorithmTest.java
index eee406c..68b6129 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/ScalingDefaultDataConsistencyCheckAlgorithmTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/CRC32MatchDataConsistencyCheckAlgorithmTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.check.consistency;
+package org.apache.shardingsphere.data.pipeline.core.spi.check.consistency;
 
 import org.junit.Test;
 
@@ -25,13 +25,13 @@ import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 
-public final class ScalingDefaultDataConsistencyCheckAlgorithmTest {
+public final class CRC32MatchDataConsistencyCheckAlgorithmTest {
     
     @Test
     public void assertNewInstance() {
-        DefaultDataConsistencyCheckAlgorithm checkAlgorithm = new 
DefaultDataConsistencyCheckAlgorithm();
+        CRC32MatchDataConsistencyCheckAlgorithm checkAlgorithm = new 
CRC32MatchDataConsistencyCheckAlgorithm();
         checkAlgorithm.init();
-        assertThat(checkAlgorithm.getType(), 
is(DefaultDataConsistencyCheckAlgorithm.TYPE));
+        assertThat(checkAlgorithm.getType(), 
is(CRC32MatchDataConsistencyCheckAlgorithm.TYPE));
         assertNotNull(checkAlgorithm.getDescription());
         assertThat(checkAlgorithm.getProvider(), is("ShardingSphere"));
         assertThat(checkAlgorithm.getSupportedDatabaseTypes(), 
is(Collections.singletonList("MySQL")));
@@ -39,7 +39,7 @@ public final class 
ScalingDefaultDataConsistencyCheckAlgorithmTest {
     
     @Test(expected = NullPointerException.class)
     public void assertGetSingleTableDataCalculator() {
-        DefaultDataConsistencyCheckAlgorithm checkAlgorithm = new 
DefaultDataConsistencyCheckAlgorithm();
+        CRC32MatchDataConsistencyCheckAlgorithm checkAlgorithm = new 
CRC32MatchDataConsistencyCheckAlgorithm();
         
checkAlgorithm.getSupportedDatabaseTypes().forEach(checkAlgorithm::getSingleTableDataCalculator);
     }
 }

Reply via email to