This is an automated email from the ASF dual-hosted git repository.
azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 18ce0c4b69b Review and improve pipeline code (#28706)
18ce0c4b69b is described below
commit 18ce0c4b69b35fd5a0783fd1d0e5f4fabbccef35
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Tue Oct 10 18:29:49 2023 +0800
Review and improve pipeline code (#28706)
* Handle PipelineJobNotFoundException on stopping job
* Release inventory checker resource ASAP on error
* Simplify TableDataConsistencyCheckResult
---
.../result/TableDataConsistencyCheckResult.java | 14 +++---
.../TableDataConsistencyContentCheckResult.java | 33 --------------
.../TableDataConsistencyCountCheckResult.java | 44 ------------------
.../yaml/YamlTableDataConsistencyCheckResult.java | 52 ++--------------------
...YamlTableDataConsistencyCheckResultSwapper.java | 17 +------
.../table/MatchingTableInventoryChecker.java | 19 +++-----
.../core/job/AbstractSimplePipelineJob.java | 8 +++-
.../runner/InventoryIncrementalTasksRunner.java | 11 ++++-
.../data/pipeline/cdc/core/job/CDCJob.java | 2 +-
.../core/fixture/FixtureTableInventoryChecker.java | 4 +-
.../service/GovernanceRepositoryAPIImplTest.java | 6 +--
.../api/impl/ConsistencyCheckJobAPITest.java | 10 ++---
.../migration/api/impl/MigrationJobAPITest.java | 33 +++-----------
.../MigrationDataConsistencyCheckerTest.java | 7 +--
14 files changed, 47 insertions(+), 213 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/TableDataConsistencyCheckResult.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/TableDataConsistencyCheckResult.java
index 7eec6445fbc..8d73495befa 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/TableDataConsistencyCheckResult.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/TableDataConsistencyCheckResult.java
@@ -27,21 +27,17 @@ import lombok.ToString;
@ToString
public final class TableDataConsistencyCheckResult {
- private final TableDataConsistencyCountCheckResult countCheckResult;
-
- private final TableDataConsistencyContentCheckResult contentCheckResult;
+ private final boolean matched;
private final TableDataConsistencyCheckIgnoredType ignoredType;
- public TableDataConsistencyCheckResult(final
TableDataConsistencyCountCheckResult countCheckResult, final
TableDataConsistencyContentCheckResult contentCheckResult) {
- this.countCheckResult = countCheckResult;
- this.contentCheckResult = contentCheckResult;
+ public TableDataConsistencyCheckResult(final boolean matched) {
+ this.matched = matched;
ignoredType = null;
}
public TableDataConsistencyCheckResult(final
TableDataConsistencyCheckIgnoredType ignoredType) {
- countCheckResult = new TableDataConsistencyCountCheckResult(-1, -1);
- contentCheckResult = new TableDataConsistencyContentCheckResult(false);
+ matched = false;
this.ignoredType = ignoredType;
}
@@ -63,6 +59,6 @@ public final class TableDataConsistencyCheckResult {
if (null != ignoredType) {
return false;
}
- return countCheckResult.isMatched() && contentCheckResult.isMatched();
+ return matched;
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/TableDataConsistencyContentCheckResult.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/TableDataConsistencyContentCheckResult.java
deleted file mode 100644
index 3995f5ad4b6..00000000000
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/TableDataConsistencyContentCheckResult.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.consistencycheck.result;
-
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import lombok.ToString;
-
-/**
- * Table data consistency content check result.
- */
-@RequiredArgsConstructor
-@Getter
-@ToString
-public final class TableDataConsistencyContentCheckResult {
-
- private final boolean matched;
-}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/TableDataConsistencyCountCheckResult.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/TableDataConsistencyCountCheckResult.java
deleted file mode 100644
index aaf2b5ef02c..00000000000
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/TableDataConsistencyCountCheckResult.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.consistencycheck.result;
-
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import lombok.ToString;
-
-/**
- * Table data consistency count check result.
- */
-@RequiredArgsConstructor
-@Getter
-@ToString
-public final class TableDataConsistencyCountCheckResult {
-
- private final long sourceRecordsCount;
-
- private final long targetRecordsCount;
-
- /**
- * Is matched.
- *
- * @return true if records count equals between source and target
- */
- public boolean isMatched() {
- return sourceRecordsCount == targetRecordsCount && sourceRecordsCount
>= 0;
- }
-}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/yaml/YamlTableDataConsistencyCheckResult.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/yaml/YamlTableDataConsistencyCheckResult.java
index 9a1e4174492..de1c849e345 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/yaml/YamlTableDataConsistencyCheckResult.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/yaml/YamlTableDataConsistencyCheckResult.java
@@ -17,7 +17,6 @@
package
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml;
-import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
@@ -31,56 +30,11 @@ import
org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
@Setter
public final class YamlTableDataConsistencyCheckResult implements
YamlConfiguration {
- private YamlTableDataConsistencyCountCheckResult countCheckResult;
-
- private YamlTableDataConsistencyContentCheckResult contentCheckResult;
+ private boolean matched;
private String ignoredType;
- public YamlTableDataConsistencyCheckResult(final
YamlTableDataConsistencyCountCheckResult countCheckResult, final
YamlTableDataConsistencyContentCheckResult contentCheckResult) {
- this.countCheckResult = countCheckResult;
- this.contentCheckResult = contentCheckResult;
- }
-
- /**
- * YAML table data consistency count result.
- */
- @Getter
- @Setter
- public static class YamlTableDataConsistencyCountCheckResult implements
YamlConfiguration {
-
- private long sourceRecordsCount;
-
- private long targetRecordsCount;
-
- /**
- * Add source records count.
- *
- * @param delta delta count
- */
- public void addSourceRecordsCount(final long delta) {
- sourceRecordsCount += delta;
- }
-
- /**
- * Add target records count.
- *
- * @param delta delta count
- */
- public void addTargetRecordsCount(final long delta) {
- targetRecordsCount += delta;
- }
- }
-
- /**
- * YAML table data consistency content result.
- */
- @NoArgsConstructor
- @AllArgsConstructor
- @Getter
- @Setter
- public static class YamlTableDataConsistencyContentCheckResult implements
YamlConfiguration {
-
- private boolean matched;
+ public YamlTableDataConsistencyCheckResult(final boolean matched) {
+ this.matched = matched;
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/yaml/YamlTableDataConsistencyCheckResultSwapper.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/yaml/YamlTableDataConsistencyCheckResultSwapper.java
index 4003f101813..cd07e8d48c5 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/yaml/YamlTableDataConsistencyCheckResultSwapper.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/yaml/YamlTableDataConsistencyCheckResultSwapper.java
@@ -20,10 +20,6 @@ package
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yam
import com.google.common.base.Strings;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckIgnoredType;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyContentCheckResult;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCountCheckResult;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResult.YamlTableDataConsistencyContentCheckResult;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResult.YamlTableDataConsistencyCountCheckResult;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import
org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
@@ -39,13 +35,7 @@ public final class
YamlTableDataConsistencyCheckResultSwapper implements YamlCon
result.setIgnoredType(data.getIgnoredType().name());
return result;
}
- YamlTableDataConsistencyCountCheckResult countCheckResult = new
YamlTableDataConsistencyCountCheckResult();
-
countCheckResult.setSourceRecordsCount(data.getCountCheckResult().getSourceRecordsCount());
-
countCheckResult.setTargetRecordsCount(data.getCountCheckResult().getTargetRecordsCount());
- result.setCountCheckResult(countCheckResult);
- YamlTableDataConsistencyContentCheckResult contentCheckResult = new
YamlTableDataConsistencyContentCheckResult();
-
contentCheckResult.setMatched(data.getContentCheckResult().isMatched());
- result.setContentCheckResult(contentCheckResult);
+ result.setMatched(data.isMatched());
return result;
}
@@ -57,10 +47,7 @@ public final class
YamlTableDataConsistencyCheckResultSwapper implements YamlCon
if (!Strings.isNullOrEmpty(yamlConfig.getIgnoredType())) {
return new
TableDataConsistencyCheckResult(TableDataConsistencyCheckIgnoredType.valueOf(yamlConfig.getIgnoredType()));
}
- YamlTableDataConsistencyCountCheckResult yamlCountCheck =
yamlConfig.getCountCheckResult();
- TableDataConsistencyCountCheckResult countCheckResult = new
TableDataConsistencyCountCheckResult(yamlCountCheck.getSourceRecordsCount(),
yamlCountCheck.getTargetRecordsCount());
- TableDataConsistencyContentCheckResult contentCheckResult = new
TableDataConsistencyContentCheckResult(yamlConfig.getContentCheckResult().isMatched());
- return new TableDataConsistencyCheckResult(countCheckResult,
contentCheckResult);
+ return new TableDataConsistencyCheckResult(yamlConfig.isMatched());
}
/**
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
index 8e4734aabc8..6717a17f209 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
@@ -24,8 +24,6 @@ import
org.apache.shardingsphere.data.pipeline.common.job.progress.listener.Pipe
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.SingleTableInventoryCalculatedResult;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResult;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResult.YamlTableDataConsistencyContentCheckResult;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResult.YamlTableDataConsistencyCountCheckResult;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResultSwapper;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.SingleTableInventoryCalculateParameter;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.SingleTableInventoryCalculator;
@@ -78,9 +76,9 @@ public abstract class MatchingTableInventoryChecker
implements TableInventoryChe
calculators.add(sourceCalculator);
SingleTableInventoryCalculator targetCalculator =
buildSingleTableInventoryCalculator();
calculators.add(targetCalculator);
- Iterator<SingleTableInventoryCalculatedResult> sourceCalculatedResults
= waitFuture(executor.submit(() ->
sourceCalculator.calculate(sourceParam))).iterator();
- Iterator<SingleTableInventoryCalculatedResult> targetCalculatedResults
= waitFuture(executor.submit(() ->
targetCalculator.calculate(targetParam))).iterator();
try {
+ Iterator<SingleTableInventoryCalculatedResult>
sourceCalculatedResults = waitFuture(executor.submit(() ->
sourceCalculator.calculate(sourceParam))).iterator();
+ Iterator<SingleTableInventoryCalculatedResult>
targetCalculatedResults = waitFuture(executor.submit(() ->
targetCalculator.calculate(targetParam))).iterator();
return checkSingleTableInventoryData(sourceCalculatedResults,
targetCalculatedResults, param, executor);
} finally {
QuietlyCloser.close(sourceParam.getCalculationContext());
@@ -93,17 +91,15 @@ public abstract class MatchingTableInventoryChecker
implements TableInventoryChe
private TableDataConsistencyCheckResult
checkSingleTableInventoryData(final
Iterator<SingleTableInventoryCalculatedResult> sourceCalculatedResults,
final Iterator<SingleTableInventoryCalculatedResult> targetCalculatedResults,
final TableInventoryCheckParameter param, final ThreadPoolExecutor executor) {
- YamlTableDataConsistencyCheckResult checkResult = new
YamlTableDataConsistencyCheckResult(new
YamlTableDataConsistencyCountCheckResult(), new
YamlTableDataConsistencyContentCheckResult(true));
+ YamlTableDataConsistencyCheckResult checkResult = new
YamlTableDataConsistencyCheckResult(true);
while (sourceCalculatedResults.hasNext() &&
targetCalculatedResults.hasNext()) {
if (null != param.getReadRateLimitAlgorithm()) {
param.getReadRateLimitAlgorithm().intercept(JobOperationType.SELECT, 1);
}
SingleTableInventoryCalculatedResult sourceCalculatedResult =
waitFuture(executor.submit(sourceCalculatedResults::next));
SingleTableInventoryCalculatedResult targetCalculatedResult =
waitFuture(executor.submit(targetCalculatedResults::next));
-
checkResult.getCountCheckResult().addSourceRecordsCount(sourceCalculatedResult.getRecordsCount());
-
checkResult.getCountCheckResult().addTargetRecordsCount(targetCalculatedResult.getRecordsCount());
if (!Objects.equals(sourceCalculatedResult,
targetCalculatedResult)) {
- checkResult.getContentCheckResult().setMatched(false);
+ checkResult.setMatched(false);
log.info("content matched false, jobId={}, sourceTable={},
targetTable={}, uniqueKeys={}", param.getJobId(), param.getSourceTable(),
param.getTargetTable(), param.getUniqueKeys());
break;
}
@@ -116,14 +112,11 @@ public abstract class MatchingTableInventoryChecker
implements TableInventoryChe
param.getProgressContext().onProgressUpdated(new
PipelineJobProgressUpdatedParameter(sourceCalculatedResult.getRecordsCount()));
}
if (sourceCalculatedResults.hasNext()) {
- // TODO Refactor SingleTableInventoryCalculatedResult to represent
inaccurate number
- checkResult.getCountCheckResult().addSourceRecordsCount(1);
- checkResult.getContentCheckResult().setMatched(false);
+ checkResult.setMatched(false);
return new
YamlTableDataConsistencyCheckResultSwapper().swapToObject(checkResult);
}
if (targetCalculatedResults.hasNext()) {
- checkResult.getCountCheckResult().addTargetRecordsCount(1);
- checkResult.getContentCheckResult().setMatched(false);
+ checkResult.setMatched(false);
return new
YamlTableDataConsistencyCheckResultSwapper().swapToObject(checkResult);
}
return new
YamlTableDataConsistencyCheckResultSwapper().swapToObject(checkResult);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
index 3491f3882b1..fae85d03ad3 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.data.pipeline.core.job;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
+import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
@@ -77,8 +78,11 @@ public abstract class AbstractSimplePipelineJob extends
AbstractPipelineJob impl
}
private void processFailed(final String jobId, final int shardingItem,
final Exception ex) {
- log.error("job prepare failed, {}-{}", jobId, shardingItem, ex);
+ log.error("job execution failed, {}-{}", jobId, shardingItem, ex);
getJobAPI().updateJobItemErrorMessage(jobId, shardingItem, ex);
- getJobAPI().stop(jobId);
+ try {
+ getJobAPI().stop(jobId);
+ } catch (final PipelineJobNotFoundException ignored) {
+ }
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
index 08415b7ecba..ff3e9308f58 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
@@ -25,6 +25,7 @@ import
org.apache.shardingsphere.data.pipeline.common.execute.ExecuteCallback;
import org.apache.shardingsphere.data.pipeline.common.execute.ExecuteEngine;
import
org.apache.shardingsphere.data.pipeline.common.ingest.position.FinishedPosition;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
+import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
@@ -142,7 +143,10 @@ public class InventoryIncrementalTasksRunner implements
PipelineTasksRunner {
log.error("onFailure, inventory task execute failed.", throwable);
String jobId = jobItemContext.getJobId();
jobAPI.updateJobItemErrorMessage(jobId,
jobItemContext.getShardingItem(), throwable);
- jobAPI.stop(jobId);
+ try {
+ jobAPI.stop(jobId);
+ } catch (final PipelineJobNotFoundException ignored) {
+ }
}
private final class InventoryTaskExecuteCallback implements
ExecuteCallback {
@@ -174,7 +178,10 @@ public class InventoryIncrementalTasksRunner implements
PipelineTasksRunner {
log.error("onFailure, incremental task execute failed.",
throwable);
String jobId = jobItemContext.getJobId();
jobAPI.updateJobItemErrorMessage(jobId,
jobItemContext.getShardingItem(), throwable);
- jobAPI.stop(jobId);
+ try {
+ jobAPI.stop(jobId);
+ } catch (final PipelineJobNotFoundException ignored) {
+ }
}
}
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
index 7e28a6b3787..9858e2932e3 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
@@ -121,7 +121,7 @@ public final class CDCJob extends AbstractPipelineJob
implements SimpleJob {
}
private void processFailed(final String jobId, final int shardingItem,
final Exception ex) {
- log.error("job prepare failed, {}-{}", jobId, shardingItem, ex);
+ log.error("job execution failed, {}-{}", jobId, shardingItem, ex);
jobAPI.updateJobItemErrorMessage(jobId, shardingItem, ex);
PipelineJobCenter.stop(jobId);
jobAPI.updateJobConfigurationDisabled(jobId, true);
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureTableInventoryChecker.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureTableInventoryChecker.java
index 0a9e1841220..18ea7820581 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureTableInventoryChecker.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureTableInventoryChecker.java
@@ -18,8 +18,6 @@
package org.apache.shardingsphere.test.it.data.pipeline.core.fixture;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyContentCheckResult;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCountCheckResult;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryChecker;
public final class FixtureTableInventoryChecker implements
TableInventoryChecker {
@@ -35,6 +33,6 @@ public final class FixtureTableInventoryChecker implements
TableInventoryChecker
@Override
public TableDataConsistencyCheckResult checkSingleTableInventoryData() {
- return new TableDataConsistencyCheckResult(new
TableDataConsistencyCountCheckResult(2, 2), new
TableDataConsistencyContentCheckResult(true));
+ return new TableDataConsistencyCheckResult(true);
}
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java
index d2f8c19150f..9339b4ab88f 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java
@@ -23,8 +23,6 @@ import
org.apache.shardingsphere.data.pipeline.common.constant.DataPipelineConst
import
org.apache.shardingsphere.data.pipeline.common.ingest.position.PlaceholderPosition;
import
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyContentCheckResult;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCountCheckResult;
import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
@@ -98,11 +96,11 @@ class GovernanceRepositoryAPIImplTest {
void assertPersistJobCheckResult() {
MigrationJobItemContext jobItemContext = mockJobItemContext();
Map<String, TableDataConsistencyCheckResult> actual = new HashMap<>();
- actual.put("test", new TableDataConsistencyCheckResult(new
TableDataConsistencyCountCheckResult(1, 1), new
TableDataConsistencyContentCheckResult(true)));
+ actual.put("test", new TableDataConsistencyCheckResult(true));
governanceRepositoryAPI.persistCheckJobResult(jobItemContext.getJobId(),
"j02123", actual);
Map<String, TableDataConsistencyCheckResult> checkResult =
governanceRepositoryAPI.getCheckJobResult(jobItemContext.getJobId(), "j02123");
assertThat(checkResult.size(), is(1));
-
assertTrue(checkResult.get("test").getContentCheckResult().isMatched());
+ assertTrue(checkResult.get("test").isMatched());
}
@Test
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
index 962f3340821..2e49467926f 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
@@ -20,8 +20,6 @@ package
org.apache.shardingsphere.test.it.data.pipeline.scenario.consistencychec
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyContentCheckResult;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCountCheckResult;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobId;
@@ -83,12 +81,11 @@ class ConsistencyCheckJobAPITest {
parentJobConfig.getSourceDatabaseType(),
parentJobConfig.getTargetDatabaseType()));
GovernanceRepositoryAPI governanceRepositoryAPI =
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey());
governanceRepositoryAPI.persistLatestCheckJobId(parentJobId,
checkJobId);
- Map<String, TableDataConsistencyCheckResult> expectedCheckResult =
Collections.singletonMap("t_order", new TableDataConsistencyCheckResult(new
TableDataConsistencyCountCheckResult(1, 1),
- new TableDataConsistencyContentCheckResult(true)));
+ Map<String, TableDataConsistencyCheckResult> expectedCheckResult =
Collections.singletonMap("t_order", new TableDataConsistencyCheckResult(true));
governanceRepositoryAPI.persistCheckJobResult(parentJobId, checkJobId,
expectedCheckResult);
Map<String, TableDataConsistencyCheckResult> actualCheckResult =
checkJobAPI.getLatestDataConsistencyCheckResult(parentJobId);
assertThat(actualCheckResult.size(), is(expectedCheckResult.size()));
-
assertThat(actualCheckResult.get("t_order").getCountCheckResult().isMatched(),
is(expectedCheckResult.get("t_order").getContentCheckResult().isMatched()));
+ assertThat(actualCheckResult.get("t_order").isMatched(),
is(expectedCheckResult.get("t_order").isMatched()));
}
@Test
@@ -103,8 +100,7 @@ class ConsistencyCheckJobAPITest {
ConsistencyCheckJobItemContext checkJobItemContext = new
ConsistencyCheckJobItemContext(
new ConsistencyCheckJobConfiguration(checkJobId,
parentJobId, null, null, TypedSPILoader.getService(DatabaseType.class, "H2")),
0, JobStatus.FINISHED, null);
checkJobAPI.persistJobItemProgress(checkJobItemContext);
- Map<String, TableDataConsistencyCheckResult>
dataConsistencyCheckResult = Collections.singletonMap("t_order",
- new TableDataConsistencyCheckResult(new
TableDataConsistencyCountCheckResult(0, 0), new
TableDataConsistencyContentCheckResult(true)));
+ Map<String, TableDataConsistencyCheckResult>
dataConsistencyCheckResult = Collections.singletonMap("t_order", new
TableDataConsistencyCheckResult(true));
repositoryAPI.persistCheckJobResult(parentJobId, checkJobId,
dataConsistencyCheckResult);
Optional<String> latestCheckJobId =
repositoryAPI.getLatestCheckJobId(parentJobId);
assertTrue(latestCheckJobId.isPresent());
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
index 96fc9eba5c8..9e2f22d7a96 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
@@ -30,8 +30,6 @@ import
org.apache.shardingsphere.data.pipeline.common.pojo.InventoryIncrementalJ
import
org.apache.shardingsphere.data.pipeline.common.util.PipelineDistributedBarrier;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyContentCheckResult;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCountCheckResult;
import
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
@@ -180,9 +178,8 @@ class MigrationJobAPITest {
jobConfig, jobAPI.buildPipelineProcessContext(jobConfig), new
ConsistencyCheckJobItemProgressContext(jobId.get(), 0, "H2")).check("FIXTURE",
null);
assertThat(checkResultMap.size(), is(1));
String checkKey = "t_order";
-
assertTrue(checkResultMap.get(checkKey).getCountCheckResult().isMatched());
-
assertThat(checkResultMap.get(checkKey).getCountCheckResult().getTargetRecordsCount(),
is(2L));
-
assertTrue(checkResultMap.get(checkKey).getContentCheckResult().isMatched());
+ assertTrue(checkResultMap.get(checkKey).isMatched());
+ assertTrue(checkResultMap.get(checkKey).isMatched());
}
@Test
@@ -191,34 +188,18 @@ class MigrationJobAPITest {
}
@Test
- void assertAggregateDifferentCountDataConsistencyCheckResults() {
- TableDataConsistencyCountCheckResult equalCountCheckResult = new
TableDataConsistencyCountCheckResult(100, 100);
- TableDataConsistencyCountCheckResult notEqualCountCheckResult = new
TableDataConsistencyCountCheckResult(100, 95);
- TableDataConsistencyContentCheckResult equalContentCheckResult = new
TableDataConsistencyContentCheckResult(false);
+ void assertAggregateDifferentDataConsistencyCheckResults() {
Map<String, TableDataConsistencyCheckResult> checkResults = new
LinkedHashMap<>(2, 1F);
- checkResults.put("foo_tbl", new
TableDataConsistencyCheckResult(equalCountCheckResult,
equalContentCheckResult));
- checkResults.put("bar_tbl", new
TableDataConsistencyCheckResult(notEqualCountCheckResult,
equalContentCheckResult));
- assertFalse(jobAPI.aggregateDataConsistencyCheckResults("foo_job",
checkResults));
- }
-
- @Test
- void assertAggregateDifferentContentDataConsistencyCheckResults() {
- TableDataConsistencyCountCheckResult equalCountCheckResult = new
TableDataConsistencyCountCheckResult(100, 100);
- TableDataConsistencyContentCheckResult equalContentCheckResult = new
TableDataConsistencyContentCheckResult(true);
- TableDataConsistencyContentCheckResult notEqualContentCheckResult =
new TableDataConsistencyContentCheckResult(false);
- Map<String, TableDataConsistencyCheckResult> checkResults = new
LinkedHashMap<>(2, 1F);
- checkResults.put("foo_tbl", new
TableDataConsistencyCheckResult(equalCountCheckResult,
equalContentCheckResult));
- checkResults.put("bar_tbl", new
TableDataConsistencyCheckResult(equalCountCheckResult,
notEqualContentCheckResult));
+ checkResults.put("foo_tbl", new TableDataConsistencyCheckResult(true));
+ checkResults.put("bar_tbl", new
TableDataConsistencyCheckResult(false));
assertFalse(jobAPI.aggregateDataConsistencyCheckResults("foo_job",
checkResults));
}
@Test
void assertAggregateSameDataConsistencyCheckResults() {
- TableDataConsistencyCountCheckResult equalCountCheckResult = new
TableDataConsistencyCountCheckResult(100, 100);
- TableDataConsistencyContentCheckResult equalContentCheckResult = new
TableDataConsistencyContentCheckResult(true);
Map<String, TableDataConsistencyCheckResult> checkResults = new
LinkedHashMap<>(2, 1F);
- checkResults.put("foo_tbl", new
TableDataConsistencyCheckResult(equalCountCheckResult,
equalContentCheckResult));
- checkResults.put("bar_tbl", new
TableDataConsistencyCheckResult(equalCountCheckResult,
equalContentCheckResult));
+ checkResults.put("foo_tbl", new TableDataConsistencyCheckResult(true));
+ checkResults.put("bar_tbl", new TableDataConsistencyCheckResult(true));
assertTrue(jobAPI.aggregateDataConsistencyCheckResults("foo_job",
checkResults));
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
index ccafd074e0f..b36513d3771 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
@@ -42,8 +42,6 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.util.Map;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
class MigrationDataConsistencyCheckerTest {
@@ -66,9 +64,8 @@ class MigrationDataConsistencyCheckerTest {
Map<String, TableDataConsistencyCheckResult> actual = new
MigrationDataConsistencyChecker(jobConfig, new
MigrationProcessContext(jobConfig.getJobId(), null),
createConsistencyCheckJobItemProgressContext(jobConfig.getJobId())).check("FIXTURE",
null);
String checkKey = "t_order";
- assertTrue(actual.get(checkKey).getCountCheckResult().isMatched());
-
assertThat(actual.get(checkKey).getCountCheckResult().getSourceRecordsCount(),
is(actual.get(checkKey).getCountCheckResult().getTargetRecordsCount()));
- assertTrue(actual.get(checkKey).getContentCheckResult().isMatched());
+ assertTrue(actual.get(checkKey).isMatched());
+ assertTrue(actual.get(checkKey).isMatched());
}
private ConsistencyCheckJobItemProgressContext
createConsistencyCheckJobItemProgressContext(final String jobId) {