This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 7d80ca4da9d Add buildPipelineProcessContext method in pipeline job API
for common usage (#20306)
7d80ca4da9d is described below
commit 7d80ca4da9d21570ed0dc8dd48c7d81106fcab7b
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Fri Aug 19 20:16:48 2022 +0800
Add buildPipelineProcessContext method in pipeline job API for common usage
(#20306)
---
.../data/pipeline/core/api/PipelineJobAPI.java | 9 ++
.../check/consistency/DataConsistencyChecker.java | 7 +-
.../data/pipeline/core/job/FinishedCheckJob.java | 3 +-
.../scenario/migration/MigrationJobAPI.java | 4 +
.../scenario/migration/MigrationJobAPIImpl.java | 21 ++--
.../migration/MigrationJobItemContext.java | 5 +-
.../migration/MigrationProcessContext.java | 6 ++
.../core/fixture/MigrationJobAPIFixture.java | 6 ++
.../consistency/DataConsistencyCheckerTest.java | 2 +-
.../rulealtered/RuleAlteredJobWorkerTest.java | 113 ---------------------
10 files changed, 48 insertions(+), 128 deletions(-)
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
index adbe0df3bde..eea116b6662 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.data.pipeline.core.api;
import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPI;
import
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.context.PipelineProcessContext;
import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
@@ -47,6 +48,14 @@ public interface PipelineJobAPI extends
PipelineJobPublicAPI, PipelineJobItemAPI
*/
void extendYamlJobConfiguration(YamlPipelineJobConfiguration
yamlJobConfig);
+ /**
+ * Build pipeline process context.
+ *
+ * @param pipelineJobConfig pipeline job configuration
+ * @return pipeline process context
+ */
+ PipelineProcessContext
buildPipelineProcessContext(PipelineJobConfiguration pipelineJobConfig);
+
/**
* Start job.
*
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
index 08f2e2a0631..98987fcc9f5 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
@@ -32,7 +32,6 @@ import
org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineDataConsistencyCheckFailedException;
import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
-import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobWorker;
import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
@@ -74,10 +73,13 @@ public final class DataConsistencyChecker {
private final TableNameSchemaNameMapping tableNameSchemaNameMapping;
- public DataConsistencyChecker(final MigrationJobConfiguration jobConfig) {
+ private final JobRateLimitAlgorithm readRateLimitAlgorithm;
+
+ public DataConsistencyChecker(final MigrationJobConfiguration jobConfig,
final JobRateLimitAlgorithm readRateLimitAlgorithm) {
this.jobConfig = jobConfig;
logicTableNames = jobConfig.splitLogicTableNames();
tableNameSchemaNameMapping = new
TableNameSchemaNameMapping(TableNameSchemaNameMapping.convert(jobConfig.getSchemaTablesMap()));
+ this.readRateLimitAlgorithm = readRateLimitAlgorithm;
}
/**
@@ -157,7 +159,6 @@ public final class DataConsistencyChecker {
PipelineDataSourceConfiguration targetDataSourceConfig =
jobConfig.getTarget();
ThreadFactory threadFactory =
ExecutorThreadFactoryBuilder.build("job-" +
getJobIdDigest(jobConfig.getJobId()) + "-data-check-%d");
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
- JobRateLimitAlgorithm readRateLimitAlgorithm =
RuleAlteredJobWorker.createRuleAlteredContext(jobConfig).getReadRateLimitAlgorithm();
Map<String, DataConsistencyContentCheckResult> result = new
HashMap<>(logicTableNames.size(), 1);
try (
PipelineDataSourceWrapper sourceDataSource =
PipelineDataSourceFactory.newInstance(sourceDataSourceConfig);
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
index 79347621bc7..c491cfa11ef 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
@@ -27,7 +27,6 @@ import
org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPI;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIFactory;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationProcessContext;
-import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobWorker;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
@@ -64,7 +63,7 @@ public final class FinishedCheckJob implements SimpleJob {
try {
// TODO refactor: dispatch to different job types
MigrationJobConfiguration jobConfig =
YamlMigrationJobConfigurationSwapper.swapToObject(jobInfo.getJobParameter());
- MigrationProcessContext processContext =
RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
+ MigrationProcessContext processContext =
jobAPI.buildPipelineProcessContext(jobConfig);
if (null == processContext.getCompletionDetectAlgorithm()) {
log.info("completionDetector not configured, auto switch
will not be enabled. You could query job progress and switch config manually
with DistSQL.");
continue;
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java
index 05cc6397c99..72d3a03a5b8 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java
@@ -21,6 +21,7 @@ import
org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
@@ -48,6 +49,9 @@ public interface MigrationJobAPI extends PipelineJobAPI,
MigrationJobPublicAPI,
*/
TaskConfiguration buildTaskConfiguration(MigrationJobConfiguration
jobConfig, int jobShardingItem, PipelineProcessConfiguration
pipelineProcessConfig);
+ @Override
+ MigrationProcessContext
buildPipelineProcessContext(PipelineJobConfiguration pipelineJobConfig);
+
/**
* Get job progress.
*
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index 5a7ca4c97c1..cce56aa9a0a 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -51,8 +51,8 @@ import
org.apache.shardingsphere.data.pipeline.core.exception.AddMigrationSource
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineVerifyFailedException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
-import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobWorker;
import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
+import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobConfigurationPreparerFactory;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo;
@@ -143,6 +143,14 @@ public final class MigrationJobAPIImpl extends
AbstractPipelineJobAPIImpl implem
return
RuleAlteredJobConfigurationPreparerFactory.getInstance().createTaskConfiguration(jobConfig,
jobShardingItem, pipelineProcessConfig);
}
+ @Override
+ public MigrationProcessContext buildPipelineProcessContext(final
PipelineJobConfiguration pipelineJobConfig) {
+ // TODO add jobType
+ // TODO read process config from registry center
+ PipelineProcessConfiguration processConfig = new
PipelineProcessConfiguration(null, null, null);
+ return new MigrationProcessContext(pipelineJobConfig.getJobId(),
processConfig);
+ }
+
@Override
public List<JobInfo> list() {
checkModeConfig();
@@ -207,7 +215,7 @@ public final class MigrationJobAPIImpl extends
AbstractPipelineJobAPIImpl implem
}
private void verifyManualMode(final MigrationJobConfiguration jobConfig) {
- MigrationProcessContext processContext =
RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
+ MigrationProcessContext processContext =
buildPipelineProcessContext(jobConfig);
if (null != processContext.getCompletionDetectAlgorithm()) {
throw new PipelineVerifyFailedException("It's not necessary to do
it in auto mode.");
}
@@ -291,7 +299,7 @@ public final class MigrationJobAPIImpl extends
AbstractPipelineJobAPIImpl implem
@Override
public boolean isDataConsistencyCheckNeeded(final
MigrationJobConfiguration jobConfig) {
- MigrationProcessContext processContext =
RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
+ MigrationProcessContext processContext =
buildPipelineProcessContext(jobConfig);
return isDataConsistencyCheckNeeded(processContext);
}
@@ -310,7 +318,7 @@ public final class MigrationJobAPIImpl extends
AbstractPipelineJobAPIImpl implem
@Override
public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final
MigrationJobConfiguration jobConfig) {
- MigrationProcessContext processContext =
RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
+ MigrationProcessContext processContext =
buildPipelineProcessContext(jobConfig);
if (!isDataConsistencyCheckNeeded(processContext)) {
log.info("DataConsistencyCalculatorAlgorithm is not configured,
data consistency check is ignored.");
return Collections.emptyMap();
@@ -329,7 +337,8 @@ public final class MigrationJobAPIImpl extends
AbstractPipelineJobAPIImpl implem
private Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final
MigrationJobConfiguration jobConfig, final DataConsistencyCalculateAlgorithm
calculator) {
String jobId = jobConfig.getJobId();
- Map<String, DataConsistencyCheckResult> result = new
DataConsistencyChecker(jobConfig).check(calculator);
+ JobRateLimitAlgorithm readRateLimitAlgorithm =
buildPipelineProcessContext(jobConfig).getReadRateLimitAlgorithm();
+ Map<String, DataConsistencyCheckResult> result = new
DataConsistencyChecker(jobConfig, readRateLimitAlgorithm).check(calculator);
log.info("Scaling job {} with check algorithm '{}' data consistency
checker result {}", jobId, calculator.getType(), result);
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobCheckResult(jobId,
aggregateDataConsistencyCheckResults(jobId, result));
return result;
@@ -371,7 +380,7 @@ public final class MigrationJobAPIImpl extends
AbstractPipelineJobAPIImpl implem
@Override
public void switchClusterConfiguration(final MigrationJobConfiguration
jobConfig) {
String jobId = jobConfig.getJobId();
- MigrationProcessContext processContext =
RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
+ MigrationProcessContext processContext =
buildPipelineProcessContext(jobConfig);
GovernanceRepositoryAPI repositoryAPI =
PipelineAPIFactory.getGovernanceRepositoryAPI();
if (isDataConsistencyCheckNeeded(processContext)) {
Optional<Boolean> checkResult =
repositoryAPI.getJobCheckResult(jobId);
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
index b1ea5766ef5..f559d4e8feb 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
@@ -33,7 +33,6 @@ import
org.apache.shardingsphere.data.pipeline.core.context.InventoryIncremental
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
-import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobWorker;
import java.util.Collection;
import java.util.LinkedList;
@@ -86,8 +85,8 @@ public final class MigrationJobItemContext implements
InventoryIncrementalJobIte
public MigrationJobItemContext(final MigrationJobConfiguration jobConfig,
final int jobShardingItem, final InventoryIncrementalJobItemProgress
initProgress,
final PipelineDataSourceManager
dataSourceManager) {
- // TODO refactor RuleAlteredJobWorker
- jobProcessContext =
RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
+ MigrationJobAPI jobAPI = MigrationJobAPIFactory.getInstance();
+ jobProcessContext = jobAPI.buildPipelineProcessContext(jobConfig);
this.jobConfig = jobConfig;
jobId = jobConfig.getJobId();
this.shardingItem = jobShardingItem;
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationProcessContext.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationProcessContext.java
index 4d960e30305..b3c430629e1 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationProcessContext.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationProcessContext.java
@@ -43,6 +43,12 @@ public final class MigrationProcessContext extends
AbstractPipelineProcessContex
private final DataConsistencyCalculateAlgorithm
dataConsistencyCalculateAlgorithm;
+ public MigrationProcessContext(final String jobId, final
PipelineProcessConfiguration originalProcessConfig) {
+ super(jobId, originalProcessConfig);
+ completionDetectAlgorithm = null;
+ dataConsistencyCalculateAlgorithm = null;
+ }
+
@SuppressWarnings("unchecked")
public MigrationProcessContext(final String jobId, final
OnRuleAlteredActionConfiguration actionConfig) {
super(jobId, new PipelineProcessConfiguration(actionConfig.getInput(),
actionConfig.getOutput(), actionConfig.getStreamChannel()));
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java
index da7a60324d1..3a29eb699e6 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java
@@ -29,6 +29,7 @@ import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncreme
import
org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPI;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationProcessContext;
import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
@@ -158,6 +159,11 @@ public final class MigrationJobAPIFixture implements
MigrationJobAPI {
return null;
}
+ @Override
+ public MigrationProcessContext buildPipelineProcessContext(final
PipelineJobConfiguration pipelineJobConfig) {
+ return null;
+ }
+
@Override
public boolean isDefault() {
return MigrationJobAPI.super.isDefault();
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java
index 373c74c8103..8145b7062f3 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java
@@ -47,7 +47,7 @@ public final class DataConsistencyCheckerTest {
@Test
public void assertCountAndDataCheck() throws SQLException {
- Map<String, DataConsistencyCheckResult> actual = new
DataConsistencyChecker(createJobConfiguration()).check(new
DataConsistencyCalculateAlgorithmFixture());
+ Map<String, DataConsistencyCheckResult> actual = new
DataConsistencyChecker(createJobConfiguration(), null).check(new
DataConsistencyCalculateAlgorithmFixture());
assertTrue(actual.get("t_order").getCountCheckResult().isMatched());
assertThat(actual.get("t_order").getCountCheckResult().getSourceRecordsCount(),
is(actual.get("t_order").getCountCheckResult().getTargetRecordsCount()));
assertTrue(actual.get("t_order").getContentCheckResult().isMatched());
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
deleted file mode 100644
index df383c2baa7..00000000000
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
-
-import org.apache.commons.io.FileUtils;
-import
org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfigurationSwapper;
-import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
-import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
-import
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
-import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
-import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreationException;
-import
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
-import org.apache.shardingsphere.data.pipeline.core.util.ConfigurationFileUtil;
-import
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
-import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
-import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIFactory;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobItemContext;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.cache.event.StartScalingEvent;
-import org.apache.shardingsphere.sharding.api.config.ShardingRuleConfiguration;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.net.URL;
-import java.nio.charset.StandardCharsets;
-import java.util.Collections;
-import java.util.Optional;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-public final class RuleAlteredJobWorkerTest {
-
- @BeforeClass
- public static void beforeClass() {
- PipelineContextUtil.mockModeConfigAndContextManager();
- }
-
- @Test(expected = PipelineJobCreationException.class)
- public void assertCreateRuleAlteredContextNoAlteredRule() {
- MigrationJobConfiguration jobConfig =
JobConfigurationBuilder.createJobConfiguration();
- YamlMigrationJobConfigurationSwapper swapper = new
YamlMigrationJobConfigurationSwapper();
- YamlMigrationJobConfiguration yamlJobConfig =
swapper.swapToYamlConfiguration(jobConfig);
-
yamlJobConfig.setAlteredRuleYamlClassNameTablesMap(Collections.emptyMap());
-
RuleAlteredJobWorker.createRuleAlteredContext(swapper.swapToObject(yamlJobConfig));
- }
-
- @Test
- public void assertCreateRuleAlteredContextSuccess() {
-
assertNotNull(RuleAlteredJobWorker.createRuleAlteredContext(JobConfigurationBuilder.createJobConfiguration()).getPipelineProcessConfig());
- }
-
- @Test
- public void assertRuleAlteredActionEnabled() {
- ShardingRuleConfiguration ruleConfig = new ShardingRuleConfiguration();
- ruleConfig.setScalingName("default_scaling");
-
assertTrue(RuleAlteredJobWorker.isOnRuleAlteredActionEnabled(ruleConfig));
- }
-
- @Test
- public void assertRuleAlteredActionDisabled() throws
InvocationTargetException, NoSuchMethodException, IllegalAccessException {
- ShardingSpherePipelineDataSourceConfiguration pipelineDataSourceConfig
= new ShardingSpherePipelineDataSourceConfiguration(
-
ConfigurationFileUtil.readFile("config_sharding_sphere_jdbc_source.yaml"));
- ShardingSpherePipelineDataSourceConfiguration pipelineDataTargetConfig
= new ShardingSpherePipelineDataSourceConfiguration(
-
ConfigurationFileUtil.readFile("config_sharding_sphere_jdbc_target.yaml"));
- StartScalingEvent startScalingEvent = new
StartScalingEvent("logic_db",
YamlEngine.marshal(pipelineDataSourceConfig.getRootConfig().getDataSources()),
-
YamlEngine.marshal(pipelineDataSourceConfig.getRootConfig().getRules()),
YamlEngine.marshal(pipelineDataTargetConfig.getRootConfig().getDataSources()),
-
YamlEngine.marshal(pipelineDataTargetConfig.getRootConfig().getRules()), 0, 1);
- RuleAlteredJobWorker ruleAlteredJobWorker = new RuleAlteredJobWorker();
- Object result = ReflectionUtil.invokeMethod(ruleAlteredJobWorker,
"createJobConfig", new Class[]{StartScalingEvent.class}, new
Object[]{startScalingEvent});
- assertTrue(((Optional<?>) result).isPresent());
- }
-
- // TODO improve assertHasUncompletedJob, refactor
hasUncompletedJobOfSameDatabaseName for easier unit test
- // @Test
- public void assertHasUncompletedJob() throws InvocationTargetException,
NoSuchMethodException, IllegalAccessException, IOException {
- final MigrationJobConfiguration jobConfig =
JobConfigurationBuilder.createJobConfiguration();
- MigrationJobItemContext jobItemContext = new
MigrationJobItemContext(jobConfig, 0, new
InventoryIncrementalJobItemProgress(), new DefaultPipelineDataSourceManager());
- jobItemContext.setStatus(JobStatus.PREPARING);
- GovernanceRepositoryAPI repositoryAPI =
PipelineAPIFactory.getGovernanceRepositoryAPI();
-
MigrationJobAPIFactory.getInstance().persistJobItemProgress(jobItemContext);
- URL jobConfigUrl =
getClass().getClassLoader().getResource("scaling/rule_alter/scaling_job_config.yaml");
- assertNotNull(jobConfigUrl);
-
repositoryAPI.persist(PipelineMetaDataNode.getJobConfigPath(jobItemContext.getJobId()),
FileUtils.readFileToString(new File(jobConfigUrl.getFile()),
StandardCharsets.UTF_8));
- Object result = ReflectionUtil.invokeMethod(new
RuleAlteredJobWorker(), "hasUncompletedJobOfSameDatabaseName", new
Class[]{String.class},
- new String[]{jobConfig.getDatabaseName()});
- assertFalse((Boolean) result);
- }
-}