This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 4bc8afaa008 Add AbstractPipelineJob for common usage (#19969)
4bc8afaa008 is described below

commit 4bc8afaa008e55f153f1655435f05b622b1e2689
Author: Xinze Guo <[email protected]>
AuthorDate: Mon Aug 8 18:40:03 2022 +0800

    Add AbstractPipelineJob for common usage (#19969)
---
 .../scenario/rulealtered/AbstractPipelineJob.java  | 57 +++++++++++++++
 .../scenario/rulealtered/RuleAlteredJob.java       | 83 +++++++++++-----------
 .../rulealtered/RuleAlteredJobContext.java         |  5 +-
 .../rulealtered/RuleAlteredJobScheduler.java       | 32 +--------
 .../api/impl/GovernanceRepositoryAPIImplTest.java  |  3 +-
 .../core/api/impl/RuleAlteredJobAPIImplTest.java   |  5 +-
 .../consistency/DataConsistencyCheckerTest.java    |  3 +-
 .../pipeline/core/task/IncrementalTaskTest.java    |  3 +-
 .../data/pipeline/core/task/InventoryTaskTest.java |  3 +-
 .../rulealtered/RuleAlteredJobWorkerTest.java      |  5 +-
 .../prepare/InventoryTaskSplitterTest.java         |  3 +-
 11 files changed, 113 insertions(+), 89 deletions(-)

diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/AbstractPipelineJob.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/AbstractPipelineJob.java
new file mode 100644
index 00000000000..f07b2da52f9
--- /dev/null
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/AbstractPipelineJob.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.job.PipelineJob;
+import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
+import 
org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Abstract pipeline job.
+ */
+@Slf4j
+@Getter
+public abstract class AbstractPipelineJob implements PipelineJob {
+    
+    @Setter
+    private volatile String jobId;
+    
+    @Setter
+    private volatile boolean stopping;
+    
+    @Setter
+    private OneOffJobBootstrap oneOffJobBootstrap;
+    
+    private final Map<Integer, PipelineTasksRunner> tasksRunnerMap = new 
ConcurrentHashMap<>();
+    
+    protected void runInBackground(final Runnable runnable) {
+        new Thread(runnable).start();
+    }
+    
+    @Override
+    public Optional<PipelineTasksRunner> getTasksRunner(final int 
shardingItem) {
+        return Optional.ofNullable(tasksRunnerMap.get(shardingItem));
+    }
+}
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
index e0ee494d5bf..ebd74c85f1a 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
@@ -17,98 +17,99 @@
 
 package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
 
-import lombok.Getter;
 import lombok.RequiredArgsConstructor;
-import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.PipelineJob;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
+import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
 import 
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineIgnoredException;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
-import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
-import 
org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
 import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
 
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.ConcurrentHashMap;
-
 /**
  * Rule altered job.
  */
 @Slf4j
 @RequiredArgsConstructor
-public final class RuleAlteredJob implements SimpleJob, PipelineJob {
+public final class RuleAlteredJob extends AbstractPipelineJob implements 
SimpleJob, PipelineJob {
     
     private final GovernanceRepositoryAPI governanceRepositoryAPI = 
PipelineAPIFactory.getGovernanceRepositoryAPI();
     
-    private volatile String jobId;
-    
     private final PipelineDataSourceManager dataSourceManager = new 
PipelineDataSourceManager();
     
     // Shared by all sharding items
     private final RuleAlteredJobPreparer jobPreparer = new 
RuleAlteredJobPreparer();
     
-    @Getter
-    private final Map<Integer, PipelineTasksRunner> tasksRunnerMap = new 
ConcurrentHashMap<>();
-    
-    private volatile boolean stopping;
-    
-    @Setter
-    private OneOffJobBootstrap oneOffJobBootstrap;
-    
     @Override
     public void execute(final ShardingContext shardingContext) {
         log.info("Execute job {}-{}", shardingContext.getJobName(), 
shardingContext.getShardingItem());
-        if (stopping) {
+        if (isStopping()) {
             log.info("stopping true, ignore");
             return;
         }
-        jobId = shardingContext.getJobName();
+        setJobId(shardingContext.getJobName());
         RuleAlteredJobConfiguration jobConfig = 
RuleAlteredJobConfigurationSwapper.swapToObject(shardingContext.getJobParameter());
         JobProgress initProgress = 
governanceRepositoryAPI.getJobProgress(shardingContext.getJobName(), 
shardingContext.getShardingItem());
-        RuleAlteredJobContext jobContext = new 
RuleAlteredJobContext(jobConfig, shardingContext.getShardingItem(), 
initProgress, dataSourceManager, jobPreparer);
+        RuleAlteredJobContext jobContext = new 
RuleAlteredJobContext(jobConfig, shardingContext.getShardingItem(), 
initProgress, dataSourceManager);
         int shardingItem = jobContext.getShardingItem();
-        if (tasksRunnerMap.containsKey(shardingItem)) {
-            // If the following log is output, it is possible that the 
elasticjob task was not closed correctly
+        if (getTasksRunnerMap().containsKey(shardingItem)) {
+            // If the following log is output, it is possible that the 
elasticjob task was not shutdown correctly
             log.warn("schedulerMap contains shardingItem {}, ignore", 
shardingItem);
             return;
         }
-        log.info("start RuleAlteredJobScheduler, jobId={}, shardingItem={}", 
jobId, shardingItem);
+        log.info("start RuleAlteredJobScheduler, jobId={}, shardingItem={}", 
getJobId(), shardingItem);
         RuleAlteredJobScheduler jobScheduler = new 
RuleAlteredJobScheduler(jobContext);
-        jobScheduler.start();
-        tasksRunnerMap.put(shardingItem, jobScheduler);
-        PipelineJobProgressPersistService.addJobProgressPersistContext(jobId, 
shardingItem);
-    }
-    
-    @Override
-    public Optional<PipelineTasksRunner> getTasksRunner(final int 
shardingItem) {
-        return Optional.ofNullable(tasksRunnerMap.get(shardingItem));
+        runInBackground(() -> {
+            prepare(jobContext);
+            jobScheduler.start();
+        });
+        getTasksRunnerMap().put(shardingItem, jobScheduler);
+        
PipelineJobProgressPersistService.addJobProgressPersistContext(getJobId(), 
shardingItem);
     }
     
     /**
      * Stop job.
      */
     public void stop() {
-        stopping = true;
+        setStopping(true);
         dataSourceManager.close();
-        if (null != oneOffJobBootstrap) {
-            oneOffJobBootstrap.shutdown();
+        if (null != getOneOffJobBootstrap()) {
+            getOneOffJobBootstrap().shutdown();
         }
-        if (null == jobId) {
+        if (null == getJobId()) {
             log.info("stop, jobId is null, ignore");
             return;
         }
-        log.info("stop job scheduler, jobId={}", jobId);
-        for (PipelineTasksRunner each : tasksRunnerMap.values()) {
+        log.info("stop job scheduler, jobId={}", getJobId());
+        for (PipelineTasksRunner each : getTasksRunnerMap().values()) {
             each.stop();
         }
-        tasksRunnerMap.clear();
-        
PipelineJobProgressPersistService.removeJobProgressPersistContext(jobId);
+        getTasksRunnerMap().clear();
+        
PipelineJobProgressPersistService.removeJobProgressPersistContext(getJobId());
+    }
+    
+    private void prepare(final RuleAlteredJobContext jobContext) {
+        try {
+            jobPreparer.prepare(jobContext);
+        } catch (final PipelineIgnoredException ex) {
+            log.info("pipeline ignore exception: {}", ex.getMessage());
+            PipelineJobCenter.stop(getJobId());
+            // CHECKSTYLE:OFF
+        } catch (final RuntimeException ex) {
+            // CHECKSTYLE:ON
+            log.error("job prepare failed, {}-{}", getJobId(), 
jobContext.getShardingItem(), ex);
+            PipelineJobCenter.stop(getJobId());
+            jobContext.setStatus(JobStatus.PREPARING_FAILURE);
+            
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobProgress(jobContext);
+            throw ex;
+        }
     }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
index 8ffcba5ec20..ef511163d33 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
@@ -83,17 +83,14 @@ public final class RuleAlteredJobContext implements 
PipelineJobContext {
         }
     };
     
-    private final RuleAlteredJobPreparer jobPreparer;
-    
     public RuleAlteredJobContext(final RuleAlteredJobConfiguration jobConfig, 
final int jobShardingItem, final JobProgress initProgress,
-                                 final PipelineDataSourceManager 
dataSourceManager, final RuleAlteredJobPreparer jobPreparer) {
+                                 final PipelineDataSourceManager 
dataSourceManager) {
         jobProcessContext = 
RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
         this.jobConfig = jobConfig;
         jobId = jobConfig.getJobId();
         this.shardingItem = jobShardingItem;
         this.initProgress = initProgress;
         this.dataSourceManager = dataSourceManager;
-        this.jobPreparer = jobPreparer;
         taskConfig = RuleAlteredJobWorker.buildTaskConfig(jobConfig, 
jobShardingItem, jobProcessContext.getPipelineProcessConfig());
     }
     
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
index ac0618c3643..45f99dc3e21 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
@@ -23,11 +23,8 @@ import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
-import 
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
-import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineIgnoredException;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
-import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
 import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
@@ -38,17 +35,10 @@ import 
org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
 @Slf4j
 @RequiredArgsConstructor
 @Getter
-public final class RuleAlteredJobScheduler implements PipelineTasksRunner, 
Runnable {
+public final class RuleAlteredJobScheduler implements PipelineTasksRunner {
     
     private final RuleAlteredJobContext jobContext;
     
-    /**
-     * Start execute job.
-     */
-    public void start() {
-        new Thread(this).start();
-    }
-    
     /**
      * Stop all task.
      */
@@ -69,28 +59,12 @@ public final class RuleAlteredJobScheduler implements 
PipelineTasksRunner, Runna
     }
     
     @Override
-    public void run() {
-        String jobId = jobContext.getJobId();
-        GovernanceRepositoryAPI governanceRepositoryAPI = 
PipelineAPIFactory.getGovernanceRepositoryAPI();
-        try {
-            jobContext.getJobPreparer().prepare(jobContext);
-        } catch (final PipelineIgnoredException ex) {
-            log.info("pipeline ignore exception: {}", ex.getMessage());
-            PipelineJobCenter.stop(jobId);
-            // CHECKSTYLE:OFF
-        } catch (final RuntimeException ex) {
-            // CHECKSTYLE:ON
-            log.error("job prepare failed, {}-{}", jobId, 
jobContext.getShardingItem(), ex);
-            PipelineJobCenter.stop(jobId);
-            jobContext.setStatus(JobStatus.PREPARING_FAILURE);
-            governanceRepositoryAPI.persistJobProgress(jobContext);
-            throw ex;
-        }
+    public void start() {
         if (jobContext.isStopping()) {
             log.info("job stopping, ignore inventory task");
             return;
         }
-        governanceRepositoryAPI.persistJobProgress(jobContext);
+        
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobProgress(jobContext);
         if (executeInventoryTask()) {
             if (jobContext.isStopping()) {
                 log.info("stopping, ignore incremental task");
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
index 8457c4e1ff5..c6864fe9f61 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
@@ -37,7 +37,6 @@ import 
org.apache.shardingsphere.data.pipeline.core.util.ConfigurationFileUtil;
 import 
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
 import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
-import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobPreparer;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
 import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
@@ -142,7 +141,7 @@ public final class GovernanceRepositoryAPIImplTest {
     }
     
     private RuleAlteredJobContext mockJobContext() {
-        RuleAlteredJobContext result = new 
RuleAlteredJobContext(JobConfigurationBuilder.createJobConfiguration(), 0, new 
JobProgress(), new PipelineDataSourceManager(), new RuleAlteredJobPreparer());
+        RuleAlteredJobContext result = new 
RuleAlteredJobContext(JobConfigurationBuilder.createJobConfiguration(), 0, new 
JobProgress(), new PipelineDataSourceManager());
         TaskConfiguration taskConfig = result.getTaskConfig();
         result.getInventoryTasks().add(mockInventoryTask(taskConfig));
         result.getIncrementalTasks().add(mockIncrementalTask(taskConfig));
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
index a525658e714..6c72641a38d 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
@@ -38,7 +38,6 @@ import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineVerifyFail
 import 
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
 import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
-import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobPreparer;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -200,7 +199,7 @@ public final class RuleAlteredJobAPIImplTest {
         Optional<String> jobId = ruleAlteredJobAPI.start(jobConfig);
         assertTrue(jobId.isPresent());
         final GovernanceRepositoryAPI repositoryAPI = 
PipelineAPIFactory.getGovernanceRepositoryAPI();
-        RuleAlteredJobContext jobContext = new 
RuleAlteredJobContext(jobConfig, 0, new JobProgress(), new 
PipelineDataSourceManager(), new RuleAlteredJobPreparer());
+        RuleAlteredJobContext jobContext = new 
RuleAlteredJobContext(jobConfig, 0, new JobProgress(), new 
PipelineDataSourceManager());
         repositoryAPI.persistJobProgress(jobContext);
         repositoryAPI.persistJobCheckResult(jobId.get(), true);
         repositoryAPI.updateShardingJobStatus(jobId.get(), 0, 
JobStatus.FINISHED);
@@ -213,7 +212,7 @@ public final class RuleAlteredJobAPIImplTest {
         Optional<String> jobId = ruleAlteredJobAPI.start(jobConfig);
         assertTrue(jobId.isPresent());
         GovernanceRepositoryAPI repositoryAPI = 
PipelineAPIFactory.getGovernanceRepositoryAPI();
-        RuleAlteredJobContext jobContext = new 
RuleAlteredJobContext(jobConfig, 0, new JobProgress(), new 
PipelineDataSourceManager(), new RuleAlteredJobPreparer());
+        RuleAlteredJobContext jobContext = new 
RuleAlteredJobContext(jobConfig, 0, new JobProgress(), new 
PipelineDataSourceManager());
         repositoryAPI.persistJobProgress(jobContext);
         repositoryAPI.persistJobCheckResult(jobId.get(), true);
         repositoryAPI.updateShardingJobStatus(jobId.get(), 
jobContext.getShardingItem(), JobStatus.EXECUTE_INVENTORY_TASK);
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java
index e9f18715051..e9c6e1a0b06 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java
@@ -26,7 +26,6 @@ import 
org.apache.shardingsphere.data.pipeline.core.fixture.DataConsistencyCalcu
 import 
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
 import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
-import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobPreparer;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -56,7 +55,7 @@ public final class DataConsistencyCheckerTest {
     
     private RuleAlteredJobConfiguration createJobConfiguration() throws 
SQLException {
         RuleAlteredJobContext jobContext = new 
RuleAlteredJobContext(JobConfigurationBuilder.createJobConfiguration(), 0,
-                new JobProgress(), new PipelineDataSourceManager(), new 
RuleAlteredJobPreparer());
+                new JobProgress(), new PipelineDataSourceManager());
         
initTableData(jobContext.getTaskConfig().getDumperConfig().getDataSourceConfig());
         
initTableData(jobContext.getTaskConfig().getImporterConfig().getDataSourceConfig());
         return jobContext.getJobConfig();
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
index e81a8913b09..509caa6e02b 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
@@ -27,7 +27,6 @@ import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTabl
 import 
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
 import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
-import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobPreparer;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -50,7 +49,7 @@ public final class IncrementalTaskTest {
     @Before
     public void setUp() {
         TaskConfiguration taskConfig = new 
RuleAlteredJobContext(JobConfigurationBuilder.createJobConfiguration(), 0, new 
JobProgress(),
-                new PipelineDataSourceManager(), new 
RuleAlteredJobPreparer()).getTaskConfig();
+                new PipelineDataSourceManager()).getTaskConfig();
         taskConfig.getDumperConfig().setPosition(new PlaceholderPosition());
         PipelineTableMetaDataLoader metaDataLoader = new 
PipelineTableMetaDataLoader(mock(PipelineDataSourceWrapper.class));
         incrementalTask = new IncrementalTask(3, taskConfig.getDumperConfig(), 
taskConfig.getImporterConfig(),
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
index afde39ffba6..f31cba885c0 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
@@ -30,7 +30,6 @@ import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTabl
 import 
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
 import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
-import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobPreparer;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -62,7 +61,7 @@ public final class InventoryTaskTest {
     
     @Before
     public void setUp() {
-        taskConfig = new 
RuleAlteredJobContext(JobConfigurationBuilder.createJobConfiguration(), 0, new 
JobProgress(), new PipelineDataSourceManager(), new 
RuleAlteredJobPreparer()).getTaskConfig();
+        taskConfig = new 
RuleAlteredJobContext(JobConfigurationBuilder.createJobConfiguration(), 0, new 
JobProgress(), new PipelineDataSourceManager()).getTaskConfig();
     }
     
     @Test(expected = IngestException.class)
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
index d51586c71d9..32efa4f06fc 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
@@ -43,6 +43,7 @@ import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.net.URL;
+import java.nio.charset.StandardCharsets;
 import java.util.Collections;
 import java.util.Optional;
 
@@ -96,13 +97,13 @@ public final class RuleAlteredJobWorkerTest {
     // @Test
     public void assertHasUncompletedJob() throws InvocationTargetException, 
NoSuchMethodException, IllegalAccessException, IOException {
         final RuleAlteredJobConfiguration jobConfig = 
JobConfigurationBuilder.createJobConfiguration();
-        RuleAlteredJobContext jobContext = new 
RuleAlteredJobContext(jobConfig, 0, new JobProgress(), new 
PipelineDataSourceManager(), new RuleAlteredJobPreparer());
+        RuleAlteredJobContext jobContext = new 
RuleAlteredJobContext(jobConfig, 0, new JobProgress(), new 
PipelineDataSourceManager());
         jobContext.setStatus(JobStatus.PREPARING);
         GovernanceRepositoryAPI repositoryAPI = 
PipelineAPIFactory.getGovernanceRepositoryAPI();
         repositoryAPI.persistJobProgress(jobContext);
         URL jobConfigUrl = 
getClass().getClassLoader().getResource("scaling/rule_alter/scaling_job_config.yaml");
         assertNotNull(jobConfigUrl);
-        
repositoryAPI.persist(PipelineMetaDataNode.getJobConfigPath(jobContext.getJobId()),
 FileUtils.readFileToString(new File(jobConfigUrl.getFile())));
+        
repositoryAPI.persist(PipelineMetaDataNode.getJobConfigPath(jobContext.getJobId()),
 FileUtils.readFileToString(new File(jobConfigUrl.getFile()), 
StandardCharsets.UTF_8));
         Object result = ReflectionUtil.invokeMethod(new 
RuleAlteredJobWorker(), "hasUncompletedJobOfSameDatabaseName", new 
Class[]{String.class},
                 new String[]{jobConfig.getDatabaseName()});
         assertFalse((Boolean) result);
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java
index ee0ea66f257..99fbc9b6d30 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java
@@ -29,7 +29,6 @@ import 
org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
 import 
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
 import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
-import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobPreparer;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -68,7 +67,7 @@ public final class InventoryTaskSplitterTest {
     private void initJobContext() {
         RuleAlteredJobConfiguration jobConfig = 
JobConfigurationBuilder.createJobConfiguration();
         JobProgress initProgress = 
PipelineAPIFactory.getGovernanceRepositoryAPI().getJobProgress(jobConfig.getJobId(),
 0);
-        jobContext = new RuleAlteredJobContext(jobConfig, 0, initProgress, new 
PipelineDataSourceManager(), new RuleAlteredJobPreparer());
+        jobContext = new RuleAlteredJobContext(jobConfig, 0, initProgress, new 
PipelineDataSourceManager());
         dataSourceManager = jobContext.getDataSourceManager();
         taskConfig = jobContext.getTaskConfig();
     }

Reply via email to