This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 4c778a7eab7 Support cancel data consistency check and refactoring
(#21429)
4c778a7eab7 is described below
commit 4c778a7eab7e1c867c1803568fc1ac7030774b1b
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Mon Oct 10 13:53:13 2022 +0800
Support cancel data consistency check and refactoring (#21429)
* Move DataConsistencyCalculateAlgorithmFactory
* Add SingleTableInventoryDataConsistencyChecker
* Refactor DataConsistencyCalculateParameter.tableNameSchemaNameMapping to
schemaName
* Add DataConsistencyCalculatedResult, calculate count and content together
* Refactor SingleTableInventoryDataConsistencyChecker by
DataConsistencyCalculatedResult
* Remove buildCountSQL from PipelineSQLBuilder
* Unit test
* Revert "Remove buildCountSQL from PipelineSQLBuilder"
This reverts commit ea946fcce577fe2aee72592176657464f4952753.
* Add CRC32_MATCH in MySQLMigrationGeneralIT
* Add log when crc32 not match
* Add cancel() for DataConsistencyCalculateAlgorithm and abstract impl
* Recover PipelineTask start stop methods
* Add ConsistencyCheckTasksRunner and refactor ConsistencyCheckJob
* Refactor AbstractPipelineJob to use common JobBootstrap
* Cancel DataConsistencyCalculateAlgorithm on stopping
* Log
---
.../DataConsistencyCalculateParameter.java | 4 +-
.../DataConsistencyCalculatedResult.java | 21 +--
.../DataConsistencyCalculateAlgorithm.java | 12 +-
.../DataConsistencyCalculateAlgorithmFactory.java | 3 +-
.../spi/sqlbuilder/PipelineSQLBuilder.java | 1 +
.../core/api/InventoryIncrementalJobAPI.java | 15 +-
.../AbstractInventoryIncrementalJobAPIImpl.java | 31 ++--
.../DataConsistencyCalculateAlgorithmChooser.java | 1 +
...SingleTableInventoryDataConsistencyChecker.java | 148 ++++++++++++++++++
.../AbstractDataConsistencyCalculateAlgorithm.java | 63 ++++++++
...StreamingDataConsistencyCalculateAlgorithm.java | 20 +--
...RC32MatchDataConsistencyCalculateAlgorithm.java | 76 ++++++++--
...DataMatchDataConsistencyCalculateAlgorithm.java | 27 ++--
.../PipelineJobHasAlreadyFinishedException.java | 2 +-
.../pipeline/core/job/AbstractPipelineJob.java | 4 +-
.../data/pipeline/core/task/IncrementalTask.java | 10 +-
.../data/pipeline/core/task/InventoryTask.java | 10 +-
.../data/pipeline/core/task/PipelineTask.java | 14 ++
...tencyCheckChangedJobConfigurationProcessor.java | 2 +-
.../consistencycheck/ConsistencyCheckJob.java | 61 +++-----
.../ConsistencyCheckTasksRunner.java | 139 +++++++++++++++++
.../MigrationChangedJobConfigurationProcessor.java | 2 +-
.../migration/MigrationDataConsistencyChecker.java | 167 +++------------------
.../pipeline/scenario/migration/MigrationJob.java | 4 +-
...MatchDataConsistencyCalculateAlgorithmTest.java | 17 +--
.../mysql/sqlbuilder/MySQLPipelineSQLBuilder.java | 2 +-
.../sqlbuilder/MySQLPipelineSQLBuilderTest.java | 2 +-
.../migration/general/MySQLMigrationGeneralIT.java | 9 +-
...taConsistencyCalculateAlgorithmFactoryTest.java | 1 +
.../DataConsistencyCalculateAlgorithmFixture.java | 9 +-
.../FixtureDataConsistencyCalculatedResult.java} | 19 +--
31 files changed, 598 insertions(+), 298 deletions(-)
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculateParameter.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculateParameter.java
index 7463fadae10..154643bdd59 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculateParameter.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculateParameter.java
@@ -22,7 +22,6 @@ import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.ToString;
-import
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
@@ -43,8 +42,7 @@ public final class DataConsistencyCalculateParameter {
*/
private final PipelineDataSourceWrapper dataSource;
- // TODO replace to schemaName
- private final TableNameSchemaNameMapping tableNameSchemaNameMapping;
+ private final String schemaName;
private final String logicTableName;
diff --git
a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmFactoryTest.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculatedResult.java
similarity index 70%
copy from
test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmFactoryTest.java
copy to
kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculatedResult.java
index 3d1c2827377..ea34f78d60e 100644
---
a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmFactoryTest.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculatedResult.java
@@ -15,16 +15,17 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.check.consistency;
+package org.apache.shardingsphere.data.pipeline.api.check.consistency;
-import org.junit.Test;
-
-import java.util.Properties;
-
-public final class DataConsistencyCalculateAlgorithmFactoryTest {
+/**
+ * Data consistency calculated result.
+ */
+public interface DataConsistencyCalculatedResult {
- @Test
- public void assertNewInstanceSuccess() {
- DataConsistencyCalculateAlgorithmFactory.newInstance("FIXTURE", new
Properties());
- }
+ /**
+ * Get records count.
+ *
+ * @return records count
+ */
+ int getRecordsCount();
}
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/check/consistency/DataConsistencyCalculateAlgorithm.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/check/consistency/DataConsistencyCalculateAlgorithm.java
index 368b501ee5a..6ee50d96e10 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/check/consistency/DataConsistencyCalculateAlgorithm.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/check/consistency/DataConsistencyCalculateAlgorithm.java
@@ -18,9 +18,12 @@
package org.apache.shardingsphere.data.pipeline.spi.check.consistency;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculateParameter;
+import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculatedResult;
import
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithm;
import org.apache.shardingsphere.infra.util.spi.aware.SPIMetadataAware;
+import java.sql.SQLException;
+
/**
* Data consistency calculate algorithm.
*/
@@ -32,5 +35,12 @@ public interface DataConsistencyCalculateAlgorithm extends
ShardingSphereAlgorit
* @param parameter data consistency calculate parameter
* @return calculated result
*/
- Iterable<Object> calculate(DataConsistencyCalculateParameter parameter);
+ Iterable<DataConsistencyCalculatedResult>
calculate(DataConsistencyCalculateParameter parameter);
+
+ /**
+ * Cancel calculation.
+ *
+ * @throws SQLException SQL exception if cancel underlying SQL execution
failure
+ */
+ void cancel() throws SQLException;
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmFactory.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/check/consistency/DataConsistencyCalculateAlgorithmFactory.java
similarity index 92%
rename from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmFactory.java
rename to
kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/check/consistency/DataConsistencyCalculateAlgorithmFactory.java
index a37556b792f..8db297edc3c 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmFactory.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/check/consistency/DataConsistencyCalculateAlgorithmFactory.java
@@ -15,9 +15,8 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.check.consistency;
+package org.apache.shardingsphere.data.pipeline.spi.check.consistency;
-import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
import
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmFactory;
import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
index 0a1299318a7..c5663ba699b 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
@@ -124,6 +124,7 @@ public interface PipelineSQLBuilder extends TypedSPI,
RequiredSPI {
* @param tableName table name
* @return count SQL
*/
+ // TODO keep it for now, it might be used later
String buildCountSQL(String schemaName, String tableName);
/**
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
index fdf355a8a4f..077d08c7281 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
@@ -20,8 +20,10 @@ package org.apache.shardingsphere.data.pipeline.core.api;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
import
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
+import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
import java.util.Map;
+import java.util.Properties;
/**
* Inventory incremental job API.
@@ -39,13 +41,24 @@ public interface InventoryIncrementalJobAPI extends
PipelineJobAPI {
@Override
InventoryIncrementalJobItemProgress getJobItemProgress(String jobId, int
shardingItem);
+ /**
+ * Build data consistency calculate algorithm.
+ *
+ * @param jobConfig job configuration
+ * @param algorithmType algorithm type
+ * @param algorithmProps algorithm properties
+ * @return calculate algorithm
+ */
+ DataConsistencyCalculateAlgorithm
buildDataConsistencyCalculateAlgorithm(PipelineJobConfiguration jobConfig,
String algorithmType, Properties algorithmProps);
+
/**
* Do data consistency check.
*
* @param pipelineJobConfig job configuration
+ * @param calculateAlgorithm calculate algorithm
* @return each logic table check result
*/
- Map<String, DataConsistencyCheckResult>
dataConsistencyCheck(PipelineJobConfiguration pipelineJobConfig);
+ Map<String, DataConsistencyCheckResult>
dataConsistencyCheck(PipelineJobConfiguration pipelineJobConfig,
DataConsistencyCalculateAlgorithm calculateAlgorithm);
/**
* Aggregate data consistency check results.
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
index acb875a682a..b98eb85cf8f 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -23,20 +23,20 @@ import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsist
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.PipelineDataConsistencyChecker;
import
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
-import
org.apache.shardingsphere.data.pipeline.yaml.process.YamlPipelineProcessConfiguration;
-import
org.apache.shardingsphere.data.pipeline.yaml.process.YamlPipelineProcessConfigurationSwapper;
import
org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
import
org.apache.shardingsphere.data.pipeline.core.api.InventoryIncrementalJobAPI;
import
org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmChooser;
-import
org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmFactory;
import
org.apache.shardingsphere.data.pipeline.core.config.process.PipelineProcessConfigurationUtil;
import
org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
import
org.apache.shardingsphere.data.pipeline.core.exception.metadata.AlterNotExistProcessConfigurationException;
import
org.apache.shardingsphere.data.pipeline.core.exception.metadata.CreateExistsProcessConfigurationException;
import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
+import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithmFactory;
+import
org.apache.shardingsphere.data.pipeline.yaml.process.YamlPipelineProcessConfiguration;
+import
org.apache.shardingsphere.data.pipeline.yaml.process.YamlPipelineProcessConfigurationSwapper;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
import
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
@@ -150,19 +150,23 @@ public abstract class
AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
}).collect(Collectors.toList());
}
+ @Override
+ public DataConsistencyCalculateAlgorithm
buildDataConsistencyCalculateAlgorithm(final PipelineJobConfiguration
jobConfig, final String algorithmType, final Properties algorithmProps) {
+ ShardingSpherePreconditions.checkState(null != algorithmType || null
!= jobConfig, () -> new IllegalArgumentException("algorithmType and jobConfig
are null"));
+ if (null != algorithmType) {
+ return
DataConsistencyCalculateAlgorithmFactory.newInstance(algorithmType,
algorithmProps);
+ }
+ return DataConsistencyCalculateAlgorithmChooser.choose(
+
DatabaseTypeFactory.getInstance(jobConfig.getSourceDatabaseType()),
DatabaseTypeFactory.getInstance(getTargetDatabaseType(jobConfig)));
+ }
+
@Override
public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final
String jobId) {
checkModeConfig();
log.info("Data consistency check for job {}", jobId);
PipelineJobConfiguration jobConfig =
getJobConfiguration(getElasticJobConfigPOJO(jobId));
- return dataConsistencyCheck(jobConfig);
- }
-
- @Override
- public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final
PipelineJobConfiguration jobConfig) {
- DataConsistencyCalculateAlgorithm algorithm =
DataConsistencyCalculateAlgorithmChooser.choose(
-
DatabaseTypeFactory.getInstance(jobConfig.getSourceDatabaseType()),
DatabaseTypeFactory.getInstance(getTargetDatabaseType(jobConfig)));
- return dataConsistencyCheck(jobConfig, algorithm);
+ DataConsistencyCalculateAlgorithm calculateAlgorithm =
buildDataConsistencyCalculateAlgorithm(jobConfig, null, null);
+ return dataConsistencyCheck(jobConfig, calculateAlgorithm);
}
@Override
@@ -170,10 +174,11 @@ public abstract class
AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
checkModeConfig();
log.info("Data consistency check for job {}, algorithmType: {}",
jobId, algorithmType);
PipelineJobConfiguration jobConfig =
getJobConfiguration(getElasticJobConfigPOJO(jobId));
- return dataConsistencyCheck(jobConfig,
DataConsistencyCalculateAlgorithmFactory.newInstance(algorithmType,
algorithmProps));
+ return dataConsistencyCheck(jobConfig,
buildDataConsistencyCalculateAlgorithm(jobConfig, algorithmType,
algorithmProps));
}
- protected Map<String, DataConsistencyCheckResult>
dataConsistencyCheck(final PipelineJobConfiguration jobConfig, final
DataConsistencyCalculateAlgorithm calculateAlgorithm) {
+ @Override
+ public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final
PipelineJobConfiguration jobConfig, final DataConsistencyCalculateAlgorithm
calculateAlgorithm) {
String jobId = jobConfig.getJobId();
Map<String, DataConsistencyCheckResult> result =
buildPipelineDataConsistencyChecker(jobConfig,
buildPipelineProcessContext(jobConfig)).check(calculateAlgorithm);
log.info("job {} with check algorithm '{}' data consistency checker
result {}", jobId, calculateAlgorithm.getType(), result);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmChooser.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmChooser.java
index 5e50032e3eb..499dfa006c5 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmChooser.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmChooser.java
@@ -20,6 +20,7 @@ package
org.apache.shardingsphere.data.pipeline.core.check.consistency;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
+import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithmFactory;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
new file mode 100644
index 00000000000..3125fc778fc
--- /dev/null
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.check.consistency;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculateParameter;
+import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculatedResult;
+import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyContentCheckResult;
+import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
+import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
+import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
+import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaTableName;
+import
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
+import
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
+import
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
+import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLException;
+import
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
+import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
+import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
+import
org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
+import
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
+import
org.apache.shardingsphere.infra.util.exception.external.sql.type.wrapper.SQLWrapperException;
+
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Single table inventory data consistency checker.
+ */
+@Slf4j
+@RequiredArgsConstructor
+public final class SingleTableInventoryDataConsistencyChecker {
+
+ private final String jobId;
+
+ private final PipelineDataSourceWrapper sourceDataSource;
+
+ private final PipelineDataSourceWrapper targetDataSource;
+
+ private final SchemaTableName sourceTable;
+
+ private final SchemaTableName targetTable;
+
+ private final PipelineColumnMetaData uniqueKey;
+
+ private final PipelineTableMetaDataLoader metaDataLoader;
+
+ private final JobRateLimitAlgorithm readRateLimitAlgorithm;
+
+ /**
+ * Data consistency check.
+ *
+ * @param calculateAlgorithm calculate algorithm
+ * @return data consistency check result
+ */
+ public DataConsistencyCheckResult check(final
DataConsistencyCalculateAlgorithm calculateAlgorithm) {
+ ThreadFactory threadFactory =
ExecutorThreadFactoryBuilder.build("job-" + getJobIdDigest(jobId) +
"-check-%d");
+ ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
+ try {
+ return check(calculateAlgorithm, executor);
+ } finally {
+ executor.shutdown();
+ executor.shutdownNow();
+ }
+ }
+
+ private DataConsistencyCheckResult check(final
DataConsistencyCalculateAlgorithm calculateAlgorithm, final ThreadPoolExecutor
executor) {
+ String sourceDatabaseType =
sourceDataSource.getDatabaseType().getType();
+ String targetDatabaseType =
targetDataSource.getDatabaseType().getType();
+ String sourceTableName = sourceTable.getTableName().getOriginal();
+ PipelineTableMetaData tableMetaData =
metaDataLoader.getTableMetaData(sourceTable.getSchemaName().getOriginal(),
sourceTableName);
+ ShardingSpherePreconditions.checkNotNull(tableMetaData, () -> new
PipelineTableDataConsistencyCheckLoadingFailedException(sourceTableName));
+ Collection<String> columnNames = tableMetaData.getColumnNames();
+ DataConsistencyCalculateParameter sourceParameter = buildParameter(
+ sourceDataSource, sourceTable.getSchemaName().getOriginal(),
sourceTableName, columnNames, sourceDatabaseType, targetDatabaseType,
uniqueKey);
+ DataConsistencyCalculateParameter targetParameter = buildParameter(
+ targetDataSource, targetTable.getSchemaName().getOriginal(),
targetTable.getTableName().getOriginal(), columnNames, targetDatabaseType,
sourceDatabaseType, uniqueKey);
+ Iterator<DataConsistencyCalculatedResult> sourceCalculatedResults =
calculateAlgorithm.calculate(sourceParameter).iterator();
+ Iterator<DataConsistencyCalculatedResult> targetCalculatedResults =
calculateAlgorithm.calculate(targetParameter).iterator();
+ long sourceRecordsCount = 0;
+ long targetRecordsCount = 0;
+ boolean contentMatched = true;
+ while (sourceCalculatedResults.hasNext() &&
targetCalculatedResults.hasNext()) {
+ if (null != readRateLimitAlgorithm) {
+ readRateLimitAlgorithm.intercept(JobOperationType.SELECT, 1);
+ }
+ Future<DataConsistencyCalculatedResult> sourceFuture =
executor.submit(sourceCalculatedResults::next);
+ Future<DataConsistencyCalculatedResult> targetFuture =
executor.submit(targetCalculatedResults::next);
+ DataConsistencyCalculatedResult sourceCalculatedResult =
waitFuture(sourceFuture);
+ DataConsistencyCalculatedResult targetCalculatedResult =
waitFuture(targetFuture);
+ sourceRecordsCount += sourceCalculatedResult.getRecordsCount();
+ targetRecordsCount += targetCalculatedResult.getRecordsCount();
+ contentMatched = Objects.equals(sourceCalculatedResult,
targetCalculatedResult);
+ if (!contentMatched) {
+ log.info("content matched false, jobId={}, sourceTable={},
targetTable={}, uniqueKey={}", jobId, sourceTable, targetTable, uniqueKey);
+ break;
+ }
+ }
+ return new DataConsistencyCheckResult(new
DataConsistencyCountCheckResult(sourceRecordsCount, targetRecordsCount), new
DataConsistencyContentCheckResult(contentMatched));
+ }
+
+ // TODO use digest (crc32, murmurhash)
+ private String getJobIdDigest(final String jobId) {
+ return jobId.length() <= 6 ? jobId : jobId.substring(0, 6);
+ }
+
+ private DataConsistencyCalculateParameter buildParameter(final
PipelineDataSourceWrapper sourceDataSource,
+ final String
schemaName, final String tableName, final Collection<String> columnNames,
+ final String
sourceDatabaseType, final String targetDatabaseType, final
PipelineColumnMetaData uniqueKey) {
+ return new DataConsistencyCalculateParameter(sourceDataSource,
schemaName, tableName, columnNames, sourceDatabaseType, targetDatabaseType,
uniqueKey);
+ }
+
+ private <T> T waitFuture(final Future<T> future) {
+ try {
+ return future.get();
+ } catch (final InterruptedException | ExecutionException ex) {
+ if (ex.getCause() instanceof PipelineSQLException) {
+ throw (PipelineSQLException) ex.getCause();
+ }
+ throw new SQLWrapperException(new SQLException(ex));
+ }
+ }
+}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/AbstractDataConsistencyCalculateAlgorithm.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/AbstractDataConsistencyCalculateAlgorithm.java
new file mode 100644
index 00000000000..7e14ced4342
--- /dev/null
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/AbstractDataConsistencyCalculateAlgorithm.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.data.pipeline.core.check.consistency.algorithm;
+
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
+
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.sql.Statement;
+
+/**
+ * Abstract data consistency calculate algorithm.
+ */
+@Slf4j
+public abstract class AbstractDataConsistencyCalculateAlgorithm implements
DataConsistencyCalculateAlgorithm {
+
+ @Getter(AccessLevel.PROTECTED)
+ private volatile boolean canceling;
+
+ private volatile Statement currentStatement;
+
+ protected <T extends Statement> T setCurrentStatement(final T statement) {
+ this.currentStatement = statement;
+ return statement;
+ }
+
+ @Override
+ public void cancel() throws SQLException {
+ canceling = true;
+ Statement statement = currentStatement;
+ if (null == statement || statement.isClosed()) {
+ log.info("cancel, statement is null or closed");
+ return;
+ }
+ long startTimeMillis = System.currentTimeMillis();
+ try {
+ statement.cancel();
+ } catch (final SQLFeatureNotSupportedException ex) {
+ log.info("cancel is not supported: {}", ex.getMessage());
+ } catch (final SQLException ex) {
+ log.info("cancel failed: {}", ex.getMessage());
+ }
+ log.info("cancel cost {} ms", System.currentTimeMillis() -
startTimeMillis);
+ }
+}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/AbstractStreamingDataConsistencyCalculateAlgorithm.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/AbstractStreamingDataConsistencyCalculateAlgorithm.java
index e35afffba1c..c1c18347e88 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/AbstractStreamingDataConsistencyCalculateAlgorithm.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/AbstractStreamingDataConsistencyCalculateAlgorithm.java
@@ -21,7 +21,7 @@ import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculateParameter;
-import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
+import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculatedResult;
import java.util.Iterator;
import java.util.Optional;
@@ -33,10 +33,10 @@ import java.util.concurrent.atomic.AtomicInteger;
@RequiredArgsConstructor
@Getter
@Slf4j
-public abstract class AbstractStreamingDataConsistencyCalculateAlgorithm
implements DataConsistencyCalculateAlgorithm {
+public abstract class AbstractStreamingDataConsistencyCalculateAlgorithm
extends AbstractDataConsistencyCalculateAlgorithm {
@Override
- public final Iterable<Object> calculate(final
DataConsistencyCalculateParameter parameter) {
+ public final Iterable<DataConsistencyCalculatedResult> calculate(final
DataConsistencyCalculateParameter parameter) {
return new ResultIterable(parameter);
}
@@ -46,30 +46,30 @@ public abstract class
AbstractStreamingDataConsistencyCalculateAlgorithm impleme
* @param parameter data consistency calculate parameter
* @return optional calculated result, empty means there's no more result
*/
- protected abstract Optional<Object>
calculateChunk(DataConsistencyCalculateParameter parameter);
+ protected abstract Optional<DataConsistencyCalculatedResult>
calculateChunk(DataConsistencyCalculateParameter parameter);
/**
* It's not thread-safe, it should be executed in only one thread at the
same time.
*/
@RequiredArgsConstructor
- final class ResultIterable implements Iterable<Object> {
+ final class ResultIterable implements
Iterable<DataConsistencyCalculatedResult> {
private final DataConsistencyCalculateParameter parameter;
@Override
- public Iterator<Object> iterator() {
+ public Iterator<DataConsistencyCalculatedResult> iterator() {
return new ResultIterator(parameter);
}
}
@RequiredArgsConstructor
- final class ResultIterator implements Iterator<Object> {
+ final class ResultIterator implements
Iterator<DataConsistencyCalculatedResult> {
private final DataConsistencyCalculateParameter parameter;
private final AtomicInteger calculationCount = new AtomicInteger(0);
- private volatile Optional<Object> nextResult;
+ private volatile Optional<DataConsistencyCalculatedResult> nextResult;
@Override
public boolean hasNext() {
@@ -78,9 +78,9 @@ public abstract class
AbstractStreamingDataConsistencyCalculateAlgorithm impleme
}
@Override
- public Object next() {
+ public DataConsistencyCalculatedResult next() {
calculateIfNecessary();
- Optional<Object> nextResult = this.nextResult;
+ Optional<DataConsistencyCalculatedResult> nextResult =
this.nextResult;
parameter.setPreviousCalculatedResult(nextResult.orElse(null));
this.nextResult = null;
return nextResult.orElse(null);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
index b2eeac94959..230298808c9 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
@@ -18,11 +18,14 @@
package
org.apache.shardingsphere.data.pipeline.core.check.consistency.algorithm;
import lombok.Getter;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculateParameter;
+import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculatedResult;
import
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
import
org.apache.shardingsphere.data.pipeline.core.exception.data.UnsupportedCRC32DataConsistencyCalculateAlgorithmException;
import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
-import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
import
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
@@ -34,6 +37,7 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Collections;
+import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;
@@ -41,11 +45,12 @@ import java.util.stream.Collectors;
/**
* CRC32 match data consistency calculate algorithm.
*/
-@Getter
-public final class CRC32MatchDataConsistencyCalculateAlgorithm implements
DataConsistencyCalculateAlgorithm {
+@Slf4j
+public final class CRC32MatchDataConsistencyCalculateAlgorithm extends
AbstractDataConsistencyCalculateAlgorithm {
private static final Collection<String> SUPPORTED_DATABASE_TYPES =
Collections.singletonList(new MySQLDatabaseType().getType());
+ @Getter
private Properties props;
@Override
@@ -54,26 +59,29 @@ public final class
CRC32MatchDataConsistencyCalculateAlgorithm implements DataCo
}
@Override
- public Iterable<Object> calculate(final DataConsistencyCalculateParameter
parameter) {
+ public Iterable<DataConsistencyCalculatedResult> calculate(final
DataConsistencyCalculateParameter parameter) {
PipelineSQLBuilder sqlBuilder =
PipelineSQLBuilderFactory.getInstance(parameter.getDatabaseType());
- return
Collections.unmodifiableList(parameter.getColumnNames().stream().map(each ->
calculateCRC32(sqlBuilder, parameter, each)).collect(Collectors.toList()));
+ List<CalculatedItem> calculatedItems =
parameter.getColumnNames().stream().map(each -> calculateCRC32(sqlBuilder,
parameter, each)).collect(Collectors.toList());
+ return Collections.singletonList(new
CalculatedResult(calculatedItems.get(0).getRecordsCount(),
calculatedItems.stream().map(CalculatedItem::getCrc32).collect(Collectors.toList())));
}
- private long calculateCRC32(final PipelineSQLBuilder sqlBuilder, final
DataConsistencyCalculateParameter parameter, final String columnName) {
+ private CalculatedItem calculateCRC32(final PipelineSQLBuilder sqlBuilder,
final DataConsistencyCalculateParameter parameter, final String columnName) {
String logicTableName = parameter.getLogicTableName();
- String schemaName =
parameter.getTableNameSchemaNameMapping().getSchemaName(logicTableName);
+ String schemaName = parameter.getSchemaName();
Optional<String> sql = sqlBuilder.buildCRC32SQL(schemaName,
logicTableName, columnName);
ShardingSpherePreconditions.checkState(sql.isPresent(), () -> new
UnsupportedCRC32DataConsistencyCalculateAlgorithmException(parameter.getDatabaseType()));
return calculateCRC32(parameter.getDataSource(), logicTableName,
sql.get());
}
- private long calculateCRC32(final DataSource dataSource, final String
logicTableName, final String sql) {
+ private CalculatedItem calculateCRC32(final DataSource dataSource, final
String logicTableName, final String sql) {
try (
Connection connection = dataSource.getConnection();
- PreparedStatement preparedStatement =
connection.prepareStatement(sql);
+ PreparedStatement preparedStatement =
setCurrentStatement(connection.prepareStatement(sql));
ResultSet resultSet = preparedStatement.executeQuery()) {
resultSet.next();
- return resultSet.getLong(1);
+ long crc32 = resultSet.getLong(1);
+ int recordsCount = resultSet.getInt(2);
+ return new CalculatedItem(crc32, recordsCount);
} catch (final SQLException ex) {
throw new
PipelineTableDataConsistencyCheckLoadingFailedException(logicTableName);
}
@@ -93,4 +101,52 @@ public final class
CRC32MatchDataConsistencyCalculateAlgorithm implements DataCo
public String getDescription() {
return "Match CRC32 of records.";
}
+
+ @RequiredArgsConstructor
+ @Getter
+ private static final class CalculatedItem {
+
+ private final long crc32;
+
+ private final int recordsCount;
+ }
+
+ @RequiredArgsConstructor
+ @Getter
+ private static final class CalculatedResult implements
DataConsistencyCalculatedResult {
+
+ private final int recordsCount;
+
+ @NonNull
+ private final Collection<Long> columnsCrc32;
+
+ @Override
+ public boolean equals(final @NonNull Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (getClass() != o.getClass()) {
+ log.warn("CalculatedResult type not match, o.className={}",
o.getClass().getName());
+ return false;
+ }
+ final CalculatedResult that = (CalculatedResult) o;
+ if (recordsCount != that.recordsCount) {
+ log.info("recordsCount not match, recordsCount={},
that.recordsCount={}", recordsCount, that.recordsCount);
+ return false;
+ }
+ if (!columnsCrc32.equals(that.columnsCrc32)) {
+ log.info("columnsCrc32 not match, columnsCrc32={},
that.columnsCrc32={}", columnsCrc32, that.columnsCrc32);
+ return false;
+ } else {
+ return true;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ int result = recordsCount;
+ result = 31 * result + columnsCrc32.hashCode();
+ return result;
+ }
+ }
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
index da0dbf28569..a2df6b70b79 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
@@ -25,6 +25,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculateParameter;
+import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculatedResult;
import
org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCheckUtils;
import
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.ColumnValueReaderFactory;
@@ -87,12 +88,12 @@ public final class
DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
}
@Override
- protected Optional<Object> calculateChunk(final
DataConsistencyCalculateParameter parameter) {
+ protected Optional<DataConsistencyCalculatedResult> calculateChunk(final
DataConsistencyCalculateParameter parameter) {
CalculatedResult previousCalculatedResult = (CalculatedResult)
parameter.getPreviousCalculatedResult();
String sql = getQuerySQL(parameter);
try (
Connection connection =
parameter.getDataSource().getConnection();
- PreparedStatement preparedStatement =
connection.prepareStatement(sql)) {
+ PreparedStatement preparedStatement =
setCurrentStatement(connection.prepareStatement(sql))) {
preparedStatement.setFetchSize(chunkSize);
if (null == previousCalculatedResult) {
preparedStatement.setInt(1, chunkSize);
@@ -105,6 +106,10 @@ public final class
DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
try (ResultSet resultSet = preparedStatement.executeQuery()) {
ColumnValueReader columnValueReader =
ColumnValueReaderFactory.getInstance(parameter.getDatabaseType());
while (resultSet.next()) {
+ if (isCanceling()) {
+ log.info("canceling, schemaName={}, tableName={}",
parameter.getSchemaName(), parameter.getLogicTableName());
+ throw new
PipelineTableDataConsistencyCheckLoadingFailedException(parameter.getLogicTableName());
+ }
ResultSetMetaData resultSetMetaData =
resultSet.getMetaData();
int columnCount = resultSetMetaData.getColumnCount();
Collection<Object> record = new LinkedList<>();
@@ -124,11 +129,11 @@ public final class
DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
private String getQuerySQL(final DataConsistencyCalculateParameter
parameter) {
PipelineSQLBuilder sqlBuilder =
PipelineSQLBuilderFactory.getInstance(parameter.getDatabaseType());
String logicTableName = parameter.getLogicTableName();
- String schemaName =
parameter.getTableNameSchemaNameMapping().getSchemaName(logicTableName);
+ String schemaName = parameter.getSchemaName();
String uniqueKey = parameter.getUniqueKey().getName();
- String cacheKey = parameter.getDatabaseType() + "-" +
(DatabaseTypeFactory.getInstance(parameter.getDatabaseType()).isSchemaAvailable()
- ? schemaName.toLowerCase() + "." + logicTableName.toLowerCase()
- : logicTableName.toLowerCase());
+ String cacheKey = parameter.getDatabaseType() + "-" + (null !=
schemaName &&
DatabaseTypeFactory.getInstance(parameter.getDatabaseType()).isSchemaAvailable()
+ ? schemaName + "." + logicTableName
+ : logicTableName);
if (null == parameter.getPreviousCalculatedResult()) {
return firstSQLCache.computeIfAbsent(cacheKey, s ->
sqlBuilder.buildChunkedQuerySQL(schemaName, logicTableName, uniqueKey, true));
}
@@ -152,12 +157,12 @@ public final class
DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
@RequiredArgsConstructor
@Getter
- private static final class CalculatedResult {
+ private static final class CalculatedResult implements
DataConsistencyCalculatedResult {
@NonNull
private final Object maxUniqueKeyValue;
- private final int recordCount;
+ private final int recordsCount;
private final Collection<Collection<Object>> records;
@@ -172,10 +177,10 @@ public final class
DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
return false;
}
final CalculatedResult that = (CalculatedResult) o;
- boolean equalsFirst = new EqualsBuilder().append(getRecordCount(),
that.getRecordCount()).append(getMaxUniqueKeyValue(),
that.getMaxUniqueKeyValue()).isEquals();
+ boolean equalsFirst = new
EqualsBuilder().append(getRecordsCount(),
that.getRecordsCount()).append(getMaxUniqueKeyValue(),
that.getMaxUniqueKeyValue()).isEquals();
if (!equalsFirst) {
log.warn("recordCount or maxUniqueKeyValue not match,
recordCount1={}, recordCount2={}, maxUniqueKeyValue1={}, maxUniqueKeyValue2={}",
- getRecordCount(), that.getRecordCount(),
getMaxUniqueKeyValue(), that.getMaxUniqueKeyValue());
+ getRecordsCount(), that.getRecordsCount(),
getMaxUniqueKeyValue(), that.getMaxUniqueKeyValue());
return false;
}
Iterator<Collection<Object>> thisIterator =
this.records.iterator();
@@ -218,7 +223,7 @@ public final class
DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
@Override
public int hashCode() {
- return new HashCodeBuilder(17,
37).append(getMaxUniqueKeyValue()).append(getRecordCount()).append(getRecords()).toHashCode();
+ return new HashCodeBuilder(17,
37).append(getMaxUniqueKeyValue()).append(getRecordsCount()).append(getRecords()).toHashCode();
}
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobHasAlreadyFinishedException.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobHasAlreadyFinishedException.java
index edbd4afb2f7..afdbcce0856 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobHasAlreadyFinishedException.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobHasAlreadyFinishedException.java
@@ -28,6 +28,6 @@ public final class PipelineJobHasAlreadyFinishedException
extends PipelineSQLExc
private static final long serialVersionUID = 6881217592831423520L;
public PipelineJobHasAlreadyFinishedException(final String jobId) {
- super(XOpenSQLState.GENERAL_ERROR, 95, "Job has already finished,
please run `CHECK MIGRATION %s` to start a new data consistency check job.",
jobId);
+ super(XOpenSQLState.GENERAL_ERROR, 95, "Data consistency check job has
already finished.", jobId);
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
index 7d14d2707e0..d48f4467a0c 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
@@ -22,7 +22,7 @@ import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.job.PipelineJob;
import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
-import
org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
+import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.JobBootstrap;
import java.util.Map;
import java.util.Optional;
@@ -42,7 +42,7 @@ public abstract class AbstractPipelineJob implements
PipelineJob {
private volatile boolean stopping;
@Setter
- private volatile OneOffJobBootstrap oneOffJobBootstrap;
+ private volatile JobBootstrap jobBootstrap;
private final Map<Integer, PipelineTasksRunner> tasksRunnerMap = new
ConcurrentHashMap<>();
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index 9e7363970a0..8b435383e69 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -104,11 +104,7 @@ public final class IncrementalTask implements
PipelineTask, AutoCloseable {
});
}
- /**
- * Start.
- *
- * @return future
- */
+ @Override
public CompletableFuture<?> start() {
taskProgress.getIncrementalTaskDelay().setLatestActiveTimeMillis(System.currentTimeMillis());
CompletableFuture<?> dumperFuture =
incrementalExecuteEngine.submit(dumper, new ExecuteCallback() {
@@ -140,9 +136,7 @@ public final class IncrementalTask implements PipelineTask,
AutoCloseable {
return CompletableFuture.allOf(dumperFuture, importerFuture);
}
- /**
- * Stop.
- */
+ @Override
public void stop() {
dumper.stop();
for (Importer each : importers) {
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
index ca90aeb859e..a69ee2d121d 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
@@ -83,11 +83,7 @@ public final class InventoryTask implements PipelineTask,
AutoCloseable {
return null == inventoryDumperConfig.getShardingItem() ? result :
result + "#" + inventoryDumperConfig.getShardingItem();
}
- /**
- * Start.
- *
- * @return future
- */
+ @Override
public CompletableFuture<?> start() {
CompletableFuture<?> dumperFuture =
inventoryDumperExecuteEngine.submit(dumper, new ExecuteCallback() {
@@ -138,9 +134,7 @@ public final class InventoryTask implements PipelineTask,
AutoCloseable {
return null;
}
- /**
- * Stop.
- */
+ @Override
public void stop() {
dumper.stop();
importer.stop();
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java
index ebc5a7e0e33..45dabe9ad68 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java
@@ -19,11 +19,25 @@ package org.apache.shardingsphere.data.pipeline.core.task;
import org.apache.shardingsphere.data.pipeline.api.task.progress.TaskProgress;
+import java.util.concurrent.CompletableFuture;
+
/**
* Pipeline task interface.
*/
public interface PipelineTask {
+ /**
+ * Start task.
+ *
+ * @return future
+ */
+ CompletableFuture<?> start();
+
+ /**
+ * Stop task.
+ */
+ void stop();
+
/**
* Get task id.
*
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java
index 8d71eae5a7c..406fe64310c 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java
@@ -70,7 +70,7 @@ public final class
ConsistencyCheckChangedJobConfigurationProcessor implements P
ConsistencyCheckJob job = new ConsistencyCheckJob();
PipelineJobCenter.addJob(jobConfigPOJO.getJobName(), job);
OneOffJobBootstrap oneOffJobBootstrap = new
OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(), job,
jobConfigPOJO.toJobConfiguration());
- job.setOneOffJobBootstrap(oneOffJobBootstrap);
+ job.setJobBootstrap(oneOffJobBootstrap);
oneOffJobBootstrap.execute();
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
index 77355f5281d..b86955e252d 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
@@ -18,26 +18,16 @@
package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
-import
org.apache.shardingsphere.data.pipeline.api.InventoryIncrementalJobPublicAPI;
-import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
-import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
import
org.apache.shardingsphere.data.pipeline.api.config.job.ConsistencyCheckJobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.yaml.job.YamlConsistencyCheckJobConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
import org.apache.shardingsphere.data.pipeline.api.job.PipelineJob;
-import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
-import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
import
org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
+import
org.apache.shardingsphere.data.pipeline.yaml.job.YamlConsistencyCheckJobConfigurationSwapper;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
-import
org.apache.shardingsphere.infra.util.exception.external.sql.type.wrapper.SQLWrapperException;
-
-import java.util.Collections;
-import java.util.Map;
/**
* Consistency check job.
@@ -45,50 +35,43 @@ import java.util.Map;
@Slf4j
public final class ConsistencyCheckJob extends AbstractPipelineJob implements
SimpleJob, PipelineJob {
- private final ConsistencyCheckJobAPI jobAPI =
ConsistencyCheckJobAPIFactory.getInstance();
-
private final PipelineDistributedBarrier pipelineDistributedBarrier =
PipelineDistributedBarrier.getInstance();
@Override
public void execute(final ShardingContext shardingContext) {
String checkJobId = shardingContext.getJobName();
+ int shardingItem = shardingContext.getShardingItem();
+ log.info("Execute job {}-{}", checkJobId, shardingItem);
+ if (isStopping()) {
+ log.info("stopping true, ignore");
+ return;
+ }
setJobId(checkJobId);
- ConsistencyCheckJobConfiguration consistencyCheckJobConfig = new
YamlConsistencyCheckJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
- JobStatus status = JobStatus.RUNNING;
- ConsistencyCheckJobItemContext jobItemContext = new
ConsistencyCheckJobItemContext(consistencyCheckJobConfig, 0, status);
- jobAPI.persistJobItemProgress(jobItemContext);
- String parentJobId = consistencyCheckJobConfig.getParentJobId();
- log.info("execute consistency check, job id:{}, referred job id:{}",
checkJobId, parentJobId);
- JobType jobType = PipelineJobIdUtils.parseJobType(parentJobId);
- InventoryIncrementalJobPublicAPI jobPublicAPI =
PipelineJobPublicAPIFactory.getInventoryIncrementalJobPublicAPI(jobType.getTypeName());
- Map<String, DataConsistencyCheckResult> dataConsistencyCheckResult =
Collections.emptyMap();
- try {
- dataConsistencyCheckResult =
StringUtils.isBlank(consistencyCheckJobConfig.getAlgorithmTypeName())
- ? jobPublicAPI.dataConsistencyCheck(parentJobId)
- : jobPublicAPI.dataConsistencyCheck(parentJobId,
consistencyCheckJobConfig.getAlgorithmTypeName(),
consistencyCheckJobConfig.getAlgorithmProps());
- status = JobStatus.FINISHED;
- } catch (final SQLWrapperException ex) {
- log.error("data consistency check failed", ex);
- status = JobStatus.CONSISTENCY_CHECK_FAILURE;
- jobAPI.persistJobItemErrorMessage(checkJobId, 0, ex);
+ ConsistencyCheckJobConfiguration jobConfig = new
YamlConsistencyCheckJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
+ ConsistencyCheckJobItemContext jobItemContext = new
ConsistencyCheckJobItemContext(jobConfig, shardingItem, JobStatus.RUNNING);
+ if (getTasksRunnerMap().containsKey(shardingItem)) {
+ log.warn("tasksRunnerMap contains shardingItem {}, ignore",
shardingItem);
+ return;
}
-
PipelineAPIFactory.getGovernanceRepositoryAPI().persistCheckJobResult(parentJobId,
checkJobId, dataConsistencyCheckResult);
- jobItemContext.setStatus(status);
- jobAPI.persistJobItemProgress(jobItemContext);
- jobAPI.stop(checkJobId);
- log.info("execute consistency check job finished, job id:{}, parent
job id:{}", checkJobId, parentJobId);
+ ConsistencyCheckTasksRunner tasksRunner = new
ConsistencyCheckTasksRunner(jobItemContext);
+ tasksRunner.start();
+ getTasksRunnerMap().put(shardingItem, tasksRunner);
}
@Override
public void stop() {
setStopping(true);
- if (null != getOneOffJobBootstrap()) {
- getOneOffJobBootstrap().shutdown();
+ if (null != getJobBootstrap()) {
+ getJobBootstrap().shutdown();
}
if (null == getJobId()) {
log.info("stop consistency check job, jobId is null, ignore");
return;
}
+ for (PipelineTasksRunner each : getTasksRunnerMap().values()) {
+ each.stop();
+ }
+ getTasksRunnerMap().clear();
String jobBarrierDisablePath =
PipelineMetaDataNode.getJobBarrierDisablePath(getJobId());
pipelineDistributedBarrier.persistEphemeralChildrenNode(jobBarrierDisablePath,
0);
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckTasksRunner.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckTasksRunner.java
new file mode 100644
index 00000000000..69055b5a85d
--- /dev/null
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckTasksRunner.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import
org.apache.shardingsphere.data.pipeline.api.config.job.ConsistencyCheckJobConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
+import org.apache.shardingsphere.data.pipeline.api.executor.LifecycleExecutor;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
+import
org.apache.shardingsphere.data.pipeline.core.api.InventoryIncrementalJobAPI;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
+import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
+import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
+
+import java.sql.SQLException;
+import java.util.Map;
+
+/**
+ * Consistency check tasks runner.
+ */
+@Slf4j
+public final class ConsistencyCheckTasksRunner implements PipelineTasksRunner {
+
+ private final ConsistencyCheckJobAPI checkJobAPI =
ConsistencyCheckJobAPIFactory.getInstance();
+
+ @Getter
+ private final ConsistencyCheckJobItemContext jobItemContext;
+
+ private final ConsistencyCheckJobConfiguration checkJobConfig;
+
+ private final String checkJobId;
+
+ private final String parentJobId;
+
+ private final LifecycleExecutor checkExecutor;
+
+ private final ExecuteCallback checkExecuteCallback;
+
+ public ConsistencyCheckTasksRunner(final ConsistencyCheckJobItemContext
jobItemContext) {
+ this.jobItemContext = jobItemContext;
+ checkJobConfig = jobItemContext.getJobConfig();
+ checkJobId = checkJobConfig.getJobId();
+ parentJobId = checkJobConfig.getParentJobId();
+ checkExecutor = new CheckLifecycleExecutor();
+ checkExecuteCallback = new CheckExecuteCallback();
+ }
+
+ @Override
+ public void start() {
+ if (jobItemContext.isStopping()) {
+ log.info("job stopping, ignore consistency check");
+ return;
+ }
+
PipelineAPIFactory.getPipelineJobAPI(PipelineJobIdUtils.parseJobType(jobItemContext.getJobId())).persistJobItemProgress(jobItemContext);
+ ExecuteEngine executeEngine = ExecuteEngine.newFixedThreadInstance(1,
checkJobId + "-check");
+ executeEngine.submit(checkExecutor, checkExecuteCallback);
+ }
+
+ @Override
+ public void stop() {
+ jobItemContext.setStopping(true);
+ log.info("stop, jobId={}, shardingItem={}", jobItemContext.getJobId(),
jobItemContext.getShardingItem());
+ checkExecutor.stop();
+ }
+
+ private final class CheckLifecycleExecutor extends
AbstractLifecycleExecutor {
+
+ private volatile DataConsistencyCalculateAlgorithm calculateAlgorithm;
+
+ @Override
+ protected void runBlocking() {
+ log.info("execute consistency check, check job id: {}, parent job
id: {}", checkJobId, parentJobId);
+ checkJobAPI.persistJobItemProgress(jobItemContext);
+ JobType jobType = PipelineJobIdUtils.parseJobType(parentJobId);
+ InventoryIncrementalJobAPI jobAPI = (InventoryIncrementalJobAPI)
PipelineAPIFactory.getPipelineJobAPI(jobType);
+ PipelineJobConfiguration parentJobConfig =
jobAPI.getJobConfiguration(parentJobId);
+ DataConsistencyCalculateAlgorithm calculateAlgorithm =
jobAPI.buildDataConsistencyCalculateAlgorithm(
+ parentJobConfig, checkJobConfig.getAlgorithmTypeName(),
checkJobConfig.getAlgorithmProps());
+ this.calculateAlgorithm = calculateAlgorithm;
+ Map<String, DataConsistencyCheckResult> dataConsistencyCheckResult
= jobAPI.dataConsistencyCheck(parentJobConfig, calculateAlgorithm);
+
PipelineAPIFactory.getGovernanceRepositoryAPI().persistCheckJobResult(parentJobId,
checkJobId, dataConsistencyCheckResult);
+ }
+
+ @Override
+ protected void doStop() {
+ DataConsistencyCalculateAlgorithm algorithm = calculateAlgorithm;
+ log.info("doStop, algorithm={}", algorithm);
+ if (null != algorithm) {
+ try {
+ algorithm.cancel();
+ } catch (final SQLException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+ }
+ }
+
+ private final class CheckExecuteCallback implements ExecuteCallback {
+
+ @Override
+ public void onSuccess() {
+ log.info("onSuccess, check job id: {}, parent job id: {}",
checkJobId, parentJobId);
+ jobItemContext.setStatus(JobStatus.FINISHED);
+ checkJobAPI.persistJobItemProgress(jobItemContext);
+ checkJobAPI.stop(checkJobId);
+ }
+
+ @Override
+ public void onFailure(final Throwable throwable) {
+ log.info("onFailure, check job id: {}, parent job id: {}",
checkJobId, parentJobId);
+ checkJobAPI.persistJobItemErrorMessage(checkJobId, 0, throwable);
+ jobItemContext.setStatus(JobStatus.CONSISTENCY_CHECK_FAILURE);
+ checkJobAPI.persistJobItemProgress(jobItemContext);
+ checkJobAPI.stop(checkJobId);
+ }
+ }
+}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
index 404bf1edc72..b0429f58cd2 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
@@ -72,7 +72,7 @@ public final class MigrationChangedJobConfigurationProcessor
implements Pipeline
MigrationJob job = new MigrationJob();
PipelineJobCenter.addJob(jobConfigPOJO.getJobName(), job);
OneOffJobBootstrap oneOffJobBootstrap = new
OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(), job,
jobConfigPOJO.toJobConfiguration());
- job.setOneOffJobBootstrap(oneOffJobBootstrap);
+ job.setJobBootstrap(oneOffJobBootstrap);
oneOffJobBootstrap.execute();
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
index ea68ca1cd0b..50ab3a39afb 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
@@ -18,54 +18,32 @@
package org.apache.shardingsphere.data.pipeline.scenario.migration;
import lombok.extern.slf4j.Slf4j;
-import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculateParameter;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
-import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyContentCheckResult;
-import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.PipelineDataConsistencyChecker;
import
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
import
org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
-import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
-import
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
-import
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
+import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaName;
+import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaTableName;
+import org.apache.shardingsphere.data.pipeline.api.metadata.TableName;
+import
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
+import
org.apache.shardingsphere.data.pipeline.core.check.consistency.SingleTableInventoryDataConsistencyChecker;
import
org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
-import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLException;
-import
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
import
org.apache.shardingsphere.data.pipeline.core.exception.data.UnsupportedPipelineDatabaseTypeException;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
-import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
-import org.apache.shardingsphere.infra.database.type.DatabaseType;
-import
org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
import
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
import
org.apache.shardingsphere.infra.util.exception.external.sql.type.wrapper.SQLWrapperException;
-import javax.sql.DataSource;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Objects;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
/**
* Data consistency checker for migration job.
@@ -81,144 +59,35 @@ public final class MigrationDataConsistencyChecker
implements PipelineDataConsis
public MigrationDataConsistencyChecker(final MigrationJobConfiguration
jobConfig, final InventoryIncrementalProcessContext processContext) {
this.jobConfig = jobConfig;
- this.readRateLimitAlgorithm = null != processContext ?
processContext.getReadRateLimitAlgorithm() : null;
+ readRateLimitAlgorithm = null != processContext ?
processContext.getReadRateLimitAlgorithm() : null;
tableNameSchemaNameMapping = new TableNameSchemaNameMapping(
TableNameSchemaNameMapping.convert(jobConfig.getSourceSchemaName(), new
HashSet<>(Arrays.asList(jobConfig.getSourceTableName(),
jobConfig.getTargetTableName()))));
}
@Override
- public Map<String, DataConsistencyCheckResult> check(final
DataConsistencyCalculateAlgorithm calculator) {
- Map<String, DataConsistencyCountCheckResult> countCheckResult =
checkCount();
- Map<String, DataConsistencyContentCheckResult> contentCheckResult =
countCheckResult.values().stream().allMatch(DataConsistencyCountCheckResult::isMatched)
- ? checkData(calculator)
- : Collections.emptyMap();
- Map<String, DataConsistencyCheckResult> result = new
LinkedHashMap<>(countCheckResult.size());
- for (Entry<String, DataConsistencyCountCheckResult> entry :
countCheckResult.entrySet()) {
- result.put(entry.getKey(), new
DataConsistencyCheckResult(entry.getValue(),
contentCheckResult.getOrDefault(entry.getKey(), new
DataConsistencyContentCheckResult(false))));
- }
- return result;
- }
-
- private Map<String, DataConsistencyCountCheckResult> checkCount() {
- ThreadFactory threadFactory =
ExecutorThreadFactoryBuilder.build("job-" +
getJobIdDigest(jobConfig.getJobId()) + "-count-check-%d");
- ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
+ public Map<String, DataConsistencyCheckResult> check(final
DataConsistencyCalculateAlgorithm calculateAlgorithm) {
+ verifyPipelineDatabaseType(calculateAlgorithm, jobConfig.getSource());
+ verifyPipelineDatabaseType(calculateAlgorithm, jobConfig.getTarget());
+ SchemaTableName sourceTable = new SchemaTableName(new
SchemaName(tableNameSchemaNameMapping.getSchemaName(jobConfig.getSourceTableName())),
new TableName(jobConfig.getSourceTableName()));
+ SchemaTableName targetTable = new SchemaTableName(new
SchemaName(tableNameSchemaNameMapping.getSchemaName(jobConfig.getTargetTableName())),
new TableName(jobConfig.getTargetTableName()));
PipelineDataSourceConfiguration sourceDataSourceConfig =
PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getSource().getType(),
jobConfig.getSource().getParameter());
PipelineDataSourceConfiguration targetDataSourceConfig =
PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getTarget().getType(),
jobConfig.getTarget().getParameter());
- Map<String, DataConsistencyCountCheckResult> result = new
LinkedHashMap<>(1, 1);
+ Map<String, DataConsistencyCheckResult> result = new LinkedHashMap<>();
try (
PipelineDataSourceWrapper sourceDataSource =
PipelineDataSourceFactory.newInstance(sourceDataSourceConfig);
PipelineDataSourceWrapper targetDataSource =
PipelineDataSourceFactory.newInstance(targetDataSourceConfig)) {
- result.put(jobConfig.getSourceTableName(),
checkCount(sourceDataSource, targetDataSource, executor));
- return result;
+ PipelineTableMetaDataLoader metaDataLoader = new
StandardPipelineTableMetaDataLoader(sourceDataSource);
+ SingleTableInventoryDataConsistencyChecker
singleTableInventoryChecker = new SingleTableInventoryDataConsistencyChecker(
+ jobConfig.getJobId(), sourceDataSource, targetDataSource,
sourceTable, targetTable, jobConfig.getUniqueKeyColumn(), metaDataLoader,
readRateLimitAlgorithm);
+ result.put(sourceTable.getTableName().getOriginal(),
singleTableInventoryChecker.check(calculateAlgorithm));
} catch (final SQLException ex) {
throw new SQLWrapperException(ex);
- } finally {
- executor.shutdown();
- executor.shutdownNow();
- }
- }
-
- private DataConsistencyCountCheckResult checkCount(final
PipelineDataSourceWrapper sourceDataSource, final PipelineDataSourceWrapper
targetDataSource, final ThreadPoolExecutor executor) {
- Future<Long> sourceFuture = executor.submit(() ->
count(sourceDataSource, jobConfig.getSourceTableName(),
sourceDataSource.getDatabaseType()));
- Future<Long> targetFuture = executor.submit(() ->
count(targetDataSource, jobConfig.getTargetTableName(),
targetDataSource.getDatabaseType()));
- long sourceCount;
- long targetCount;
- try {
- sourceCount = sourceFuture.get();
- } catch (final InterruptedException | ExecutionException ex) {
- if (ex.getCause() instanceof PipelineSQLException) {
- throw (PipelineSQLException) ex.getCause();
- }
- throw new SQLWrapperException(new SQLException(ex));
- }
- try {
- targetCount = targetFuture.get();
- } catch (final InterruptedException | ExecutionException ex) {
- if (ex.getCause() instanceof PipelineSQLException) {
- throw (PipelineSQLException) ex.getCause();
- }
- throw new SQLWrapperException(new SQLException(ex));
- }
- return new DataConsistencyCountCheckResult(sourceCount, targetCount);
- }
-
- // TODO use digest (crc32, murmurhash)
- private String getJobIdDigest(final String jobId) {
- return jobId.length() <= 6 ? jobId : jobId.substring(0, 6);
- }
-
- private long count(final DataSource dataSource, final String tableName,
final DatabaseType databaseType) {
- String sql =
PipelineSQLBuilderFactory.getInstance(databaseType.getType()).buildCountSQL(tableNameSchemaNameMapping.getSchemaName(tableName),
tableName);
- try (
- Connection connection = dataSource.getConnection();
- PreparedStatement preparedStatement =
connection.prepareStatement(sql);
- ResultSet resultSet = preparedStatement.executeQuery()) {
- resultSet.next();
- return resultSet.getLong(1);
- } catch (final SQLException ex) {
- throw new
PipelineTableDataConsistencyCheckLoadingFailedException(tableName);
- }
- }
-
- private Map<String, DataConsistencyContentCheckResult> checkData(final
DataConsistencyCalculateAlgorithm calculator) {
- checkPipelineDatabaseType(calculator, jobConfig.getSource());
- PipelineDataSourceConfiguration sourceDataSourceConfig =
jobConfig.getSource();
- checkPipelineDatabaseType(calculator, jobConfig.getTarget());
- PipelineDataSourceConfiguration targetDataSourceConfig =
jobConfig.getTarget();
- ThreadFactory threadFactory =
ExecutorThreadFactoryBuilder.build("job-" +
getJobIdDigest(jobConfig.getJobId()) + "-data-check-%d");
- ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
- Map<String, DataConsistencyContentCheckResult> result = new
HashMap<>(1, 1);
- try (
- PipelineDataSourceWrapper sourceDataSource =
PipelineDataSourceFactory.newInstance(sourceDataSourceConfig);
- PipelineDataSourceWrapper targetDataSource =
PipelineDataSourceFactory.newInstance(targetDataSourceConfig)) {
- String sourceDatabaseType =
sourceDataSourceConfig.getDatabaseType().getType();
- String targetDatabaseType =
targetDataSourceConfig.getDatabaseType().getType();
- StandardPipelineTableMetaDataLoader metaDataLoader = new
StandardPipelineTableMetaDataLoader(sourceDataSource);
- for (String each :
Collections.singleton(jobConfig.getSourceTableName())) {
- PipelineTableMetaData tableMetaData =
metaDataLoader.getTableMetaData(tableNameSchemaNameMapping.getSchemaName(each),
each);
- ShardingSpherePreconditions.checkNotNull(tableMetaData, () ->
new PipelineTableDataConsistencyCheckLoadingFailedException(each));
- Collection<String> columnNames =
tableMetaData.getColumnNames();
- PipelineColumnMetaData uniqueKey =
jobConfig.getUniqueKeyColumn();
- DataConsistencyCalculateParameter sourceParameter =
buildParameter(sourceDataSource, tableNameSchemaNameMapping, each, columnNames,
sourceDatabaseType, targetDatabaseType, uniqueKey);
- DataConsistencyCalculateParameter targetParameter =
buildParameter(
- targetDataSource, tableNameSchemaNameMapping,
jobConfig.getTargetTableName(), columnNames, targetDatabaseType,
sourceDatabaseType, uniqueKey);
- Iterator<Object> sourceCalculatedResults =
calculator.calculate(sourceParameter).iterator();
- Iterator<Object> targetCalculatedResults =
calculator.calculate(targetParameter).iterator();
- boolean contentMatched = true;
- while (sourceCalculatedResults.hasNext() &&
targetCalculatedResults.hasNext()) {
- if (null != readRateLimitAlgorithm) {
-
readRateLimitAlgorithm.intercept(JobOperationType.SELECT, 1);
- }
- Future<Object> sourceFuture =
executor.submit(sourceCalculatedResults::next);
- Future<Object> targetFuture =
executor.submit(targetCalculatedResults::next);
- Object sourceCalculatedResult = sourceFuture.get();
- Object targetCalculatedResult = targetFuture.get();
- contentMatched = Objects.equals(sourceCalculatedResult,
targetCalculatedResult);
- if (!contentMatched) {
- break;
- }
- }
- result.put(each, new
DataConsistencyContentCheckResult(contentMatched));
- }
- } catch (final SQLException ex) {
- throw new SQLWrapperException(ex);
- } catch (final ExecutionException | InterruptedException ex) {
- throw new SQLWrapperException(new SQLException(ex.getCause()));
- } finally {
- executor.shutdown();
- executor.shutdownNow();
}
return result;
}
- private void checkPipelineDatabaseType(final
DataConsistencyCalculateAlgorithm calculator, final
PipelineDataSourceConfiguration dataSourceConfig) {
-
ShardingSpherePreconditions.checkState(calculator.getSupportedDatabaseTypes().contains(dataSourceConfig.getDatabaseType().getType()),
+ private void verifyPipelineDatabaseType(final
DataConsistencyCalculateAlgorithm calculateAlgorithm, final
PipelineDataSourceConfiguration dataSourceConfig) {
+
ShardingSpherePreconditions.checkState(calculateAlgorithm.getSupportedDatabaseTypes().contains(dataSourceConfig.getDatabaseType().getType()),
() -> new
UnsupportedPipelineDatabaseTypeException(dataSourceConfig.getDatabaseType()));
}
-
- private DataConsistencyCalculateParameter buildParameter(final
PipelineDataSourceWrapper sourceDataSource, final TableNameSchemaNameMapping
tableNameSchemaNameMapping,
- final String
tableName, final Collection<String> columnNames,
- final String
sourceDatabaseType, final String targetDatabaseType, final
PipelineColumnMetaData uniqueKey) {
- return new DataConsistencyCalculateParameter(sourceDataSource,
tableNameSchemaNameMapping, tableName, columnNames, sourceDatabaseType,
targetDatabaseType, uniqueKey);
- }
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index 018570e184c..244dc46efcf 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -112,8 +112,8 @@ public final class MigrationJob extends AbstractPipelineJob
implements SimpleJob
public void stop() {
setStopping(true);
dataSourceManager.close();
- if (null != getOneOffJobBootstrap()) {
- getOneOffJobBootstrap().shutdown();
+ if (null != getJobBootstrap()) {
+ getJobBootstrap().shutdown();
}
String jobId = getJobId();
if (null == jobId) {
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithmTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithmTest.java
index b0d0f42a10d..e5ae0a9635e 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithmTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithmTest.java
@@ -18,7 +18,7 @@
package
org.apache.shardingsphere.data.pipeline.core.check.consistency.algorithm;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculateParameter;
-import
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
+import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculatedResult;
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
import
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
@@ -35,7 +35,6 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
import java.util.Arrays;
-import java.util.Collections;
import java.util.Iterator;
import static org.hamcrest.CoreMatchers.is;
@@ -60,28 +59,28 @@ public final class
CRC32MatchDataConsistencyCalculateAlgorithmTest {
@Before
public void setUp() throws SQLException {
PipelineColumnMetaData uniqueKey = new PipelineColumnMetaData(1, "id",
Types.INTEGER, "integer", false, true, true);
- parameter = new DataConsistencyCalculateParameter(pipelineDataSource,
new TableNameSchemaNameMapping(Collections.emptyMap()),
+ parameter = new DataConsistencyCalculateParameter(pipelineDataSource,
null,
"foo_tbl", Arrays.asList("foo_col", "bar_col"), "FIXTURE",
"FIXTURE", uniqueKey);
when(pipelineDataSource.getConnection()).thenReturn(connection);
}
@Test
public void assertCalculateSuccess() throws SQLException {
- PreparedStatement preparedStatement0 = mockPreparedStatement(0L);
+ PreparedStatement preparedStatement0 = mockPreparedStatement(123L, 10);
when(connection.prepareStatement("SELECT CRC32(foo_col) FROM
foo_tbl")).thenReturn(preparedStatement0);
- PreparedStatement preparedStatement1 = mockPreparedStatement(1L);
+ PreparedStatement preparedStatement1 = mockPreparedStatement(456L, 10);
when(connection.prepareStatement("SELECT CRC32(bar_col) FROM
foo_tbl")).thenReturn(preparedStatement1);
- Iterator<Object> actual = new
CRC32MatchDataConsistencyCalculateAlgorithm().calculate(parameter).iterator();
- assertThat(actual.next(), is(0L));
- assertThat(actual.next(), is(1L));
+ Iterator<DataConsistencyCalculatedResult> actual = new
CRC32MatchDataConsistencyCalculateAlgorithm().calculate(parameter).iterator();
+ assertThat(actual.next().getRecordsCount(), is(10));
assertFalse(actual.hasNext());
}
- private PreparedStatement mockPreparedStatement(final long
expectedCRC32Result) throws SQLException {
+ private PreparedStatement mockPreparedStatement(final long
expectedCRC32Result, final int expectedRecordsCount) throws SQLException {
ResultSet resultSet = mock(ResultSet.class);
PreparedStatement result = mock(PreparedStatement.class,
RETURNS_DEEP_STUBS);
when(result.executeQuery()).thenReturn(resultSet);
when(resultSet.getLong(1)).thenReturn(expectedCRC32Result);
+ when(resultSet.getInt(2)).thenReturn(expectedRecordsCount);
return result;
}
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
index 14440f0344d..286de508afe 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
@@ -51,7 +51,7 @@ public final class MySQLPipelineSQLBuilder extends
AbstractPipelineSQLBuilder {
@Override
public Optional<String> buildCRC32SQL(final String schemaName, final
String tableName, final String column) {
- return Optional.of(String.format("SELECT BIT_XOR(CAST(CRC32(%s) AS
UNSIGNED)) AS checksum FROM %s", quote(column), quote(tableName)));
+ return Optional.of(String.format("SELECT BIT_XOR(CAST(CRC32(%s) AS
UNSIGNED)) AS checksum, COUNT(1) AS cnt FROM %s", quote(column),
quote(tableName)));
}
@Override
diff --git
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
index 3a2049c3487..f2b276fefde 100644
---
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
+++
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
@@ -55,7 +55,7 @@ public final class MySQLPipelineSQLBuilderTest {
public void assertBuildSumCrc32SQL() {
Optional<String> actual = sqlBuilder.buildCRC32SQL(null, "t2", "id");
assertTrue(actual.isPresent());
- assertThat(actual.get(), is("SELECT BIT_XOR(CAST(CRC32(id) AS
UNSIGNED)) AS checksum FROM t2"));
+ assertThat(actual.get(), is("SELECT BIT_XOR(CAST(CRC32(id) AS
UNSIGNED)) AS checksum, COUNT(1) AS cnt FROM t2"));
}
private DataRecord mockDataRecord(final String tableName) {
diff --git
a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
index 0c4047c2286..2929e143128 100644
---
a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
+++
b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
@@ -99,8 +99,9 @@ public final class MySQLMigrationGeneralIT extends
AbstractMigrationITCase {
startIncrementTask(new MySQLIncrementTask(jdbcTemplate,
getSourceTableOrderName(), keyGenerateAlgorithm, 30));
String orderJobId = getJobIdByTableName(getSourceTableOrderName());
String orderItemJobId = getJobIdByTableName("t_order_item");
- assertMigrationSuccessById(orderJobId);
- assertMigrationSuccessById(orderItemJobId);
+ assertMigrationSuccessById(orderJobId, "DATA_MATCH");
+ assertMigrationSuccessById(orderItemJobId, "DATA_MATCH");
+ assertMigrationSuccessById(orderItemJobId, "CRC32_MATCH");
if (ENV.getItEnvType() == ITEnvTypeEnum.DOCKER) {
for (String each : listJobId()) {
commitMigrationByJobId(each);
@@ -111,12 +112,12 @@ public final class MySQLMigrationGeneralIT extends
AbstractMigrationITCase {
assertGreaterThanOrderTableInitRows(TABLE_INIT_ROW_COUNT, "");
}
- private void assertMigrationSuccessById(final String jobId) throws
SQLException, InterruptedException {
+ private void assertMigrationSuccessById(final String jobId, final String
algorithmType) throws SQLException, InterruptedException {
List<Map<String, Object>> jobStatus =
waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
for (Map<String, Object> each : jobStatus) {
assertTrue(Integer.parseInt(each.get("processed_records_count").toString()) >
0);
}
- assertCheckMigrationSuccess(jobId, "DATA_MATCH");
+ assertCheckMigrationSuccess(jobId, algorithmType);
stopMigrationByJobId(jobId);
}
}
diff --git
a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmFactoryTest.java
b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmFactoryTest.java
index 3d1c2827377..01fd3394a42 100644
---
a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmFactoryTest.java
+++
b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmFactoryTest.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.data.pipeline.core.check.consistency;
+import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithmFactory;
import org.junit.Test;
import java.util.Properties;
diff --git
a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/DataConsistencyCalculateAlgorithmFixture.java
b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/DataConsistencyCalculateAlgorithmFixture.java
index f6ec7327507..051ce468505 100644
---
a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/DataConsistencyCalculateAlgorithmFixture.java
+++
b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/DataConsistencyCalculateAlgorithmFixture.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.data.pipeline.core.fixture;
import lombok.Getter;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculateParameter;
+import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculatedResult;
import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
@@ -39,8 +40,12 @@ public final class DataConsistencyCalculateAlgorithmFixture
implements DataConsi
}
@Override
- public Iterable<Object> calculate(final DataConsistencyCalculateParameter
parameter) {
- return Collections.singletonList(true);
+ public Iterable<DataConsistencyCalculatedResult> calculate(final
DataConsistencyCalculateParameter parameter) {
+ return Collections.singletonList(new
FixtureDataConsistencyCalculatedResult(2));
+ }
+
+ @Override
+ public void cancel() {
}
@Override
diff --git
a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmFactoryTest.java
b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureDataConsistencyCalculatedResult.java
similarity index 64%
copy from
test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmFactoryTest.java
copy to
test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureDataConsistencyCalculatedResult.java
index 3d1c2827377..b61af786c4a 100644
---
a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmFactoryTest.java
+++
b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureDataConsistencyCalculatedResult.java
@@ -15,16 +15,17 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.check.consistency;
+package org.apache.shardingsphere.data.pipeline.core.fixture;
-import org.junit.Test;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculatedResult;
-import java.util.Properties;
-
-public final class DataConsistencyCalculateAlgorithmFactoryTest {
+@RequiredArgsConstructor
+@EqualsAndHashCode
+@Getter
+public final class FixtureDataConsistencyCalculatedResult implements
DataConsistencyCalculatedResult {
- @Test
- public void assertNewInstanceSuccess() {
- DataConsistencyCalculateAlgorithmFactory.newInstance("FIXTURE", new
Properties());
- }
+ private final int recordsCount;
}