This is an automated email from the ASF dual-hosted git repository.

azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a62ba2670f Create new TableDataConsistencyChecker for every table 
data consistency check (#28121)
0a62ba2670f is described below

commit 0a62ba2670f3eee6becf1e2aebaf1817c60282ee
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Wed Aug 16 17:51:31 2023 +0800

    Create new TableDataConsistencyChecker for every table data consistency 
check (#28121)
    
    * Add TableDataConsistencyCheckerFactory
    
    * Remove InventoryIncrementalJobAPI.buildTableDataConsistencyChecker
    
    * Refactor PipelineDataConsistencyChecker.check parameter;
    Refactor InventoryIncrementalJobAPI consistency check methods;
    Refactor PipelineDataConsistencyChecker cancellable;
    
    * Fix spotless
    
    * Update javadoc
---
 .../PipelineDataConsistencyChecker.java            |  9 +++--
 .../TableDataConsistencyCheckerFactory.java}       | 25 +++++++-----
 .../job/service/InventoryIncrementalJobAPI.java    | 26 +++++-------
 .../AbstractInventoryIncrementalJobAPIImpl.java    | 26 ------------
 .../TableDataConsistencyCheckerFactoryTest.java    | 46 ++++++++++++++++++++++
 .../data/pipeline/cdc/api/impl/CDCJobAPI.java      |  4 +-
 .../task/ConsistencyCheckTasksRunner.java          | 25 ++++++------
 .../migration/api/impl/MigrationJobAPI.java        |  4 +-
 .../MigrationDataConsistencyChecker.java           | 41 ++++++++++++++++---
 .../migration/api/impl/MigrationJobAPITest.java    | 12 +-----
 .../MigrationDataConsistencyCheckerTest.java       |  3 +-
 11 files changed, 131 insertions(+), 90 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/PipelineDataConsistencyChecker.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/PipelineDataConsistencyChecker.java
index 34060e109bc..8fc4972c04a 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/PipelineDataConsistencyChecker.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/PipelineDataConsistencyChecker.java
@@ -18,20 +18,21 @@
 package org.apache.shardingsphere.data.pipeline.core.consistencycheck;
 
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
 
 import java.util.Map;
+import java.util.Properties;
 
 /**
  * Pipeline data consistency checker.
  */
-public interface PipelineDataConsistencyChecker {
+public interface PipelineDataConsistencyChecker extends PipelineCancellable {
     
     /**
      * Data consistency check.
      *
-     * @param tableChecker table data consistency checker
+     * @param algorithmType algorithm type of {@link 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker}
+     * @param algorithmProps algorithm properties of {@link 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker}
      * @return check results. key is logic table name, value is check result.
      */
-    Map<String, TableDataConsistencyCheckResult> 
check(TableDataConsistencyChecker tableChecker);
+    Map<String, TableDataConsistencyCheckResult> check(String algorithmType, 
Properties algorithmProps);
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/PipelineDataConsistencyChecker.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableDataConsistencyCheckerFactory.java
similarity index 53%
copy from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/PipelineDataConsistencyChecker.java
copy to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableDataConsistencyCheckerFactory.java
index 34060e109bc..42cec80ad6a 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/PipelineDataConsistencyChecker.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableDataConsistencyCheckerFactory.java
@@ -15,23 +15,28 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.consistencycheck;
+package org.apache.shardingsphere.data.pipeline.core.consistencycheck.table;
 
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 
-import java.util.Map;
+import java.util.Properties;
 
 /**
- * Pipeline data consistency checker.
+ * Table data consistency checker factory.
  */
-public interface PipelineDataConsistencyChecker {
+@NoArgsConstructor(access = AccessLevel.NONE)
+public final class TableDataConsistencyCheckerFactory {
     
     /**
-     * Data consistency check.
+     * Build table data consistency checker.
      *
-     * @param tableChecker table data consistency checker
-     * @return check results. key is logic table name, value is check result.
+     * @param algorithmType algorithm type
+     * @param algorithmProps algorithm properties
+     * @return table data consistency checker
      */
-    Map<String, TableDataConsistencyCheckResult> 
check(TableDataConsistencyChecker tableChecker);
+    public static TableDataConsistencyChecker newInstance(final String 
algorithmType, final Properties algorithmProps) {
+        return TypedSPILoader.getService(TableDataConsistencyChecker.class, 
null == algorithmType ? "DATA_MATCH" : algorithmType, algorithmProps);
+    }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java
index 80e0d3818bc..523b9e89977 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java
@@ -19,27 +19,30 @@ package 
org.apache.shardingsphere.data.pipeline.core.job.service;
 
 import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.common.context.InventoryIncrementalProcessContext;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
 import 
org.apache.shardingsphere.data.pipeline.common.pojo.DataConsistencyCheckAlgorithmInfo;
 import 
org.apache.shardingsphere.data.pipeline.common.pojo.InventoryIncrementalJobItemInfo;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
 
 import java.sql.SQLException;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Properties;
 
 /**
  * Inventory incremental job API.
  */
 public interface InventoryIncrementalJobAPI extends PipelineJobAPI {
     
+    @Override
+    InventoryIncrementalProcessContext 
buildPipelineProcessContext(PipelineJobConfiguration pipelineJobConfig);
+    
     /**
      * Alter process configuration.
      *
@@ -99,24 +102,15 @@ public interface InventoryIncrementalJobAPI extends 
PipelineJobAPI {
     Collection<DataConsistencyCheckAlgorithmInfo> 
listDataConsistencyCheckAlgorithms();
     
     /**
-     * Build data consistency checker.
-     *
-     * @param algorithmType algorithm type
-     * @param algorithmProps algorithm properties
-     * @return calculate algorithm
-     */
-    TableDataConsistencyChecker buildTableDataConsistencyChecker(String 
algorithmType, Properties algorithmProps);
-    
-    /**
-     * Do data consistency check.
+     * Build pipeline data consistency checker.
      *
      * @param pipelineJobConfig job configuration
-     * @param tableChecker table data consistency checker
+     * @param processContext process context
      * @param progressContext consistency check job item progress context
-     * @return each logic table check result
+     * @return all logic tables check result
      */
-    Map<String, TableDataConsistencyCheckResult> 
dataConsistencyCheck(PipelineJobConfiguration pipelineJobConfig, 
TableDataConsistencyChecker tableChecker,
-                                                                      
ConsistencyCheckJobItemProgressContext progressContext);
+    PipelineDataConsistencyChecker 
buildPipelineDataConsistencyChecker(PipelineJobConfiguration pipelineJobConfig, 
InventoryIncrementalProcessContext processContext,
+                                                                       
ConsistencyCheckJobItemProgressContext progressContext);
     
     /**
      * Aggregate data consistency check results.
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
index 7fdde595c69..8402612510c 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -24,7 +24,6 @@ import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConf
 import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfigurationUtils;
 import 
org.apache.shardingsphere.data.pipeline.common.context.InventoryIncrementalJobItemContext;
-import 
org.apache.shardingsphere.data.pipeline.common.context.InventoryIncrementalProcessContext;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
 import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
@@ -41,8 +40,6 @@ import 
org.apache.shardingsphere.data.pipeline.common.pojo.InventoryIncrementalJ
 import 
org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo;
 import 
org.apache.shardingsphere.data.pipeline.common.task.progress.IncrementalTaskProgress;
 import 
org.apache.shardingsphere.data.pipeline.common.task.progress.InventoryTaskProgress;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
@@ -54,7 +51,6 @@ import 
org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
 import org.apache.shardingsphere.infra.spi.annotation.SPIDescription;
-import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 
 import java.util.Collection;
@@ -65,7 +61,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
-import java.util.Properties;
 import java.util.stream.IntStream;
 
 /**
@@ -81,9 +76,6 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl 
extends AbstractPip
     
     private final YamlJobOffsetInfoSwapper jobOffsetInfoSwapper = new 
YamlJobOffsetInfoSwapper();
     
-    @Override
-    public abstract InventoryIncrementalProcessContext 
buildPipelineProcessContext(PipelineJobConfiguration pipelineJobConfig);
-    
     @Override
     public void alterProcessConfiguration(final PipelineContextKey contextKey, 
final PipelineProcessConfiguration processConfig) {
         // TODO check rateLimiter type match or not
@@ -220,24 +212,6 @@ public abstract class 
AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
         return supportedDatabaseTypes.isEmpty() ? 
ShardingSphereServiceLoader.getServiceInstances(DatabaseType.class) : 
supportedDatabaseTypes;
     }
     
-    @Override
-    public TableDataConsistencyChecker buildTableDataConsistencyChecker(final 
String algorithmType, final Properties algorithmProps) {
-        return TypedSPILoader.getService(TableDataConsistencyChecker.class, 
null == algorithmType ? "DATA_MATCH" : algorithmType, algorithmProps);
-    }
-    
-    @Override
-    public Map<String, TableDataConsistencyCheckResult> 
dataConsistencyCheck(final PipelineJobConfiguration jobConfig, final 
TableDataConsistencyChecker tableChecker,
-                                                                             
final ConsistencyCheckJobItemProgressContext progressContext) {
-        String jobId = jobConfig.getJobId();
-        PipelineDataConsistencyChecker dataConsistencyChecker = 
buildPipelineDataConsistencyChecker(jobConfig, 
buildPipelineProcessContext(jobConfig), progressContext);
-        Map<String, TableDataConsistencyCheckResult> result = 
dataConsistencyChecker.check(tableChecker);
-        log.info("job {} with check algorithm '{}' data consistency checker 
result {}", jobId, tableChecker.getType(), result);
-        return result;
-    }
-    
-    protected abstract PipelineDataConsistencyChecker 
buildPipelineDataConsistencyChecker(PipelineJobConfiguration pipelineJobConfig, 
InventoryIncrementalProcessContext processContext,
-                                                                               
           ConsistencyCheckJobItemProgressContext progressContext);
-    
     @Override
     public boolean aggregateDataConsistencyCheckResults(final String jobId, 
final Map<String, TableDataConsistencyCheckResult> checkResults) {
         if (checkResults.isEmpty()) {
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/TableDataConsistencyCheckerFactoryTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/TableDataConsistencyCheckerFactoryTest.java
new file mode 100644
index 00000000000..0db13255e7f
--- /dev/null
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/TableDataConsistencyCheckerFactoryTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator;
+
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.CRC32MatchTableDataConsistencyChecker;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.DataMatchTableDataConsistencyChecker;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyCheckerFactory;
+import org.junit.jupiter.api.Test;
+
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+class TableDataConsistencyCheckerFactoryTest {
+    
+    @Test
+    void assertNewInstanceTypeMatched() {
+        assertInstanceOf(DataMatchTableDataConsistencyChecker.class, 
TableDataConsistencyCheckerFactory.newInstance(null, new Properties()));
+        assertInstanceOf(DataMatchTableDataConsistencyChecker.class, 
TableDataConsistencyCheckerFactory.newInstance("DATA_MATCH", new Properties()));
+        assertInstanceOf(CRC32MatchTableDataConsistencyChecker.class, 
TableDataConsistencyCheckerFactory.newInstance("CRC32_MATCH", new 
Properties()));
+    }
+    
+    @Test
+    void assertNewInstancesDifferent() {
+        TableDataConsistencyChecker actual1 = 
TableDataConsistencyCheckerFactory.newInstance("DATA_MATCH", new Properties());
+        TableDataConsistencyChecker actual2 = 
TableDataConsistencyCheckerFactory.newInstance("DATA_MATCH", new Properties());
+        assertNotEquals(actual1, actual2);
+    }
+}
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 67a9e9a0f78..48bd8cb27fa 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -368,8 +368,8 @@ public final class CDCJobAPI extends 
AbstractInventoryIncrementalJobAPIImpl {
     }
     
     @Override
-    protected PipelineDataConsistencyChecker 
buildPipelineDataConsistencyChecker(final PipelineJobConfiguration 
pipelineJobConfig, final InventoryIncrementalProcessContext processContext,
-                                                                               
  final ConsistencyCheckJobItemProgressContext progressContext) {
+    public PipelineDataConsistencyChecker 
buildPipelineDataConsistencyChecker(final PipelineJobConfiguration 
pipelineJobConfig, final InventoryIncrementalProcessContext processContext,
+                                                                              
final ConsistencyCheckJobItemProgressContext progressContext) {
         throw new UnsupportedOperationException();
     }
     
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
index 84a4c2454cb..bfb36635f80 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
@@ -26,8 +26,8 @@ import 
org.apache.shardingsphere.data.pipeline.common.execute.ExecuteCallback;
 import org.apache.shardingsphere.data.pipeline.common.execute.ExecuteEngine;
 import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
@@ -62,7 +62,7 @@ public final class ConsistencyCheckTasksRunner implements 
PipelineTasksRunner {
     
     private final LifecycleExecutor checkExecutor;
     
-    private final AtomicReference<TableDataConsistencyChecker> 
tableDataConsistencyChecker = new AtomicReference<>();
+    private final AtomicReference<PipelineDataConsistencyChecker> 
consistencyChecker = new AtomicReference<>();
     
     public ConsistencyCheckTasksRunner(final ConsistencyCheckJobItemContext 
jobItemContext) {
         this.jobItemContext = jobItemContext;
@@ -96,12 +96,13 @@ public final class ConsistencyCheckTasksRunner implements 
PipelineTasksRunner {
             JobType jobType = PipelineJobIdUtils.parseJobType(parentJobId);
             InventoryIncrementalJobAPI jobAPI = (InventoryIncrementalJobAPI) 
TypedSPILoader.getService(PipelineJobAPI.class, jobType.getType());
             PipelineJobConfiguration parentJobConfig = 
jobAPI.getJobConfiguration(parentJobId);
-            TableDataConsistencyChecker tableChecker = 
jobAPI.buildTableDataConsistencyChecker(checkJobConfig.getAlgorithmTypeName(), 
checkJobConfig.getAlgorithmProps());
-            
ConsistencyCheckTasksRunner.this.tableDataConsistencyChecker.set(tableChecker);
-            Map<String, TableDataConsistencyCheckResult> 
dataConsistencyCheckResult;
             try {
-                dataConsistencyCheckResult = 
jobAPI.dataConsistencyCheck(parentJobConfig, tableChecker, 
jobItemContext.getProgressContext());
-                
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(parentJobId)).persistCheckJobResult(parentJobId,
 checkJobId, dataConsistencyCheckResult);
+                PipelineDataConsistencyChecker checker = 
jobAPI.buildPipelineDataConsistencyChecker(
+                        parentJobConfig, 
jobAPI.buildPipelineProcessContext(parentJobConfig), 
jobItemContext.getProgressContext());
+                consistencyChecker.set(checker);
+                Map<String, TableDataConsistencyCheckResult> checkResultMap = 
checker.check(checkJobConfig.getAlgorithmTypeName(), 
checkJobConfig.getAlgorithmProps());
+                log.info("job {} with check algorithm '{}' data consistency 
checker result: {}", parentJobId, checkJobConfig.getAlgorithmTypeName(), 
checkResultMap);
+                
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(parentJobId)).persistCheckJobResult(parentJobId,
 checkJobId, checkResultMap);
             } finally {
                 
jobItemContext.getProgressContext().setCheckEndTimeMillis(System.currentTimeMillis());
             }
@@ -109,9 +110,9 @@ public final class ConsistencyCheckTasksRunner implements 
PipelineTasksRunner {
         
         @Override
         protected void doStop() {
-            TableDataConsistencyChecker tableChecker = 
tableDataConsistencyChecker.get();
-            if (null != tableChecker) {
-                tableChecker.cancel();
+            PipelineDataConsistencyChecker checker = consistencyChecker.get();
+            if (null != checker) {
+                checker.cancel();
             }
         }
     }
@@ -128,8 +129,8 @@ public final class ConsistencyCheckTasksRunner implements 
PipelineTasksRunner {
         
         @Override
         public void onFailure(final Throwable throwable) {
-            TableDataConsistencyChecker tableChecker = 
tableDataConsistencyChecker.get();
-            if (null != tableChecker && tableChecker.isCanceling()) {
+            PipelineDataConsistencyChecker checker = consistencyChecker.get();
+            if (null != checker && checker.isCanceling()) {
                 log.info("onFailure, canceling, check job id: {}, parent job 
id: {}", checkJobId, parentJobId);
                 checkJobAPI.stop(checkJobId);
                 return;
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index 1995e6e13e6..134ca33febc 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -337,8 +337,8 @@ public final class MigrationJobAPI extends 
AbstractInventoryIncrementalJobAPIImp
     }
     
     @Override
-    protected PipelineDataConsistencyChecker 
buildPipelineDataConsistencyChecker(final PipelineJobConfiguration 
pipelineJobConfig, final InventoryIncrementalProcessContext processContext,
-                                                                               
  final ConsistencyCheckJobItemProgressContext progressContext) {
+    public PipelineDataConsistencyChecker 
buildPipelineDataConsistencyChecker(final PipelineJobConfiguration 
pipelineJobConfig, final InventoryIncrementalProcessContext processContext,
+                                                                              
final ConsistencyCheckJobItemProgressContext progressContext) {
         return new MigrationDataConsistencyChecker((MigrationJobConfiguration) 
pipelineJobConfig, processContext, progressContext);
     }
     
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
index 05bd0db0d81..c8dbbc1f6eb 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
@@ -39,20 +39,25 @@ import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDat
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyCheckParameter;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyCheckerFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.data.UnsupportedPipelineDatabaseTypeException;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
+import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import org.apache.shardingsphere.infra.datanode.DataNode;
 import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
 
+import java.util.Collection;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Properties;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * Data consistency checker for migration job.
@@ -66,6 +71,8 @@ public final class MigrationDataConsistencyChecker implements 
PipelineDataConsis
     
     private final ConsistencyCheckJobItemProgressContext progressContext;
     
+    private final AtomicReference<TableDataConsistencyChecker> 
currentTableChecker = new AtomicReference<>();
+    
     public MigrationDataConsistencyChecker(final MigrationJobConfiguration 
jobConfig, final InventoryIncrementalProcessContext processContext,
                                            final 
ConsistencyCheckJobItemProgressContext progressContext) {
         this.jobConfig = jobConfig;
@@ -74,9 +81,10 @@ public final class MigrationDataConsistencyChecker 
implements PipelineDataConsis
     }
     
     @Override
-    public Map<String, TableDataConsistencyCheckResult> check(final 
TableDataConsistencyChecker tableChecker) {
-        verifyPipelineDatabaseType(tableChecker, 
jobConfig.getSources().values().iterator().next());
-        verifyPipelineDatabaseType(tableChecker, jobConfig.getTarget());
+    public Map<String, TableDataConsistencyCheckResult> check(final String 
algorithmType, final Properties algorithmProps) {
+        Collection<DatabaseType> supportedDatabaseTypes = 
TableDataConsistencyCheckerFactory.newInstance(algorithmType, 
algorithmProps).getSupportedDatabaseTypes();
+        verifyPipelineDatabaseType(supportedDatabaseTypes, 
jobConfig.getSources().values().iterator().next());
+        verifyPipelineDatabaseType(supportedDatabaseTypes, 
jobConfig.getTarget());
         List<String> sourceTableNames = new LinkedList<>();
         jobConfig.getJobShardingDataNodes().forEach(each -> 
each.getEntries().forEach(entry -> entry.getDataNodes()
                 .forEach(dataNode -> 
sourceTableNames.add(DataNodeUtils.formatWithSchema(dataNode)))));
@@ -87,7 +95,11 @@ public final class MigrationDataConsistencyChecker 
implements PipelineDataConsis
         try (PipelineDataSourceManager dataSourceManager = new 
DefaultPipelineDataSourceManager()) {
             AtomicBoolean checkFailed = new AtomicBoolean(false);
             for (JobDataNodeLine each : jobConfig.getJobShardingDataNodes()) {
-                each.getEntries().forEach(entry -> 
entry.getDataNodes().forEach(dataNode -> check(tableChecker, result, 
dataSourceManager, checkFailed, each, entry, dataNode)));
+                each.getEntries().forEach(entry -> 
entry.getDataNodes().forEach(dataNode -> {
+                    TableDataConsistencyChecker tableChecker = 
TableDataConsistencyCheckerFactory.newInstance(algorithmType, algorithmProps);
+                    currentTableChecker.set(tableChecker);
+                    check(tableChecker, result, dataSourceManager, 
checkFailed, each, entry, dataNode);
+                }));
             }
         }
         return result;
@@ -123,8 +135,8 @@ public final class MigrationDataConsistencyChecker 
implements PipelineDataConsis
         return tableChecker.checkSingleTableInventoryData(param);
     }
     
-    private void verifyPipelineDatabaseType(final TableDataConsistencyChecker 
tableChecker, final PipelineDataSourceConfiguration dataSourceConfig) {
-        
ShardingSpherePreconditions.checkState(tableChecker.getSupportedDatabaseTypes().contains(dataSourceConfig.getDatabaseType()),
+    private void verifyPipelineDatabaseType(final Collection<DatabaseType> 
supportedDatabaseTypes, final PipelineDataSourceConfiguration dataSourceConfig) 
{
+        
ShardingSpherePreconditions.checkState(supportedDatabaseTypes.contains(dataSourceConfig.getDatabaseType()),
                 () -> new 
UnsupportedPipelineDatabaseTypeException(dataSourceConfig.getDatabaseType()));
     }
     
@@ -132,4 +144,21 @@ public final class MigrationDataConsistencyChecker 
implements PipelineDataConsis
         Map<Integer, InventoryIncrementalJobItemProgress> jobProgress = new 
MigrationJobAPI().getJobProgress(jobConfig);
         return 
jobProgress.values().stream().filter(Objects::nonNull).mapToLong(InventoryIncrementalJobItemProgress::getProcessedRecordsCount).sum();
     }
+    
+    @Override
+    public void cancel() {
+        TableDataConsistencyChecker tableChecker = currentTableChecker.get();
+        if (null != tableChecker) {
+            tableChecker.cancel();
+        }
+    }
+    
+    @Override
+    public boolean isCanceling() {
+        TableDataConsistencyChecker tableChecker = currentTableChecker.get();
+        if (null != tableChecker) {
+            return tableChecker.isCanceling();
+        }
+        return false;
+    }
 }
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
index 4598b363cc5..39418560e4f 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
@@ -32,7 +32,6 @@ import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.Consistency
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyContentCheckResult;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCountCheckResult;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
@@ -79,7 +78,6 @@ import java.util.stream.Stream;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -172,20 +170,14 @@ class MigrationJobAPITest {
         assertThat(jobProgressMap.size(), is(1));
     }
     
-    @Test
-    void assertBuildTableDataConsistencyCheckerWithNullType() {
-        TableDataConsistencyChecker actual = 
jobAPI.buildTableDataConsistencyChecker(null, null);
-        assertInstanceOf(TableDataConsistencyChecker.class, actual);
-    }
-    
     @Test
     void assertDataConsistencyCheck() {
         MigrationJobConfiguration jobConfig = 
JobConfigurationBuilder.createJobConfiguration();
         initTableData(jobConfig);
         Optional<String> jobId = jobAPI.start(jobConfig);
         assertTrue(jobId.isPresent());
-        TableDataConsistencyChecker actual = 
jobAPI.buildTableDataConsistencyChecker("FIXTURE", null);
-        Map<String, TableDataConsistencyCheckResult> checkResultMap = 
jobAPI.dataConsistencyCheck(jobConfig, actual, new 
ConsistencyCheckJobItemProgressContext(jobId.get(), 0));
+        Map<String, TableDataConsistencyCheckResult> checkResultMap = 
jobAPI.buildPipelineDataConsistencyChecker(
+                jobConfig, jobAPI.buildPipelineProcessContext(jobConfig), new 
ConsistencyCheckJobItemProgressContext(jobId.get(), 0)).check("FIXTURE", null);
         assertThat(checkResultMap.size(), is(1));
         String checkKey = "ds_0.t_order";
         
assertTrue(checkResultMap.get(checkKey).getCountCheckResult().isMatched());
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
index 7d02e7b525e..5821727cf9c 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
@@ -32,7 +32,6 @@ import 
org.apache.shardingsphere.data.pipeline.scenario.migration.context.Migrat
 import 
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
-import 
org.apache.shardingsphere.test.it.data.pipeline.core.fixture.TableDataConsistencyCheckerFixture;
 import 
org.apache.shardingsphere.test.it.data.pipeline.core.util.JobConfigurationBuilder;
 import 
org.apache.shardingsphere.test.it.data.pipeline.core.util.PipelineContextUtils;
 import org.junit.jupiter.api.BeforeAll;
@@ -65,7 +64,7 @@ class MigrationDataConsistencyCheckerTest {
         
governanceRepositoryAPI.persist(String.format("/pipeline/jobs/%s/config", 
jobConfig.getJobId()), YamlEngine.marshal(jobConfigurationPOJO));
         governanceRepositoryAPI.persistJobItemProgress(jobConfig.getJobId(), 
0, "");
         Map<String, TableDataConsistencyCheckResult> actual = new 
MigrationDataConsistencyChecker(jobConfig, new 
MigrationProcessContext(jobConfig.getJobId(), null),
-                createConsistencyCheckJobItemProgressContext()).check(new 
TableDataConsistencyCheckerFixture());
+                
createConsistencyCheckJobItemProgressContext()).check("FIXTURE", null);
         String checkKey = "ds_0.t_order";
         assertTrue(actual.get(checkKey).getCountCheckResult().isMatched());
         
assertThat(actual.get(checkKey).getCountCheckResult().getSourceRecordsCount(), 
is(actual.get(checkKey).getCountCheckResult().getTargetRecordsCount()));


Reply via email to