This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b4c67f1aa4 Fix consistency check failed but status is Finished 
(#30601)
6b4c67f1aa4 is described below

commit 6b4c67f1aa4fc9512447f89b2b46e4403c514767
Author: Xinze Guo <[email protected]>
AuthorDate: Fri Mar 22 11:07:41 2024 +0800

    Fix consistency check failed but status is Finished (#30601)
---
 .../api/ConsistencyCheckJobAPI.java                |  2 +-
 .../task/ConsistencyCheckTasksRunner.java          | 24 ++++++++++++++--------
 2 files changed, 16 insertions(+), 10 deletions(-)

diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPI.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPI.java
index 73bd8d1de98..e62fc84ce41 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPI.java
@@ -91,7 +91,7 @@ public final class ConsistencyCheckJobAPI {
         Optional<String> latestCheckJobId = 
governanceFacade.getJobFacade().getCheck().findLatestCheckJobId(parentJobId);
         if (latestCheckJobId.isPresent()) {
             Optional<ConsistencyCheckJobItemProgress> progress = new 
PipelineJobItemManager<ConsistencyCheckJobItemProgress>(progressSwapper).getProgress(latestCheckJobId.get(),
 0);
-            ShardingSpherePreconditions.checkState(progress.isPresent() && 
JobStatus.FINISHED == progress.get().getStatus(),
+            ShardingSpherePreconditions.checkState(progress.isPresent() && 
(JobStatus.FINISHED == progress.get().getStatus() || 
JobStatus.CONSISTENCY_CHECK_FAILURE == progress.get().getStatus()),
                     () -> new 
UncompletedConsistencyCheckJobExistsException(latestCheckJobId.get(), 
progress.orElse(null)));
         }
         checkPipelineDatabaseTypes(param);
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
index 2799ada3325..6a0fea10d15 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
@@ -19,24 +19,24 @@ package 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.task;
 
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
-import 
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfigurationUtils;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
 import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
 import 
org.apache.shardingsphere.data.pipeline.core.execute.AbstractPipelineLifecycleRunnable;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
 import 
org.apache.shardingsphere.data.pipeline.core.execute.PipelineLifecycleRunnable;
 import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
-import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
-import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
-import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
 import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
+import 
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfigurationUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
+import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobType;
@@ -144,7 +144,13 @@ public final class ConsistencyCheckTasksRunner implements 
PipelineTasksRunner {
                 return;
             }
             log.info("onSuccess, check job id: {}, parent job id: {}", 
checkJobId, parentJobId);
-            jobItemContext.setStatus(JobStatus.FINISHED);
+            Map<String, TableDataConsistencyCheckResult> checkJobResult = 
PipelineAPIFactory.getPipelineGovernanceFacade(
+                    
PipelineJobIdUtils.parseContextKey(parentJobId)).getJobFacade().getCheck().getCheckJobResult(parentJobId,
 checkJobId);
+            if 
(checkJobResult.values().stream().allMatch(TableDataConsistencyCheckResult::isMatched))
 {
+                jobItemContext.setStatus(JobStatus.FINISHED);
+            } else {
+                jobItemContext.setStatus(JobStatus.CONSISTENCY_CHECK_FAILURE);
+            }
             jobItemManager.persistProgress(jobItemContext);
             jobManager.stop(checkJobId);
         }

Reply via email to