This is an automated email from the ASF dual-hosted git repository.

azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 18ce0c4b69b Review and improve pipeline code (#28706)
18ce0c4b69b is described below

commit 18ce0c4b69b35fd5a0783fd1d0e5f4fabbccef35
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Tue Oct 10 18:29:49 2023 +0800

    Review and improve pipeline code (#28706)
    
    * Handle PipelineJobNotFoundException on stopping job
    
    * Release inventory checker resource ASAP on error
    
    * Simplify TableDataConsistencyCheckResult
---
 .../result/TableDataConsistencyCheckResult.java    | 14 +++---
 .../TableDataConsistencyContentCheckResult.java    | 33 --------------
 .../TableDataConsistencyCountCheckResult.java      | 44 ------------------
 .../yaml/YamlTableDataConsistencyCheckResult.java  | 52 ++--------------------
 ...YamlTableDataConsistencyCheckResultSwapper.java | 17 +------
 .../table/MatchingTableInventoryChecker.java       | 19 +++-----
 .../core/job/AbstractSimplePipelineJob.java        |  8 +++-
 .../runner/InventoryIncrementalTasksRunner.java    | 11 ++++-
 .../data/pipeline/cdc/core/job/CDCJob.java         |  2 +-
 .../core/fixture/FixtureTableInventoryChecker.java |  4 +-
 .../service/GovernanceRepositoryAPIImplTest.java   |  6 +--
 .../api/impl/ConsistencyCheckJobAPITest.java       | 10 ++---
 .../migration/api/impl/MigrationJobAPITest.java    | 33 +++-----------
 .../MigrationDataConsistencyCheckerTest.java       |  7 +--
 14 files changed, 47 insertions(+), 213 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/TableDataConsistencyCheckResult.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/TableDataConsistencyCheckResult.java
index 7eec6445fbc..8d73495befa 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/TableDataConsistencyCheckResult.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/TableDataConsistencyCheckResult.java
@@ -27,21 +27,17 @@ import lombok.ToString;
 @ToString
 public final class TableDataConsistencyCheckResult {
     
-    private final TableDataConsistencyCountCheckResult countCheckResult;
-    
-    private final TableDataConsistencyContentCheckResult contentCheckResult;
+    private final boolean matched;
     
     private final TableDataConsistencyCheckIgnoredType ignoredType;
     
-    public TableDataConsistencyCheckResult(final 
TableDataConsistencyCountCheckResult countCheckResult, final 
TableDataConsistencyContentCheckResult contentCheckResult) {
-        this.countCheckResult = countCheckResult;
-        this.contentCheckResult = contentCheckResult;
+    public TableDataConsistencyCheckResult(final boolean matched) {
+        this.matched = matched;
         ignoredType = null;
     }
     
     public TableDataConsistencyCheckResult(final 
TableDataConsistencyCheckIgnoredType ignoredType) {
-        countCheckResult = new TableDataConsistencyCountCheckResult(-1, -1);
-        contentCheckResult = new TableDataConsistencyContentCheckResult(false);
+        matched = false;
         this.ignoredType = ignoredType;
     }
     
@@ -63,6 +59,6 @@ public final class TableDataConsistencyCheckResult {
         if (null != ignoredType) {
             return false;
         }
-        return countCheckResult.isMatched() && contentCheckResult.isMatched();
+        return matched;
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/TableDataConsistencyContentCheckResult.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/TableDataConsistencyContentCheckResult.java
deleted file mode 100644
index 3995f5ad4b6..00000000000
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/TableDataConsistencyContentCheckResult.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.consistencycheck.result;
-
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import lombok.ToString;
-
-/**
- * Table data consistency content check result.
- */
-@RequiredArgsConstructor
-@Getter
-@ToString
-public final class TableDataConsistencyContentCheckResult {
-    
-    private final boolean matched;
-}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/TableDataConsistencyCountCheckResult.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/TableDataConsistencyCountCheckResult.java
deleted file mode 100644
index aaf2b5ef02c..00000000000
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/TableDataConsistencyCountCheckResult.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.consistencycheck.result;
-
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import lombok.ToString;
-
-/**
- * Table data consistency count check result.
- */
-@RequiredArgsConstructor
-@Getter
-@ToString
-public final class TableDataConsistencyCountCheckResult {
-    
-    private final long sourceRecordsCount;
-    
-    private final long targetRecordsCount;
-    
-    /**
-     * Is matched.
-     *
-     * @return true if records count equals between source and target
-     */
-    public boolean isMatched() {
-        return sourceRecordsCount == targetRecordsCount && sourceRecordsCount 
>= 0;
-    }
-}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/yaml/YamlTableDataConsistencyCheckResult.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/yaml/YamlTableDataConsistencyCheckResult.java
index 9a1e4174492..de1c849e345 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/yaml/YamlTableDataConsistencyCheckResult.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/yaml/YamlTableDataConsistencyCheckResult.java
@@ -17,7 +17,6 @@
 
 package 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml;
 
-import lombok.AllArgsConstructor;
 import lombok.Getter;
 import lombok.NoArgsConstructor;
 import lombok.Setter;
@@ -31,56 +30,11 @@ import 
org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
 @Setter
 public final class YamlTableDataConsistencyCheckResult implements 
YamlConfiguration {
     
-    private YamlTableDataConsistencyCountCheckResult countCheckResult;
-    
-    private YamlTableDataConsistencyContentCheckResult contentCheckResult;
+    private boolean matched;
     
     private String ignoredType;
     
-    public YamlTableDataConsistencyCheckResult(final 
YamlTableDataConsistencyCountCheckResult countCheckResult, final 
YamlTableDataConsistencyContentCheckResult contentCheckResult) {
-        this.countCheckResult = countCheckResult;
-        this.contentCheckResult = contentCheckResult;
-    }
-    
-    /**
-     * YAML table data consistency count result.
-     */
-    @Getter
-    @Setter
-    public static class YamlTableDataConsistencyCountCheckResult implements 
YamlConfiguration {
-        
-        private long sourceRecordsCount;
-        
-        private long targetRecordsCount;
-        
-        /**
-         * Add source records count.
-         *
-         * @param delta delta count
-         */
-        public void addSourceRecordsCount(final long delta) {
-            sourceRecordsCount += delta;
-        }
-        
-        /**
-         * Add target records count.
-         *
-         * @param delta delta count
-         */
-        public void addTargetRecordsCount(final long delta) {
-            targetRecordsCount += delta;
-        }
-    }
-    
-    /**
-     * YAML table data consistency content result.
-     */
-    @NoArgsConstructor
-    @AllArgsConstructor
-    @Getter
-    @Setter
-    public static class YamlTableDataConsistencyContentCheckResult implements 
YamlConfiguration {
-        
-        private boolean matched;
+    public YamlTableDataConsistencyCheckResult(final boolean matched) {
+        this.matched = matched;
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/yaml/YamlTableDataConsistencyCheckResultSwapper.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/yaml/YamlTableDataConsistencyCheckResultSwapper.java
index 4003f101813..cd07e8d48c5 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/yaml/YamlTableDataConsistencyCheckResultSwapper.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/yaml/YamlTableDataConsistencyCheckResultSwapper.java
@@ -20,10 +20,6 @@ package 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yam
 import com.google.common.base.Strings;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckIgnoredType;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyContentCheckResult;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCountCheckResult;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResult.YamlTableDataConsistencyContentCheckResult;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResult.YamlTableDataConsistencyCountCheckResult;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import 
org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
 
@@ -39,13 +35,7 @@ public final class 
YamlTableDataConsistencyCheckResultSwapper implements YamlCon
             result.setIgnoredType(data.getIgnoredType().name());
             return result;
         }
-        YamlTableDataConsistencyCountCheckResult countCheckResult = new 
YamlTableDataConsistencyCountCheckResult();
-        
countCheckResult.setSourceRecordsCount(data.getCountCheckResult().getSourceRecordsCount());
-        
countCheckResult.setTargetRecordsCount(data.getCountCheckResult().getTargetRecordsCount());
-        result.setCountCheckResult(countCheckResult);
-        YamlTableDataConsistencyContentCheckResult contentCheckResult = new 
YamlTableDataConsistencyContentCheckResult();
-        
contentCheckResult.setMatched(data.getContentCheckResult().isMatched());
-        result.setContentCheckResult(contentCheckResult);
+        result.setMatched(data.isMatched());
         return result;
     }
     
@@ -57,10 +47,7 @@ public final class 
YamlTableDataConsistencyCheckResultSwapper implements YamlCon
         if (!Strings.isNullOrEmpty(yamlConfig.getIgnoredType())) {
             return new 
TableDataConsistencyCheckResult(TableDataConsistencyCheckIgnoredType.valueOf(yamlConfig.getIgnoredType()));
         }
-        YamlTableDataConsistencyCountCheckResult yamlCountCheck = 
yamlConfig.getCountCheckResult();
-        TableDataConsistencyCountCheckResult countCheckResult = new 
TableDataConsistencyCountCheckResult(yamlCountCheck.getSourceRecordsCount(), 
yamlCountCheck.getTargetRecordsCount());
-        TableDataConsistencyContentCheckResult contentCheckResult = new 
TableDataConsistencyContentCheckResult(yamlConfig.getContentCheckResult().isMatched());
-        return new TableDataConsistencyCheckResult(countCheckResult, 
contentCheckResult);
+        return new TableDataConsistencyCheckResult(yamlConfig.isMatched());
     }
     
     /**
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
index 8e4734aabc8..6717a17f209 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
@@ -24,8 +24,6 @@ import 
org.apache.shardingsphere.data.pipeline.common.job.progress.listener.Pipe
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.SingleTableInventoryCalculatedResult;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResult;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResult.YamlTableDataConsistencyContentCheckResult;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResult.YamlTableDataConsistencyCountCheckResult;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResultSwapper;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.SingleTableInventoryCalculateParameter;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.SingleTableInventoryCalculator;
@@ -78,9 +76,9 @@ public abstract class MatchingTableInventoryChecker 
implements TableInventoryChe
         calculators.add(sourceCalculator);
         SingleTableInventoryCalculator targetCalculator = 
buildSingleTableInventoryCalculator();
         calculators.add(targetCalculator);
-        Iterator<SingleTableInventoryCalculatedResult> sourceCalculatedResults 
= waitFuture(executor.submit(() -> 
sourceCalculator.calculate(sourceParam))).iterator();
-        Iterator<SingleTableInventoryCalculatedResult> targetCalculatedResults 
= waitFuture(executor.submit(() -> 
targetCalculator.calculate(targetParam))).iterator();
         try {
+            Iterator<SingleTableInventoryCalculatedResult> 
sourceCalculatedResults = waitFuture(executor.submit(() -> 
sourceCalculator.calculate(sourceParam))).iterator();
+            Iterator<SingleTableInventoryCalculatedResult> 
targetCalculatedResults = waitFuture(executor.submit(() -> 
targetCalculator.calculate(targetParam))).iterator();
             return checkSingleTableInventoryData(sourceCalculatedResults, 
targetCalculatedResults, param, executor);
         } finally {
             QuietlyCloser.close(sourceParam.getCalculationContext());
@@ -93,17 +91,15 @@ public abstract class MatchingTableInventoryChecker 
implements TableInventoryChe
     private TableDataConsistencyCheckResult 
checkSingleTableInventoryData(final 
Iterator<SingleTableInventoryCalculatedResult> sourceCalculatedResults,
                                                                           
final Iterator<SingleTableInventoryCalculatedResult> targetCalculatedResults,
                                                                           
final TableInventoryCheckParameter param, final ThreadPoolExecutor executor) {
-        YamlTableDataConsistencyCheckResult checkResult = new 
YamlTableDataConsistencyCheckResult(new 
YamlTableDataConsistencyCountCheckResult(), new 
YamlTableDataConsistencyContentCheckResult(true));
+        YamlTableDataConsistencyCheckResult checkResult = new 
YamlTableDataConsistencyCheckResult(true);
         while (sourceCalculatedResults.hasNext() && 
targetCalculatedResults.hasNext()) {
             if (null != param.getReadRateLimitAlgorithm()) {
                 
param.getReadRateLimitAlgorithm().intercept(JobOperationType.SELECT, 1);
             }
             SingleTableInventoryCalculatedResult sourceCalculatedResult = 
waitFuture(executor.submit(sourceCalculatedResults::next));
             SingleTableInventoryCalculatedResult targetCalculatedResult = 
waitFuture(executor.submit(targetCalculatedResults::next));
-            
checkResult.getCountCheckResult().addSourceRecordsCount(sourceCalculatedResult.getRecordsCount());
-            
checkResult.getCountCheckResult().addTargetRecordsCount(targetCalculatedResult.getRecordsCount());
             if (!Objects.equals(sourceCalculatedResult, 
targetCalculatedResult)) {
-                checkResult.getContentCheckResult().setMatched(false);
+                checkResult.setMatched(false);
                 log.info("content matched false, jobId={}, sourceTable={}, 
targetTable={}, uniqueKeys={}", param.getJobId(), param.getSourceTable(), 
param.getTargetTable(), param.getUniqueKeys());
                 break;
             }
@@ -116,14 +112,11 @@ public abstract class MatchingTableInventoryChecker 
implements TableInventoryChe
             param.getProgressContext().onProgressUpdated(new 
PipelineJobProgressUpdatedParameter(sourceCalculatedResult.getRecordsCount()));
         }
         if (sourceCalculatedResults.hasNext()) {
-            // TODO Refactor SingleTableInventoryCalculatedResult to represent 
inaccurate number
-            checkResult.getCountCheckResult().addSourceRecordsCount(1);
-            checkResult.getContentCheckResult().setMatched(false);
+            checkResult.setMatched(false);
             return new 
YamlTableDataConsistencyCheckResultSwapper().swapToObject(checkResult);
         }
         if (targetCalculatedResults.hasNext()) {
-            checkResult.getCountCheckResult().addTargetRecordsCount(1);
-            checkResult.getContentCheckResult().setMatched(false);
+            checkResult.setMatched(false);
             return new 
YamlTableDataConsistencyCheckResultSwapper().swapToObject(checkResult);
         }
         return new 
YamlTableDataConsistencyCheckResultSwapper().swapToObject(checkResult);
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
index 3491f3882b1..fae85d03ad3 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.data.pipeline.core.job;
 
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
 import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
@@ -77,8 +78,11 @@ public abstract class AbstractSimplePipelineJob extends 
AbstractPipelineJob impl
     }
     
     private void processFailed(final String jobId, final int shardingItem, 
final Exception ex) {
-        log.error("job prepare failed, {}-{}", jobId, shardingItem, ex);
+        log.error("job execution failed, {}-{}", jobId, shardingItem, ex);
         getJobAPI().updateJobItemErrorMessage(jobId, shardingItem, ex);
-        getJobAPI().stop(jobId);
+        try {
+            getJobAPI().stop(jobId);
+        } catch (final PipelineJobNotFoundException ignored) {
+        }
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
index 08415b7ecba..ff3e9308f58 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
@@ -25,6 +25,7 @@ import 
org.apache.shardingsphere.data.pipeline.common.execute.ExecuteCallback;
 import org.apache.shardingsphere.data.pipeline.common.execute.ExecuteEngine;
 import 
org.apache.shardingsphere.data.pipeline.common.ingest.position.FinishedPosition;
 import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
 import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
@@ -142,7 +143,10 @@ public class InventoryIncrementalTasksRunner implements 
PipelineTasksRunner {
         log.error("onFailure, inventory task execute failed.", throwable);
         String jobId = jobItemContext.getJobId();
         jobAPI.updateJobItemErrorMessage(jobId, 
jobItemContext.getShardingItem(), throwable);
-        jobAPI.stop(jobId);
+        try {
+            jobAPI.stop(jobId);
+        } catch (final PipelineJobNotFoundException ignored) {
+        }
     }
     
     private final class InventoryTaskExecuteCallback implements 
ExecuteCallback {
@@ -174,7 +178,10 @@ public class InventoryIncrementalTasksRunner implements 
PipelineTasksRunner {
             log.error("onFailure, incremental task execute failed.", 
throwable);
             String jobId = jobItemContext.getJobId();
             jobAPI.updateJobItemErrorMessage(jobId, 
jobItemContext.getShardingItem(), throwable);
-            jobAPI.stop(jobId);
+            try {
+                jobAPI.stop(jobId);
+            } catch (final PipelineJobNotFoundException ignored) {
+            }
         }
     }
 }
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
index 7e28a6b3787..9858e2932e3 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
@@ -121,7 +121,7 @@ public final class CDCJob extends AbstractPipelineJob 
implements SimpleJob {
     }
     
     private void processFailed(final String jobId, final int shardingItem, 
final Exception ex) {
-        log.error("job prepare failed, {}-{}", jobId, shardingItem, ex);
+        log.error("job execution failed, {}-{}", jobId, shardingItem, ex);
         jobAPI.updateJobItemErrorMessage(jobId, shardingItem, ex);
         PipelineJobCenter.stop(jobId);
         jobAPI.updateJobConfigurationDisabled(jobId, true);
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureTableInventoryChecker.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureTableInventoryChecker.java
index 0a9e1841220..18ea7820581 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureTableInventoryChecker.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureTableInventoryChecker.java
@@ -18,8 +18,6 @@
 package org.apache.shardingsphere.test.it.data.pipeline.core.fixture;
 
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyContentCheckResult;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCountCheckResult;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryChecker;
 
 public final class FixtureTableInventoryChecker implements 
TableInventoryChecker {
@@ -35,6 +33,6 @@ public final class FixtureTableInventoryChecker implements 
TableInventoryChecker
     
     @Override
     public TableDataConsistencyCheckResult checkSingleTableInventoryData() {
-        return new TableDataConsistencyCheckResult(new 
TableDataConsistencyCountCheckResult(2, 2), new 
TableDataConsistencyContentCheckResult(true));
+        return new TableDataConsistencyCheckResult(true);
     }
 }
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java
index d2f8c19150f..9339b4ab88f 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java
@@ -23,8 +23,6 @@ import 
org.apache.shardingsphere.data.pipeline.common.constant.DataPipelineConst
 import 
org.apache.shardingsphere.data.pipeline.common.ingest.position.PlaceholderPosition;
 import 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyContentCheckResult;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCountCheckResult;
 import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
@@ -98,11 +96,11 @@ class GovernanceRepositoryAPIImplTest {
     void assertPersistJobCheckResult() {
         MigrationJobItemContext jobItemContext = mockJobItemContext();
         Map<String, TableDataConsistencyCheckResult> actual = new HashMap<>();
-        actual.put("test", new TableDataConsistencyCheckResult(new 
TableDataConsistencyCountCheckResult(1, 1), new 
TableDataConsistencyContentCheckResult(true)));
+        actual.put("test", new TableDataConsistencyCheckResult(true));
         
governanceRepositoryAPI.persistCheckJobResult(jobItemContext.getJobId(), 
"j02123", actual);
         Map<String, TableDataConsistencyCheckResult> checkResult = 
governanceRepositoryAPI.getCheckJobResult(jobItemContext.getJobId(), "j02123");
         assertThat(checkResult.size(), is(1));
-        
assertTrue(checkResult.get("test").getContentCheckResult().isMatched());
+        assertTrue(checkResult.get("test").isMatched());
     }
     
     @Test
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
index 962f3340821..2e49467926f 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
@@ -20,8 +20,6 @@ package 
org.apache.shardingsphere.test.it.data.pipeline.scenario.consistencychec
 import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyContentCheckResult;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCountCheckResult;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobId;
@@ -83,12 +81,11 @@ class ConsistencyCheckJobAPITest {
                 parentJobConfig.getSourceDatabaseType(), 
parentJobConfig.getTargetDatabaseType()));
         GovernanceRepositoryAPI governanceRepositoryAPI = 
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey());
         governanceRepositoryAPI.persistLatestCheckJobId(parentJobId, 
checkJobId);
-        Map<String, TableDataConsistencyCheckResult> expectedCheckResult = 
Collections.singletonMap("t_order", new TableDataConsistencyCheckResult(new 
TableDataConsistencyCountCheckResult(1, 1),
-                new TableDataConsistencyContentCheckResult(true)));
+        Map<String, TableDataConsistencyCheckResult> expectedCheckResult = 
Collections.singletonMap("t_order", new TableDataConsistencyCheckResult(true));
         governanceRepositoryAPI.persistCheckJobResult(parentJobId, checkJobId, 
expectedCheckResult);
         Map<String, TableDataConsistencyCheckResult> actualCheckResult = 
checkJobAPI.getLatestDataConsistencyCheckResult(parentJobId);
         assertThat(actualCheckResult.size(), is(expectedCheckResult.size()));
-        
assertThat(actualCheckResult.get("t_order").getCountCheckResult().isMatched(), 
is(expectedCheckResult.get("t_order").getContentCheckResult().isMatched()));
+        assertThat(actualCheckResult.get("t_order").isMatched(), 
is(expectedCheckResult.get("t_order").isMatched()));
     }
     
     @Test
@@ -103,8 +100,7 @@ class ConsistencyCheckJobAPITest {
             ConsistencyCheckJobItemContext checkJobItemContext = new 
ConsistencyCheckJobItemContext(
                     new ConsistencyCheckJobConfiguration(checkJobId, 
parentJobId, null, null, TypedSPILoader.getService(DatabaseType.class, "H2")), 
0, JobStatus.FINISHED, null);
             checkJobAPI.persistJobItemProgress(checkJobItemContext);
-            Map<String, TableDataConsistencyCheckResult> 
dataConsistencyCheckResult = Collections.singletonMap("t_order",
-                    new TableDataConsistencyCheckResult(new 
TableDataConsistencyCountCheckResult(0, 0), new 
TableDataConsistencyContentCheckResult(true)));
+            Map<String, TableDataConsistencyCheckResult> 
dataConsistencyCheckResult = Collections.singletonMap("t_order", new 
TableDataConsistencyCheckResult(true));
             repositoryAPI.persistCheckJobResult(parentJobId, checkJobId, 
dataConsistencyCheckResult);
             Optional<String> latestCheckJobId = 
repositoryAPI.getLatestCheckJobId(parentJobId);
             assertTrue(latestCheckJobId.isPresent());
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
index 96fc9eba5c8..9e2f22d7a96 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
@@ -30,8 +30,6 @@ import 
org.apache.shardingsphere.data.pipeline.common.pojo.InventoryIncrementalJ
 import 
org.apache.shardingsphere.data.pipeline.common.util.PipelineDistributedBarrier;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyContentCheckResult;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCountCheckResult;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
@@ -180,9 +178,8 @@ class MigrationJobAPITest {
                 jobConfig, jobAPI.buildPipelineProcessContext(jobConfig), new 
ConsistencyCheckJobItemProgressContext(jobId.get(), 0, "H2")).check("FIXTURE", 
null);
         assertThat(checkResultMap.size(), is(1));
         String checkKey = "t_order";
-        
assertTrue(checkResultMap.get(checkKey).getCountCheckResult().isMatched());
-        
assertThat(checkResultMap.get(checkKey).getCountCheckResult().getTargetRecordsCount(),
 is(2L));
-        
assertTrue(checkResultMap.get(checkKey).getContentCheckResult().isMatched());
+        assertTrue(checkResultMap.get(checkKey).isMatched());
+        assertTrue(checkResultMap.get(checkKey).isMatched());
     }
     
     @Test
@@ -191,34 +188,18 @@ class MigrationJobAPITest {
     }
     
     @Test
-    void assertAggregateDifferentCountDataConsistencyCheckResults() {
-        TableDataConsistencyCountCheckResult equalCountCheckResult = new 
TableDataConsistencyCountCheckResult(100, 100);
-        TableDataConsistencyCountCheckResult notEqualCountCheckResult = new 
TableDataConsistencyCountCheckResult(100, 95);
-        TableDataConsistencyContentCheckResult equalContentCheckResult = new 
TableDataConsistencyContentCheckResult(false);
+    void assertAggregateDifferentDataConsistencyCheckResults() {
         Map<String, TableDataConsistencyCheckResult> checkResults = new 
LinkedHashMap<>(2, 1F);
-        checkResults.put("foo_tbl", new 
TableDataConsistencyCheckResult(equalCountCheckResult, 
equalContentCheckResult));
-        checkResults.put("bar_tbl", new 
TableDataConsistencyCheckResult(notEqualCountCheckResult, 
equalContentCheckResult));
-        assertFalse(jobAPI.aggregateDataConsistencyCheckResults("foo_job", 
checkResults));
-    }
-    
-    @Test
-    void assertAggregateDifferentContentDataConsistencyCheckResults() {
-        TableDataConsistencyCountCheckResult equalCountCheckResult = new 
TableDataConsistencyCountCheckResult(100, 100);
-        TableDataConsistencyContentCheckResult equalContentCheckResult = new 
TableDataConsistencyContentCheckResult(true);
-        TableDataConsistencyContentCheckResult notEqualContentCheckResult = 
new TableDataConsistencyContentCheckResult(false);
-        Map<String, TableDataConsistencyCheckResult> checkResults = new 
LinkedHashMap<>(2, 1F);
-        checkResults.put("foo_tbl", new 
TableDataConsistencyCheckResult(equalCountCheckResult, 
equalContentCheckResult));
-        checkResults.put("bar_tbl", new 
TableDataConsistencyCheckResult(equalCountCheckResult, 
notEqualContentCheckResult));
+        checkResults.put("foo_tbl", new TableDataConsistencyCheckResult(true));
+        checkResults.put("bar_tbl", new 
TableDataConsistencyCheckResult(false));
         assertFalse(jobAPI.aggregateDataConsistencyCheckResults("foo_job", 
checkResults));
     }
     
     @Test
     void assertAggregateSameDataConsistencyCheckResults() {
-        TableDataConsistencyCountCheckResult equalCountCheckResult = new 
TableDataConsistencyCountCheckResult(100, 100);
-        TableDataConsistencyContentCheckResult equalContentCheckResult = new 
TableDataConsistencyContentCheckResult(true);
         Map<String, TableDataConsistencyCheckResult> checkResults = new 
LinkedHashMap<>(2, 1F);
-        checkResults.put("foo_tbl", new 
TableDataConsistencyCheckResult(equalCountCheckResult, 
equalContentCheckResult));
-        checkResults.put("bar_tbl", new 
TableDataConsistencyCheckResult(equalCountCheckResult, 
equalContentCheckResult));
+        checkResults.put("foo_tbl", new TableDataConsistencyCheckResult(true));
+        checkResults.put("bar_tbl", new TableDataConsistencyCheckResult(true));
         assertTrue(jobAPI.aggregateDataConsistencyCheckResults("foo_job", 
checkResults));
     }
     
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
index ccafd074e0f..b36513d3771 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
@@ -42,8 +42,6 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Map;
 
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 class MigrationDataConsistencyCheckerTest {
@@ -66,9 +64,8 @@ class MigrationDataConsistencyCheckerTest {
         Map<String, TableDataConsistencyCheckResult> actual = new 
MigrationDataConsistencyChecker(jobConfig, new 
MigrationProcessContext(jobConfig.getJobId(), null),
                 
createConsistencyCheckJobItemProgressContext(jobConfig.getJobId())).check("FIXTURE",
 null);
         String checkKey = "t_order";
-        assertTrue(actual.get(checkKey).getCountCheckResult().isMatched());
-        
assertThat(actual.get(checkKey).getCountCheckResult().getSourceRecordsCount(), 
is(actual.get(checkKey).getCountCheckResult().getTargetRecordsCount()));
-        assertTrue(actual.get(checkKey).getContentCheckResult().isMatched());
+        assertTrue(actual.get(checkKey).isMatched());
+        assertTrue(actual.get(checkKey).isMatched());
     }
     
     private ConsistencyCheckJobItemProgressContext 
createConsistencyCheckJobItemProgressContext(final String jobId) {


Reply via email to