This is an automated email from the ASF dual-hosted git repository.

azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 5dc8fbfebab Extract ConsistencyCheckJobItemProgressContext to decouple 
ConsistencyCheckJobItemContext (#22541)
5dc8fbfebab is described below

commit 5dc8fbfebab73b7fb368fc96f9a1a8d2f249b0b6
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Thu Dec 1 09:47:28 2022 +0800

    Extract ConsistencyCheckJobItemProgressContext to decouple 
ConsistencyCheckJobItemContext (#22541)
    
    * Remove checkJobItemContext from SingleTableInventoryDataConsistencyChecker
    
    * Extract ConsistencyCheckJobItemProgressContext
    
    * Replace ConsistencyCheckJobItemContext to 
ConsistencyCheckJobItemProgressContext in API and impl
    
    * Move core classes from scenario module to core module
    
    * Clean unused mock
    
    * Clean unused methods
---
 .../progress/ConsistencyCheckJobItemProgress.java  |  1 +
 .../core/api/InventoryIncrementalJobAPI.java       |  7 ++--
 .../AbstractInventoryIncrementalJobAPIImpl.java    |  8 ++---
 .../ConsistencyCheckJobItemProgressContext.java}   | 41 ++++------------------
 ...SingleTableInventoryDataConsistencyChecker.java |  7 ++--
 .../job/progress/PipelineJobProgressDetector.java  | 26 --------------
 .../ConsistencyCheckJobAPIImpl.java                |  6 ++--
 .../ConsistencyCheckJobItemContext.java            | 37 ++++---------------
 .../ConsistencyCheckTasksRunner.java               |  4 +--
 .../migration/MigrationDataConsistencyChecker.java | 14 ++++----
 .../scenario/migration/MigrationJobAPIImpl.java    |  6 ++--
 .../core/api/impl/MigrationJobAPIImplTest.java     | 10 ++----
 .../consistencycheck/ConsistencyCheckJobTest.java  |  2 +-
 .../MigrationDataConsistencyCheckerTest.java       | 12 +++----
 14 files changed, 47 insertions(+), 134 deletions(-)

diff --git 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/ConsistencyCheckJobItemProgress.java
 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/ConsistencyCheckJobItemProgress.java
index a559023fccd..fd580eea4ba 100644
--- 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/ConsistencyCheckJobItemProgress.java
+++ 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/ConsistencyCheckJobItemProgress.java
@@ -28,6 +28,7 @@ import java.util.Map;
 /**
  * Data consistency check job item progress.
  */
+// TODO move package
 @Getter
 @RequiredArgsConstructor
 @ToString
diff --git 
a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
similarity index 91%
rename from 
kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
rename to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
index adbf8429ca6..901360ef659 100644
--- 
a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.data.pipeline.core.api;
 import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
 import 
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
-import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobItemContext;
+import 
org.apache.shardingsphere.data.pipeline.core.check.consistency.ConsistencyCheckJobItemProgressContext;
 import 
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
 
 import java.util.Map;
@@ -57,12 +57,11 @@ public interface InventoryIncrementalJobAPI extends 
PipelineJobAPI {
      *
      * @param pipelineJobConfig job configuration
      * @param calculateAlgorithm calculate algorithm
-     * @param checkJobItemContext consistency check job item context
+     * @param progressContext consistency check job item progress context
      * @return each logic table check result
      */
-    // TODO do not depend on ConsistencyCheckJobItemContext
     Map<String, DataConsistencyCheckResult> 
dataConsistencyCheck(PipelineJobConfiguration pipelineJobConfig, 
DataConsistencyCalculateAlgorithm calculateAlgorithm,
-                                                                 
ConsistencyCheckJobItemContext checkJobItemContext);
+                                                                 
ConsistencyCheckJobItemProgressContext progressContext);
     
     /**
      * Aggregate data consistency check results.
diff --git 
a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
similarity index 97%
rename from 
kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
rename to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
index d5f76459eb1..51d34cf1bbc 100644
--- 
a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -34,6 +34,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.pojo.InventoryIncrementalJobI
 import 
org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.api.InventoryIncrementalJobAPI;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import 
org.apache.shardingsphere.data.pipeline.core.check.consistency.ConsistencyCheckJobItemProgressContext;
 import 
org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmChooser;
 import 
org.apache.shardingsphere.data.pipeline.core.config.process.PipelineProcessConfigurationUtil;
 import 
org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
@@ -42,7 +43,6 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlInvent
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlInventoryIncrementalJobItemProgressSwapper;
 import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
-import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithmFactory;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
@@ -189,16 +189,16 @@ public abstract class 
AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
     
     @Override
     public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final 
PipelineJobConfiguration jobConfig, final DataConsistencyCalculateAlgorithm 
calculateAlgorithm,
-                                                                        final 
ConsistencyCheckJobItemContext checkJobItemContext) {
+                                                                        final 
ConsistencyCheckJobItemProgressContext progressContext) {
         String jobId = jobConfig.getJobId();
-        PipelineDataConsistencyChecker dataConsistencyChecker = 
buildPipelineDataConsistencyChecker(jobConfig, 
buildPipelineProcessContext(jobConfig), checkJobItemContext);
+        PipelineDataConsistencyChecker dataConsistencyChecker = 
buildPipelineDataConsistencyChecker(jobConfig, 
buildPipelineProcessContext(jobConfig), progressContext);
         Map<String, DataConsistencyCheckResult> result = 
dataConsistencyChecker.check(calculateAlgorithm);
         log.info("job {} with check algorithm '{}' data consistency checker 
result {}", jobId, calculateAlgorithm.getType(), result);
         return result;
     }
     
     protected abstract PipelineDataConsistencyChecker 
buildPipelineDataConsistencyChecker(PipelineJobConfiguration pipelineJobConfig, 
InventoryIncrementalProcessContext processContext,
-                                                                               
           ConsistencyCheckJobItemContext checkJobItemContext);
+                                                                               
           ConsistencyCheckJobItemProgressContext progressContext);
     
     @Override
     public boolean aggregateDataConsistencyCheckResults(final String jobId, 
final Map<String, DataConsistencyCheckResult> checkResults) {
diff --git 
a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/ConsistencyCheckJobItemProgressContext.java
similarity index 54%
copy from 
kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java
copy to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/ConsistencyCheckJobItemProgressContext.java
index dd39f9bbed8..733f9285cab 100644
--- 
a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/ConsistencyCheckJobItemProgressContext.java
@@ -15,74 +15,45 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
+package org.apache.shardingsphere.data.pipeline.core.check.consistency;
 
 import lombok.Getter;
+import lombok.RequiredArgsConstructor;
 import lombok.Setter;
-import 
org.apache.shardingsphere.data.pipeline.api.config.job.ConsistencyCheckJobConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
-import 
org.apache.shardingsphere.data.pipeline.api.context.PipelineProcessContext;
-import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
-import 
org.apache.shardingsphere.data.pipeline.api.job.progress.ConsistencyCheckJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressUpdatedParameter;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
 
 import java.util.Collection;
 import java.util.Map;
-import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
- * Consistency check job item context.
+ * Consistency check job item progress context.
  */
+@RequiredArgsConstructor
 @Getter
 @Setter
-public final class ConsistencyCheckJobItemContext implements 
PipelineJobItemContext, PipelineJobProgressListener {
+public final class ConsistencyCheckJobItemProgressContext implements 
PipelineJobProgressListener {
     
     private final String jobId;
     
     private final int shardingItem;
     
-    private String dataSourceName;
-    
-    private volatile boolean stopping;
-    
-    private volatile JobStatus status;
-    
     private final Collection<String> tableNames = new CopyOnWriteArraySet<>();
     
     private volatile long recordsCount;
     
     private final AtomicLong checkedRecordsCount = new AtomicLong(0);
     
-    private final long checkBeginTimeMillis;
+    private final long checkBeginTimeMillis = System.currentTimeMillis();
     
     private volatile Long checkEndTimeMillis;
     
     private final Map<String, Object> tableCheckPositions = new 
ConcurrentHashMap<>();
     
-    private final ConsistencyCheckJobConfiguration jobConfig;
-    
-    public ConsistencyCheckJobItemContext(final 
ConsistencyCheckJobConfiguration jobConfig, final int shardingItem, final 
JobStatus status, final ConsistencyCheckJobItemProgress jobItemProgress) {
-        this.jobConfig = jobConfig;
-        jobId = jobConfig.getJobId();
-        this.shardingItem = shardingItem;
-        this.status = status;
-        checkBeginTimeMillis = System.currentTimeMillis();
-        if (null != jobItemProgress) {
-            
checkedRecordsCount.set(Optional.ofNullable(jobItemProgress.getCheckedRecordsCount()).orElse(0L));
-            
Optional.ofNullable(jobItemProgress.getTableCheckPositions()).ifPresent(tableCheckPositions::putAll);
-        }
-    }
-    
-    @Override
-    public PipelineProcessContext getJobProcessContext() {
-        throw new UnsupportedOperationException();
-    }
-    
     @Override
     public void onProgressUpdated(final PipelineJobProgressUpdatedParameter 
param) {
         checkedRecordsCount.addAndGet(param.getProcessedRecordsCount());
diff --git 
a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
similarity index 95%
rename from 
kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
rename to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
index ac296ecb225..060421536fd 100644
--- 
a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
@@ -33,7 +33,6 @@ import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumn
 import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
-import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import 
org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
@@ -75,7 +74,7 @@ public final class SingleTableInventoryDataConsistencyChecker 
{
     
     private final JobRateLimitAlgorithm readRateLimitAlgorithm;
     
-    private final ConsistencyCheckJobItemContext jobItemContext;
+    private final ConsistencyCheckJobItemProgressContext progressContext;
     
     /**
      * Data consistency check.
@@ -102,7 +101,7 @@ public final class 
SingleTableInventoryDataConsistencyChecker {
         PipelineTableMetaData tableMetaData = 
metaDataLoader.getTableMetaData(schemaName, sourceTableName);
         ShardingSpherePreconditions.checkNotNull(tableMetaData, () -> new 
PipelineTableDataConsistencyCheckLoadingFailedException(schemaName, 
sourceTableName));
         Collection<String> columnNames = tableMetaData.getColumnNames();
-        Map<String, Object> tableCheckPositions = 
jobItemContext.getTableCheckPositions();
+        Map<String, Object> tableCheckPositions = 
progressContext.getTableCheckPositions();
         DataConsistencyCalculateParameter sourceParam = buildParameter(
                 sourceDataSource, schemaName, sourceTableName, columnNames, 
sourceDatabaseType, targetDatabaseType, uniqueKey, 
tableCheckPositions.get(sourceTableName));
         String targetTableName = targetTable.getTableName().getOriginal();
@@ -134,7 +133,7 @@ public final class 
SingleTableInventoryDataConsistencyChecker {
             if (targetCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
                 tableCheckPositions.put(targetTableName, 
targetCalculatedResult.getMaxUniqueKeyValue().get());
             }
-            jobItemContext.onProgressUpdated(new 
PipelineJobProgressUpdatedParameter(sourceCalculatedResult.getRecordsCount()));
+            progressContext.onProgressUpdated(new 
PipelineJobProgressUpdatedParameter(sourceCalculatedResult.getRecordsCount()));
         }
         return new DataConsistencyCheckResult(new 
DataConsistencyCountCheckResult(sourceRecordsCount, targetRecordsCount), new 
DataConsistencyContentCheckResult(contentMatched));
     }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/PipelineJobProgressDetector.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/PipelineJobProgressDetector.java
index 011821c2f07..f583b736789 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/PipelineJobProgressDetector.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/PipelineJobProgressDetector.java
@@ -21,8 +21,6 @@ import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
-import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
-import 
org.apache.shardingsphere.data.pipeline.api.job.progress.PipelineJobItemProgress;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
 
 import java.util.Collection;
@@ -46,28 +44,4 @@ public final class PipelineJobProgressDetector {
         }
         return inventoryTasks.stream().allMatch(each -> 
each.getTaskProgress().getPosition() instanceof FinishedPosition);
     }
-    
-    /**
-     * Whether job is completed (successful or failed).
-     *
-     * @param jobShardingCount job sharding count
-     * @param jobItemProgresses job item progresses
-     * @return completed or not
-     */
-    public static boolean isJobCompleted(final int jobShardingCount, final 
Collection<? extends PipelineJobItemProgress> jobItemProgresses) {
-        return jobShardingCount == jobItemProgresses.size()
-                && jobItemProgresses.stream().allMatch(each -> null != each && 
!each.getStatus().isRunning());
-    }
-    
-    /**
-     * Whether job is successful.
-     *
-     * @param jobShardingCount job sharding count
-     * @param jobItemProgresses job item progresses
-     * @return completed or not
-     */
-    public static boolean isJobSuccessful(final int jobShardingCount, final 
Collection<? extends PipelineJobItemProgress> jobItemProgresses) {
-        return jobShardingCount == jobItemProgresses.size()
-                && jobItemProgresses.stream().allMatch(each -> null != each && 
JobStatus.FINISHED == each.getStatus());
-    }
 }
diff --git 
a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
 
b/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
index 3f17b5b8433..77635c72f06 100644
--- 
a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
+++ 
b/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
@@ -39,6 +39,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
 import 
org.apache.shardingsphere.data.pipeline.core.api.InventoryIncrementalJobAPI;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.api.impl.AbstractPipelineJobAPIImpl;
+import 
org.apache.shardingsphere.data.pipeline.core.check.consistency.ConsistencyCheckJobItemProgressContext;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.ConsistencyCheckJobNotFoundException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.UncompletedConsistencyCheckJobExistsException;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
@@ -114,8 +115,9 @@ public final class ConsistencyCheckJobAPIImpl extends 
AbstractPipelineJobAPIImpl
     @Override
     public void persistJobItemProgress(final PipelineJobItemContext 
jobItemContext) {
         ConsistencyCheckJobItemContext context = 
(ConsistencyCheckJobItemContext) jobItemContext;
-        ConsistencyCheckJobItemProgress jobItemProgress = new 
ConsistencyCheckJobItemProgress(String.join(",", context.getTableNames()),
-                context.getCheckedRecordsCount().get(), 
context.getRecordsCount(), context.getCheckBeginTimeMillis(), 
context.getCheckEndTimeMillis(), context.getTableCheckPositions());
+        ConsistencyCheckJobItemProgressContext progressContext = 
context.getProgressContext();
+        ConsistencyCheckJobItemProgress jobItemProgress = new 
ConsistencyCheckJobItemProgress(String.join(",", 
progressContext.getTableNames()), 
progressContext.getCheckedRecordsCount().get(),
+                progressContext.getRecordsCount(), 
progressContext.getCheckBeginTimeMillis(), 
progressContext.getCheckEndTimeMillis(), 
progressContext.getTableCheckPositions());
         jobItemProgress.setStatus(context.getStatus());
         YamlConsistencyCheckJobItemProgress yamlJobProgress = 
swapper.swapToYamlConfiguration(jobItemProgress);
         
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(context.getJobId(),
 context.getShardingItem(), YamlEngine.marshal(yamlJobProgress));
diff --git 
a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java
 
b/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java
index dd39f9bbed8..501854a71ca 100644
--- 
a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java
+++ 
b/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java
@@ -24,23 +24,16 @@ import 
org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContex
 import 
org.apache.shardingsphere.data.pipeline.api.context.PipelineProcessContext;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.ConsistencyCheckJobItemProgress;
-import 
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
-import 
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressUpdatedParameter;
-import 
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
+import 
org.apache.shardingsphere.data.pipeline.core.check.consistency.ConsistencyCheckJobItemProgressContext;
 
-import java.util.Collection;
-import java.util.Map;
 import java.util.Optional;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Consistency check job item context.
  */
 @Getter
 @Setter
-public final class ConsistencyCheckJobItemContext implements 
PipelineJobItemContext, PipelineJobProgressListener {
+public final class ConsistencyCheckJobItemContext implements 
PipelineJobItemContext {
     
     private final String jobId;
     
@@ -52,29 +45,19 @@ public final class ConsistencyCheckJobItemContext 
implements PipelineJobItemCont
     
     private volatile JobStatus status;
     
-    private final Collection<String> tableNames = new CopyOnWriteArraySet<>();
-    
-    private volatile long recordsCount;
-    
-    private final AtomicLong checkedRecordsCount = new AtomicLong(0);
-    
-    private final long checkBeginTimeMillis;
-    
-    private volatile Long checkEndTimeMillis;
-    
-    private final Map<String, Object> tableCheckPositions = new 
ConcurrentHashMap<>();
-    
     private final ConsistencyCheckJobConfiguration jobConfig;
     
+    private final ConsistencyCheckJobItemProgressContext progressContext;
+    
     public ConsistencyCheckJobItemContext(final 
ConsistencyCheckJobConfiguration jobConfig, final int shardingItem, final 
JobStatus status, final ConsistencyCheckJobItemProgress jobItemProgress) {
         this.jobConfig = jobConfig;
         jobId = jobConfig.getJobId();
         this.shardingItem = shardingItem;
         this.status = status;
-        checkBeginTimeMillis = System.currentTimeMillis();
+        progressContext = new ConsistencyCheckJobItemProgressContext(jobId, 
shardingItem);
         if (null != jobItemProgress) {
-            
checkedRecordsCount.set(Optional.ofNullable(jobItemProgress.getCheckedRecordsCount()).orElse(0L));
-            
Optional.ofNullable(jobItemProgress.getTableCheckPositions()).ifPresent(tableCheckPositions::putAll);
+            
progressContext.getCheckedRecordsCount().set(Optional.ofNullable(jobItemProgress.getCheckedRecordsCount()).orElse(0L));
+            
Optional.ofNullable(jobItemProgress.getTableCheckPositions()).ifPresent(progressContext.getTableCheckPositions()::putAll);
         }
     }
     
@@ -82,10 +65,4 @@ public final class ConsistencyCheckJobItemContext implements 
PipelineJobItemCont
     public PipelineProcessContext getJobProcessContext() {
         throw new UnsupportedOperationException();
     }
-    
-    @Override
-    public void onProgressUpdated(final PipelineJobProgressUpdatedParameter 
param) {
-        checkedRecordsCount.addAndGet(param.getProcessedRecordsCount());
-        PipelineJobProgressPersistService.notifyPersist(jobId, shardingItem);
-    }
 }
diff --git 
a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckTasksRunner.java
 
b/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckTasksRunner.java
index 9859ff5bb12..dfab5bf2383 100644
--- 
a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckTasksRunner.java
+++ 
b/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckTasksRunner.java
@@ -101,10 +101,10 @@ public final class ConsistencyCheckTasksRunner implements 
PipelineTasksRunner {
             setCalculateAlgorithm(calculateAlgorithm);
             Map<String, DataConsistencyCheckResult> dataConsistencyCheckResult;
             try {
-                dataConsistencyCheckResult = 
jobAPI.dataConsistencyCheck(parentJobConfig, calculateAlgorithm, 
jobItemContext);
+                dataConsistencyCheckResult = 
jobAPI.dataConsistencyCheck(parentJobConfig, calculateAlgorithm, 
jobItemContext.getProgressContext());
                 
PipelineAPIFactory.getGovernanceRepositoryAPI().persistCheckJobResult(parentJobId,
 checkJobId, dataConsistencyCheckResult);
             } finally {
-                
jobItemContext.setCheckEndTimeMillis(System.currentTimeMillis());
+                
jobItemContext.getProgressContext().setCheckEndTimeMillis(System.currentTimeMillis());
             }
         }
         
diff --git 
a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
 
b/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
index 24b0eb902c9..506774046f5 100644
--- 
a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
+++ 
b/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
@@ -29,12 +29,12 @@ import 
org.apache.shardingsphere.data.pipeline.api.metadata.SchemaName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaTableName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.TableName;
 import 
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
+import 
org.apache.shardingsphere.data.pipeline.core.check.consistency.ConsistencyCheckJobItemProgressContext;
 import 
org.apache.shardingsphere.data.pipeline.core.check.consistency.SingleTableInventoryDataConsistencyChecker;
 import 
org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.data.UnsupportedPipelineDatabaseTypeException;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
-import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import 
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
@@ -59,15 +59,15 @@ public final class MigrationDataConsistencyChecker 
implements PipelineDataConsis
     
     private final TableNameSchemaNameMapping tableNameSchemaNameMapping;
     
-    private final ConsistencyCheckJobItemContext checkJobItemContext;
+    private final ConsistencyCheckJobItemProgressContext progressContext;
     
     public MigrationDataConsistencyChecker(final MigrationJobConfiguration 
jobConfig, final InventoryIncrementalProcessContext processContext,
-                                           final 
ConsistencyCheckJobItemContext checkJobItemContext) {
+                                           final 
ConsistencyCheckJobItemProgressContext progressContext) {
         this.jobConfig = jobConfig;
         readRateLimitAlgorithm = null != processContext ? 
processContext.getReadRateLimitAlgorithm() : null;
         tableNameSchemaNameMapping = new TableNameSchemaNameMapping(
                 
TableNameSchemaNameMapping.convert(jobConfig.getSourceSchemaName(), new 
HashSet<>(Arrays.asList(jobConfig.getSourceTableName(), 
jobConfig.getTargetTableName()))));
-        this.checkJobItemContext = checkJobItemContext;
+        this.progressContext = progressContext;
     }
     
     @Override
@@ -80,11 +80,11 @@ public final class MigrationDataConsistencyChecker 
implements PipelineDataConsis
         try (
                 PipelineDataSourceWrapper sourceDataSource = 
PipelineDataSourceFactory.newInstance(jobConfig.getSource());
                 PipelineDataSourceWrapper targetDataSource = 
PipelineDataSourceFactory.newInstance(jobConfig.getTarget())) {
-            checkJobItemContext.setRecordsCount(getRecordsCount());
-            
checkJobItemContext.getTableNames().add(jobConfig.getSourceTableName());
+            progressContext.setRecordsCount(getRecordsCount());
+            
progressContext.getTableNames().add(jobConfig.getSourceTableName());
             PipelineTableMetaDataLoader metaDataLoader = new 
StandardPipelineTableMetaDataLoader(sourceDataSource);
             SingleTableInventoryDataConsistencyChecker 
singleTableInventoryChecker = new 
SingleTableInventoryDataConsistencyChecker(jobConfig.getJobId(), 
sourceDataSource, targetDataSource,
-                    sourceTable, targetTable, jobConfig.getUniqueKeyColumn(), 
metaDataLoader, readRateLimitAlgorithm, checkJobItemContext);
+                    sourceTable, targetTable, jobConfig.getUniqueKeyColumn(), 
metaDataLoader, readRateLimitAlgorithm, progressContext);
             result.put(sourceTable.getTableName().getOriginal(), 
singleTableInventoryChecker.check(calculateAlgorithm));
         } catch (final SQLException ex) {
             throw new SQLWrapperException(ex);
diff --git 
a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
 
b/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index a9dd585ecc8..2a9c508c921 100644
--- 
a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++ 
b/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -53,6 +53,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.pojo.TableBasedPipelineJobInf
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.api.impl.AbstractInventoryIncrementalJobAPIImpl;
 import 
org.apache.shardingsphere.data.pipeline.core.api.impl.PipelineDataSourcePersistService;
+import 
org.apache.shardingsphere.data.pipeline.core.check.consistency.ConsistencyCheckJobItemProgressContext;
 import 
org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
@@ -63,7 +64,6 @@ import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTabl
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobAPIFactory;
-import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.spi.sharding.ShardingColumnsExtractorFactory;
 import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
 import 
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfiguration;
@@ -242,8 +242,8 @@ public final class MigrationJobAPIImpl extends 
AbstractInventoryIncrementalJobAP
     
     @Override
     protected PipelineDataConsistencyChecker 
buildPipelineDataConsistencyChecker(final PipelineJobConfiguration 
pipelineJobConfig, final InventoryIncrementalProcessContext processContext,
-                                                                               
  final ConsistencyCheckJobItemContext checkJobItemContext) {
-        return new MigrationDataConsistencyChecker((MigrationJobConfiguration) 
pipelineJobConfig, processContext, checkJobItemContext);
+                                                                               
  final ConsistencyCheckJobItemProgressContext progressContext) {
+        return new MigrationDataConsistencyChecker((MigrationJobConfiguration) 
pipelineJobConfig, processContext, progressContext);
     }
     
     @Override
diff --git 
a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
 
b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
index 4f5de0fcd58..75fe65e125b 100644
--- 
a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
+++ 
b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
@@ -21,7 +21,6 @@ import lombok.SneakyThrows;
 import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
 import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyContentCheckResult;
 import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
-import 
org.apache.shardingsphere.data.pipeline.api.config.job.ConsistencyCheckJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
@@ -31,11 +30,11 @@ import 
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncreme
 import 
org.apache.shardingsphere.data.pipeline.api.pojo.CreateMigrationJobParameter;
 import 
org.apache.shardingsphere.data.pipeline.api.pojo.InventoryIncrementalJobItemInfo;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import 
org.apache.shardingsphere.data.pipeline.core.check.consistency.ConsistencyCheckJobItemProgressContext;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.creator.PipelineDataSourceCreatorFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlInventoryIncrementalJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
-import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobItemContext;
@@ -70,8 +69,6 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
 public final class MigrationJobAPIImplTest {
@@ -156,10 +153,7 @@ public final class MigrationJobAPIImplTest {
         Optional<String> jobId = jobAPI.start(jobConfig);
         assertTrue(jobId.isPresent());
         DataConsistencyCalculateAlgorithm calculateAlgorithm = 
jobAPI.buildDataConsistencyCalculateAlgorithm(jobConfig, "FIXTURE", null);
-        ConsistencyCheckJobConfiguration checkJobConfig = 
mock(ConsistencyCheckJobConfiguration.class);
-        when(checkJobConfig.getJobId()).thenReturn(jobConfig.getJobId() + "1");
-        ConsistencyCheckJobItemContext checkJobItemContext = new 
ConsistencyCheckJobItemContext(checkJobConfig, 0, JobStatus.RUNNING, null);
-        Map<String, DataConsistencyCheckResult> checkResultMap = 
jobAPI.dataConsistencyCheck(jobConfig, calculateAlgorithm, checkJobItemContext);
+        Map<String, DataConsistencyCheckResult> checkResultMap = 
jobAPI.dataConsistencyCheck(jobConfig, calculateAlgorithm, new 
ConsistencyCheckJobItemProgressContext(jobId.get(), 0));
         assertThat(checkResultMap.size(), is(1));
         
assertTrue(checkResultMap.get("t_order").getCountCheckResult().isMatched());
         
assertThat(checkResultMap.get("t_order").getCountCheckResult().getTargetRecordsCount(),
 is(2L));
diff --git 
a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
 
b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
index 22c07c2caf0..539254e74e7 100644
--- 
a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
+++ 
b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
@@ -59,6 +59,6 @@ public final class ConsistencyCheckJobTest {
         ConsistencyCheckJob consistencyCheckJob = new ConsistencyCheckJob();
         ReflectionUtil.invokeMethodInParentClass(consistencyCheckJob, 
"setJobId", new Class[]{String.class}, new Object[]{checkJobId});
         ConsistencyCheckJobItemContext actualItemContext = 
consistencyCheckJob.buildPipelineJobItemContext(shardingContext);
-        assertThat(actualItemContext.getTableCheckPositions(), 
is(expectTableCheckPosition));
+        
assertThat(actualItemContext.getProgressContext().getTableCheckPositions(), 
is(expectTableCheckPosition));
     }
 }
diff --git 
a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyCheckerTest.java
 
b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyCheckerTest.java
index c6f494a0fb5..8fa1b6f2f24 100644
--- 
a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyCheckerTest.java
+++ 
b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyCheckerTest.java
@@ -18,16 +18,14 @@
 package org.apache.shardingsphere.data.pipeline.scenario.migration;
 
 import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
-import 
org.apache.shardingsphere.data.pipeline.api.config.job.ConsistencyCheckJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import 
org.apache.shardingsphere.data.pipeline.core.check.consistency.ConsistencyCheckJobItemProgressContext;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.fixture.DataConsistencyCalculateAlgorithmFixture;
 import 
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
-import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
@@ -38,7 +36,6 @@ import java.sql.Connection;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Map;
-import java.util.Properties;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -59,15 +56,14 @@ public final class MigrationDataConsistencyCheckerTest {
         
PipelineAPIFactory.getGovernanceRepositoryAPI().persist(String.format("/pipeline/jobs/%s/config",
 jobConfig.getJobId()), YamlEngine.marshal(jobConfigurationPOJO));
         
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobConfig.getJobId(),
 0, "");
         Map<String, DataConsistencyCheckResult> actual = new 
MigrationDataConsistencyChecker(jobConfig, new 
MigrationProcessContext(jobConfig.getJobId(), null),
-                createConsistencyCheckJobItemConfig()).check(new 
DataConsistencyCalculateAlgorithmFixture());
+                createConsistencyCheckJobItemProgressContext()).check(new 
DataConsistencyCalculateAlgorithmFixture());
         assertTrue(actual.get("t_order").getCountCheckResult().isMatched());
         
assertThat(actual.get("t_order").getCountCheckResult().getSourceRecordsCount(), 
is(actual.get("t_order").getCountCheckResult().getTargetRecordsCount()));
         assertTrue(actual.get("t_order").getContentCheckResult().isMatched());
     }
     
-    private ConsistencyCheckJobItemContext 
createConsistencyCheckJobItemConfig() {
-        ConsistencyCheckJobConfiguration jobConfig = new 
ConsistencyCheckJobConfiguration("", "", "", new Properties());
-        return new ConsistencyCheckJobItemContext(jobConfig, 0, 
JobStatus.RUNNING, null);
+    private ConsistencyCheckJobItemProgressContext 
createConsistencyCheckJobItemProgressContext() {
+        return new ConsistencyCheckJobItemProgressContext("", 0);
     }
     
     private MigrationJobConfiguration createJobConfiguration() throws 
SQLException {


Reply via email to