This is an automated email from the ASF dual-hosted git repository.
azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 0a62ba2670f Create new TableDataConsistencyChecker for every table
data consistency check (#28121)
0a62ba2670f is described below
commit 0a62ba2670f3eee6becf1e2aebaf1817c60282ee
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Wed Aug 16 17:51:31 2023 +0800
Create new TableDataConsistencyChecker for every table data consistency
check (#28121)
* Add TableDataConsistencyCheckerFactory
* Remove InventoryIncrementalJobAPI.buildTableDataConsistencyChecker
* Refactor PipelineDataConsistencyChecker.check parameter;
Refactor InventoryIncrementalJobAPI consistency check methods;
Refactor PipelineDataConsistencyChecker cancellable;
* Fix spotless
* Update javadoc
---
.../PipelineDataConsistencyChecker.java | 9 +++--
.../TableDataConsistencyCheckerFactory.java} | 25 +++++++-----
.../job/service/InventoryIncrementalJobAPI.java | 26 +++++-------
.../AbstractInventoryIncrementalJobAPIImpl.java | 26 ------------
.../TableDataConsistencyCheckerFactoryTest.java | 46 ++++++++++++++++++++++
.../data/pipeline/cdc/api/impl/CDCJobAPI.java | 4 +-
.../task/ConsistencyCheckTasksRunner.java | 25 ++++++------
.../migration/api/impl/MigrationJobAPI.java | 4 +-
.../MigrationDataConsistencyChecker.java | 41 ++++++++++++++++---
.../migration/api/impl/MigrationJobAPITest.java | 12 +-----
.../MigrationDataConsistencyCheckerTest.java | 3 +-
11 files changed, 131 insertions(+), 90 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/PipelineDataConsistencyChecker.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/PipelineDataConsistencyChecker.java
index 34060e109bc..8fc4972c04a 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/PipelineDataConsistencyChecker.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/PipelineDataConsistencyChecker.java
@@ -18,20 +18,21 @@
package org.apache.shardingsphere.data.pipeline.core.consistencycheck;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
import java.util.Map;
+import java.util.Properties;
/**
* Pipeline data consistency checker.
*/
-public interface PipelineDataConsistencyChecker {
+public interface PipelineDataConsistencyChecker extends PipelineCancellable {
/**
* Data consistency check.
*
- * @param tableChecker table data consistency checker
+ * @param algorithmType algorithm type of {@link
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker}
+ * @param algorithmProps algorithm properties of {@link
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker}
* @return check results. key is logic table name, value is check result.
*/
- Map<String, TableDataConsistencyCheckResult>
check(TableDataConsistencyChecker tableChecker);
+ Map<String, TableDataConsistencyCheckResult> check(String algorithmType,
Properties algorithmProps);
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/PipelineDataConsistencyChecker.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableDataConsistencyCheckerFactory.java
similarity index 53%
copy from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/PipelineDataConsistencyChecker.java
copy to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableDataConsistencyCheckerFactory.java
index 34060e109bc..42cec80ad6a 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/PipelineDataConsistencyChecker.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableDataConsistencyCheckerFactory.java
@@ -15,23 +15,28 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.consistencycheck;
+package org.apache.shardingsphere.data.pipeline.core.consistencycheck.table;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
-import java.util.Map;
+import java.util.Properties;
/**
- * Pipeline data consistency checker.
+ * Table data consistency checker factory.
*/
-public interface PipelineDataConsistencyChecker {
+@NoArgsConstructor(access = AccessLevel.NONE)
+public final class TableDataConsistencyCheckerFactory {
/**
- * Data consistency check.
+ * Build table data consistency checker.
*
- * @param tableChecker table data consistency checker
- * @return check results. key is logic table name, value is check result.
+ * @param algorithmType algorithm type
+ * @param algorithmProps algorithm properties
+ * @return table data consistency checker
*/
- Map<String, TableDataConsistencyCheckResult>
check(TableDataConsistencyChecker tableChecker);
+ public static TableDataConsistencyChecker newInstance(final String
algorithmType, final Properties algorithmProps) {
+ return TypedSPILoader.getService(TableDataConsistencyChecker.class,
null == algorithmType ? "DATA_MATCH" : algorithmType, algorithmProps);
+ }
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java
index 80e0d3818bc..523b9e89977 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java
@@ -19,27 +19,30 @@ package
org.apache.shardingsphere.data.pipeline.core.job.service;
import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
+import
org.apache.shardingsphere.data.pipeline.common.context.InventoryIncrementalProcessContext;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
import
org.apache.shardingsphere.data.pipeline.common.pojo.DataConsistencyCheckAlgorithmInfo;
import
org.apache.shardingsphere.data.pipeline.common.pojo.InventoryIncrementalJobItemInfo;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
import java.sql.SQLException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.Properties;
/**
* Inventory incremental job API.
*/
public interface InventoryIncrementalJobAPI extends PipelineJobAPI {
+ @Override
+ InventoryIncrementalProcessContext
buildPipelineProcessContext(PipelineJobConfiguration pipelineJobConfig);
+
/**
* Alter process configuration.
*
@@ -99,24 +102,15 @@ public interface InventoryIncrementalJobAPI extends
PipelineJobAPI {
Collection<DataConsistencyCheckAlgorithmInfo>
listDataConsistencyCheckAlgorithms();
/**
- * Build data consistency checker.
- *
- * @param algorithmType algorithm type
- * @param algorithmProps algorithm properties
- * @return calculate algorithm
- */
- TableDataConsistencyChecker buildTableDataConsistencyChecker(String
algorithmType, Properties algorithmProps);
-
- /**
- * Do data consistency check.
+ * Build pipeline data consistency checker.
*
* @param pipelineJobConfig job configuration
- * @param tableChecker table data consistency checker
+ * @param processContext process context
* @param progressContext consistency check job item progress context
- * @return each logic table check result
+ * @return all logic tables check result
*/
- Map<String, TableDataConsistencyCheckResult>
dataConsistencyCheck(PipelineJobConfiguration pipelineJobConfig,
TableDataConsistencyChecker tableChecker,
-
ConsistencyCheckJobItemProgressContext progressContext);
+ PipelineDataConsistencyChecker
buildPipelineDataConsistencyChecker(PipelineJobConfiguration pipelineJobConfig,
InventoryIncrementalProcessContext processContext,
+
ConsistencyCheckJobItemProgressContext progressContext);
/**
* Aggregate data consistency check results.
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
index 7fdde595c69..8402612510c 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -24,7 +24,6 @@ import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConf
import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfigurationUtils;
import
org.apache.shardingsphere.data.pipeline.common.context.InventoryIncrementalJobItemContext;
-import
org.apache.shardingsphere.data.pipeline.common.context.InventoryIncrementalProcessContext;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
@@ -41,8 +40,6 @@ import
org.apache.shardingsphere.data.pipeline.common.pojo.InventoryIncrementalJ
import
org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo;
import
org.apache.shardingsphere.data.pipeline.common.task.progress.IncrementalTaskProgress;
import
org.apache.shardingsphere.data.pipeline.common.task.progress.InventoryTaskProgress;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
@@ -54,7 +51,6 @@ import
org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
import org.apache.shardingsphere.infra.spi.annotation.SPIDescription;
-import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import java.util.Collection;
@@ -65,7 +61,6 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
-import java.util.Properties;
import java.util.stream.IntStream;
/**
@@ -81,9 +76,6 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl
extends AbstractPip
private final YamlJobOffsetInfoSwapper jobOffsetInfoSwapper = new
YamlJobOffsetInfoSwapper();
- @Override
- public abstract InventoryIncrementalProcessContext
buildPipelineProcessContext(PipelineJobConfiguration pipelineJobConfig);
-
@Override
public void alterProcessConfiguration(final PipelineContextKey contextKey,
final PipelineProcessConfiguration processConfig) {
// TODO check rateLimiter type match or not
@@ -220,24 +212,6 @@ public abstract class
AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
return supportedDatabaseTypes.isEmpty() ?
ShardingSphereServiceLoader.getServiceInstances(DatabaseType.class) :
supportedDatabaseTypes;
}
- @Override
- public TableDataConsistencyChecker buildTableDataConsistencyChecker(final
String algorithmType, final Properties algorithmProps) {
- return TypedSPILoader.getService(TableDataConsistencyChecker.class,
null == algorithmType ? "DATA_MATCH" : algorithmType, algorithmProps);
- }
-
- @Override
- public Map<String, TableDataConsistencyCheckResult>
dataConsistencyCheck(final PipelineJobConfiguration jobConfig, final
TableDataConsistencyChecker tableChecker,
-
final ConsistencyCheckJobItemProgressContext progressContext) {
- String jobId = jobConfig.getJobId();
- PipelineDataConsistencyChecker dataConsistencyChecker =
buildPipelineDataConsistencyChecker(jobConfig,
buildPipelineProcessContext(jobConfig), progressContext);
- Map<String, TableDataConsistencyCheckResult> result =
dataConsistencyChecker.check(tableChecker);
- log.info("job {} with check algorithm '{}' data consistency checker
result {}", jobId, tableChecker.getType(), result);
- return result;
- }
-
- protected abstract PipelineDataConsistencyChecker
buildPipelineDataConsistencyChecker(PipelineJobConfiguration pipelineJobConfig,
InventoryIncrementalProcessContext processContext,
-
ConsistencyCheckJobItemProgressContext progressContext);
-
@Override
public boolean aggregateDataConsistencyCheckResults(final String jobId,
final Map<String, TableDataConsistencyCheckResult> checkResults) {
if (checkResults.isEmpty()) {
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/TableDataConsistencyCheckerFactoryTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/TableDataConsistencyCheckerFactoryTest.java
new file mode 100644
index 00000000000..0db13255e7f
--- /dev/null
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/TableDataConsistencyCheckerFactoryTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator;
+
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.CRC32MatchTableDataConsistencyChecker;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.DataMatchTableDataConsistencyChecker;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyCheckerFactory;
+import org.junit.jupiter.api.Test;
+
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+class TableDataConsistencyCheckerFactoryTest {
+
+ @Test
+ void assertNewInstanceTypeMatched() {
+ assertInstanceOf(DataMatchTableDataConsistencyChecker.class,
TableDataConsistencyCheckerFactory.newInstance(null, new Properties()));
+ assertInstanceOf(DataMatchTableDataConsistencyChecker.class,
TableDataConsistencyCheckerFactory.newInstance("DATA_MATCH", new Properties()));
+ assertInstanceOf(CRC32MatchTableDataConsistencyChecker.class,
TableDataConsistencyCheckerFactory.newInstance("CRC32_MATCH", new
Properties()));
+ }
+
+ @Test
+ void assertNewInstancesDifferent() {
+ TableDataConsistencyChecker actual1 =
TableDataConsistencyCheckerFactory.newInstance("DATA_MATCH", new Properties());
+ TableDataConsistencyChecker actual2 =
TableDataConsistencyCheckerFactory.newInstance("DATA_MATCH", new Properties());
+ assertNotEquals(actual1, actual2);
+ }
+}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 67a9e9a0f78..48bd8cb27fa 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -368,8 +368,8 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
}
@Override
- protected PipelineDataConsistencyChecker
buildPipelineDataConsistencyChecker(final PipelineJobConfiguration
pipelineJobConfig, final InventoryIncrementalProcessContext processContext,
-
final ConsistencyCheckJobItemProgressContext progressContext) {
+ public PipelineDataConsistencyChecker
buildPipelineDataConsistencyChecker(final PipelineJobConfiguration
pipelineJobConfig, final InventoryIncrementalProcessContext processContext,
+
final ConsistencyCheckJobItemProgressContext progressContext) {
throw new UnsupportedOperationException();
}
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
index 84a4c2454cb..bfb36635f80 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
@@ -26,8 +26,8 @@ import
org.apache.shardingsphere.data.pipeline.common.execute.ExecuteCallback;
import org.apache.shardingsphere.data.pipeline.common.execute.ExecuteEngine;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
@@ -62,7 +62,7 @@ public final class ConsistencyCheckTasksRunner implements
PipelineTasksRunner {
private final LifecycleExecutor checkExecutor;
- private final AtomicReference<TableDataConsistencyChecker>
tableDataConsistencyChecker = new AtomicReference<>();
+ private final AtomicReference<PipelineDataConsistencyChecker>
consistencyChecker = new AtomicReference<>();
public ConsistencyCheckTasksRunner(final ConsistencyCheckJobItemContext
jobItemContext) {
this.jobItemContext = jobItemContext;
@@ -96,12 +96,13 @@ public final class ConsistencyCheckTasksRunner implements
PipelineTasksRunner {
JobType jobType = PipelineJobIdUtils.parseJobType(parentJobId);
InventoryIncrementalJobAPI jobAPI = (InventoryIncrementalJobAPI)
TypedSPILoader.getService(PipelineJobAPI.class, jobType.getType());
PipelineJobConfiguration parentJobConfig =
jobAPI.getJobConfiguration(parentJobId);
- TableDataConsistencyChecker tableChecker =
jobAPI.buildTableDataConsistencyChecker(checkJobConfig.getAlgorithmTypeName(),
checkJobConfig.getAlgorithmProps());
-
ConsistencyCheckTasksRunner.this.tableDataConsistencyChecker.set(tableChecker);
- Map<String, TableDataConsistencyCheckResult>
dataConsistencyCheckResult;
try {
- dataConsistencyCheckResult =
jobAPI.dataConsistencyCheck(parentJobConfig, tableChecker,
jobItemContext.getProgressContext());
-
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(parentJobId)).persistCheckJobResult(parentJobId,
checkJobId, dataConsistencyCheckResult);
+ PipelineDataConsistencyChecker checker =
jobAPI.buildPipelineDataConsistencyChecker(
+ parentJobConfig,
jobAPI.buildPipelineProcessContext(parentJobConfig),
jobItemContext.getProgressContext());
+ consistencyChecker.set(checker);
+ Map<String, TableDataConsistencyCheckResult> checkResultMap =
checker.check(checkJobConfig.getAlgorithmTypeName(),
checkJobConfig.getAlgorithmProps());
+ log.info("job {} with check algorithm '{}' data consistency
checker result: {}", parentJobId, checkJobConfig.getAlgorithmTypeName(),
checkResultMap);
+
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(parentJobId)).persistCheckJobResult(parentJobId,
checkJobId, checkResultMap);
} finally {
jobItemContext.getProgressContext().setCheckEndTimeMillis(System.currentTimeMillis());
}
@@ -109,9 +110,9 @@ public final class ConsistencyCheckTasksRunner implements
PipelineTasksRunner {
@Override
protected void doStop() {
- TableDataConsistencyChecker tableChecker =
tableDataConsistencyChecker.get();
- if (null != tableChecker) {
- tableChecker.cancel();
+ PipelineDataConsistencyChecker checker = consistencyChecker.get();
+ if (null != checker) {
+ checker.cancel();
}
}
}
@@ -128,8 +129,8 @@ public final class ConsistencyCheckTasksRunner implements
PipelineTasksRunner {
@Override
public void onFailure(final Throwable throwable) {
- TableDataConsistencyChecker tableChecker =
tableDataConsistencyChecker.get();
- if (null != tableChecker && tableChecker.isCanceling()) {
+ PipelineDataConsistencyChecker checker = consistencyChecker.get();
+ if (null != checker && checker.isCanceling()) {
log.info("onFailure, canceling, check job id: {}, parent job
id: {}", checkJobId, parentJobId);
checkJobAPI.stop(checkJobId);
return;
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index 1995e6e13e6..134ca33febc 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -337,8 +337,8 @@ public final class MigrationJobAPI extends
AbstractInventoryIncrementalJobAPIImp
}
@Override
- protected PipelineDataConsistencyChecker
buildPipelineDataConsistencyChecker(final PipelineJobConfiguration
pipelineJobConfig, final InventoryIncrementalProcessContext processContext,
-
final ConsistencyCheckJobItemProgressContext progressContext) {
+ public PipelineDataConsistencyChecker
buildPipelineDataConsistencyChecker(final PipelineJobConfiguration
pipelineJobConfig, final InventoryIncrementalProcessContext processContext,
+
final ConsistencyCheckJobItemProgressContext progressContext) {
return new MigrationDataConsistencyChecker((MigrationJobConfiguration)
pipelineJobConfig, processContext, progressContext);
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
index 05bd0db0d81..c8dbbc1f6eb 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
@@ -39,20 +39,25 @@ import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDat
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyCheckParameter;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyCheckerFactory;
import
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
import
org.apache.shardingsphere.data.pipeline.core.exception.data.UnsupportedPipelineDatabaseTypeException;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
+import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.datanode.DataNode;
import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
+import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
/**
* Data consistency checker for migration job.
@@ -66,6 +71,8 @@ public final class MigrationDataConsistencyChecker implements
PipelineDataConsis
private final ConsistencyCheckJobItemProgressContext progressContext;
+ private final AtomicReference<TableDataConsistencyChecker>
currentTableChecker = new AtomicReference<>();
+
public MigrationDataConsistencyChecker(final MigrationJobConfiguration
jobConfig, final InventoryIncrementalProcessContext processContext,
final
ConsistencyCheckJobItemProgressContext progressContext) {
this.jobConfig = jobConfig;
@@ -74,9 +81,10 @@ public final class MigrationDataConsistencyChecker
implements PipelineDataConsis
}
@Override
- public Map<String, TableDataConsistencyCheckResult> check(final
TableDataConsistencyChecker tableChecker) {
- verifyPipelineDatabaseType(tableChecker,
jobConfig.getSources().values().iterator().next());
- verifyPipelineDatabaseType(tableChecker, jobConfig.getTarget());
+ public Map<String, TableDataConsistencyCheckResult> check(final String
algorithmType, final Properties algorithmProps) {
+ Collection<DatabaseType> supportedDatabaseTypes =
TableDataConsistencyCheckerFactory.newInstance(algorithmType,
algorithmProps).getSupportedDatabaseTypes();
+ verifyPipelineDatabaseType(supportedDatabaseTypes,
jobConfig.getSources().values().iterator().next());
+ verifyPipelineDatabaseType(supportedDatabaseTypes,
jobConfig.getTarget());
List<String> sourceTableNames = new LinkedList<>();
jobConfig.getJobShardingDataNodes().forEach(each ->
each.getEntries().forEach(entry -> entry.getDataNodes()
.forEach(dataNode ->
sourceTableNames.add(DataNodeUtils.formatWithSchema(dataNode)))));
@@ -87,7 +95,11 @@ public final class MigrationDataConsistencyChecker
implements PipelineDataConsis
try (PipelineDataSourceManager dataSourceManager = new
DefaultPipelineDataSourceManager()) {
AtomicBoolean checkFailed = new AtomicBoolean(false);
for (JobDataNodeLine each : jobConfig.getJobShardingDataNodes()) {
- each.getEntries().forEach(entry ->
entry.getDataNodes().forEach(dataNode -> check(tableChecker, result,
dataSourceManager, checkFailed, each, entry, dataNode)));
+ each.getEntries().forEach(entry ->
entry.getDataNodes().forEach(dataNode -> {
+ TableDataConsistencyChecker tableChecker =
TableDataConsistencyCheckerFactory.newInstance(algorithmType, algorithmProps);
+ currentTableChecker.set(tableChecker);
+ check(tableChecker, result, dataSourceManager,
checkFailed, each, entry, dataNode);
+ }));
}
}
return result;
@@ -123,8 +135,8 @@ public final class MigrationDataConsistencyChecker
implements PipelineDataConsis
return tableChecker.checkSingleTableInventoryData(param);
}
- private void verifyPipelineDatabaseType(final TableDataConsistencyChecker
tableChecker, final PipelineDataSourceConfiguration dataSourceConfig) {
-
ShardingSpherePreconditions.checkState(tableChecker.getSupportedDatabaseTypes().contains(dataSourceConfig.getDatabaseType()),
+ private void verifyPipelineDatabaseType(final Collection<DatabaseType>
supportedDatabaseTypes, final PipelineDataSourceConfiguration dataSourceConfig)
{
+
ShardingSpherePreconditions.checkState(supportedDatabaseTypes.contains(dataSourceConfig.getDatabaseType()),
() -> new
UnsupportedPipelineDatabaseTypeException(dataSourceConfig.getDatabaseType()));
}
@@ -132,4 +144,21 @@ public final class MigrationDataConsistencyChecker
implements PipelineDataConsis
Map<Integer, InventoryIncrementalJobItemProgress> jobProgress = new
MigrationJobAPI().getJobProgress(jobConfig);
return
jobProgress.values().stream().filter(Objects::nonNull).mapToLong(InventoryIncrementalJobItemProgress::getProcessedRecordsCount).sum();
}
+
+ @Override
+ public void cancel() {
+ TableDataConsistencyChecker tableChecker = currentTableChecker.get();
+ if (null != tableChecker) {
+ tableChecker.cancel();
+ }
+ }
+
+ @Override
+ public boolean isCanceling() {
+ TableDataConsistencyChecker tableChecker = currentTableChecker.get();
+ if (null != tableChecker) {
+ return tableChecker.isCanceling();
+ }
+ return false;
+ }
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
index 4598b363cc5..39418560e4f 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
@@ -32,7 +32,6 @@ import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.Consistency
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyContentCheckResult;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCountCheckResult;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
import
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
@@ -79,7 +78,6 @@ import java.util.stream.Stream;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -172,20 +170,14 @@ class MigrationJobAPITest {
assertThat(jobProgressMap.size(), is(1));
}
- @Test
- void assertBuildTableDataConsistencyCheckerWithNullType() {
- TableDataConsistencyChecker actual =
jobAPI.buildTableDataConsistencyChecker(null, null);
- assertInstanceOf(TableDataConsistencyChecker.class, actual);
- }
-
@Test
void assertDataConsistencyCheck() {
MigrationJobConfiguration jobConfig =
JobConfigurationBuilder.createJobConfiguration();
initTableData(jobConfig);
Optional<String> jobId = jobAPI.start(jobConfig);
assertTrue(jobId.isPresent());
- TableDataConsistencyChecker actual =
jobAPI.buildTableDataConsistencyChecker("FIXTURE", null);
- Map<String, TableDataConsistencyCheckResult> checkResultMap =
jobAPI.dataConsistencyCheck(jobConfig, actual, new
ConsistencyCheckJobItemProgressContext(jobId.get(), 0));
+ Map<String, TableDataConsistencyCheckResult> checkResultMap =
jobAPI.buildPipelineDataConsistencyChecker(
+ jobConfig, jobAPI.buildPipelineProcessContext(jobConfig), new
ConsistencyCheckJobItemProgressContext(jobId.get(), 0)).check("FIXTURE", null);
assertThat(checkResultMap.size(), is(1));
String checkKey = "ds_0.t_order";
assertTrue(checkResultMap.get(checkKey).getCountCheckResult().isMatched());
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
index 7d02e7b525e..5821727cf9c 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
@@ -32,7 +32,6 @@ import
org.apache.shardingsphere.data.pipeline.scenario.migration.context.Migrat
import
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
-import
org.apache.shardingsphere.test.it.data.pipeline.core.fixture.TableDataConsistencyCheckerFixture;
import
org.apache.shardingsphere.test.it.data.pipeline.core.util.JobConfigurationBuilder;
import
org.apache.shardingsphere.test.it.data.pipeline.core.util.PipelineContextUtils;
import org.junit.jupiter.api.BeforeAll;
@@ -65,7 +64,7 @@ class MigrationDataConsistencyCheckerTest {
governanceRepositoryAPI.persist(String.format("/pipeline/jobs/%s/config",
jobConfig.getJobId()), YamlEngine.marshal(jobConfigurationPOJO));
governanceRepositoryAPI.persistJobItemProgress(jobConfig.getJobId(),
0, "");
Map<String, TableDataConsistencyCheckResult> actual = new
MigrationDataConsistencyChecker(jobConfig, new
MigrationProcessContext(jobConfig.getJobId(), null),
- createConsistencyCheckJobItemProgressContext()).check(new
TableDataConsistencyCheckerFixture());
+
createConsistencyCheckJobItemProgressContext()).check("FIXTURE", null);
String checkKey = "ds_0.t_order";
assertTrue(actual.get(checkKey).getCountCheckResult().isMatched());
assertThat(actual.get(checkKey).getCountCheckResult().getSourceRecordsCount(),
is(actual.get(checkKey).getCountCheckResult().getTargetRecordsCount()));