This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d80ca4da9d Add buildPipelineProcessContext method in pipeline job API 
for common usage (#20306)
7d80ca4da9d is described below

commit 7d80ca4da9d21570ed0dc8dd48c7d81106fcab7b
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Fri Aug 19 20:16:48 2022 +0800

    Add buildPipelineProcessContext method in pipeline job API for common usage 
(#20306)
---
 .../data/pipeline/core/api/PipelineJobAPI.java     |   9 ++
 .../check/consistency/DataConsistencyChecker.java  |   7 +-
 .../data/pipeline/core/job/FinishedCheckJob.java   |   3 +-
 .../scenario/migration/MigrationJobAPI.java        |   4 +
 .../scenario/migration/MigrationJobAPIImpl.java    |  21 ++--
 .../migration/MigrationJobItemContext.java         |   5 +-
 .../migration/MigrationProcessContext.java         |   6 ++
 .../core/fixture/MigrationJobAPIFixture.java       |   6 ++
 .../consistency/DataConsistencyCheckerTest.java    |   2 +-
 .../rulealtered/RuleAlteredJobWorkerTest.java      | 113 ---------------------
 10 files changed, 48 insertions(+), 128 deletions(-)

diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
index adbe0df3bde..eea116b6662 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.data.pipeline.core.api;
 import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPI;
 import 
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.context.PipelineProcessContext;
 import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
 import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
 import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
@@ -47,6 +48,14 @@ public interface PipelineJobAPI extends 
PipelineJobPublicAPI, PipelineJobItemAPI
      */
     void extendYamlJobConfiguration(YamlPipelineJobConfiguration 
yamlJobConfig);
     
+    /**
+     * Build pipeline process context.
+     *
+     * @param pipelineJobConfig pipeline job configuration
+     * @return pipeline process context
+     */
+    PipelineProcessContext 
buildPipelineProcessContext(PipelineJobConfiguration pipelineJobConfig);
+    
     /**
      * Start job.
      *
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
index 08f2e2a0631..98987fcc9f5 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
@@ -32,7 +32,6 @@ import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineDataConsistencyCheckFailedException;
 import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
-import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobWorker;
 import 
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
@@ -74,10 +73,13 @@ public final class DataConsistencyChecker {
     
     private final TableNameSchemaNameMapping tableNameSchemaNameMapping;
     
-    public DataConsistencyChecker(final MigrationJobConfiguration jobConfig) {
+    private final JobRateLimitAlgorithm readRateLimitAlgorithm;
+    
+    public DataConsistencyChecker(final MigrationJobConfiguration jobConfig, 
final JobRateLimitAlgorithm readRateLimitAlgorithm) {
         this.jobConfig = jobConfig;
         logicTableNames = jobConfig.splitLogicTableNames();
         tableNameSchemaNameMapping = new 
TableNameSchemaNameMapping(TableNameSchemaNameMapping.convert(jobConfig.getSchemaTablesMap()));
+        this.readRateLimitAlgorithm = readRateLimitAlgorithm;
     }
     
     /**
@@ -157,7 +159,6 @@ public final class DataConsistencyChecker {
         PipelineDataSourceConfiguration targetDataSourceConfig = 
jobConfig.getTarget();
         ThreadFactory threadFactory = 
ExecutorThreadFactoryBuilder.build("job-" + 
getJobIdDigest(jobConfig.getJobId()) + "-data-check-%d");
         ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60, 
TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
-        JobRateLimitAlgorithm readRateLimitAlgorithm = 
RuleAlteredJobWorker.createRuleAlteredContext(jobConfig).getReadRateLimitAlgorithm();
         Map<String, DataConsistencyContentCheckResult> result = new 
HashMap<>(logicTableNames.size(), 1);
         try (
                 PipelineDataSourceWrapper sourceDataSource = 
PipelineDataSourceFactory.newInstance(sourceDataSourceConfig);
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
index 79347621bc7..c491cfa11ef 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
@@ -27,7 +27,6 @@ import 
org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationProcessContext;
-import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobWorker;
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
 import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
 
@@ -64,7 +63,7 @@ public final class FinishedCheckJob implements SimpleJob {
             try {
                 // TODO refactor: dispatch to different job types
                 MigrationJobConfiguration jobConfig = 
YamlMigrationJobConfigurationSwapper.swapToObject(jobInfo.getJobParameter());
-                MigrationProcessContext processContext = 
RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
+                MigrationProcessContext processContext = 
jobAPI.buildPipelineProcessContext(jobConfig);
                 if (null == processContext.getCompletionDetectAlgorithm()) {
                     log.info("completionDetector not configured, auto switch 
will not be enabled. You could query job progress and switch config manually 
with DistSQL.");
                     continue;
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java
index 05cc6397c99..72d3a03a5b8 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java
@@ -21,6 +21,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI;
 import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
 import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
 import 
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
@@ -48,6 +49,9 @@ public interface MigrationJobAPI extends PipelineJobAPI, 
MigrationJobPublicAPI,
      */
     TaskConfiguration buildTaskConfiguration(MigrationJobConfiguration 
jobConfig, int jobShardingItem, PipelineProcessConfiguration 
pipelineProcessConfig);
     
+    @Override
+    MigrationProcessContext 
buildPipelineProcessContext(PipelineJobConfiguration pipelineJobConfig);
+    
     /**
      * Get job progress.
      *
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index 5a7ca4c97c1..cce56aa9a0a 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -51,8 +51,8 @@ import 
org.apache.shardingsphere.data.pipeline.core.exception.AddMigrationSource
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineVerifyFailedException;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
-import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobWorker;
 import 
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
+import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobConfigurationPreparerFactory;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo;
@@ -143,6 +143,14 @@ public final class MigrationJobAPIImpl extends 
AbstractPipelineJobAPIImpl implem
         return 
RuleAlteredJobConfigurationPreparerFactory.getInstance().createTaskConfiguration(jobConfig,
 jobShardingItem, pipelineProcessConfig);
     }
     
+    @Override
+    public MigrationProcessContext buildPipelineProcessContext(final 
PipelineJobConfiguration pipelineJobConfig) {
+        // TODO add jobType
+        // TODO read process config from registry center
+        PipelineProcessConfiguration processConfig = new 
PipelineProcessConfiguration(null, null, null);
+        return new MigrationProcessContext(pipelineJobConfig.getJobId(), 
processConfig);
+    }
+    
     @Override
     public List<JobInfo> list() {
         checkModeConfig();
@@ -207,7 +215,7 @@ public final class MigrationJobAPIImpl extends 
AbstractPipelineJobAPIImpl implem
     }
     
     private void verifyManualMode(final MigrationJobConfiguration jobConfig) {
-        MigrationProcessContext processContext = 
RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
+        MigrationProcessContext processContext = 
buildPipelineProcessContext(jobConfig);
         if (null != processContext.getCompletionDetectAlgorithm()) {
             throw new PipelineVerifyFailedException("It's not necessary to do 
it in auto mode.");
         }
@@ -291,7 +299,7 @@ public final class MigrationJobAPIImpl extends 
AbstractPipelineJobAPIImpl implem
     
     @Override
     public boolean isDataConsistencyCheckNeeded(final 
MigrationJobConfiguration jobConfig) {
-        MigrationProcessContext processContext = 
RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
+        MigrationProcessContext processContext = 
buildPipelineProcessContext(jobConfig);
         return isDataConsistencyCheckNeeded(processContext);
     }
     
@@ -310,7 +318,7 @@ public final class MigrationJobAPIImpl extends 
AbstractPipelineJobAPIImpl implem
     
     @Override
     public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final 
MigrationJobConfiguration jobConfig) {
-        MigrationProcessContext processContext = 
RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
+        MigrationProcessContext processContext = 
buildPipelineProcessContext(jobConfig);
         if (!isDataConsistencyCheckNeeded(processContext)) {
             log.info("DataConsistencyCalculatorAlgorithm is not configured, 
data consistency check is ignored.");
             return Collections.emptyMap();
@@ -329,7 +337,8 @@ public final class MigrationJobAPIImpl extends 
AbstractPipelineJobAPIImpl implem
     
     private Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final 
MigrationJobConfiguration jobConfig, final DataConsistencyCalculateAlgorithm 
calculator) {
         String jobId = jobConfig.getJobId();
-        Map<String, DataConsistencyCheckResult> result = new 
DataConsistencyChecker(jobConfig).check(calculator);
+        JobRateLimitAlgorithm readRateLimitAlgorithm = 
buildPipelineProcessContext(jobConfig).getReadRateLimitAlgorithm();
+        Map<String, DataConsistencyCheckResult> result = new 
DataConsistencyChecker(jobConfig, readRateLimitAlgorithm).check(calculator);
         log.info("Scaling job {} with check algorithm '{}' data consistency 
checker result {}", jobId, calculator.getType(), result);
         
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobCheckResult(jobId, 
aggregateDataConsistencyCheckResults(jobId, result));
         return result;
@@ -371,7 +380,7 @@ public final class MigrationJobAPIImpl extends 
AbstractPipelineJobAPIImpl implem
     @Override
     public void switchClusterConfiguration(final MigrationJobConfiguration 
jobConfig) {
         String jobId = jobConfig.getJobId();
-        MigrationProcessContext processContext = 
RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
+        MigrationProcessContext processContext = 
buildPipelineProcessContext(jobConfig);
         GovernanceRepositoryAPI repositoryAPI = 
PipelineAPIFactory.getGovernanceRepositoryAPI();
         if (isDataConsistencyCheckNeeded(processContext)) {
             Optional<Boolean> checkResult = 
repositoryAPI.getJobCheckResult(jobId);
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
index b1ea5766ef5..f559d4e8feb 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
@@ -33,7 +33,6 @@ import 
org.apache.shardingsphere.data.pipeline.core.context.InventoryIncremental
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
-import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobWorker;
 
 import java.util.Collection;
 import java.util.LinkedList;
@@ -86,8 +85,8 @@ public final class MigrationJobItemContext implements 
InventoryIncrementalJobIte
     
     public MigrationJobItemContext(final MigrationJobConfiguration jobConfig, 
final int jobShardingItem, final InventoryIncrementalJobItemProgress 
initProgress,
                                    final PipelineDataSourceManager 
dataSourceManager) {
-        // TODO refactor RuleAlteredJobWorker
-        jobProcessContext = 
RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
+        MigrationJobAPI jobAPI = MigrationJobAPIFactory.getInstance();
+        jobProcessContext = jobAPI.buildPipelineProcessContext(jobConfig);
         this.jobConfig = jobConfig;
         jobId = jobConfig.getJobId();
         this.shardingItem = jobShardingItem;
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationProcessContext.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationProcessContext.java
index 4d960e30305..b3c430629e1 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationProcessContext.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationProcessContext.java
@@ -43,6 +43,12 @@ public final class MigrationProcessContext extends 
AbstractPipelineProcessContex
     
     private final DataConsistencyCalculateAlgorithm 
dataConsistencyCalculateAlgorithm;
     
+    public MigrationProcessContext(final String jobId, final 
PipelineProcessConfiguration originalProcessConfig) {
+        super(jobId, originalProcessConfig);
+        completionDetectAlgorithm = null;
+        dataConsistencyCalculateAlgorithm = null;
+    }
+    
     @SuppressWarnings("unchecked")
     public MigrationProcessContext(final String jobId, final 
OnRuleAlteredActionConfiguration actionConfig) {
         super(jobId, new PipelineProcessConfiguration(actionConfig.getInput(), 
actionConfig.getOutput(), actionConfig.getStreamChannel()));
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java
index da7a60324d1..3a29eb699e6 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java
@@ -29,6 +29,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncreme
 import 
org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
 import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPI;
+import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationProcessContext;
 import 
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
 import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
 
@@ -158,6 +159,11 @@ public final class MigrationJobAPIFixture implements 
MigrationJobAPI {
         return null;
     }
     
+    @Override
+    public MigrationProcessContext buildPipelineProcessContext(final 
PipelineJobConfiguration pipelineJobConfig) {
+        return null;
+    }
+    
     @Override
     public boolean isDefault() {
         return MigrationJobAPI.super.isDefault();
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java
index 373c74c8103..8145b7062f3 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java
@@ -47,7 +47,7 @@ public final class DataConsistencyCheckerTest {
     
     @Test
     public void assertCountAndDataCheck() throws SQLException {
-        Map<String, DataConsistencyCheckResult> actual = new 
DataConsistencyChecker(createJobConfiguration()).check(new 
DataConsistencyCalculateAlgorithmFixture());
+        Map<String, DataConsistencyCheckResult> actual = new 
DataConsistencyChecker(createJobConfiguration(), null).check(new 
DataConsistencyCalculateAlgorithmFixture());
         assertTrue(actual.get("t_order").getCountCheckResult().isMatched());
         
assertThat(actual.get("t_order").getCountCheckResult().getSourceRecordsCount(), 
is(actual.get("t_order").getCountCheckResult().getTargetRecordsCount()));
         assertTrue(actual.get("t_order").getContentCheckResult().isMatched());
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
deleted file mode 100644
index df383c2baa7..00000000000
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
-
-import org.apache.commons.io.FileUtils;
-import 
org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfigurationSwapper;
-import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
-import 
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
-import 
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
-import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
-import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreationException;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
-import org.apache.shardingsphere.data.pipeline.core.util.ConfigurationFileUtil;
-import 
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
-import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
-import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIFactory;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobItemContext;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.cache.event.StartScalingEvent;
-import org.apache.shardingsphere.sharding.api.config.ShardingRuleConfiguration;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.net.URL;
-import java.nio.charset.StandardCharsets;
-import java.util.Collections;
-import java.util.Optional;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-public final class RuleAlteredJobWorkerTest {
-    
-    @BeforeClass
-    public static void beforeClass() {
-        PipelineContextUtil.mockModeConfigAndContextManager();
-    }
-    
-    @Test(expected = PipelineJobCreationException.class)
-    public void assertCreateRuleAlteredContextNoAlteredRule() {
-        MigrationJobConfiguration jobConfig = 
JobConfigurationBuilder.createJobConfiguration();
-        YamlMigrationJobConfigurationSwapper swapper = new 
YamlMigrationJobConfigurationSwapper();
-        YamlMigrationJobConfiguration yamlJobConfig = 
swapper.swapToYamlConfiguration(jobConfig);
-        
yamlJobConfig.setAlteredRuleYamlClassNameTablesMap(Collections.emptyMap());
-        
RuleAlteredJobWorker.createRuleAlteredContext(swapper.swapToObject(yamlJobConfig));
-    }
-    
-    @Test
-    public void assertCreateRuleAlteredContextSuccess() {
-        
assertNotNull(RuleAlteredJobWorker.createRuleAlteredContext(JobConfigurationBuilder.createJobConfiguration()).getPipelineProcessConfig());
-    }
-    
-    @Test
-    public void assertRuleAlteredActionEnabled() {
-        ShardingRuleConfiguration ruleConfig = new ShardingRuleConfiguration();
-        ruleConfig.setScalingName("default_scaling");
-        
assertTrue(RuleAlteredJobWorker.isOnRuleAlteredActionEnabled(ruleConfig));
-    }
-    
-    @Test
-    public void assertRuleAlteredActionDisabled() throws 
InvocationTargetException, NoSuchMethodException, IllegalAccessException {
-        ShardingSpherePipelineDataSourceConfiguration pipelineDataSourceConfig 
= new ShardingSpherePipelineDataSourceConfiguration(
-                
ConfigurationFileUtil.readFile("config_sharding_sphere_jdbc_source.yaml"));
-        ShardingSpherePipelineDataSourceConfiguration pipelineDataTargetConfig 
= new ShardingSpherePipelineDataSourceConfiguration(
-                
ConfigurationFileUtil.readFile("config_sharding_sphere_jdbc_target.yaml"));
-        StartScalingEvent startScalingEvent = new 
StartScalingEvent("logic_db", 
YamlEngine.marshal(pipelineDataSourceConfig.getRootConfig().getDataSources()),
-                
YamlEngine.marshal(pipelineDataSourceConfig.getRootConfig().getRules()), 
YamlEngine.marshal(pipelineDataTargetConfig.getRootConfig().getDataSources()),
-                
YamlEngine.marshal(pipelineDataTargetConfig.getRootConfig().getRules()), 0, 1);
-        RuleAlteredJobWorker ruleAlteredJobWorker = new RuleAlteredJobWorker();
-        Object result = ReflectionUtil.invokeMethod(ruleAlteredJobWorker, 
"createJobConfig", new Class[]{StartScalingEvent.class}, new 
Object[]{startScalingEvent});
-        assertTrue(((Optional<?>) result).isPresent());
-    }
-    
-    // TODO improve assertHasUncompletedJob, refactor 
hasUncompletedJobOfSameDatabaseName for easier unit test
-    // @Test
-    public void assertHasUncompletedJob() throws InvocationTargetException, 
NoSuchMethodException, IllegalAccessException, IOException {
-        final MigrationJobConfiguration jobConfig = 
JobConfigurationBuilder.createJobConfiguration();
-        MigrationJobItemContext jobItemContext = new 
MigrationJobItemContext(jobConfig, 0, new 
InventoryIncrementalJobItemProgress(), new DefaultPipelineDataSourceManager());
-        jobItemContext.setStatus(JobStatus.PREPARING);
-        GovernanceRepositoryAPI repositoryAPI = 
PipelineAPIFactory.getGovernanceRepositoryAPI();
-        
MigrationJobAPIFactory.getInstance().persistJobItemProgress(jobItemContext);
-        URL jobConfigUrl = 
getClass().getClassLoader().getResource("scaling/rule_alter/scaling_job_config.yaml");
-        assertNotNull(jobConfigUrl);
-        
repositoryAPI.persist(PipelineMetaDataNode.getJobConfigPath(jobItemContext.getJobId()),
 FileUtils.readFileToString(new File(jobConfigUrl.getFile()), 
StandardCharsets.UTF_8));
-        Object result = ReflectionUtil.invokeMethod(new 
RuleAlteredJobWorker(), "hasUncompletedJobOfSameDatabaseName", new 
Class[]{String.class},
-                new String[]{jobConfig.getDatabaseName()});
-        assertFalse((Boolean) result);
-    }
-}

Reply via email to