This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 0574a476e84 Extract InventoryIncrementalJobAPI and extend
InventoryIncrementalJobPublicAPI for common usage (#21152)
0574a476e84 is described below
commit 0574a476e84a8c7dbea13355f3b941f1d657a165
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Fri Sep 23 16:17:47 2022 +0800
Extract InventoryIncrementalJobAPI and extend
InventoryIncrementalJobPublicAPI for common usage (#21152)
* Extract more methods to InventoryIncrementalJobPublicAPI; Add
InventoryIncrementalJobAPI
* Add PipelineDataConsistencyChecker
* Rename AbstractInventoryIncrementalJobAPIImpl
---
.../api/InventoryIncrementalJobPublicAPI.java | 43 ++++-
.../data/pipeline/api/MigrationJobPublicAPI.java | 38 ----
.../PipelineDataConsistencyChecker.java} | 25 +--
.../api/InventoryIncrementalJobAPI.java} | 30 +--
.../AbstractInventoryIncrementalJobAPIImpl.java | 203 +++++++++++++++++++++
.../impl/InventoryIncrementalJobPublicAPIImpl.java | 74 --------
.../MigrationChangedJobConfigurationProcessor.java | 7 +-
.../migration/MigrationDataConsistencyChecker.java | 16 +-
.../scenario/migration/MigrationJobAPI.java | 36 +---
.../scenario/migration/MigrationJobAPIImpl.java | 122 ++-----------
.../migration/MigrationProcessContext.java | 6 +
.../MigrationDataConsistencyCheckerTest.java | 4 +-
12 files changed, 299 insertions(+), 305 deletions(-)
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/InventoryIncrementalJobPublicAPI.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/InventoryIncrementalJobPublicAPI.java
index 53b89e72f35..5750e196aca 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/InventoryIncrementalJobPublicAPI.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/InventoryIncrementalJobPublicAPI.java
@@ -17,14 +17,22 @@
package org.apache.shardingsphere.data.pipeline.api;
+import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
import
org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
+import
org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
+import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
import java.sql.SQLException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Properties;
/**
- * Inventory incremental job API.
+ * Inventory incremental job public API.
*/
+@SingletonSPI
public interface InventoryIncrementalJobPublicAPI extends
PipelineJobPublicAPI, TypedSPI {
/**
@@ -70,4 +78,37 @@ public interface InventoryIncrementalJobPublicAPI extends
PipelineJobPublicAPI,
* @throws SQLException when commit underlying database data
*/
void commit(String jobId) throws SQLException;
+
+ /**
+ * Get job progress.
+ *
+ * @param jobId job id
+ * @return each sharding item progress
+ */
+ Map<Integer, InventoryIncrementalJobItemProgress> getJobProgress(String
jobId);
+
+ /**
+ * List all data consistency check algorithms from SPI.
+ *
+ * @return data consistency check algorithms
+ */
+ Collection<DataConsistencyCheckAlgorithmInfo>
listDataConsistencyCheckAlgorithms();
+
+ /**
+ * Do data consistency check.
+ *
+ * @param jobId job id
+ * @return each logic table check result
+ */
+ Map<String, DataConsistencyCheckResult> dataConsistencyCheck(String jobId);
+
+ /**
+ * Do data consistency check.
+ *
+ * @param jobId job id
+ * @param algorithmType algorithm type
+ * @param algorithmProps algorithm props. Nullable
+ * @return each logic table check result
+ */
+ Map<String, DataConsistencyCheckResult> dataConsistencyCheck(String jobId,
String algorithmType, Properties algorithmProps);
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/MigrationJobPublicAPI.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/MigrationJobPublicAPI.java
index e60b48bfe92..fb8fe96405e 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/MigrationJobPublicAPI.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/MigrationJobPublicAPI.java
@@ -17,10 +17,7 @@
package org.apache.shardingsphere.data.pipeline.api;
-import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
-import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.api.pojo.CreateMigrationJobParameter;
-import
org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
import org.apache.shardingsphere.data.pipeline.api.pojo.MigrationJobInfo;
import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
@@ -29,7 +26,6 @@ import
org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPI;
import java.util.Collection;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
/**
* Migration job public API.
@@ -45,40 +41,6 @@ public interface MigrationJobPublicAPI extends
InventoryIncrementalJobPublicAPI,
@Override
List<MigrationJobInfo> list();
- /**
- * Get job progress.
- *
- * @param jobId job id
- * @return each sharding item progress
- */
- // TODO add JobProgress
- Map<Integer, InventoryIncrementalJobItemProgress> getJobProgress(String
jobId);
-
- /**
- * List all data consistency check algorithms from SPI.
- *
- * @return data consistency check algorithms
- */
- Collection<DataConsistencyCheckAlgorithmInfo>
listDataConsistencyCheckAlgorithms();
-
- /**
- * Do data consistency check.
- *
- * @param jobId job id
- * @return each logic table check result
- */
- Map<String, DataConsistencyCheckResult> dataConsistencyCheck(String jobId);
-
- /**
- * Do data consistency check.
- *
- * @param jobId job id
- * @param algorithmType algorithm type
- * @param algorithmProps algorithm props. Nullable
- * @return each logic table check result
- */
- Map<String, DataConsistencyCheckResult> dataConsistencyCheck(String jobId,
String algorithmType, Properties algorithmProps);
-
/**
* Add migration source resources.
*
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationProcessContext.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/PipelineDataConsistencyChecker.java
similarity index 56%
copy from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationProcessContext.java
copy to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/PipelineDataConsistencyChecker.java
index b2e11b30c15..92420d82c63 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationProcessContext.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/PipelineDataConsistencyChecker.java
@@ -15,21 +15,22 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.scenario.migration;
+package org.apache.shardingsphere.data.pipeline.api.check.consistency;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-import
org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
-import
org.apache.shardingsphere.data.pipeline.core.context.AbstractInventoryIncrementalProcessContext;
+import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
+
+import java.util.Map;
/**
- * Migration process context.
+ * Pipeline data consistency checker.
*/
-@Getter
-@Slf4j
-public final class MigrationProcessContext extends
AbstractInventoryIncrementalProcessContext {
+public interface PipelineDataConsistencyChecker {
- public MigrationProcessContext(final String jobId, final
PipelineProcessConfiguration originalProcessConfig) {
- super(jobId, originalProcessConfig);
- }
+ /**
+ * Data consistency check.
+ *
+ * @param calculateAlgorithm calculate algorithm
+ * @return check results. key is logic table name, value is check result.
+ */
+ Map<String, DataConsistencyCheckResult>
check(DataConsistencyCalculateAlgorithm calculateAlgorithm);
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
similarity index 60%
copy from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java
copy to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
index 982a99f2e9e..fdf355a8a4f 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
@@ -15,42 +15,26 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.scenario.migration;
+package org.apache.shardingsphere.data.pipeline.core.api;
-import org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
-import
org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
-import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
-import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
-import org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPI;
import java.util.Map;
/**
- * Migration job API.
+ * Inventory incremental job API.
*/
-@SingletonSPI
-public interface MigrationJobAPI extends PipelineJobAPI,
MigrationJobPublicAPI, RequiredSPI {
-
- @Override
- MigrationJobConfiguration getJobConfiguration(String jobId);
-
- @Override
- MigrationTaskConfiguration buildTaskConfiguration(PipelineJobConfiguration
pipelineJobConfig, int jobShardingItem, PipelineProcessConfiguration
pipelineProcessConfig);
-
- @Override
- MigrationProcessContext
buildPipelineProcessContext(PipelineJobConfiguration pipelineJobConfig);
+public interface InventoryIncrementalJobAPI extends PipelineJobAPI {
/**
* Get job progress.
*
- * @param jobConfig job configuration
+ * @param pipelineJobConfig job configuration
* @return each sharding item progress
*/
- Map<Integer, InventoryIncrementalJobItemProgress>
getJobProgress(MigrationJobConfiguration jobConfig);
+ Map<Integer, InventoryIncrementalJobItemProgress>
getJobProgress(PipelineJobConfiguration pipelineJobConfig);
@Override
InventoryIncrementalJobItemProgress getJobItemProgress(String jobId, int
shardingItem);
@@ -58,10 +42,10 @@ public interface MigrationJobAPI extends PipelineJobAPI,
MigrationJobPublicAPI,
/**
* Do data consistency check.
*
- * @param jobConfig job configuration
+ * @param pipelineJobConfig job configuration
* @return each logic table check result
*/
- Map<String, DataConsistencyCheckResult>
dataConsistencyCheck(MigrationJobConfiguration jobConfig);
+ Map<String, DataConsistencyCheckResult>
dataConsistencyCheck(PipelineJobConfiguration pipelineJobConfig);
/**
* Aggregate data consistency check results.
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
new file mode 100644
index 00000000000..d0be204a7d0
--- /dev/null
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.api.impl;
+
+import lombok.extern.slf4j.Slf4j;
+import
org.apache.shardingsphere.data.pipeline.api.InventoryIncrementalJobPublicAPI;
+import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import
org.apache.shardingsphere.data.pipeline.api.check.consistency.PipelineDataConsistencyChecker;
+import
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.config.process.yaml.YamlPipelineProcessConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.config.process.yaml.YamlPipelineProcessConfigurationSwapper;
+import
org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
+import
org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
+import
org.apache.shardingsphere.data.pipeline.core.api.InventoryIncrementalJobAPI;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import
org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmChooser;
+import
org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmFactory;
+import
org.apache.shardingsphere.data.pipeline.core.config.process.PipelineProcessConfigurationUtil;
+import
org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
+import
org.apache.shardingsphere.data.pipeline.core.exception.metadata.AlterNotExistProcessConfigurationException;
+import
org.apache.shardingsphere.data.pipeline.core.exception.metadata.CreateExistsProcessConfigurationException;
+import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
+import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
+import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
+import
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
+
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Abstract inventory incremental job API implementation.
+ */
+@Slf4j
+public abstract class AbstractInventoryIncrementalJobAPIImpl extends
AbstractPipelineJobAPIImpl implements InventoryIncrementalJobAPI,
InventoryIncrementalJobPublicAPI {
+
+ private static final YamlPipelineProcessConfigurationSwapper
PROCESS_CONFIG_SWAPPER = new YamlPipelineProcessConfigurationSwapper();
+
+ private final PipelineProcessConfigurationPersistService
processConfigPersistService = new PipelineProcessConfigurationPersistService();
+
+ private final InventoryIncrementalJobItemAPIImpl jobItemAPI = new
InventoryIncrementalJobItemAPIImpl();
+
+ protected abstract String getTargetDatabaseType(PipelineJobConfiguration
pipelineJobConfig);
+
+ @Override
+ public abstract InventoryIncrementalProcessContext
buildPipelineProcessContext(PipelineJobConfiguration pipelineJobConfig);
+
+ @Override
+ public void createProcessConfiguration(final PipelineProcessConfiguration
processConfig) {
+ PipelineProcessConfiguration existingProcessConfig =
processConfigPersistService.load(getJobType());
+ ShardingSpherePreconditions.checkState(null == existingProcessConfig,
CreateExistsProcessConfigurationException::new);
+ processConfigPersistService.persist(getJobType(), processConfig);
+ }
+
+ @Override
+ public void alterProcessConfiguration(final PipelineProcessConfiguration
processConfig) {
+ // TODO check rateLimiter type match or not
+ YamlPipelineProcessConfiguration targetYamlProcessConfig =
getTargetYamlProcessConfiguration();
+
targetYamlProcessConfig.copyNonNullFields(PROCESS_CONFIG_SWAPPER.swapToYamlConfiguration(processConfig));
+ processConfigPersistService.persist(getJobType(),
PROCESS_CONFIG_SWAPPER.swapToObject(targetYamlProcessConfig));
+ }
+
+ private YamlPipelineProcessConfiguration
getTargetYamlProcessConfiguration() {
+ PipelineProcessConfiguration existingProcessConfig =
processConfigPersistService.load(getJobType());
+ ShardingSpherePreconditions.checkNotNull(existingProcessConfig,
AlterNotExistProcessConfigurationException::new);
+ return
PROCESS_CONFIG_SWAPPER.swapToYamlConfiguration(existingProcessConfig);
+ }
+
+ @Override
+ public void dropProcessConfiguration(final String confPath) {
+ String finalConfPath = confPath.trim();
+ PipelineProcessConfigurationUtil.verifyConfPath(confPath);
+ YamlPipelineProcessConfiguration targetYamlProcessConfig =
getTargetYamlProcessConfiguration();
+
PipelineProcessConfigurationUtil.setFieldsNullByConfPath(targetYamlProcessConfig,
finalConfPath);
+ processConfigPersistService.persist(getJobType(),
PROCESS_CONFIG_SWAPPER.swapToObject(targetYamlProcessConfig));
+ }
+
+ @Override
+ public PipelineProcessConfiguration showProcessConfiguration() {
+ PipelineProcessConfiguration result =
processConfigPersistService.load(getJobType());
+ result =
PipelineProcessConfigurationUtil.convertWithDefaultValue(result);
+ return result;
+ }
+
+ @Override
+ public Map<Integer, InventoryIncrementalJobItemProgress>
getJobProgress(final String jobId) {
+ checkModeConfig();
+ return getJobProgress(getJobConfiguration(jobId));
+ }
+
+ @Override
+ public Map<Integer, InventoryIncrementalJobItemProgress>
getJobProgress(final PipelineJobConfiguration jobConfig) {
+ String jobId = jobConfig.getJobId();
+ JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
+ return IntStream.range(0,
jobConfig.getJobShardingCount()).boxed().collect(LinkedHashMap::new, (map,
each) -> {
+ InventoryIncrementalJobItemProgress jobItemProgress =
getJobItemProgress(jobId, each);
+ if (null != jobItemProgress) {
+ jobItemProgress.setActive(!jobConfigPOJO.isDisabled());
+ jobItemProgress.setErrorMessage(getJobItemErrorMessage(jobId,
each));
+ }
+ map.put(each, jobItemProgress);
+ }, LinkedHashMap::putAll);
+ }
+
+ @Override
+ public InventoryIncrementalJobItemProgress getJobItemProgress(final String
jobId, final int shardingItem) {
+ return jobItemAPI.getJobItemProgress(jobId, shardingItem);
+ }
+
+ @Override
+ public void persistJobItemProgress(final PipelineJobItemContext
jobItemContext) {
+ jobItemAPI.persistJobItemProgress(jobItemContext);
+ }
+
+ @Override
+ public void updateJobItemStatus(final String jobId, final int
shardingItem, final JobStatus status) {
+ jobItemAPI.updateJobItemStatus(jobId, shardingItem, status);
+ }
+
+ @Override
+ public Collection<DataConsistencyCheckAlgorithmInfo>
listDataConsistencyCheckAlgorithms() {
+ checkModeConfig();
+ return
DataConsistencyCalculateAlgorithmFactory.getAllInstances().stream().map(each ->
{
+ DataConsistencyCheckAlgorithmInfo result = new
DataConsistencyCheckAlgorithmInfo();
+ result.setType(each.getType());
+ result.setDescription(each.getDescription());
+ result.setSupportedDatabaseTypes(each.getSupportedDatabaseTypes());
+ return result;
+ }).collect(Collectors.toList());
+ }
+
+ @Override
+ public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final
String jobId) {
+ checkModeConfig();
+ log.info("Data consistency check for job {}", jobId);
+ PipelineJobConfiguration jobConfig =
getJobConfiguration(getElasticJobConfigPOJO(jobId));
+ return dataConsistencyCheck(jobConfig);
+ }
+
+ @Override
+ public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final
PipelineJobConfiguration jobConfig) {
+ DataConsistencyCalculateAlgorithm algorithm =
DataConsistencyCalculateAlgorithmChooser.choose(
+
DatabaseTypeFactory.getInstance(jobConfig.getSourceDatabaseType()),
DatabaseTypeFactory.getInstance(getTargetDatabaseType(jobConfig)));
+ return dataConsistencyCheck(jobConfig, algorithm);
+ }
+
+ @Override
+ public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final
String jobId, final String algorithmType, final Properties algorithmProps) {
+ checkModeConfig();
+ log.info("Data consistency check for job {}, algorithmType: {}",
jobId, algorithmType);
+ PipelineJobConfiguration jobConfig =
getJobConfiguration(getElasticJobConfigPOJO(jobId));
+ return dataConsistencyCheck(jobConfig,
DataConsistencyCalculateAlgorithmFactory.newInstance(algorithmType,
algorithmProps));
+ }
+
+ private Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final
PipelineJobConfiguration jobConfig, final DataConsistencyCalculateAlgorithm
calculateAlgorithm) {
+ String jobId = jobConfig.getJobId();
+ Map<String, DataConsistencyCheckResult> result =
buildPipelineDataConsistencyChecker(jobConfig,
buildPipelineProcessContext(jobConfig)).check(calculateAlgorithm);
+ log.info("job {} with check algorithm '{}' data consistency checker
result {}", jobId, calculateAlgorithm.getType(), result);
+
PipelineAPIFactory.getGovernanceRepositoryAPI().persistCheckLatestResult(jobId,
aggregateDataConsistencyCheckResults(jobId, result));
+ return result;
+ }
+
+ protected abstract PipelineDataConsistencyChecker
buildPipelineDataConsistencyChecker(PipelineJobConfiguration pipelineJobConfig,
InventoryIncrementalProcessContext processContext);
+
+ @Override
+ public boolean aggregateDataConsistencyCheckResults(final String jobId,
final Map<String, DataConsistencyCheckResult> checkResults) {
+ if (checkResults.isEmpty()) {
+ return false;
+ }
+ for (Entry<String, DataConsistencyCheckResult> entry :
checkResults.entrySet()) {
+ DataConsistencyCheckResult checkResult = entry.getValue();
+ boolean isCountMatched =
checkResult.getCountCheckResult().isMatched();
+ boolean isContentMatched =
checkResult.getContentCheckResult().isMatched();
+ if (!isCountMatched || !isContentMatched) {
+ log.error("job: {}, table: {} data consistency check failed,
count matched: {}, content matched: {}", jobId, entry.getKey(), isCountMatched,
isContentMatched);
+ return false;
+ }
+ }
+ return true;
+ }
+}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/InventoryIncrementalJobPublicAPIImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/InventoryIncrementalJobPublicAPIImpl.java
deleted file mode 100644
index 44f48883545..00000000000
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/InventoryIncrementalJobPublicAPIImpl.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.api.impl;
-
-import
org.apache.shardingsphere.data.pipeline.api.InventoryIncrementalJobPublicAPI;
-import
org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
-import
org.apache.shardingsphere.data.pipeline.api.config.process.yaml.YamlPipelineProcessConfiguration;
-import
org.apache.shardingsphere.data.pipeline.api.config.process.yaml.YamlPipelineProcessConfigurationSwapper;
-import
org.apache.shardingsphere.data.pipeline.core.config.process.PipelineProcessConfigurationUtil;
-import
org.apache.shardingsphere.data.pipeline.core.exception.metadata.AlterNotExistProcessConfigurationException;
-import
org.apache.shardingsphere.data.pipeline.core.exception.metadata.CreateExistsProcessConfigurationException;
-import
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
-
-/**
- * Inventory incremental job API implementation.
- */
-public abstract class InventoryIncrementalJobPublicAPIImpl extends
AbstractPipelineJobAPIImpl implements InventoryIncrementalJobPublicAPI {
-
- private static final YamlPipelineProcessConfigurationSwapper
PROCESS_CONFIG_SWAPPER = new YamlPipelineProcessConfigurationSwapper();
-
- private final PipelineProcessConfigurationPersistService
processConfigPersistService = new PipelineProcessConfigurationPersistService();
-
- @Override
- public void createProcessConfiguration(final PipelineProcessConfiguration
processConfig) {
- PipelineProcessConfiguration existingProcessConfig =
processConfigPersistService.load(getJobType());
- ShardingSpherePreconditions.checkState(null == existingProcessConfig,
CreateExistsProcessConfigurationException::new);
- processConfigPersistService.persist(getJobType(), processConfig);
- }
-
- @Override
- public void alterProcessConfiguration(final PipelineProcessConfiguration
processConfig) {
- // TODO check rateLimiter type match or not
- YamlPipelineProcessConfiguration targetYamlProcessConfig =
getTargetYamlProcessConfiguration();
-
targetYamlProcessConfig.copyNonNullFields(PROCESS_CONFIG_SWAPPER.swapToYamlConfiguration(processConfig));
- processConfigPersistService.persist(getJobType(),
PROCESS_CONFIG_SWAPPER.swapToObject(targetYamlProcessConfig));
- }
-
- private YamlPipelineProcessConfiguration
getTargetYamlProcessConfiguration() {
- PipelineProcessConfiguration existingProcessConfig =
processConfigPersistService.load(getJobType());
- ShardingSpherePreconditions.checkNotNull(existingProcessConfig,
AlterNotExistProcessConfigurationException::new);
- return
PROCESS_CONFIG_SWAPPER.swapToYamlConfiguration(existingProcessConfig);
- }
-
- @Override
- public void dropProcessConfiguration(final String confPath) {
- String finalConfPath = confPath.trim();
- PipelineProcessConfigurationUtil.verifyConfPath(confPath);
- YamlPipelineProcessConfiguration targetYamlProcessConfig =
getTargetYamlProcessConfiguration();
-
PipelineProcessConfigurationUtil.setFieldsNullByConfPath(targetYamlProcessConfig,
finalConfPath);
- processConfigPersistService.persist(getJobType(),
PROCESS_CONFIG_SWAPPER.swapToObject(targetYamlProcessConfig));
- }
-
- @Override
- public PipelineProcessConfiguration showProcessConfiguration() {
- PipelineProcessConfiguration result =
processConfigPersistService.load(getJobType());
- result =
PipelineProcessConfigurationUtil.convertWithDefaultValue(result);
- return result;
- }
-}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
index 1333121ec89..8f5b30e7eef 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
@@ -52,8 +52,11 @@ public final class MigrationChangedJobConfigurationProcessor
implements Pipeline
log.info("{} added to executing jobs failed since it
already exists", jobId);
} else {
log.info("{} executing jobs", jobId);
- CompletableFuture.runAsync(() -> execute(jobConfigPOJO),
PipelineContext.getEventListenerExecutor())
- .whenComplete((unused, throwable) ->
log.error("execute failed, jobId={}", jobId, throwable));
+ CompletableFuture.runAsync(() -> execute(jobConfigPOJO),
PipelineContext.getEventListenerExecutor()).whenComplete((unused, throwable) ->
{
+ if (null != throwable) {
+ log.error("execute failed, jobId={}", jobId,
throwable);
+ }
+ });
}
break;
case DELETED:
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
index a196b56e552..a0fd5c4ae5a 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
@@ -22,6 +22,7 @@ import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsist
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyContentCheckResult;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
+import
org.apache.shardingsphere.data.pipeline.api.check.consistency.PipelineDataConsistencyChecker;
import
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
import
org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
@@ -30,6 +31,7 @@ import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDat
import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
import
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
import
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
+import
org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLException;
import
org.apache.shardingsphere.data.pipeline.core.exception.data.UnsupportedPipelineDatabaseTypeException;
@@ -69,7 +71,7 @@ import java.util.concurrent.TimeUnit;
* Data consistency checker for migration job.
*/
@Slf4j
-public final class MigrationDataConsistencyChecker {
+public final class MigrationDataConsistencyChecker implements
PipelineDataConsistencyChecker {
private final MigrationJobConfiguration jobConfig;
@@ -81,21 +83,16 @@ public final class MigrationDataConsistencyChecker {
private final JobRateLimitAlgorithm readRateLimitAlgorithm;
- public MigrationDataConsistencyChecker(final MigrationJobConfiguration
jobConfig, final JobRateLimitAlgorithm readRateLimitAlgorithm) {
+ public MigrationDataConsistencyChecker(final MigrationJobConfiguration
jobConfig, final InventoryIncrementalProcessContext processContext) {
this.jobConfig = jobConfig;
sourceTableName = jobConfig.getSourceTableName();
targetTableName = jobConfig.getTargetTableName();
tableNameSchemaNameMapping = new
TableNameSchemaNameMapping(TableNameSchemaNameMapping.convert(jobConfig.getSourceSchemaName(),
new HashSet<>(Arrays.asList(jobConfig.getSourceTableName(),
jobConfig.getTargetTableName()))));
- this.readRateLimitAlgorithm = readRateLimitAlgorithm;
+ this.readRateLimitAlgorithm = null != processContext ?
processContext.getReadRateLimitAlgorithm() : null;
}
- /**
- * Check data consistency.
- *
- * @param calculator data consistency calculate algorithm
- * @return checked result. key is logic table name, value is check result.
- */
+ @Override
public Map<String, DataConsistencyCheckResult> check(final
DataConsistencyCalculateAlgorithm calculator) {
Map<String, DataConsistencyCountCheckResult> countCheckResult =
checkCount();
Map<String, DataConsistencyContentCheckResult> contentCheckResult =
countCheckResult.values().stream().allMatch(DataConsistencyCountCheckResult::isMatched)
@@ -113,6 +110,7 @@ public final class MigrationDataConsistencyChecker {
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
PipelineDataSourceConfiguration sourceDataSourceConfig =
PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getSource().getType(),
jobConfig.getSource().getParameter());
PipelineDataSourceConfiguration targetDataSourceConfig =
PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getTarget().getType(),
jobConfig.getTarget().getParameter());
+ // TODO migration might support multiple tables
Map<String, DataConsistencyCountCheckResult> result = new
LinkedHashMap<>(1, 1);
try (
PipelineDataSourceWrapper sourceDataSource =
PipelineDataSourceFactory.newInstance(sourceDataSourceConfig);
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java
index 982a99f2e9e..efa627cb5e4 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java
@@ -18,22 +18,18 @@
package org.apache.shardingsphere.data.pipeline.scenario.migration;
import org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI;
-import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
import
org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
-import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
-import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
+import
org.apache.shardingsphere.data.pipeline.core.api.InventoryIncrementalJobAPI;
import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
import org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPI;
-import java.util.Map;
-
/**
* Migration job API.
*/
@SingletonSPI
-public interface MigrationJobAPI extends PipelineJobAPI,
MigrationJobPublicAPI, RequiredSPI {
+public interface MigrationJobAPI extends InventoryIncrementalJobAPI,
MigrationJobPublicAPI, RequiredSPI {
@Override
MigrationJobConfiguration getJobConfiguration(String jobId);
@@ -43,32 +39,4 @@ public interface MigrationJobAPI extends PipelineJobAPI,
MigrationJobPublicAPI,
@Override
MigrationProcessContext
buildPipelineProcessContext(PipelineJobConfiguration pipelineJobConfig);
-
- /**
- * Get job progress.
- *
- * @param jobConfig job configuration
- * @return each sharding item progress
- */
- Map<Integer, InventoryIncrementalJobItemProgress>
getJobProgress(MigrationJobConfiguration jobConfig);
-
- @Override
- InventoryIncrementalJobItemProgress getJobItemProgress(String jobId, int
shardingItem);
-
- /**
- * Do data consistency check.
- *
- * @param jobConfig job configuration
- * @return each logic table check result
- */
- Map<String, DataConsistencyCheckResult>
dataConsistencyCheck(MigrationJobConfiguration jobConfig);
-
- /**
- * Aggregate data consistency check results.
- *
- * @param jobId job id
- * @param checkResults check results
- * @return check success or not
- */
- boolean aggregateDataConsistencyCheckResults(String jobId, Map<String,
DataConsistencyCheckResult> checkResults);
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index b4419dafa05..027c13f1a40 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -22,7 +22,7 @@ import com.google.common.base.Strings;
import com.google.gson.Gson;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.digest.DigestUtils;
-import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import
org.apache.shardingsphere.data.pipeline.api.check.consistency.PipelineDataConsistencyChecker;
import
org.apache.shardingsphere.data.pipeline.api.config.CreateTableConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.CreateTableConfiguration.CreateTableEntry;
import
org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
@@ -34,7 +34,6 @@ import
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigration
import
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfigurationSwapper;
import
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
-import
org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeEntry;
import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
@@ -43,10 +42,8 @@ import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDat
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.JobType;
import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
-import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaName;
@@ -55,15 +52,10 @@ import
org.apache.shardingsphere.data.pipeline.api.metadata.TableName;
import
org.apache.shardingsphere.data.pipeline.api.metadata.yaml.YamlPipelineColumnMetaData;
import
org.apache.shardingsphere.data.pipeline.api.metadata.yaml.YamlPipelineColumnMetaDataSwapper;
import
org.apache.shardingsphere.data.pipeline.api.pojo.CreateMigrationJobParameter;
-import
org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
import org.apache.shardingsphere.data.pipeline.api.pojo.MigrationJobInfo;
-import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
-import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobItemAPI;
-import
org.apache.shardingsphere.data.pipeline.core.api.impl.InventoryIncrementalJobItemAPIImpl;
-import
org.apache.shardingsphere.data.pipeline.core.api.impl.InventoryIncrementalJobPublicAPIImpl;
+import
org.apache.shardingsphere.data.pipeline.core.api.impl.AbstractInventoryIncrementalJobAPIImpl;
import
org.apache.shardingsphere.data.pipeline.core.api.impl.PipelineDataSourcePersistService;
-import
org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmChooser;
-import
org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmFactory;
+import
org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
import
org.apache.shardingsphere.data.pipeline.core.exception.connection.AddMigrationSourceResourceException;
@@ -72,8 +64,6 @@ import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineSche
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataUtil;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
-import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
-import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
@@ -107,23 +97,19 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
-import java.util.stream.IntStream;
/**
* Migration job API impl.
*/
@Slf4j
-public final class MigrationJobAPIImpl extends
InventoryIncrementalJobPublicAPIImpl implements MigrationJobAPI {
+public final class MigrationJobAPIImpl extends
AbstractInventoryIncrementalJobAPIImpl implements MigrationJobAPI {
private static final YamlRuleConfigurationSwapperEngine
RULE_CONFIG_SWAPPER_ENGINE = new YamlRuleConfigurationSwapperEngine();
private static final YamlDataSourceConfigurationSwapper
DATA_SOURCE_CONFIG_SWAPPER = new YamlDataSourceConfigurationSwapper();
- private final PipelineJobItemAPI jobItemAPI = new
InventoryIncrementalJobItemAPIImpl();
-
private final PipelineDataSourcePersistService dataSourcePersistService =
new PipelineDataSourcePersistService();
@Override
@@ -203,6 +189,11 @@ public final class MigrationJobAPIImpl extends
InventoryIncrementalJobPublicAPII
return
YamlMigrationJobConfigurationSwapper.swapToObject(jobConfigPOJO.getJobParameter());
}
+ @Override
+ protected String getTargetDatabaseType(final PipelineJobConfiguration
pipelineJobConfig) {
+ return ((MigrationJobConfiguration)
pipelineJobConfig).getTargetDatabaseType();
+ }
+
@Override
public MigrationTaskConfiguration buildTaskConfiguration(final
PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final
PipelineProcessConfiguration pipelineProcessConfig) {
MigrationJobConfiguration jobConfig = (MigrationJobConfiguration)
pipelineJobConfig;
@@ -265,99 +256,8 @@ public final class MigrationJobAPIImpl extends
InventoryIncrementalJobPublicAPII
}
@Override
- public Map<Integer, InventoryIncrementalJobItemProgress>
getJobProgress(final String jobId) {
- checkModeConfig();
- return getJobProgress(getJobConfiguration(jobId));
- }
-
- @Override
- public Map<Integer, InventoryIncrementalJobItemProgress>
getJobProgress(final MigrationJobConfiguration jobConfig) {
- String jobId = jobConfig.getJobId();
- JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
- return IntStream.range(0,
jobConfig.getJobShardingCount()).boxed().collect(LinkedHashMap::new, (map,
each) -> {
- InventoryIncrementalJobItemProgress jobItemProgress =
getJobItemProgress(jobId, each);
- if (null != jobItemProgress) {
- jobItemProgress.setActive(!jobConfigPOJO.isDisabled());
- jobItemProgress.setErrorMessage(getJobItemErrorMessage(jobId,
each));
- }
- map.put(each, jobItemProgress);
- }, LinkedHashMap::putAll);
- }
-
- @Override
- public InventoryIncrementalJobItemProgress getJobItemProgress(final String
jobId, final int shardingItem) {
- return (InventoryIncrementalJobItemProgress)
jobItemAPI.getJobItemProgress(jobId, shardingItem);
- }
-
- @Override
- public void persistJobItemProgress(final PipelineJobItemContext
jobItemContext) {
- jobItemAPI.persistJobItemProgress(jobItemContext);
- }
-
- @Override
- public void updateJobItemStatus(final String jobId, final int
shardingItem, final JobStatus status) {
- jobItemAPI.updateJobItemStatus(jobId, shardingItem, status);
- }
-
- @Override
- public Collection<DataConsistencyCheckAlgorithmInfo>
listDataConsistencyCheckAlgorithms() {
- checkModeConfig();
- return
DataConsistencyCalculateAlgorithmFactory.getAllInstances().stream().map(each ->
{
- DataConsistencyCheckAlgorithmInfo result = new
DataConsistencyCheckAlgorithmInfo();
- result.setType(each.getType());
- result.setDescription(each.getDescription());
- result.setSupportedDatabaseTypes(each.getSupportedDatabaseTypes());
- return result;
- }).collect(Collectors.toList());
- }
-
- @Override
- public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final
String jobId) {
- checkModeConfig();
- log.info("Data consistency check for job {}", jobId);
- MigrationJobConfiguration jobConfig =
getJobConfiguration(getElasticJobConfigPOJO(jobId));
- return dataConsistencyCheck(jobConfig);
- }
-
- @Override
- public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final
MigrationJobConfiguration jobConfig) {
- DataConsistencyCalculateAlgorithm algorithm =
DataConsistencyCalculateAlgorithmChooser.choose(
-
DatabaseTypeFactory.getInstance(jobConfig.getSourceDatabaseType()),
DatabaseTypeFactory.getInstance(jobConfig.getTargetDatabaseType()));
- return dataConsistencyCheck(jobConfig, algorithm);
- }
-
- @Override
- public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final
String jobId, final String algorithmType, final Properties algorithmProps) {
- checkModeConfig();
- log.info("Data consistency check for job {}, algorithmType: {}",
jobId, algorithmType);
- MigrationJobConfiguration jobConfig =
getJobConfiguration(getElasticJobConfigPOJO(jobId));
- return dataConsistencyCheck(jobConfig,
DataConsistencyCalculateAlgorithmFactory.newInstance(algorithmType,
algorithmProps));
- }
-
- private Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final
MigrationJobConfiguration jobConfig, final DataConsistencyCalculateAlgorithm
calculator) {
- String jobId = jobConfig.getJobId();
- JobRateLimitAlgorithm readRateLimitAlgorithm =
buildPipelineProcessContext(jobConfig).getReadRateLimitAlgorithm();
- Map<String, DataConsistencyCheckResult> result = new
MigrationDataConsistencyChecker(jobConfig,
readRateLimitAlgorithm).check(calculator);
- log.info("job {} with check algorithm '{}' data consistency checker
result {}", jobId, calculator.getType(), result);
-
PipelineAPIFactory.getGovernanceRepositoryAPI().persistCheckLatestResult(jobId,
aggregateDataConsistencyCheckResults(jobId, result));
- return result;
- }
-
- @Override
- public boolean aggregateDataConsistencyCheckResults(final String jobId,
final Map<String, DataConsistencyCheckResult> checkResults) {
- if (checkResults.isEmpty()) {
- return false;
- }
- for (Entry<String, DataConsistencyCheckResult> entry :
checkResults.entrySet()) {
- DataConsistencyCheckResult checkResult = entry.getValue();
- boolean isCountMatched =
checkResult.getCountCheckResult().isMatched();
- boolean isContentMatched =
checkResult.getContentCheckResult().isMatched();
- if (!isCountMatched || !isContentMatched) {
- log.error("job: {}, table: {} data consistency check failed,
count matched: {}, content matched: {}", jobId, entry.getKey(), isCountMatched,
isContentMatched);
- return false;
- }
- }
- return true;
+ protected PipelineDataConsistencyChecker
buildPipelineDataConsistencyChecker(final PipelineJobConfiguration
pipelineJobConfig, final InventoryIncrementalProcessContext processContext) {
+ return new MigrationDataConsistencyChecker((MigrationJobConfiguration)
pipelineJobConfig, processContext);
}
@Override
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationProcessContext.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationProcessContext.java
index b2e11b30c15..1122a40ecc0 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationProcessContext.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationProcessContext.java
@@ -29,6 +29,12 @@ import
org.apache.shardingsphere.data.pipeline.core.context.AbstractInventoryInc
@Slf4j
public final class MigrationProcessContext extends
AbstractInventoryIncrementalProcessContext {
+ /**
+ * Constructor.
+ *
+ * @param jobId job id
+ * @param originalProcessConfig original process configuration, nullable
+ */
public MigrationProcessContext(final String jobId, final
PipelineProcessConfiguration originalProcessConfig) {
super(jobId, originalProcessConfig);
}
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyCheckerTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyCheckerTest.java
index bf492b724a2..fa646106a05 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyCheckerTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyCheckerTest.java
@@ -45,7 +45,9 @@ public final class MigrationDataConsistencyCheckerTest {
@Test
public void assertCountAndDataCheck() throws SQLException {
- Map<String, DataConsistencyCheckResult> actual = new
MigrationDataConsistencyChecker(createJobConfiguration(), null).check(new
DataConsistencyCalculateAlgorithmFixture());
+ MigrationJobConfiguration jobConfig = createJobConfiguration();
+ Map<String, DataConsistencyCheckResult> actual = new
MigrationDataConsistencyChecker(jobConfig, new
MigrationProcessContext(jobConfig.getJobId(), null))
+ .check(new DataConsistencyCalculateAlgorithmFixture());
assertTrue(actual.get("t_order").getCountCheckResult().isMatched());
assertThat(actual.get("t_order").getCountCheckResult().getSourceRecordsCount(),
is(actual.get("t_order").getCountCheckResult().getTargetRecordsCount()));
assertTrue(actual.get("t_order").getContentCheckResult().isMatched());