This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 6b4c67f1aa4 Fix consistency check failed but status is Finished
(#30601)
6b4c67f1aa4 is described below
commit 6b4c67f1aa4fc9512447f89b2b46e4403c514767
Author: Xinze Guo <[email protected]>
AuthorDate: Fri Mar 22 11:07:41 2024 +0800
Fix consistency check failed but status is Finished (#30601)
---
.../api/ConsistencyCheckJobAPI.java | 2 +-
.../task/ConsistencyCheckTasksRunner.java | 24 ++++++++++++++--------
2 files changed, 16 insertions(+), 10 deletions(-)
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPI.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPI.java
index 73bd8d1de98..e62fc84ce41 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPI.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPI.java
@@ -91,7 +91,7 @@ public final class ConsistencyCheckJobAPI {
Optional<String> latestCheckJobId =
governanceFacade.getJobFacade().getCheck().findLatestCheckJobId(parentJobId);
if (latestCheckJobId.isPresent()) {
Optional<ConsistencyCheckJobItemProgress> progress = new
PipelineJobItemManager<ConsistencyCheckJobItemProgress>(progressSwapper).getProgress(latestCheckJobId.get(),
0);
- ShardingSpherePreconditions.checkState(progress.isPresent() &&
JobStatus.FINISHED == progress.get().getStatus(),
+ ShardingSpherePreconditions.checkState(progress.isPresent() &&
(JobStatus.FINISHED == progress.get().getStatus() ||
JobStatus.CONSISTENCY_CHECK_FAILURE == progress.get().getStatus()),
() -> new
UncompletedConsistencyCheckJobExistsException(latestCheckJobId.get(),
progress.orElse(null)));
}
checkPipelineDatabaseTypes(param);
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
index 2799ada3325..6a0fea10d15 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
@@ -19,24 +19,24 @@ package
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.task;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
-import
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
-import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfigurationUtils;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
import
org.apache.shardingsphere.data.pipeline.core.execute.AbstractPipelineLifecycleRunnable;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
import
org.apache.shardingsphere.data.pipeline.core.execute.PipelineLifecycleRunnable;
import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
-import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
-import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
-import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
+import
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfigurationUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
+import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
import
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobType;
@@ -144,7 +144,13 @@ public final class ConsistencyCheckTasksRunner implements
PipelineTasksRunner {
return;
}
log.info("onSuccess, check job id: {}, parent job id: {}",
checkJobId, parentJobId);
- jobItemContext.setStatus(JobStatus.FINISHED);
+ Map<String, TableDataConsistencyCheckResult> checkJobResult =
PipelineAPIFactory.getPipelineGovernanceFacade(
+
PipelineJobIdUtils.parseContextKey(parentJobId)).getJobFacade().getCheck().getCheckJobResult(parentJobId,
checkJobId);
+ if
(checkJobResult.values().stream().allMatch(TableDataConsistencyCheckResult::isMatched))
{
+ jobItemContext.setStatus(JobStatus.FINISHED);
+ } else {
+ jobItemContext.setStatus(JobStatus.CONSISTENCY_CHECK_FAILURE);
+ }
jobItemManager.persistProgress(jobItemContext);
jobManager.stop(checkJobId);
}