This is an automated email from the ASF dual-hosted git repository.
yx9o pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 049dee5653e Remove unused algorithm config, add
dataConsistencyCalculateAlgorithm chooser (#20527)
049dee5653e is described below
commit 049dee5653e848c79d50c6bb65bcf8656b0c12de
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Fri Aug 26 09:20:39 2022 +0800
Remove unused algorithm config, add dataConsistencyCalculateAlgorithm
chooser (#20527)
* Remove unused completionDetectAlgorithm
* Remove unused dataConsistencyCalculateAlgorithm, add algorithm chooser
* Remove unused isDataConsistencyCheckNeeded
* Move getType
* Move and rename DataConsistencyChecker
---
.../data/pipeline/api/MigrationJobPublicAPI.java | 8 ---
.../core/api/impl/AbstractPipelineJobAPIImpl.java | 5 ++
.../DataConsistencyCalculateAlgorithmChooser.java | 50 +++++++++++++++++++
.../consistency}/DataConsistencyCheckUtils.java | 2 +-
...DataMatchDataConsistencyCalculateAlgorithm.java | 2 +-
.../MigrationDataConsistencyChecker.java} | 30 ++----------
.../scenario/migration/MigrationJobAPI.java | 8 ---
.../scenario/migration/MigrationJobAPIImpl.java | 50 +++----------------
.../migration/MigrationProcessContext.java | 10 ----
.../scaling/core/job/dumper/DumperFactory.java | 29 -----------
...taConsistencyCalculateAlgorithmChooserTest.java | 57 ++++++++++++++++++++++
.../DataConsistencyCheckUtilsTest.java | 2 +-
.../core/fixture/MigrationJobAPIFixture.java | 10 ----
.../core/api/impl/MigrationJobAPIImplTest.java | 7 ---
.../MigrationDataConsistencyCheckerTest.java} | 7 ++-
15 files changed, 128 insertions(+), 149 deletions(-)
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/MigrationJobPublicAPI.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/MigrationJobPublicAPI.java
index c77ea3e77ef..e2506a4cc66 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/MigrationJobPublicAPI.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/MigrationJobPublicAPI.java
@@ -61,14 +61,6 @@ public interface MigrationJobPublicAPI extends
PipelineJobPublicAPI, RequiredSPI
*/
Collection<DataConsistencyCheckAlgorithmInfo>
listDataConsistencyCheckAlgorithms();
- /**
- * Is data consistency check needed.
- *
- * @param jobId job id
- * @return data consistency check needed or not
- */
- boolean isDataConsistencyCheckNeeded(String jobId);
-
/**
* Do data consistency check.
*
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
index 72d8d84ef97..4ba03f71e41 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
@@ -224,4 +224,9 @@ public abstract class AbstractPipelineJobAPIImpl implements
PipelineJobAPI {
throw new PipelineVerifyFailedException("Job is not stopped. You
could run `STOP MIGRATION {jobId}` to stop it.");
}
}
+
+ @Override
+ public String getType() {
+ return getJobType().getTypeName();
+ }
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmChooser.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmChooser.java
new file mode 100644
index 00000000000..5e50032e3eb
--- /dev/null
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmChooser.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.check.consistency;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+
+/**
+ * Data consistency calculate algorithm chooser.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class DataConsistencyCalculateAlgorithmChooser {
+
+ /**
+ * Choose data consistency calculate algorithm when it's not defined.
+ *
+ * @param databaseType database type
+ * @param peerDatabaseType peer database type
+ * @return algorithm
+ */
+ public static DataConsistencyCalculateAlgorithm choose(final DatabaseType
databaseType, final DatabaseType peerDatabaseType) {
+ String algorithmType;
+ if
(!databaseType.getType().equalsIgnoreCase(peerDatabaseType.getType())) {
+ algorithmType = "DATA_MATCH";
+ } else if (databaseType instanceof MySQLDatabaseType) {
+ algorithmType = "CRC32_MATCH";
+ } else {
+ algorithmType = "DATA_MATCH";
+ }
+ return
DataConsistencyCalculateAlgorithmFactory.newInstance(algorithmType, null);
+ }
+}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/DataConsistencyCheckUtils.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckUtils.java
similarity index 96%
rename from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/DataConsistencyCheckUtils.java
rename to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckUtils.java
index c38ef4d9d61..a4c703989c6 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/DataConsistencyCheckUtils.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckUtils.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.util;
+package org.apache.shardingsphere.data.pipeline.core.check.consistency;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
index 6e26385013a..849fa41fbab 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
@@ -28,7 +28,7 @@ import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsist
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineDataConsistencyCheckFailedException;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.ColumnValueReaderFactory;
import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
-import
org.apache.shardingsphere.data.pipeline.core.util.DataConsistencyCheckUtils;
+import
org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCheckUtils;
import
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader;
import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
similarity index 88%
rename from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
rename to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
index dfb7d2da219..9b61a39e315 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
@@ -15,9 +15,8 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.check.consistency;
+package org.apache.shardingsphere.data.pipeline.scenario.migration;
-import com.google.common.base.Preconditions;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculateParameter;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
@@ -30,7 +29,6 @@ import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDat
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
import
org.apache.shardingsphere.data.pipeline.api.metadata.PipelineColumnMetaData;
-import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineDataConsistencyCheckFailedException;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
@@ -40,10 +38,6 @@ import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsist
import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import
org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
-import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
-import
org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
-import
org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereTable;
-import org.apache.shardingsphere.mode.manager.ContextManager;
import javax.sql.DataSource;
import java.sql.Connection;
@@ -66,12 +60,11 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
- * Data consistency checker.
+ * Data consistency checker for migration job.
*/
@Slf4j
-public final class DataConsistencyChecker {
+public final class MigrationDataConsistencyChecker {
- // TODO remove jobConfig for common usage
private final MigrationJobConfiguration jobConfig;
private final String sourceTableName;
@@ -80,7 +73,7 @@ public final class DataConsistencyChecker {
private final JobRateLimitAlgorithm readRateLimitAlgorithm;
- public DataConsistencyChecker(final MigrationJobConfiguration jobConfig,
final JobRateLimitAlgorithm readRateLimitAlgorithm) {
+ public MigrationDataConsistencyChecker(final MigrationJobConfiguration
jobConfig, final JobRateLimitAlgorithm readRateLimitAlgorithm) {
this.jobConfig = jobConfig;
this.sourceTableName = jobConfig.getSourceTableName();
tableNameSchemaNameMapping = new
TableNameSchemaNameMapping(TableNameSchemaNameMapping.convert(jobConfig.getSourceSchemaName(),
Collections.singletonList(jobConfig.getSourceTableName())));
@@ -216,21 +209,6 @@ public final class DataConsistencyChecker {
}
}
- private ShardingSphereTable getTableMetaData(final String databaseName,
final String logicTableName) {
- ContextManager contextManager = PipelineContext.getContextManager();
- Preconditions.checkNotNull(contextManager, "ContextManager null");
- ShardingSphereDatabase database =
contextManager.getMetaDataContexts().getMetaData().getDatabase(databaseName);
- if (null == database) {
- throw new RuntimeException("Can not get meta data by database name
" + databaseName);
- }
- String schemaName =
tableNameSchemaNameMapping.getSchemaName(logicTableName);
- ShardingSphereSchema schema = database.getSchema(schemaName);
- if (null == schema) {
- throw new RuntimeException("Can not get schema by schema name " +
schemaName + ", logicTableName=" + logicTableName);
- }
- return schema.get(logicTableName);
- }
-
private DataConsistencyCalculateParameter buildParameter(final
PipelineDataSourceWrapper sourceDataSource, final TableNameSchemaNameMapping
tableNameSchemaNameMapping,
final String
tableName, final Collection<String> columnNames,
final String
sourceDatabaseType, final String targetDatabaseType, final
PipelineColumnMetaData uniqueKey) {
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java
index 95cea7fe14f..a7ac132b82d 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java
@@ -63,14 +63,6 @@ public interface MigrationJobAPI extends PipelineJobAPI,
MigrationJobPublicAPI,
@Override
InventoryIncrementalJobItemProgress getJobItemProgress(String jobId, int
shardingItem);
- /**
- * Is data consistency check needed.
- *
- * @param jobConfig job configuration
- * @return data consistency check needed or not
- */
- boolean isDataConsistencyCheckNeeded(MigrationJobConfiguration jobConfig);
-
/**
* Do data consistency check.
*
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index ff38a56e71a..9102c356ebe 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -53,12 +53,11 @@ import
org.apache.shardingsphere.data.pipeline.core.api.PipelineJobItemAPI;
import
org.apache.shardingsphere.data.pipeline.core.api.impl.AbstractPipelineJobAPIImpl;
import
org.apache.shardingsphere.data.pipeline.core.api.impl.InventoryIncrementalJobItemAPIImpl;
import
org.apache.shardingsphere.data.pipeline.core.api.impl.PipelineDataSourcePersistService;
+import
org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmChooser;
import
org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmFactory;
-import
org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyChecker;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import
org.apache.shardingsphere.data.pipeline.core.exception.AddMigrationSourceResourceException;
import
org.apache.shardingsphere.data.pipeline.core.exception.DropMigrationSourceResourceException;
-import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineVerifyFailedException;
import
org.apache.shardingsphere.data.pipeline.core.util.PipelineSchemaTableUtil;
import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
@@ -68,6 +67,7 @@ import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcess
import org.apache.shardingsphere.infra.database.metadata.DataSourceMetaData;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;
+import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
import org.apache.shardingsphere.infra.datanode.DataNode;
import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
import
org.apache.shardingsphere.infra.datasource.props.DataSourcePropertiesCreator;
@@ -267,13 +267,6 @@ public final class MigrationJobAPIImpl extends
AbstractPipelineJobAPIImpl implem
jobItemAPI.updateJobItemStatus(jobId, shardingItem, status);
}
- private void verifyManualMode(final MigrationJobConfiguration jobConfig) {
- MigrationProcessContext processContext =
buildPipelineProcessContext(jobConfig);
- if (null != processContext.getCompletionDetectAlgorithm()) {
- throw new PipelineVerifyFailedException("It's not necessary to do
it in auto mode.");
- }
- }
-
@Override
public Collection<DataConsistencyCheckAlgorithmInfo>
listDataConsistencyCheckAlgorithms() {
checkModeConfig();
@@ -286,40 +279,19 @@ public final class MigrationJobAPIImpl extends
AbstractPipelineJobAPIImpl implem
}).collect(Collectors.toList());
}
- @Override
- public boolean isDataConsistencyCheckNeeded(final String jobId) {
- log.info("isDataConsistencyCheckNeeded for job {}", jobId);
- MigrationJobConfiguration jobConfig = getJobConfiguration(jobId);
- return isDataConsistencyCheckNeeded(jobConfig);
- }
-
- @Override
- public boolean isDataConsistencyCheckNeeded(final
MigrationJobConfiguration jobConfig) {
- MigrationProcessContext processContext =
buildPipelineProcessContext(jobConfig);
- return isDataConsistencyCheckNeeded(processContext);
- }
-
- private boolean isDataConsistencyCheckNeeded(final MigrationProcessContext
processContext) {
- return null != processContext.getDataConsistencyCalculateAlgorithm();
- }
-
@Override
public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final
String jobId) {
checkModeConfig();
log.info("Data consistency check for job {}", jobId);
MigrationJobConfiguration jobConfig =
getJobConfiguration(getElasticJobConfigPOJO(jobId));
- verifyDataConsistencyCheck(jobConfig);
return dataConsistencyCheck(jobConfig);
}
@Override
public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final
MigrationJobConfiguration jobConfig) {
- MigrationProcessContext processContext =
buildPipelineProcessContext(jobConfig);
- if (!isDataConsistencyCheckNeeded(processContext)) {
- log.info("DataConsistencyCalculatorAlgorithm is not configured,
data consistency check is ignored.");
- return Collections.emptyMap();
- }
- return dataConsistencyCheck(jobConfig,
processContext.getDataConsistencyCalculateAlgorithm());
+ DataConsistencyCalculateAlgorithm algorithm =
DataConsistencyCalculateAlgorithmChooser.choose(
+
DatabaseTypeFactory.getInstance(jobConfig.getSourceDatabaseType()),
DatabaseTypeFactory.getInstance(jobConfig.getTargetDatabaseType()));
+ return dataConsistencyCheck(jobConfig, algorithm);
}
@Override
@@ -327,23 +299,18 @@ public final class MigrationJobAPIImpl extends
AbstractPipelineJobAPIImpl implem
checkModeConfig();
log.info("Data consistency check for job {}, algorithmType: {}",
jobId, algorithmType);
MigrationJobConfiguration jobConfig =
getJobConfiguration(getElasticJobConfigPOJO(jobId));
- verifyDataConsistencyCheck(jobConfig);
return dataConsistencyCheck(jobConfig,
DataConsistencyCalculateAlgorithmFactory.newInstance(algorithmType,
algorithmProps));
}
private Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final
MigrationJobConfiguration jobConfig, final DataConsistencyCalculateAlgorithm
calculator) {
String jobId = jobConfig.getJobId();
JobRateLimitAlgorithm readRateLimitAlgorithm =
buildPipelineProcessContext(jobConfig).getReadRateLimitAlgorithm();
- Map<String, DataConsistencyCheckResult> result = new
DataConsistencyChecker(jobConfig, readRateLimitAlgorithm).check(calculator);
+ Map<String, DataConsistencyCheckResult> result = new
MigrationDataConsistencyChecker(jobConfig,
readRateLimitAlgorithm).check(calculator);
log.info("Scaling job {} with check algorithm '{}' data consistency
checker result {}", jobId, calculator.getType(), result);
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobCheckResult(jobId,
aggregateDataConsistencyCheckResults(jobId, result));
return result;
}
- private void verifyDataConsistencyCheck(final MigrationJobConfiguration
jobConfig) {
- verifyManualMode(jobConfig);
- }
-
@Override
public boolean aggregateDataConsistencyCheckResults(final String jobId,
final Map<String, DataConsistencyCheckResult> checkResults) {
if (checkResults.isEmpty()) {
@@ -489,9 +456,4 @@ public final class MigrationJobAPIImpl extends
AbstractPipelineJobAPIImpl implem
result.setParameter(parameter);
return result;
}
-
- @Override
- public String getType() {
- return getJobType().getTypeName();
- }
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationProcessContext.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationProcessContext.java
index ab70abf2231..058fae8e271 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationProcessContext.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationProcessContext.java
@@ -19,11 +19,7 @@ package
org.apache.shardingsphere.data.pipeline.scenario.migration;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
-import
org.apache.shardingsphere.data.pipeline.api.detect.RuleAlteredJobAlmostCompletedParameter;
-import
org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmFactory;
import
org.apache.shardingsphere.data.pipeline.core.context.AbstractPipelineProcessContext;
-import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
-import
org.apache.shardingsphere.data.pipeline.spi.detect.JobCompletionDetectAlgorithm;
import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
/**
@@ -33,13 +29,7 @@ import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcess
@Slf4j
public final class MigrationProcessContext extends
AbstractPipelineProcessContext {
- private final
JobCompletionDetectAlgorithm<RuleAlteredJobAlmostCompletedParameter>
completionDetectAlgorithm;
-
- private final DataConsistencyCalculateAlgorithm
dataConsistencyCalculateAlgorithm;
-
public MigrationProcessContext(final String jobId, final
PipelineProcessConfiguration originalProcessConfig) {
super(jobId, originalProcessConfig);
- completionDetectAlgorithm = null;
- dataConsistencyCalculateAlgorithm =
DataConsistencyCalculateAlgorithmFactory.newInstance("DATA_MATCH", null);
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/dumper/DumperFactory.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/dumper/DumperFactory.java
deleted file mode 100644
index dadfcb92fd6..00000000000
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/dumper/DumperFactory.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.scaling.core.job.dumper;
-
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-
-/**
- * Dumper factory.
- */
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class DumperFactory {
-
-}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmChooserTest.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmChooserTest.java
new file mode 100644
index 00000000000..dc3ec6ec8c0
--- /dev/null
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmChooserTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.check.consistency;
+
+import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+
+public final class DataConsistencyCalculateAlgorithmChooserTest {
+
+ @Test
+ public void assertChooseOnDifferentDatabaseTypes() {
+ DatabaseType databaseType = DatabaseTypeFactory.getInstance("Oracle");
+ DatabaseType peerDatabaseType =
DatabaseTypeFactory.getInstance("PostgreSQL");
+ DataConsistencyCalculateAlgorithm actual =
DataConsistencyCalculateAlgorithmChooser.choose(databaseType, peerDatabaseType);
+ assertNotNull(actual);
+ assertThat(actual.getType(), is("DATA_MATCH"));
+ }
+
+ @Test
+ public void assertChooseOnMySQL() {
+ DatabaseType databaseType = DatabaseTypeFactory.getInstance("MySQL");
+ DatabaseType peerDatabaseType =
DatabaseTypeFactory.getInstance("MySQL");
+ DataConsistencyCalculateAlgorithm actual =
DataConsistencyCalculateAlgorithmChooser.choose(databaseType, peerDatabaseType);
+ assertNotNull(actual);
+ assertThat(actual.getType(), is("CRC32_MATCH"));
+ }
+
+ @Test
+ public void assertChooseOnPostgreSQL() {
+ DatabaseType databaseType =
DatabaseTypeFactory.getInstance("PostgreSQL");
+ DatabaseType peerDatabaseType =
DatabaseTypeFactory.getInstance("PostgreSQL");
+ DataConsistencyCalculateAlgorithm actual =
DataConsistencyCalculateAlgorithmChooser.choose(databaseType, peerDatabaseType);
+ assertNotNull(actual);
+ assertThat(actual.getType(), is("DATA_MATCH"));
+ }
+}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/DataConsistencyCheckUtilsTest.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckUtilsTest.java
similarity index 94%
rename from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/DataConsistencyCheckUtilsTest.java
rename to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckUtilsTest.java
index f7c25a82bf8..b04217ffc8c 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/DataConsistencyCheckUtilsTest.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckUtilsTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.util;
+package org.apache.shardingsphere.data.pipeline.core.check.consistency;
import org.junit.Test;
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java
index c63d89abccb..154b9a703e9 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java
@@ -105,16 +105,6 @@ public final class MigrationJobAPIFixture implements
MigrationJobAPI {
return null;
}
- @Override
- public boolean isDataConsistencyCheckNeeded(final String jobId) {
- return false;
- }
-
- @Override
- public boolean isDataConsistencyCheckNeeded(final
MigrationJobConfiguration jobConfig) {
- return false;
- }
-
@Override
public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final
String jobId) {
return null;
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
index feff2a6b343..16816224621 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
@@ -139,13 +139,6 @@ public final class MigrationJobAPIImplTest {
assertThat(jobProgressMap.size(), is(1));
}
- @Test
- public void assertIsDataConsistencyCheckNeeded() {
- Optional<String> jobId =
jobAPI.start(JobConfigurationBuilder.createJobConfiguration());
- assertTrue(jobId.isPresent());
- assertTrue(jobAPI.isDataConsistencyCheckNeeded(jobId.get()));
- }
-
@Test
public void assertDataConsistencyCheck() {
Optional<String> jobId =
jobAPI.start(JobConfigurationBuilder.createJobConfiguration());
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyCheckerTest.java
similarity index 89%
rename from
shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java
rename to
shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyCheckerTest.java
index 2f0e02f975a..7cb88d883df 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyCheckerTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.check.consistency;
+package org.apache.shardingsphere.data.pipeline.scenario.migration;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
import
org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
@@ -24,7 +24,6 @@ import
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDa
import
org.apache.shardingsphere.data.pipeline.core.fixture.DataConsistencyCalculateAlgorithmFixture;
import
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobItemContext;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -37,7 +36,7 @@ import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
-public final class DataConsistencyCheckerTest {
+public final class MigrationDataConsistencyCheckerTest {
@BeforeClass
public static void beforeClass() {
@@ -46,7 +45,7 @@ public final class DataConsistencyCheckerTest {
@Test
public void assertCountAndDataCheck() throws SQLException {
- Map<String, DataConsistencyCheckResult> actual = new
DataConsistencyChecker(createJobConfiguration(), null).check(new
DataConsistencyCalculateAlgorithmFixture());
+ Map<String, DataConsistencyCheckResult> actual = new
MigrationDataConsistencyChecker(createJobConfiguration(), null).check(new
DataConsistencyCalculateAlgorithmFixture());
assertTrue(actual.get("t_order").getCountCheckResult().isMatched());
assertThat(actual.get("t_order").getCountCheckResult().getSourceRecordsCount(),
is(actual.get("t_order").getCountCheckResult().getTargetRecordsCount()));
assertTrue(actual.get("t_order").getContentCheckResult().isMatched());