This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 1fb87668517 Data consistency check job support breakpoint resume 
transmission (#22048)
1fb87668517 is described below

commit 1fb8766851700ec17f2d4b2967a63bc42985288c
Author: Xinze Guo <[email protected]>
AuthorDate: Fri Nov 11 11:29:04 2022 +0800

    Data consistency check job support breakpoint resume transmission (#22048)
    
    * data consistency check job add position, support start execution from the 
middle of the task
    
    * Fix codestyle
    
    * revise comment
    
    * Add more unit test
    
    * Add check records count init
    
    * Fix ci error
    
    * rename package
    
    * add unit test
---
 .../DataConsistencyCalculateParameter.java         |   2 +
 .../DataConsistencyCalculatedResult.java           |   9 ++
 .../progress/ConsistencyCheckJobItemProgress.java  |  19 +--
 ...SingleTableInventoryDataConsistencyChecker.java |  31 +++--
 ...RC32MatchDataConsistencyCalculateAlgorithm.java |   6 +
 ...DataMatchDataConsistencyCalculateAlgorithm.java |  18 ++-
 .../yaml/YamlConsistencyCheckJobItemProgress.java  |   4 +
 ...YamlConsistencyCheckJobItemProgressSwapper.java |  20 ++--
 .../data/pipeline/core/util/ReflectionUtil.java    |  29 +++++
 .../consistencycheck/ConsistencyCheckJob.java      |   4 +-
 .../ConsistencyCheckJobAPIImpl.java                |   8 +-
 .../ConsistencyCheckJobItemContext.java            |  12 +-
 ...MatchDataConsistencyCalculateAlgorithmTest.java |   3 +-
 .../pipeline/core/util/ReflectionUtilTest.java     |  17 ++-
 .../cases/migration/AbstractMigrationITCase.java   |   1 +
 .../primarykey/TextPrimaryKeyMigrationIT.java      |   7 +-
 .../core/api/impl/MigrationJobAPIImplTest.java     |   2 +-
 ...MatchDataConsistencyCalculateAlgorithmTest.java | 128 +++++++++++++++++++++
 .../FixtureDataConsistencyCalculatedResult.java    |   7 ++
 .../consistencycheck/ConsistencyCheckJobTest.java  |  64 +++++++++++
 .../MigrationDataConsistencyCheckerTest.java       |   2 +-
 21 files changed, 345 insertions(+), 48 deletions(-)

diff --git 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculateParameter.java
 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculateParameter.java
index 154643bdd59..04d9c757adf 100644
--- 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculateParameter.java
+++ 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculateParameter.java
@@ -73,4 +73,6 @@ public final class DataConsistencyCalculateParameter {
      * Previous calculated result will be transferred to next call.
      */
     private volatile Object previousCalculatedResult;
+    
+    private final Object tableCheckPosition;
 }
diff --git 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculatedResult.java
 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculatedResult.java
index ea34f78d60e..c5d4587ed02 100644
--- 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculatedResult.java
+++ 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculatedResult.java
@@ -17,6 +17,8 @@
 
 package org.apache.shardingsphere.data.pipeline.api.check.consistency;
 
+import java.util.Optional;
+
 /**
  * Data consistency calculated result.
  */
@@ -28,4 +30,11 @@ public interface DataConsistencyCalculatedResult {
      * @return records count
      */
     int getRecordsCount();
+    
+    /**
+     * Get max unique key value.
+     *
+     * @return max unique key value
+     */
+    Optional<Object> getMaxUniqueKeyValue();
 }
diff --git 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/ConsistencyCheckJobItemProgress.java
 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/ConsistencyCheckJobItemProgress.java
index dcd0195bfa3..a559023fccd 100644
--- 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/ConsistencyCheckJobItemProgress.java
+++ 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/ConsistencyCheckJobItemProgress.java
@@ -18,28 +18,33 @@
 package org.apache.shardingsphere.data.pipeline.api.job.progress;
 
 import lombok.Getter;
+import lombok.RequiredArgsConstructor;
 import lombok.Setter;
 import lombok.ToString;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 
+import java.util.Map;
+
 /**
  * Data consistency check job item progress.
  */
-// TODO use final for fields
 @Getter
-@Setter
+@RequiredArgsConstructor
 @ToString
 public final class ConsistencyCheckJobItemProgress implements 
PipelineJobItemProgress {
     
+    @Setter
     private JobStatus status = JobStatus.RUNNING;
     
-    private String tableNames;
+    private final String tableNames;
+    
+    private final Long checkedRecordsCount;
     
-    private Long checkedRecordsCount;
+    private final Long recordsCount;
     
-    private Long recordsCount;
+    private final Long checkBeginTimeMillis;
     
-    private Long checkBeginTimeMillis;
+    private final Long checkEndTimeMillis;
     
-    private Long checkEndTimeMillis;
+    private final Map<String, Object> tableCheckPositions;
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
index bf9aed28113..899c0f17ceb 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
@@ -26,7 +26,6 @@ import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsist
 import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
-import 
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressUpdatedParameter;
 import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaTableName;
 import 
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
@@ -34,6 +33,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumn
 import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
+import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import 
org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
@@ -43,6 +43,7 @@ import 
org.apache.shardingsphere.infra.util.exception.external.sql.type.wrapper.
 import java.sql.SQLException;
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ExecutionException;
@@ -74,7 +75,7 @@ public final class SingleTableInventoryDataConsistencyChecker 
{
     
     private final JobRateLimitAlgorithm readRateLimitAlgorithm;
     
-    private final PipelineJobProgressListener jobProgressListener;
+    private final ConsistencyCheckJobItemContext jobItemContext;
     
     /**
      * Data consistency check.
@@ -86,15 +87,14 @@ public final class 
SingleTableInventoryDataConsistencyChecker {
         ThreadFactory threadFactory = 
ExecutorThreadFactoryBuilder.build("job-" + getJobIdDigest(jobId) + 
"-check-%d");
         ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60, 
TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
         try {
-            return check(calculateAlgorithm, executor, jobProgressListener);
+            return check(calculateAlgorithm, executor);
         } finally {
             executor.shutdown();
             executor.shutdownNow();
         }
     }
     
-    private DataConsistencyCheckResult check(final 
DataConsistencyCalculateAlgorithm calculateAlgorithm, final ThreadPoolExecutor 
executor,
-                                             final PipelineJobProgressListener 
jobProgressListener) {
+    private DataConsistencyCheckResult check(final 
DataConsistencyCalculateAlgorithm calculateAlgorithm, final ThreadPoolExecutor 
executor) {
         String sourceDatabaseType = 
sourceDataSource.getDatabaseType().getType();
         String targetDatabaseType = 
targetDataSource.getDatabaseType().getType();
         String schemaName = sourceTable.getSchemaName().getOriginal();
@@ -102,10 +102,14 @@ public final class 
SingleTableInventoryDataConsistencyChecker {
         PipelineTableMetaData tableMetaData = 
metaDataLoader.getTableMetaData(schemaName, sourceTableName);
         ShardingSpherePreconditions.checkNotNull(tableMetaData, () -> new 
PipelineTableDataConsistencyCheckLoadingFailedException(schemaName, 
sourceTableName));
         Collection<String> columnNames = tableMetaData.getColumnNames();
+        Map<String, Object> tableCheckPositions = 
jobItemContext.getTableCheckPositions();
         DataConsistencyCalculateParameter sourceParameter = buildParameter(
-                sourceDataSource, schemaName, sourceTableName, columnNames, 
sourceDatabaseType, targetDatabaseType, uniqueKey);
+                sourceDataSource, schemaName, sourceTableName, columnNames, 
sourceDatabaseType, targetDatabaseType, uniqueKey,
+                tableCheckPositions.get(sourceTableName));
+        String targetTableName = targetTable.getTableName().getOriginal();
         DataConsistencyCalculateParameter targetParameter = buildParameter(
-                targetDataSource, targetTable.getSchemaName().getOriginal(), 
targetTable.getTableName().getOriginal(), columnNames, targetDatabaseType, 
sourceDatabaseType, uniqueKey);
+                targetDataSource, targetTable.getSchemaName().getOriginal(), 
targetTableName, columnNames, targetDatabaseType, sourceDatabaseType, uniqueKey,
+                tableCheckPositions.get(targetTableName));
         Iterator<DataConsistencyCalculatedResult> sourceCalculatedResults = 
calculateAlgorithm.calculate(sourceParameter).iterator();
         Iterator<DataConsistencyCalculatedResult> targetCalculatedResults = 
calculateAlgorithm.calculate(targetParameter).iterator();
         long sourceRecordsCount = 0;
@@ -126,9 +130,13 @@ public final class 
SingleTableInventoryDataConsistencyChecker {
                 log.info("content matched false, jobId={}, sourceTable={}, 
targetTable={}, uniqueKey={}", jobId, sourceTable, targetTable, uniqueKey);
                 break;
             }
-            if (null != jobProgressListener) {
-                jobProgressListener.onProgressUpdated(new 
PipelineJobProgressUpdatedParameter(sourceCalculatedResult.getRecordsCount()));
+            if (sourceCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
+                jobItemContext.getTableCheckPositions().put(sourceTableName, 
sourceCalculatedResult.getMaxUniqueKeyValue().get());
             }
+            if (targetCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
+                jobItemContext.getTableCheckPositions().put(targetTableName, 
targetCalculatedResult.getMaxUniqueKeyValue().get());
+            }
+            jobItemContext.onProgressUpdated(new 
PipelineJobProgressUpdatedParameter(sourceCalculatedResult.getRecordsCount()));
         }
         return new DataConsistencyCheckResult(new 
DataConsistencyCountCheckResult(sourceRecordsCount, targetRecordsCount), new 
DataConsistencyContentCheckResult(contentMatched));
     }
@@ -140,8 +148,9 @@ public final class 
SingleTableInventoryDataConsistencyChecker {
     
     private DataConsistencyCalculateParameter buildParameter(final 
PipelineDataSourceWrapper sourceDataSource,
                                                              final String 
schemaName, final String tableName, final Collection<String> columnNames,
-                                                             final String 
sourceDatabaseType, final String targetDatabaseType, final 
PipelineColumnMetaData uniqueKey) {
-        return new DataConsistencyCalculateParameter(sourceDataSource, 
schemaName, tableName, columnNames, sourceDatabaseType, targetDatabaseType, 
uniqueKey);
+                                                             final String 
sourceDatabaseType, final String targetDatabaseType, final 
PipelineColumnMetaData uniqueKey,
+                                                             final Object 
tableCheckPositionValue) {
+        return new DataConsistencyCalculateParameter(sourceDataSource, 
schemaName, tableName, columnNames, sourceDatabaseType, targetDatabaseType, 
uniqueKey, tableCheckPositionValue);
     }
     
     private <T> T waitFuture(final Future<T> future) {
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
index fa605fbec02..aefc709ebb0 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
@@ -141,5 +141,11 @@ public final class 
CRC32MatchDataConsistencyCalculateAlgorithm extends AbstractD
             result = 31 * result + columnsCrc32.hashCode();
             return result;
         }
+        
+        // TODO not support now
+        @Override
+        public Optional<Object> getMaxUniqueKeyValue() {
+            return Optional.empty();
+        }
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
index 69f2f40d15b..676c3c71fcd 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
@@ -96,10 +96,16 @@ public final class 
DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
                 Connection connection = 
parameter.getDataSource().getConnection();
                 PreparedStatement preparedStatement = 
setCurrentStatement(connection.prepareStatement(sql))) {
             preparedStatement.setFetchSize(chunkSize);
+            Object tableCheckPosition = parameter.getTableCheckPosition();
             if (null == previousCalculatedResult) {
-                preparedStatement.setInt(1, chunkSize);
+                if (null == tableCheckPosition) {
+                    preparedStatement.setInt(1, chunkSize);
+                } else {
+                    preparedStatement.setObject(1, tableCheckPosition);
+                    preparedStatement.setInt(2, chunkSize);
+                }
             } else {
-                preparedStatement.setObject(1, 
previousCalculatedResult.getMaxUniqueKeyValue());
+                preparedStatement.setObject(1, 
previousCalculatedResult.getMaxUniqueKeyValue().orElse(null));
                 preparedStatement.setInt(2, chunkSize);
             }
             Collection<Collection<Object>> records = new LinkedList<>();
@@ -134,7 +140,7 @@ public final class 
DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
         String cacheKey = parameter.getDatabaseType() + "-" + (null != 
schemaName && 
DatabaseTypeFactory.getInstance(parameter.getDatabaseType()).isSchemaAvailable()
                 ? schemaName + "." + logicTableName
                 : logicTableName);
-        if (null == parameter.getPreviousCalculatedResult()) {
+        if (null == parameter.getPreviousCalculatedResult() && null == 
parameter.getTableCheckPosition()) {
             return firstSQLCache.computeIfAbsent(cacheKey, s -> 
sqlBuilder.buildChunkedQuerySQL(schemaName, logicTableName, uniqueKey, true));
         }
         return laterSQLCache.computeIfAbsent(cacheKey, s -> 
sqlBuilder.buildChunkedQuerySQL(schemaName, logicTableName, uniqueKey, false));
@@ -166,6 +172,10 @@ public final class 
DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
         
         private final Collection<Collection<Object>> records;
         
+        public Optional<Object> getMaxUniqueKeyValue() {
+            return Optional.of(maxUniqueKeyValue);
+        }
+        
         @SneakyThrows(SQLException.class)
         @Override
         public boolean equals(final Object o) {
@@ -224,7 +234,7 @@ public final class 
DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
         
         @Override
         public int hashCode() {
-            return new HashCodeBuilder(17, 
37).append(getMaxUniqueKeyValue()).append(getRecordsCount()).append(getRecords()).toHashCode();
+            return new HashCodeBuilder(17, 
37).append(getMaxUniqueKeyValue().orElse(null)).append(getRecordsCount()).append(getRecords()).toHashCode();
         }
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobItemProgress.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobItemProgress.java
index 4b610032745..62bc83d056e 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobItemProgress.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobItemProgress.java
@@ -21,6 +21,8 @@ import lombok.Getter;
 import lombok.Setter;
 import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
 
+import java.util.Map;
+
 /**
  * Yaml data consistency check job item progress.
  */
@@ -39,4 +41,6 @@ public final class YamlConsistencyCheckJobItemProgress 
implements YamlConfigurat
     private Long checkBeginTimeMillis;
     
     private Long checkEndTimeMillis;
+    
+    private Map<String, Object> tableCheckPositions;
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobItemProgressSwapper.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobItemProgressSwapper.java
index 9f4dabdd9ae..0e737997ae8 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobItemProgressSwapper.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobItemProgressSwapper.java
@@ -21,6 +21,9 @@ import 
org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.ConsistencyCheckJobItemProgress;
 import 
org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
 
+import java.util.LinkedHashMap;
+import java.util.Map;
+
 /**
  * YAML data check job item progress swapper.
  */
@@ -30,23 +33,24 @@ public final class 
YamlConsistencyCheckJobItemProgressSwapper implements YamlCon
     public YamlConsistencyCheckJobItemProgress swapToYamlConfiguration(final 
ConsistencyCheckJobItemProgress data) {
         YamlConsistencyCheckJobItemProgress result = new 
YamlConsistencyCheckJobItemProgress();
         result.setStatus(data.getStatus().name());
-        result.setRecordsCount(data.getRecordsCount());
+        result.setTableNames(data.getTableNames());
         result.setCheckedRecordsCount(data.getCheckedRecordsCount());
+        result.setRecordsCount(data.getRecordsCount());
         result.setCheckBeginTimeMillis(data.getCheckBeginTimeMillis());
         result.setCheckEndTimeMillis(data.getCheckEndTimeMillis());
-        result.setTableNames(data.getTableNames());
+        result.setTableCheckPositions(data.getTableCheckPositions());
         return result;
     }
     
     @Override
     public ConsistencyCheckJobItemProgress swapToObject(final 
YamlConsistencyCheckJobItemProgress yamlConfig) {
-        ConsistencyCheckJobItemProgress result = new 
ConsistencyCheckJobItemProgress();
+        Map<String, Object> tableCheckPositions = new LinkedHashMap<>();
+        if (null != yamlConfig.getTableCheckPositions()) {
+            tableCheckPositions.putAll(yamlConfig.getTableCheckPositions());
+        }
+        ConsistencyCheckJobItemProgress result = new 
ConsistencyCheckJobItemProgress(yamlConfig.getTableNames(), 
yamlConfig.getCheckedRecordsCount(),
+                yamlConfig.getRecordsCount(), 
yamlConfig.getCheckBeginTimeMillis(), yamlConfig.getCheckEndTimeMillis(), 
tableCheckPositions);
         result.setStatus(JobStatus.valueOf(yamlConfig.getStatus()));
-        result.setRecordsCount(yamlConfig.getRecordsCount());
-        result.setCheckedRecordsCount(yamlConfig.getCheckedRecordsCount());
-        result.setCheckBeginTimeMillis(yamlConfig.getCheckBeginTimeMillis());
-        result.setCheckEndTimeMillis(yamlConfig.getCheckEndTimeMillis());
-        result.setTableNames(yamlConfig.getTableNames());
         return result;
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/ReflectionUtil.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/ReflectionUtil.java
index 80a7bb279e2..215a6c7f514 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/ReflectionUtil.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/ReflectionUtil.java
@@ -92,4 +92,33 @@ public final class ReflectionUtil {
         method.setAccessible(true);
         return method.invoke(target, parameterValues);
     }
+    
+    /**
+     * Invoke method in parent class.
+     *
+     * @param target target object
+     * @param methodName method name
+     * @param parameterTypes parameter types
+     * @param parameterValues parameter values
+     * @return invoke method result.
+     * @throws NoSuchMethodException no such field exception
+     * @throws InvocationTargetException invocation target exception
+     * @throws IllegalAccessException illegal access exception
+     */
+    public static Object invokeMethodInParentClass(final Object target, final 
String methodName, final Class<?>[] parameterTypes,
+                                                   final Object[] 
parameterValues) throws NoSuchMethodException, InvocationTargetException, 
IllegalAccessException {
+        Method method = null;
+        for (Class<?> clazz = target.getClass(); clazz != Object.class; clazz 
= clazz.getSuperclass()) {
+            try {
+                method = clazz.getDeclaredMethod(methodName, parameterTypes);
+                method.setAccessible(true);
+                break;
+            } catch (final NoSuchMethodException ignored) {
+            }
+        }
+        if (null == method) {
+            throw new NoSuchMethodException("not find the method ");
+        }
+        return method.invoke(target, parameterValues);
+    }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
index ac8f1f43580..3f7810248ae 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
@@ -21,6 +21,7 @@ import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.config.job.ConsistencyCheckJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import 
org.apache.shardingsphere.data.pipeline.api.job.progress.ConsistencyCheckJobItemProgress;
 import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
 import 
org.apache.shardingsphere.data.pipeline.core.job.AbstractSimplePipelineJob;
 import 
org.apache.shardingsphere.data.pipeline.yaml.job.YamlConsistencyCheckJobConfigurationSwapper;
@@ -35,7 +36,8 @@ public final class ConsistencyCheckJob extends 
AbstractSimplePipelineJob {
     @Override
     protected ConsistencyCheckJobItemContext buildPipelineJobItemContext(final 
ShardingContext shardingContext) {
         ConsistencyCheckJobConfiguration jobConfig = new 
YamlConsistencyCheckJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
-        return new ConsistencyCheckJobItemContext(jobConfig, 
shardingContext.getShardingItem(), JobStatus.RUNNING);
+        ConsistencyCheckJobItemProgress jobItemProgress = 
(ConsistencyCheckJobItemProgress) 
getJobAPI().getJobItemProgress(jobConfig.getJobId(), 
shardingContext.getShardingItem());
+        return new ConsistencyCheckJobItemContext(jobConfig, 
shardingContext.getShardingItem(), JobStatus.RUNNING, jobItemProgress);
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
index 2ef7f1db3b0..60270e3bc0e 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
@@ -113,13 +113,9 @@ public final class ConsistencyCheckJobAPIImpl extends 
AbstractPipelineJobAPIImpl
     @Override
     public void persistJobItemProgress(final PipelineJobItemContext 
jobItemContext) {
         ConsistencyCheckJobItemContext context = 
(ConsistencyCheckJobItemContext) jobItemContext;
-        ConsistencyCheckJobItemProgress jobItemProgress = new 
ConsistencyCheckJobItemProgress();
+        ConsistencyCheckJobItemProgress jobItemProgress = new 
ConsistencyCheckJobItemProgress(String.join(",", context.getTableNames()),
+                context.getCheckedRecordsCount().get(), 
context.getRecordsCount(), context.getCheckBeginTimeMillis(), 
context.getCheckEndTimeMillis(), context.getTableCheckPositions());
         jobItemProgress.setStatus(context.getStatus());
-        
jobItemProgress.setCheckedRecordsCount(context.getCheckedRecordsCount().get());
-        jobItemProgress.setRecordsCount(context.getRecordsCount());
-        
jobItemProgress.setCheckBeginTimeMillis(context.getCheckBeginTimeMillis());
-        jobItemProgress.setCheckEndTimeMillis(context.getCheckEndTimeMillis());
-        jobItemProgress.setTableNames(String.join(",", 
context.getTableNames()));
         YamlConsistencyCheckJobItemProgress yamlJobProgress = 
swapper.swapToYamlConfiguration(jobItemProgress);
         
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(context.getJobId(),
 context.getShardingItem(), YamlEngine.marshal(yamlJobProgress));
     }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java
index 3a06075341c..3a47d8489a3 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java
@@ -23,11 +23,15 @@ import 
org.apache.shardingsphere.data.pipeline.api.config.job.ConsistencyCheckJo
 import 
org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.api.context.PipelineProcessContext;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import 
org.apache.shardingsphere.data.pipeline.api.job.progress.ConsistencyCheckJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressUpdatedParameter;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
 
 import java.util.Collection;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -58,14 +62,20 @@ public final class ConsistencyCheckJobItemContext 
implements PipelineJobItemCont
     
     private Long checkEndTimeMillis;
     
+    private final Map<String, Object> tableCheckPositions = new 
ConcurrentHashMap<>();
+    
     private final ConsistencyCheckJobConfiguration jobConfig;
     
-    public ConsistencyCheckJobItemContext(final 
ConsistencyCheckJobConfiguration jobConfig, final int shardingItem, final 
JobStatus status) {
+    public ConsistencyCheckJobItemContext(final 
ConsistencyCheckJobConfiguration jobConfig, final int shardingItem, final 
JobStatus status, final ConsistencyCheckJobItemProgress jobItemProgress) {
         this.jobConfig = jobConfig;
         jobId = jobConfig.getJobId();
         this.shardingItem = shardingItem;
         this.status = status;
         checkBeginTimeMillis = System.currentTimeMillis();
+        if (null != jobItemProgress) {
+            
checkedRecordsCount.set(Optional.ofNullable(jobItemProgress.getCheckedRecordsCount()).orElse(0L));
+            
Optional.ofNullable(jobItemProgress.getTableCheckPositions()).ifPresent(tableCheckPositions::putAll);
+        }
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithmTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithmTest.java
index e5ae0a9635e..f0ab95487b4 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithmTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithmTest.java
@@ -35,6 +35,7 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Types;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Iterator;
 
 import static org.hamcrest.CoreMatchers.is;
@@ -60,7 +61,7 @@ public final class 
CRC32MatchDataConsistencyCalculateAlgorithmTest {
     public void setUp() throws SQLException {
         PipelineColumnMetaData uniqueKey = new PipelineColumnMetaData(1, "id", 
Types.INTEGER, "integer", false, true, true);
         parameter = new DataConsistencyCalculateParameter(pipelineDataSource, 
null,
-                "foo_tbl", Arrays.asList("foo_col", "bar_col"), "FIXTURE", 
"FIXTURE", uniqueKey);
+                "foo_tbl", Arrays.asList("foo_col", "bar_col"), "FIXTURE", 
"FIXTURE", uniqueKey, Collections.emptyMap());
         when(pipelineDataSource.getConnection()).thenReturn(connection);
     }
     
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/ReflectionUtilTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/ReflectionUtilTest.java
index 98c0b2901f1..e583c19bac6 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/ReflectionUtilTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/ReflectionUtilTest.java
@@ -49,9 +49,24 @@ public final class ReflectionUtilTest {
         assertThat(reflectionFixture.getValue(), is("new_value"));
     }
     
+    @Test
+    public void assertInvokeParentMethod() throws Exception {
+        ReflectionFixture reflectionFixture = new ReflectionFixture();
+        ReflectionUtil.invokeMethodInParentClass(reflectionFixture, 
"setParentValue", new Class[]{String.class}, new Object[]{"parent_value"});
+        assertThat(reflectionFixture.getParentValue(), is("parent_value"));
+    }
+    
+    @NoArgsConstructor
+    private static class ReflectionParentFixture {
+        
+        @Getter
+        @Setter
+        private String parentValue;
+    }
+    
     @AllArgsConstructor
     @NoArgsConstructor
-    private static final class ReflectionFixture {
+    private static final class ReflectionFixture extends 
ReflectionParentFixture {
         
         @Getter
         @Setter(AccessLevel.PRIVATE)
diff --git 
a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
 
b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
index b07b5cf2296..21fb5e91558 100644
--- 
a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
+++ 
b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
@@ -165,6 +165,7 @@ public abstract class AbstractMigrationITCase extends 
BaseITCase {
     
     protected void assertCheckMigrationSuccess(final String jobId, final 
String algorithmType) throws SQLException {
         proxyExecuteWithLog(String.format("CHECK MIGRATION '%s' BY TYPE 
(NAME='%s')", jobId, algorithmType), 0);
+        // TODO Need to add after the stop then to start, can continue the 
consistency check from the previous progress
         List<Map<String, Object>> resultList = Collections.emptyList();
         for (int i = 0; i < 10; i++) {
             resultList = queryForListWithLog(String.format("SHOW MIGRATION 
CHECK STATUS '%s'", jobId));
diff --git 
a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationIT.java
 
b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationIT.java
index 1d9751ab0b2..163322cdb46 100644
--- 
a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationIT.java
+++ 
b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationIT.java
@@ -91,12 +91,7 @@ public class TextPrimaryKeyMigrationIT extends 
AbstractMigrationITCase {
         String jobId = listJobId().get(0);
         sourceExecuteWithLog(String.format("INSERT INTO %s 
(order_id,user_id,status) VALUES (%s, %s, '%s')", getSourceTableOrderName(), 
"1000000000", 1, "afterStop"));
         waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", 
jobId));
-        // TODO The ordering of primary or unique keys for text types is 
different, but can't reproduce now
-        if (DatabaseTypeUtil.isMySQL(getDatabaseType())) {
-            assertCheckMigrationSuccess(jobId, "DATA_MATCH");
-        } else {
-            assertCheckMigrationSuccess(jobId, "DATA_MATCH");
-        }
+        assertCheckMigrationSuccess(jobId, "DATA_MATCH");
         if (ENV.getItEnvType() == ITEnvTypeEnum.DOCKER) {
             commitMigrationByJobId(jobId);
             List<String> lastJobIds = listJobId();
diff --git 
a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
 
b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
index ef276fc0390..9f6c2d5f2b8 100644
--- 
a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
+++ 
b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
@@ -154,7 +154,7 @@ public final class MigrationJobAPIImplTest {
         DataConsistencyCalculateAlgorithm calculateAlgorithm = 
jobAPI.buildDataConsistencyCalculateAlgorithm(jobConfig, "FIXTURE", null);
         ConsistencyCheckJobConfiguration checkJobConfig = 
mock(ConsistencyCheckJobConfiguration.class);
         when(checkJobConfig.getJobId()).thenReturn(jobConfig.getJobId() + "1");
-        ConsistencyCheckJobItemContext checkJobItemContext = new 
ConsistencyCheckJobItemContext(checkJobConfig, 0, JobStatus.RUNNING);
+        ConsistencyCheckJobItemContext checkJobItemContext = new 
ConsistencyCheckJobItemContext(checkJobConfig, 0, JobStatus.RUNNING, null);
         Map<String, DataConsistencyCheckResult> checkResultMap = 
jobAPI.dataConsistencyCheck(jobConfig, calculateAlgorithm, checkJobItemContext);
         assertThat(checkResultMap.size(), is(1));
         
assertTrue(checkResultMap.get("t_order").getCountCheckResult().isMatched());
diff --git 
a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithmTest.java
 
b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithmTest.java
new file mode 100644
index 00000000000..d6842a93b1c
--- /dev/null
+++ 
b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithmTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.data.pipeline.core.check.consistency.algorithm;
+
+import com.zaxxer.hikari.HikariDataSource;
+import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculateParameter;
+import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculatedResult;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
+import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil;
+import org.apache.shardingsphere.infra.database.type.dialect.H2DatabaseType;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.Collections;
+import java.util.Optional;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public final class DataMatchDataConsistencyCalculateAlgorithmTest {
+    
+    private PipelineDataSourceWrapper source;
+    
+    private PipelineDataSourceWrapper target;
+    
+    @Before
+    public void setUp() throws Exception {
+        source = new 
PipelineDataSourceWrapper(createHikariDataSource("source_ds"), new 
H2DatabaseType());
+        createTableAndInitData(source, "t_order_copy");
+        target = new 
PipelineDataSourceWrapper(createHikariDataSource("target_ds"), new 
H2DatabaseType());
+        createTableAndInitData(target, "t_order");
+    }
+    
+    private HikariDataSource createHikariDataSource(final String databaseName) 
{
+        HikariDataSource result = new HikariDataSource();
+        
result.setJdbcUrl(String.format("jdbc:h2:mem:%s;DATABASE_TO_UPPER=false;MODE=MySQL",
 databaseName));
+        result.setUsername("root");
+        result.setPassword("root");
+        result.setMaximumPoolSize(10);
+        result.setMinimumIdle(2);
+        result.setConnectionTimeout(15 * 1000);
+        result.setIdleTimeout(40 * 1000);
+        return result;
+    }
+    
+    private void createTableAndInitData(final PipelineDataSourceWrapper 
dataSource, final String tableName) throws SQLException {
+        try (Connection connection = dataSource.getConnection()) {
+            String sql = String.format("CREATE TABLE %s (order_id INT NOT 
NULL, user_id INT NOT NULL, status VARCHAR(45) NULL, PRIMARY KEY (order_id))", 
tableName);
+            connection.createStatement().execute(sql);
+            PreparedStatement preparedStatement = 
connection.prepareStatement(String.format("INSERT INTO %s (order_id, user_id, 
status) VALUES (?, ?, ?)", tableName));
+            for (int i = 0; i < 10; i++) {
+                preparedStatement.setInt(1, i + 1);
+                preparedStatement.setInt(2, i + 1);
+                preparedStatement.setString(3, "test");
+                preparedStatement.execute();
+            }
+        }
+    }
+    
+    @Test
+    public void assertCalculateFromBegin() throws NoSuchFieldException, 
IllegalAccessException {
+        DataMatchDataConsistencyCalculateAlgorithm calculateAlgorithm = new 
DataMatchDataConsistencyCalculateAlgorithm();
+        ReflectionUtil.setFieldValue(calculateAlgorithm, "chunkSize", 5);
+        DataConsistencyCalculateParameter sourceParameter = 
generateParameter(source, "t_order_copy", 0);
+        Optional<DataConsistencyCalculatedResult> sourceCalculateResult = 
calculateAlgorithm.calculateChunk(sourceParameter);
+        DataConsistencyCalculateParameter targetParameter = 
generateParameter(target, "t_order", 0);
+        Optional<DataConsistencyCalculatedResult> targetCalculateResult = 
calculateAlgorithm.calculateChunk(targetParameter);
+        assertTrue(sourceCalculateResult.isPresent());
+        assertTrue(targetCalculateResult.isPresent());
+        
assertTrue(sourceCalculateResult.get().getMaxUniqueKeyValue().isPresent());
+        
assertTrue(targetCalculateResult.get().getMaxUniqueKeyValue().isPresent());
+        assertThat(sourceCalculateResult.get().getMaxUniqueKeyValue().get(), 
is(targetCalculateResult.get().getMaxUniqueKeyValue().get()));
+        assertThat(targetCalculateResult.get().getMaxUniqueKeyValue().get(), 
is(5L));
+        assertEquals(sourceCalculateResult.get(), targetCalculateResult.get());
+    }
+    
+    @Test
+    public void assertCalculateFromMiddle() throws NoSuchFieldException, 
IllegalAccessException {
+        DataMatchDataConsistencyCalculateAlgorithm calculateAlgorithm = new 
DataMatchDataConsistencyCalculateAlgorithm();
+        ReflectionUtil.setFieldValue(calculateAlgorithm, "chunkSize", 5);
+        DataConsistencyCalculateParameter sourceParameter = 
generateParameter(source, "t_order_copy", 5);
+        Optional<DataConsistencyCalculatedResult> sourceCalculateResult = 
calculateAlgorithm.calculateChunk(sourceParameter);
+        DataConsistencyCalculateParameter targetParameter = 
generateParameter(target, "t_order", 5);
+        Optional<DataConsistencyCalculatedResult> targetCalculateResult = 
calculateAlgorithm.calculateChunk(targetParameter);
+        assertTrue(sourceCalculateResult.isPresent());
+        assertTrue(targetCalculateResult.isPresent());
+        
assertTrue(sourceCalculateResult.get().getMaxUniqueKeyValue().isPresent());
+        
assertTrue(targetCalculateResult.get().getMaxUniqueKeyValue().isPresent());
+        assertThat(sourceCalculateResult.get().getMaxUniqueKeyValue().get(), 
is(targetCalculateResult.get().getMaxUniqueKeyValue().get()));
+        assertThat(targetCalculateResult.get().getMaxUniqueKeyValue().get(), 
is(10L));
+        assertEquals(sourceCalculateResult.get(), targetCalculateResult.get());
+    }
+    
+    private DataConsistencyCalculateParameter generateParameter(final 
PipelineDataSourceWrapper dataSource, final String logicTableName, final Object 
dataCheckPositionValue) {
+        PipelineColumnMetaData uniqueKey = new PipelineColumnMetaData(1, 
"order_id", Types.INTEGER, "integer", false, true, true);
+        return new DataConsistencyCalculateParameter(dataSource, null, 
logicTableName, Collections.emptyList(),
+                "MySQL", "MySQL", uniqueKey, dataCheckPositionValue);
+    }
+    
+    @After
+    public void tearDown() throws Exception {
+        source.close();
+        target.close();
+    }
+}
diff --git 
a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureDataConsistencyCalculatedResult.java
 
b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureDataConsistencyCalculatedResult.java
index b61af786c4a..9d9d970be0a 100644
--- 
a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureDataConsistencyCalculatedResult.java
+++ 
b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureDataConsistencyCalculatedResult.java
@@ -22,10 +22,17 @@ import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculatedResult;
 
+import java.util.Optional;
+
 @RequiredArgsConstructor
 @EqualsAndHashCode
 @Getter
 public final class FixtureDataConsistencyCalculatedResult implements 
DataConsistencyCalculatedResult {
     
     private final int recordsCount;
+    
+    @Override
+    public Optional<Object> getMaxUniqueKeyValue() {
+        return Optional.empty();
+    }
 }
diff --git 
a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
 
b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
new file mode 100644
index 00000000000..22c07c2caf0
--- /dev/null
+++ 
b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
+
+import 
org.apache.shardingsphere.data.pipeline.api.config.job.ConsistencyCheckJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlConsistencyCheckJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
+import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil;
+import 
org.apache.shardingsphere.data.pipeline.yaml.job.YamlConsistencyCheckJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.yaml.job.YamlConsistencyCheckJobConfigurationSwapper;
+import org.apache.shardingsphere.elasticjob.api.ShardingContext;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+public final class ConsistencyCheckJobTest {
+    
+    @BeforeClass
+    public static void beforeClass() {
+        PipelineContextUtil.mockModeConfigAndContextManager();
+    }
+    
+    @Test
+    public void assertBuildPipelineJobItemContext() throws 
IllegalAccessException, InvocationTargetException, NoSuchMethodException {
+        YamlConsistencyCheckJobItemProgress jobItemProgress = new 
YamlConsistencyCheckJobItemProgress();
+        jobItemProgress.setStatus(JobStatus.RUNNING.name());
+        Map<String, Object> expectTableCheckPosition = new HashMap<>();
+        expectTableCheckPosition.put("t_order", 100);
+        jobItemProgress.setTableCheckPositions(expectTableCheckPosition);
+        String checkJobId = "j0201001";
+        
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(checkJobId,
 0, YamlEngine.marshal(jobItemProgress));
+        ConsistencyCheckJobConfiguration jobConfig = new 
ConsistencyCheckJobConfiguration(checkJobId, "", null, null);
+        YamlConsistencyCheckJobConfiguration yamlJobConfig = new 
YamlConsistencyCheckJobConfigurationSwapper().swapToYamlConfiguration(jobConfig);
+        ShardingContext shardingContext = new ShardingContext(checkJobId, "", 
1, YamlEngine.marshal(yamlJobConfig), 0, "");
+        ConsistencyCheckJob consistencyCheckJob = new ConsistencyCheckJob();
+        ReflectionUtil.invokeMethodInParentClass(consistencyCheckJob, 
"setJobId", new Class[]{String.class}, new Object[]{checkJobId});
+        ConsistencyCheckJobItemContext actualItemContext = 
consistencyCheckJob.buildPipelineJobItemContext(shardingContext);
+        assertThat(actualItemContext.getTableCheckPositions(), 
is(expectTableCheckPosition));
+    }
+}
diff --git 
a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyCheckerTest.java
 
b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyCheckerTest.java
index 1b008f63e67..c6f494a0fb5 100644
--- 
a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyCheckerTest.java
+++ 
b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyCheckerTest.java
@@ -67,7 +67,7 @@ public final class MigrationDataConsistencyCheckerTest {
     
     private ConsistencyCheckJobItemContext 
createConsistencyCheckJobItemConfig() {
         ConsistencyCheckJobConfiguration jobConfig = new 
ConsistencyCheckJobConfiguration("", "", "", new Properties());
-        return new ConsistencyCheckJobItemContext(jobConfig, 0, 
JobStatus.RUNNING);
+        return new ConsistencyCheckJobItemContext(jobConfig, 0, 
JobStatus.RUNNING, null);
     }
     
     private MigrationJobConfiguration createJobConfiguration() throws 
SQLException {

Reply via email to