This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 46305879521 Extract start job method to PipelineJobAPI for common 
usage (#20126)
46305879521 is described below

commit 46305879521d0f11289c98f6afcc161722c218bd
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Sat Aug 13 09:33:49 2022 +0800

    Extract start job method to PipelineJobAPI for common usage (#20126)
    
    * Rename getJobConfig and extendJobConfiguration
    
    * Extract start job method to PipelineJobAPI
    
    * Move YamlPipelineJobConfiguration
    
    * Improve job item status persistence
---
 .../data/pipeline/api/PipelineJobPublicAPI.java    |  2 +-
 .../{ => yaml}/YamlPipelineJobConfiguration.java   |  2 +-
 .../yaml/YamlRuleAlteredJobConfiguration.java      |  2 +-
 .../data/pipeline/core/api/PipelineJobAPI.java     | 21 ++++--
 .../data/pipeline/core/api/RuleAlteredJobAPI.java  | 13 +---
 .../core/api/impl/AbstractPipelineJobAPIImpl.java  | 40 ++++++++++-
 .../core/api/impl/RuleAlteredJobAPIImpl.java       | 77 +++++++---------------
 .../core/task/InventoryIncrementalTasksRunner.java | 17 +++--
 .../scenario/rulealtered/RuleAlteredJobWorker.java |  9 +--
 .../core/fixture/RuleAlteredJobAPIFixture.java     | 11 ++--
 .../core/api/impl/RuleAlteredJobAPIImplTest.java   |  6 +-
 .../core/util/JobConfigurationBuilder.java         |  5 +-
 12 files changed, 114 insertions(+), 91 deletions(-)

diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPI.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPI.java
index efd9ce54bd8..dd8c9175c2f 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPI.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPI.java
@@ -25,7 +25,7 @@ import 
org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
 public interface PipelineJobPublicAPI extends TypedSPI {
     
     /**
-     * Start pipeline job by id.
+     * Start disabled job.
      *
      * @param jobId job id
      */
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/YamlPipelineJobConfiguration.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlPipelineJobConfiguration.java
similarity index 94%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/YamlPipelineJobConfiguration.java
rename to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlPipelineJobConfiguration.java
index 1a39b7c854c..c0ab0ca640b 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/YamlPipelineJobConfiguration.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlPipelineJobConfiguration.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.api.config.job;
+package org.apache.shardingsphere.data.pipeline.api.config.job.yaml;
 
 import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
 
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/yaml/YamlRuleAlteredJobConfiguration.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/yaml/YamlRuleAlteredJobConfiguration.java
index 39840c6a08d..efc26ef8dfc 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/yaml/YamlRuleAlteredJobConfiguration.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/yaml/YamlRuleAlteredJobConfiguration.java
@@ -21,7 +21,7 @@ import com.google.common.base.Preconditions;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
-import 
org.apache.shardingsphere.data.pipeline.api.config.job.YamlPipelineJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
 
 import java.util.List;
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
index cdcaed2104e..adbe0df3bde 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
@@ -19,11 +19,13 @@ package org.apache.shardingsphere.data.pipeline.core.api;
 
 import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPI;
 import 
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.api.config.job.YamlPipelineJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
 import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
 import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
 
+import java.util.Optional;
+
 /**
  * Pipeline job API.
  */
@@ -39,11 +41,19 @@ public interface PipelineJobAPI extends 
PipelineJobPublicAPI, PipelineJobItemAPI
     String marshalJobId(PipelineJobId pipelineJobId);
     
     /**
-     * Extend job configuration.
+     * Extend YAML job configuration.
+     *
+     * @param yamlJobConfig YAML job configuration
+     */
+    void extendYamlJobConfiguration(YamlPipelineJobConfiguration 
yamlJobConfig);
+    
+    /**
+     * Start job.
      *
-     * @param yamlJobConfig yaml job configuration
+     * @param jobConfig job configuration
+     * @return job id
      */
-    void extendJobConfiguration(YamlPipelineJobConfiguration yamlJobConfig);
+    Optional<String> start(PipelineJobConfiguration jobConfig);
     
     /**
      * Get job configuration.
@@ -51,6 +61,5 @@ public interface PipelineJobAPI extends PipelineJobPublicAPI, 
PipelineJobItemAPI
      * @param jobId job id
      * @return job configuration
      */
-    // TODO rename
-    PipelineJobConfiguration getJobConfig(String jobId);
+    PipelineJobConfiguration getJobConfiguration(String jobId);
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/RuleAlteredJobAPI.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/RuleAlteredJobAPI.java
index 7a8def8601d..468a0c950c7 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/RuleAlteredJobAPI.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/RuleAlteredJobAPI.java
@@ -25,7 +25,6 @@ import 
org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
 import org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPI;
 
 import java.util.Map;
-import java.util.Optional;
 
 /**
  * Rule altered job API.
@@ -33,13 +32,8 @@ import java.util.Optional;
 @SingletonSPI
 public interface RuleAlteredJobAPI extends PipelineJobAPI, 
MigrationJobPublicAPI, RequiredSPI {
     
-    /**
-     * Start scaling job by config.
-     *
-     * @param jobConfig job config
-     * @return job id
-     */
-    Optional<String> start(RuleAlteredJobConfiguration jobConfig);
+    @Override
+    RuleAlteredJobConfiguration getJobConfiguration(String jobId);
     
     /**
      * Get job progress.
@@ -97,7 +91,4 @@ public interface RuleAlteredJobAPI extends PipelineJobAPI, 
MigrationJobPublicAPI
      * @param jobConfig job configuration
      */
     void switchClusterConfiguration(RuleAlteredJobConfiguration jobConfig);
-    
-    @Override
-    RuleAlteredJobConfiguration getJobConfig(String jobId);
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
index aca2c9f609a..3da0bc5b75f 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
@@ -18,16 +18,24 @@
 package org.apache.shardingsphere.data.pipeline.core.api.impl;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
+import 
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
 import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobIdUtils;
+import 
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreationException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobNotFoundException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineVerifyFailedException;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
+import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJob;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 
 import java.time.LocalDateTime;
 import java.time.format.DateTimeFormatter;
+import java.util.Optional;
 
 /**
  * Abstract pipeline job API impl.
@@ -44,6 +52,36 @@ public abstract class AbstractPipelineJobAPIImpl implements 
PipelineJobAPI {
     
     protected abstract String marshalJobIdLeftPart(PipelineJobId 
pipelineJobId);
     
+    @Override
+    public Optional<String> start(final PipelineJobConfiguration jobConfig) {
+        String jobId = jobConfig.getJobId();
+        if (0 == jobConfig.getJobShardingCount()) {
+            log.warn("Invalid job config since job sharding count is 0, 
jobId={}", jobId);
+            throw new PipelineJobCreationException("job sharding count is 0, 
jobId: " + jobId);
+        }
+        log.info("Start job by {}", jobConfig);
+        GovernanceRepositoryAPI repositoryAPI = 
PipelineAPIFactory.getGovernanceRepositoryAPI();
+        String jobConfigKey = 
PipelineMetaDataNode.getScalingJobConfigPath(jobId);
+        if (repositoryAPI.isExisted(jobConfigKey)) {
+            log.warn("jobId already exists in registry center, ignore, 
jobConfigKey={}", jobConfigKey);
+            return Optional.of(jobId);
+        }
+        repositoryAPI.persist(PipelineMetaDataNode.getScalingJobPath(jobId), 
RuleAlteredJob.class.getName());
+        repositoryAPI.persist(jobConfigKey, 
convertJobConfigurationToText(jobConfig));
+        return Optional.of(jobId);
+    }
+    
+    private String convertJobConfigurationToText(final 
PipelineJobConfiguration jobConfig) {
+        JobConfigurationPOJO jobConfigPOJO = new JobConfigurationPOJO();
+        jobConfigPOJO.setJobName(jobConfig.getJobId());
+        jobConfigPOJO.setShardingTotalCount(jobConfig.getJobShardingCount());
+        
jobConfigPOJO.setJobParameter(YamlEngine.marshal(swapToYamlJobConfiguration(jobConfig)));
+        jobConfigPOJO.getProps().setProperty("create_time", 
LocalDateTime.now().format(DATE_TIME_FORMATTER));
+        return YamlEngine.marshal(jobConfigPOJO);
+    }
+    
+    protected abstract YamlPipelineJobConfiguration 
swapToYamlJobConfiguration(PipelineJobConfiguration jobConfig);
+    
     @Override
     public void startDisabledJob(final String jobId) {
         log.info("Start disabled pipeline job {}", jobId);
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
index 1f3576de972..60e170aaed8 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
@@ -22,7 +22,8 @@ import com.google.common.base.Strings;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.codec.binary.Hex;
 import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
-import 
org.apache.shardingsphere.data.pipeline.api.config.job.YamlPipelineJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.YamlRuleAlteredJobConfiguration;
@@ -43,13 +44,10 @@ import 
org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyChecker;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
-import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreationException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineVerifyFailedException;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
 import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredContext;
-import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJob;
 import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobWorker;
 import 
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobConfigurationPreparerFactory;
@@ -58,12 +56,10 @@ import 
org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo;
 import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
 import org.apache.shardingsphere.infra.lock.LockContext;
 import org.apache.shardingsphere.infra.lock.LockDefinition;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import org.apache.shardingsphere.mode.lock.ExclusiveLockDefinition;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.ScalingTaskFinishedEvent;
 
 import java.nio.charset.StandardCharsets;
-import java.time.LocalDateTime;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedHashMap;
@@ -92,7 +88,7 @@ public final class RuleAlteredJobAPIImpl extends 
AbstractPipelineJobAPIImpl impl
     }
     
     @Override
-    public void extendJobConfiguration(final YamlPipelineJobConfiguration 
yamlJobConfig) {
+    public void extendYamlJobConfiguration(final YamlPipelineJobConfiguration 
yamlJobConfig) {
         YamlRuleAlteredJobConfiguration config = 
(YamlRuleAlteredJobConfiguration) yamlJobConfig;
         if (null == config.getJobShardingDataNodes()) {
             
RuleAlteredJobConfigurationPreparerFactory.getInstance().extendJobConfiguration(config);
@@ -120,6 +116,20 @@ public final class RuleAlteredJobAPIImpl extends 
AbstractPipelineJobAPIImpl impl
         return marshalJobId(jobId);
     }
     
+    @Override
+    protected YamlPipelineJobConfiguration swapToYamlJobConfiguration(final 
PipelineJobConfiguration jobConfig) {
+        return new 
RuleAlteredJobConfigurationSwapper().swapToYamlConfiguration((RuleAlteredJobConfiguration)
 jobConfig);
+    }
+    
+    @Override
+    public RuleAlteredJobConfiguration getJobConfiguration(final String jobId) 
{
+        return getJobConfiguration(getElasticJobConfigPOJO(jobId));
+    }
+    
+    private RuleAlteredJobConfiguration getJobConfiguration(final 
JobConfigurationPOJO jobConfigPOJO) {
+        return 
RuleAlteredJobConfigurationSwapper.swapToObject(jobConfigPOJO.getJobParameter());
+    }
+    
     @Override
     public List<JobInfo> list() {
         checkModeConfig();
@@ -139,7 +149,7 @@ public final class RuleAlteredJobAPIImpl extends 
AbstractPipelineJobAPIImpl impl
     private JobInfo getJobInfo(final String jobName) {
         JobInfo result = new JobInfo(jobName);
         JobConfigurationPOJO jobConfigPOJO = 
getElasticJobConfigPOJO(result.getJobId());
-        RuleAlteredJobConfiguration jobConfig = getJobConfig(jobConfigPOJO);
+        RuleAlteredJobConfiguration jobConfig = 
getJobConfiguration(jobConfigPOJO);
         result.setActive(!jobConfigPOJO.isDisabled());
         result.setShardingTotalCount(jobConfig.getJobShardingCount());
         result.setTables(jobConfig.getLogicTables());
@@ -149,38 +159,10 @@ public final class RuleAlteredJobAPIImpl extends 
AbstractPipelineJobAPIImpl impl
         return result;
     }
     
-    @Override
-    public Optional<String> start(final RuleAlteredJobConfiguration jobConfig) 
{
-        if (0 == jobConfig.getJobShardingCount()) {
-            log.warn("Invalid scaling job config!");
-            throw new PipelineJobCreationException("handleConfig 
shardingTotalCount is 0");
-        }
-        log.info("Start scaling job by {}", jobConfig);
-        GovernanceRepositoryAPI repositoryAPI = 
PipelineAPIFactory.getGovernanceRepositoryAPI();
-        String jobId = jobConfig.getJobId();
-        String jobConfigKey = 
PipelineMetaDataNode.getScalingJobConfigPath(jobId);
-        if (repositoryAPI.isExisted(jobConfigKey)) {
-            log.warn("jobId already exists in registry center, ignore, 
jobConfigKey={}", jobConfigKey);
-            return Optional.of(jobId);
-        }
-        repositoryAPI.persist(PipelineMetaDataNode.getScalingJobPath(jobId), 
RuleAlteredJob.class.getName());
-        repositoryAPI.persist(jobConfigKey, createJobConfigText(jobConfig));
-        return Optional.of(jobId);
-    }
-    
-    private String createJobConfigText(final RuleAlteredJobConfiguration 
jobConfig) {
-        JobConfigurationPOJO jobConfigPOJO = new JobConfigurationPOJO();
-        jobConfigPOJO.setJobName(jobConfig.getJobId());
-        jobConfigPOJO.setShardingTotalCount(jobConfig.getJobShardingCount());
-        jobConfigPOJO.setJobParameter(YamlEngine.marshal(new 
RuleAlteredJobConfigurationSwapper().swapToYamlConfiguration(jobConfig)));
-        jobConfigPOJO.getProps().setProperty("create_time", 
LocalDateTime.now().format(DATE_TIME_FORMATTER));
-        return YamlEngine.marshal(jobConfigPOJO);
-    }
-    
     @Override
     public Map<Integer, InventoryIncrementalJobItemProgress> 
getJobProgress(final String jobId) {
         checkModeConfig();
-        return getJobProgress(getJobConfig(jobId));
+        return getJobProgress(getJobConfiguration(jobId));
     }
     
     @Override
@@ -229,7 +211,7 @@ public final class RuleAlteredJobAPIImpl extends 
AbstractPipelineJobAPIImpl impl
         checkModeConfig();
         log.info("stopClusterWriteDB for job {}", jobId);
         JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
-        RuleAlteredJobConfiguration jobConfig = getJobConfig(jobConfigPOJO);
+        RuleAlteredJobConfiguration jobConfig = 
getJobConfiguration(jobConfigPOJO);
         verifyManualMode(jobConfig);
         verifyJobNotStopped(jobConfigPOJO);
         verifyJobNotCompleted(jobConfig);
@@ -257,7 +239,7 @@ public final class RuleAlteredJobAPIImpl extends 
AbstractPipelineJobAPIImpl impl
         checkModeConfig();
         log.info("restoreClusterWriteDB for job {}", jobId);
         JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
-        RuleAlteredJobConfiguration jobConfig = getJobConfig(jobConfigPOJO);
+        RuleAlteredJobConfiguration jobConfig = 
getJobConfiguration(jobConfigPOJO);
         verifyManualMode(jobConfig);
         restoreClusterWriteDB(jobConfig);
     }
@@ -290,7 +272,7 @@ public final class RuleAlteredJobAPIImpl extends 
AbstractPipelineJobAPIImpl impl
     @Override
     public boolean isDataConsistencyCheckNeeded(final String jobId) {
         log.info("isDataConsistencyCheckNeeded for job {}", jobId);
-        RuleAlteredJobConfiguration jobConfig = getJobConfig(jobId);
+        RuleAlteredJobConfiguration jobConfig = getJobConfiguration(jobId);
         return isDataConsistencyCheckNeeded(jobConfig);
     }
     
@@ -308,7 +290,7 @@ public final class RuleAlteredJobAPIImpl extends 
AbstractPipelineJobAPIImpl impl
     public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final 
String jobId) {
         checkModeConfig();
         log.info("Data consistency check for job {}", jobId);
-        RuleAlteredJobConfiguration jobConfig = 
getJobConfig(getElasticJobConfigPOJO(jobId));
+        RuleAlteredJobConfiguration jobConfig = 
getJobConfiguration(getElasticJobConfigPOJO(jobId));
         verifyDataConsistencyCheck(jobConfig);
         return dataConsistencyCheck(jobConfig);
     }
@@ -327,7 +309,7 @@ public final class RuleAlteredJobAPIImpl extends 
AbstractPipelineJobAPIImpl impl
     public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final 
String jobId, final String algorithmType, final Properties algorithmProps) {
         checkModeConfig();
         log.info("Data consistency check for job {}, algorithmType: {}", 
jobId, algorithmType);
-        RuleAlteredJobConfiguration jobConfig = 
getJobConfig(getElasticJobConfigPOJO(jobId));
+        RuleAlteredJobConfiguration jobConfig = 
getJobConfiguration(getElasticJobConfigPOJO(jobId));
         verifyDataConsistencyCheck(jobConfig);
         return dataConsistencyCheck(jobConfig, 
DataConsistencyCalculateAlgorithmFactory.newInstance(algorithmType, 
algorithmProps));
     }
@@ -366,7 +348,7 @@ public final class RuleAlteredJobAPIImpl extends 
AbstractPipelineJobAPIImpl impl
         checkModeConfig();
         log.info("Switch cluster configuration for job {}", jobId);
         JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
-        RuleAlteredJobConfiguration jobConfig = getJobConfig(jobConfigPOJO);
+        RuleAlteredJobConfiguration jobConfig = 
getJobConfiguration(jobConfigPOJO);
         verifyManualMode(jobConfig);
         verifyJobNotStopped(jobConfigPOJO);
         verifyJobNotCompleted(jobConfig);
@@ -402,15 +384,6 @@ public final class RuleAlteredJobAPIImpl extends 
AbstractPipelineJobAPIImpl impl
         verifyJobStopped(jobConfigPOJO);
     }
     
-    @Override
-    public RuleAlteredJobConfiguration getJobConfig(final String jobId) {
-        return getJobConfig(getElasticJobConfigPOJO(jobId));
-    }
-    
-    private RuleAlteredJobConfiguration getJobConfig(final 
JobConfigurationPOJO jobConfigPOJO) {
-        return 
RuleAlteredJobConfigurationSwapper.swapToObject(jobConfigPOJO.getJobParameter());
-    }
-    
     @Override
     public String getType() {
         return JobType.MIGRATION.getTypeName();
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
index 5ea0c7045b4..e9b9c1566ac 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
@@ -26,6 +26,8 @@ import 
org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobIdUtils;
 import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobItemAPI;
+import 
org.apache.shardingsphere.data.pipeline.core.api.impl.InventoryIncrementalJobItemAPIImpl;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
@@ -39,6 +41,8 @@ import java.util.Collection;
 @Slf4j
 public final class InventoryIncrementalTasksRunner implements 
PipelineTasksRunner {
     
+    private final PipelineJobItemAPI jobItemAPI = new 
InventoryIncrementalJobItemAPIImpl();
+    
     @Getter
     private final PipelineJobItemContext jobItemContext;
     
@@ -89,7 +93,7 @@ public final class InventoryIncrementalTasksRunner implements 
PipelineTasksRunne
             return true;
         }
         log.info("-------------- Start inventory task --------------");
-        jobItemContext.setStatus(JobStatus.EXECUTE_INVENTORY_TASK);
+        updateLocalAndRemoteJobItemStatus(JobStatus.EXECUTE_INVENTORY_TASK);
         ExecuteCallback inventoryTaskCallback = createInventoryTaskCallback();
         for (InventoryTask each : inventoryTasks) {
             if (each.getTaskProgress().getPosition() instanceof 
FinishedPosition) {
@@ -100,6 +104,11 @@ public final class InventoryIncrementalTasksRunner 
implements PipelineTasksRunne
         return false;
     }
     
+    private void updateLocalAndRemoteJobItemStatus(final JobStatus jobStatus) {
+        jobItemContext.setStatus(jobStatus);
+        jobItemAPI.updateJobItemStatus(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), jobStatus);
+    }
+    
     private ExecuteCallback createInventoryTaskCallback() {
         return new ExecuteCallback() {
             
@@ -114,7 +123,7 @@ public final class InventoryIncrementalTasksRunner 
implements PipelineTasksRunne
             @Override
             public void onFailure(final Throwable throwable) {
                 log.error("Inventory task execute failed.", throwable);
-                
jobItemContext.setStatus(JobStatus.EXECUTE_INVENTORY_TASK_FAILURE);
+                
updateLocalAndRemoteJobItemStatus(JobStatus.EXECUTE_INVENTORY_TASK_FAILURE);
                 stop();
             }
         };
@@ -126,7 +135,7 @@ public final class InventoryIncrementalTasksRunner 
implements PipelineTasksRunne
             return;
         }
         log.info("-------------- Start incremental task --------------");
-        jobItemContext.setStatus(JobStatus.EXECUTE_INCREMENTAL_TASK);
+        updateLocalAndRemoteJobItemStatus(JobStatus.EXECUTE_INCREMENTAL_TASK);
         ExecuteCallback incrementalTaskCallback = 
createIncrementalTaskCallback();
         for (IncrementalTask each : incrementalTasks) {
             if (each.getTaskProgress().getPosition() instanceof 
FinishedPosition) {
@@ -146,7 +155,7 @@ public final class InventoryIncrementalTasksRunner 
implements PipelineTasksRunne
             @Override
             public void onFailure(final Throwable throwable) {
                 log.error("Incremental task execute failed.", throwable);
-                
jobItemContext.setStatus(JobStatus.EXECUTE_INCREMENTAL_TASK_FAILURE);
+                
updateLocalAndRemoteJobItemStatus(JobStatus.EXECUTE_INCREMENTAL_TASK_FAILURE);
                 stop();
             }
         };
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
index 069797aa427..0e35371c673 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
@@ -21,7 +21,6 @@ import com.google.common.base.Preconditions;
 import com.google.common.eventbus.Subscribe;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.tuple.Pair;
-import 
org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
@@ -31,8 +30,10 @@ import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDat
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
-import 
org.apache.shardingsphere.data.pipeline.core.api.impl.RuleAlteredJobAPIImpl;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import 
org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreationException;
 import 
org.apache.shardingsphere.data.pipeline.core.execute.FinishedCheckJobExecutor;
@@ -43,10 +44,10 @@ import 
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobCon
 import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
 import 
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
 import 
org.apache.shardingsphere.infra.config.rule.rulealtered.OnRuleAlteredActionConfiguration;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
 import 
org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlRuleConfiguration;
 import 
org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapperEngine;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.cache.event.StartScalingEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.ScalingTaskFinishedEvent;
 
@@ -205,7 +206,7 @@ public final class RuleAlteredJobWorker {
         result.setNewVersion(event.getNewVersion());
         
result.setSource(createYamlPipelineDataSourceConfiguration(sourceRootConfig));
         
result.setTarget(createYamlPipelineDataSourceConfiguration(targetRootConfig));
-        new RuleAlteredJobAPIImpl().extendJobConfiguration(result);
+        
PipelineAPIFactory.getPipelineJobAPI(JobType.MIGRATION).extendYamlJobConfiguration(result);
         return Optional.of(new 
RuleAlteredJobConfigurationSwapper().swapToObject(result));
     }
     
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/RuleAlteredJobAPIFixture.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/RuleAlteredJobAPIFixture.java
index df5abb4fefa..18562308a50 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/RuleAlteredJobAPIFixture.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/RuleAlteredJobAPIFixture.java
@@ -17,9 +17,9 @@
 
 package org.apache.shardingsphere.data.pipeline.core.fixture;
 
-import org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
-import 
org.apache.shardingsphere.data.pipeline.api.config.job.YamlPipelineJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
@@ -27,6 +27,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
 import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
+import org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPI;
 
 import java.util.Collection;
 import java.util.List;
@@ -42,7 +43,7 @@ public final class RuleAlteredJobAPIFixture implements 
RuleAlteredJobAPI {
     }
     
     @Override
-    public void extendJobConfiguration(final YamlPipelineJobConfiguration 
yamlJobConfig) {
+    public void extendYamlJobConfiguration(final YamlPipelineJobConfiguration 
yamlJobConfig) {
     }
     
     @Override
@@ -63,7 +64,7 @@ public final class RuleAlteredJobAPIFixture implements 
RuleAlteredJobAPI {
     }
     
     @Override
-    public Optional<String> start(final RuleAlteredJobConfiguration jobConfig) 
{
+    public Optional<String> start(final PipelineJobConfiguration jobConfig) {
         return Optional.empty();
     }
     
@@ -141,7 +142,7 @@ public final class RuleAlteredJobAPIFixture implements 
RuleAlteredJobAPI {
     }
     
     @Override
-    public RuleAlteredJobConfiguration getJobConfig(final String jobId) {
+    public RuleAlteredJobConfiguration getJobConfiguration(final String jobId) 
{
         return null;
     }
     
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
index ccd3a70d520..9a465a59670 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
@@ -131,7 +131,7 @@ public final class RuleAlteredJobAPIImplTest {
     public void assertDataConsistencyCheck() {
         Optional<String> jobId = 
ruleAlteredJobAPI.start(JobConfigurationBuilder.createJobConfiguration());
         assertTrue(jobId.isPresent());
-        RuleAlteredJobConfiguration jobConfig = 
ruleAlteredJobAPI.getJobConfig(jobId.get());
+        RuleAlteredJobConfiguration jobConfig = 
ruleAlteredJobAPI.getJobConfiguration(jobId.get());
         if (null == jobConfig.getSource()) {
             log.error("source is null, jobConfig={}", 
YamlEngine.marshal(jobConfig));
         }
@@ -146,7 +146,7 @@ public final class RuleAlteredJobAPIImplTest {
     public void assertDataConsistencyCheckWithAlgorithm() {
         Optional<String> jobId = 
ruleAlteredJobAPI.start(JobConfigurationBuilder.createJobConfiguration());
         assertTrue(jobId.isPresent());
-        RuleAlteredJobConfiguration jobConfig = 
ruleAlteredJobAPI.getJobConfig(jobId.get());
+        RuleAlteredJobConfiguration jobConfig = 
ruleAlteredJobAPI.getJobConfiguration(jobId.get());
         initTableData(jobConfig);
         ruleAlteredJobAPI.stopClusterWriteDB(jobConfig);
         Map<String, DataConsistencyCheckResult> checkResultMap = 
ruleAlteredJobAPI.dataConsistencyCheck(jobId.get(), "FIXTURE", null);
@@ -228,7 +228,7 @@ public final class RuleAlteredJobAPIImplTest {
     public void assertResetTargetTable() {
         Optional<String> jobId = 
ruleAlteredJobAPI.start(JobConfigurationBuilder.createJobConfiguration());
         assertTrue(jobId.isPresent());
-        RuleAlteredJobConfiguration jobConfig = 
ruleAlteredJobAPI.getJobConfig(jobId.get());
+        RuleAlteredJobConfiguration jobConfig = 
ruleAlteredJobAPI.getJobConfiguration(jobId.get());
         initTableData(jobConfig);
         ruleAlteredJobAPI.stop(jobId.get());
         ruleAlteredJobAPI.reset(jobId.get());
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java
index 7fa35b89273..d20d2ca2f4e 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java
@@ -26,7 +26,8 @@ import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDat
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.core.api.impl.RuleAlteredJobAPIImpl;
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import 
org.apache.shardingsphere.sharding.yaml.config.YamlShardingRuleConfiguration;
 
 import java.util.Collections;
@@ -54,7 +55,7 @@ public final class JobConfigurationBuilder {
         result.setSource(createYamlPipelineDataSourceConfiguration(
                 new 
ShardingSpherePipelineDataSourceConfiguration(ConfigurationFileUtil.readFile("config_sharding_sphere_jdbc_source.yaml"))));
         result.setTarget(createYamlPipelineDataSourceConfiguration(new 
StandardPipelineDataSourceConfiguration(ConfigurationFileUtil.readFile("config_standard_jdbc_target.yaml"))));
-        new RuleAlteredJobAPIImpl().extendJobConfiguration(result);
+        
PipelineAPIFactory.getPipelineJobAPI(JobType.MIGRATION).extendYamlJobConfiguration(result);
         return new RuleAlteredJobConfigurationSwapper().swapToObject(result);
     }
     

Reply via email to