This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 97a54388f80 Refactor GovernanceRepositoryAPI for common usage (#19979)
97a54388f80 is described below

commit 97a54388f80c2aa408496b4f520e9df0beb02cb4
Author: Xinze Guo <[email protected]>
AuthorDate: Tue Aug 9 13:28:46 2022 +0800

    Refactor GovernanceRepositoryAPI for common usage (#19979)
    
    * Refactor GovernanceRepositoryAPI for common usage
    
    * Fix codestyle and refactor getJobProgress
    
    * Fix ci error
---
 .../data/pipeline/api/RuleAlteredJobAPI.java       | 27 +++++++++
 .../api/fixture/RuleAlteredJobAPIFixture.java      | 15 +++++
 .../pipeline/core/api/GovernanceRepositoryAPI.java | 20 ++-----
 .../core/api/impl/GovernanceRepositoryAPIImpl.java | 56 ++----------------
 .../core/api/impl/RuleAlteredJobAPIImpl.java       | 69 +++++++++++++++++++++-
 .../persist/PipelineJobProgressPersistService.java |  7 +--
 .../scenario/rulealtered/RuleAlteredJob.java       | 41 ++++++-------
 .../rulealtered/RuleAlteredJobPreparer.java        |  4 +-
 .../rulealtered/RuleAlteredJobScheduler.java       |  4 +-
 .../api/impl/GovernanceRepositoryAPIImplTest.java  | 25 ++------
 .../core/api/impl/RuleAlteredJobAPIImplTest.java   | 18 ++++--
 .../rulealtered/RuleAlteredJobWorkerTest.java      |  3 +-
 .../prepare/InventoryTaskSplitterTest.java         |  4 +-
 13 files changed, 165 insertions(+), 128 deletions(-)

diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPI.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPI.java
index 4d27306af74..096334bd9f3 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPI.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPI.java
@@ -19,6 +19,8 @@ package org.apache.shardingsphere.data.pipeline.api;
 
 import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobContext;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
 import 
org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
 import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
@@ -181,4 +183,29 @@ public interface RuleAlteredJobAPI extends PipelineJobAPI, 
RequiredSPI {
      * @return job configuration
      */
     RuleAlteredJobConfiguration getJobConfig(String jobId);
+    
+    /**
+     * Persist job progress.
+     *
+     * @param jobContext job context
+     */
+    void persistJobProgress(PipelineJobContext jobContext);
+    
+    /**
+     * Get job progress.
+     *
+     * @param jobId job id
+     * @param shardingItem sharding item
+     * @return job progress
+     */
+    JobProgress getJobProgress(String jobId, int shardingItem);
+    
+    /**
+     * Update sharding job status.
+     *
+     * @param jobId job id
+     * @param shardingItem sharding item
+     * @param status status
+     */
+    void updateShardingJobStatus(String jobId, int shardingItem, JobStatus 
status);
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/fixture/RuleAlteredJobAPIFixture.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/fixture/RuleAlteredJobAPIFixture.java
index bfb05d6ae54..2e9817ab54a 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/fixture/RuleAlteredJobAPIFixture.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/fixture/RuleAlteredJobAPIFixture.java
@@ -20,6 +20,8 @@ package org.apache.shardingsphere.data.pipeline.api.fixture;
 import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobContext;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
 import 
org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
 import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
@@ -136,4 +138,17 @@ public final class RuleAlteredJobAPIFixture implements 
RuleAlteredJobAPI {
     public boolean isDefault() {
         return RuleAlteredJobAPI.super.isDefault();
     }
+    
+    @Override
+    public void persistJobProgress(final PipelineJobContext jobContext) {
+    }
+    
+    @Override
+    public JobProgress getJobProgress(final String jobId, final int 
shardingItem) {
+        return null;
+    }
+    
+    @Override
+    public void updateShardingJobStatus(final String jobId, final int 
shardingItem, final JobStatus status) {
+    }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
index fd60c290b46..db47c3d2dc7 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
@@ -17,9 +17,6 @@
 
 package org.apache.shardingsphere.data.pipeline.core.api;
 
-import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
-import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
-import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobContext;
 import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
 
 import java.util.List;
@@ -41,9 +38,11 @@ public interface GovernanceRepositoryAPI {
     /**
      * Persist job progress.
      *
-     * @param jobContext job context
+     * @param jobId job id
+     * @param shardingItem sharding item
+     * @param progressValue progress value
      */
-    void persistJobProgress(PipelineJobContext jobContext);
+    void persistJobProgress(String jobId, int shardingItem, String 
progressValue);
     
     /**
      * Get job progress.
@@ -52,7 +51,7 @@ public interface GovernanceRepositoryAPI {
      * @param shardingItem sharding item
      * @return job progress
      */
-    JobProgress getJobProgress(String jobId, int shardingItem);
+    String getJobProgress(String jobId, int shardingItem);
     
     /**
      * Persist job check result.
@@ -108,13 +107,4 @@ public interface GovernanceRepositoryAPI {
      * @return sharding items
      */
     List<Integer> getShardingItems(String jobId);
-    
-    /**
-     * Update sharding job status.
-     *
-     * @param jobId job id
-     * @param shardingItem sharding item
-     * @param status status
-     */
-    void updateShardingJobStatus(String jobId, int shardingItem, JobStatus 
status);
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
index 97f8f661d2d..f21243ce4ec 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
@@ -20,19 +20,8 @@ package 
org.apache.shardingsphere.data.pipeline.core.api.impl;
 import com.google.common.base.Strings;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
-import 
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
-import 
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemInventoryTasksProgress;
-import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
-import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobContext;
-import 
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlJobProgressSwapper;
-import 
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlJobProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
-import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
-import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
-import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
 
@@ -47,8 +36,6 @@ import java.util.stream.Collectors;
 @Slf4j
 public final class GovernanceRepositoryAPIImpl implements 
GovernanceRepositoryAPI {
     
-    private static final YamlJobProgressSwapper SWAPPER = new 
YamlJobProgressSwapper();
-    
     private final ClusterPersistRepository repository;
     
     @Override
@@ -57,37 +44,13 @@ public final class GovernanceRepositoryAPIImpl implements 
GovernanceRepositoryAP
     }
     
     @Override
-    public void persistJobProgress(final PipelineJobContext context) {
-        RuleAlteredJobContext jobContext = (RuleAlteredJobContext) context;
-        JobProgress jobProgress = new JobProgress();
-        jobProgress.setStatus(jobContext.getStatus());
-        
jobProgress.setSourceDatabaseType(jobContext.getJobConfig().getSourceDatabaseType());
-        jobProgress.setIncremental(getIncrementalTasksProgress(jobContext));
-        jobProgress.setInventory(getInventoryTasksProgress(jobContext));
-        String value = 
YamlEngine.marshal(SWAPPER.swapToYamlConfiguration(jobProgress));
-        
repository.persist(PipelineMetaDataNode.getScalingJobOffsetPath(jobContext.getJobId(),
 jobContext.getShardingItem()), value);
-    }
-    
-    private JobItemIncrementalTasksProgress getIncrementalTasksProgress(final 
RuleAlteredJobContext jobContext) {
-        return new JobItemIncrementalTasksProgress(
-                jobContext.getIncrementalTasks()
-                        
.stream().collect(Collectors.toMap(IncrementalTask::getTaskId, 
IncrementalTask::getProgress)));
-    }
-    
-    private JobItemInventoryTasksProgress getInventoryTasksProgress(final 
RuleAlteredJobContext jobContext) {
-        return new JobItemInventoryTasksProgress(
-                jobContext.getInventoryTasks()
-                        .stream()
-                        .collect(Collectors.toMap(InventoryTask::getTaskId, 
InventoryTask::getProgress)));
+    public void persistJobProgress(final String jobId, final int shardingItem, 
final String progressValue) {
+        repository.persist(PipelineMetaDataNode.getScalingJobOffsetPath(jobId, 
shardingItem), progressValue);
     }
     
     @Override
-    public JobProgress getJobProgress(final String jobId, final int 
shardingItem) {
-        String data = 
repository.get(PipelineMetaDataNode.getScalingJobOffsetPath(jobId, 
shardingItem));
-        if (Strings.isNullOrEmpty(data)) {
-            return null;
-        }
-        return SWAPPER.swapToObject(YamlEngine.unmarshal(data, 
YamlJobProgress.class));
+    public String getJobProgress(final String jobId, final int shardingItem) {
+        return 
repository.get(PipelineMetaDataNode.getScalingJobOffsetPath(jobId, 
shardingItem));
     }
     
     @Override
@@ -129,15 +92,4 @@ public final class GovernanceRepositoryAPIImpl implements 
GovernanceRepositoryAP
         log.info("getShardingItems, jobId={}, offsetKeys={}", jobId, result);
         return 
result.stream().map(Integer::parseInt).collect(Collectors.toList());
     }
-    
-    @Override
-    public void updateShardingJobStatus(final String jobId, final int 
shardingItem, final JobStatus status) {
-        JobProgress jobProgress = getJobProgress(jobId, shardingItem);
-        if (null == jobProgress) {
-            log.warn("updateShardingJobStatus, jobProgress is null, jobId={}, 
shardingItem={}", jobId, shardingItem);
-            return;
-        }
-        jobProgress.setStatus(status);
-        persist(PipelineMetaDataNode.getScalingJobOffsetPath(jobId, 
shardingItem), 
YamlEngine.marshal(SWAPPER.swapToYamlConfiguration(jobProgress)));
-    }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
index 933e2bf6003..05afdeb0a4b 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
@@ -19,14 +19,20 @@ package 
org.apache.shardingsphere.data.pipeline.core.api.impl;
 
 import com.google.common.base.Preconditions;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
+import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobContext;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import 
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
+import 
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemInventoryTasksProgress;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
 import 
org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
 import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
+import 
org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
+import 
org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmFactory;
@@ -36,9 +42,14 @@ import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreatio
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineVerifyFailedException;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlJobProgress;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlJobProgressSwapper;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
+import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
+import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
 import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredContext;
 import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJob;
+import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
 import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobWorker;
 import 
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
@@ -53,6 +64,7 @@ import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.confi
 import java.time.LocalDateTime;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -69,6 +81,8 @@ import java.util.stream.Stream;
 @Slf4j
 public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl 
implements RuleAlteredJobAPI {
     
+    private static final YamlJobProgressSwapper SWAPPER = new 
YamlJobProgressSwapper();
+    
     @Override
     public List<JobInfo> list() {
         checkModeConfig();
@@ -137,7 +151,7 @@ public final class RuleAlteredJobAPIImpl extends 
AbstractPipelineJobAPIImpl impl
         String jobId = jobConfig.getJobId();
         JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
         return IntStream.range(0, 
jobConfig.getJobShardingCount()).boxed().collect(LinkedHashMap::new, (map, 
each) -> {
-            JobProgress jobProgress = 
PipelineAPIFactory.getGovernanceRepositoryAPI().getJobProgress(jobId, each);
+            JobProgress jobProgress = getJobProgress(jobId, each);
             if (null != jobProgress) {
                 jobProgress.setActive(!jobConfigPOJO.isDisabled());
             }
@@ -323,7 +337,7 @@ public final class RuleAlteredJobAPIImpl extends 
AbstractPipelineJobAPIImpl impl
         // TODO rewrite job status update after job progress structure refactor
         for (int each : repositoryAPI.getShardingItems(jobId)) {
             PipelineJobCenter.getJobContext(jobId, each).ifPresent(jobContext 
-> jobContext.setStatus(JobStatus.FINISHED));
-            repositoryAPI.updateShardingJobStatus(jobId, each, 
JobStatus.FINISHED);
+            updateShardingJobStatus(jobId, each, JobStatus.FINISHED);
         }
         PipelineJobCenter.stop(jobId);
         stop(jobId);
@@ -345,4 +359,55 @@ public final class RuleAlteredJobAPIImpl extends 
AbstractPipelineJobAPIImpl impl
     private RuleAlteredJobConfiguration getJobConfig(final 
JobConfigurationPOJO jobConfigPOJO) {
         return 
RuleAlteredJobConfigurationSwapper.swapToObject(jobConfigPOJO.getJobParameter());
     }
+    
+    @Override
+    public void persistJobProgress(final PipelineJobContext jobContext) {
+        if (!(jobContext instanceof RuleAlteredJobContext)) {
+            return;
+        }
+        RuleAlteredJobContext context = (RuleAlteredJobContext) jobContext;
+        JobProgress jobProgress = new JobProgress();
+        jobProgress.setStatus(jobContext.getStatus());
+        
jobProgress.setSourceDatabaseType(context.getJobConfig().getSourceDatabaseType());
+        jobProgress.setIncremental(getIncrementalTasksProgress(context));
+        jobProgress.setInventory(getInventoryTasksProgress(context));
+        String value = 
YamlEngine.marshal(SWAPPER.swapToYamlConfiguration(jobProgress));
+        
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobProgress(jobContext.getJobId(),
 jobContext.getShardingItem(), value);
+    }
+    
+    private JobItemIncrementalTasksProgress getIncrementalTasksProgress(final 
RuleAlteredJobContext jobContext) {
+        Map<String, IncrementalTaskProgress> incrementalTaskProgressMap = new 
HashMap<>();
+        for (IncrementalTask each : jobContext.getIncrementalTasks()) {
+            incrementalTaskProgressMap.put(each.getTaskId(), 
each.getProgress());
+        }
+        return new JobItemIncrementalTasksProgress(incrementalTaskProgressMap);
+    }
+    
+    private JobItemInventoryTasksProgress getInventoryTasksProgress(final 
RuleAlteredJobContext jobContext) {
+        Map<String, InventoryTaskProgress> inventoryTaskProgressMap = new 
HashMap<>();
+        for (InventoryTask each : jobContext.getInventoryTasks()) {
+            inventoryTaskProgressMap.put(each.getTaskId(), each.getProgress());
+        }
+        return new JobItemInventoryTasksProgress(inventoryTaskProgressMap);
+    }
+    
+    @Override
+    public JobProgress getJobProgress(final String jobId, final int 
shardingItem) {
+        String data = 
PipelineAPIFactory.getGovernanceRepositoryAPI().getJobProgress(jobId, 
shardingItem);
+        if (StringUtils.isBlank(data)) {
+            return null;
+        }
+        return SWAPPER.swapToObject(YamlEngine.unmarshal(data, 
YamlJobProgress.class));
+    }
+    
+    @Override
+    public void updateShardingJobStatus(final String jobId, final int 
shardingItem, final JobStatus status) {
+        JobProgress jobProgress = getJobProgress(jobId, shardingItem);
+        if (null == jobProgress) {
+            log.warn("updateShardingJobStatus, jobProgress is null, jobId={}, 
shardingItem={}", jobId, shardingItem);
+            return;
+        }
+        jobProgress.setStatus(status);
+        
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobProgress(jobId, 
shardingItem, YamlEngine.marshal(SWAPPER.swapToYamlConfiguration(jobProgress)));
+    }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
index cb7c66aa166..71d5db43e29 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
@@ -18,9 +18,8 @@
 package org.apache.shardingsphere.data.pipeline.core.job.progress.persist;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
 import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobContext;
-import 
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
-import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
 import 
org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
 
@@ -42,8 +41,6 @@ public final class PipelineJobProgressPersistService {
     
     private static final Map<String, Map<Integer, 
PipelineJobProgressPersistContext>> JOB_PROGRESS_PERSIST_MAP = new 
ConcurrentHashMap<>();
     
-    private static final GovernanceRepositoryAPI REPOSITORY_API = 
PipelineAPIFactory.getGovernanceRepositoryAPI();
-    
     private static final ScheduledExecutorService JOB_PERSIST_EXECUTOR = 
Executors.newSingleThreadScheduledExecutor(ExecutorThreadFactoryBuilder.build("pipeline-progress-persist-%d"));
     
     private static final long DELAY_SECONDS = 1;
@@ -105,7 +102,7 @@ public final class PipelineJobProgressPersistService {
         }
         persistContext.getHasNewEvents().set(false);
         long startTimeMillis = System.currentTimeMillis();
-        REPOSITORY_API.persistJobProgress(jobContext.get());
+        
RuleAlteredJobAPIFactory.getInstance().persistJobProgress(jobContext.get());
         persistContext.getBeforePersistingProgressMillis().set(null);
         if (6 == ThreadLocalRandom.current().nextInt(100)) {
             log.info("persist, jobId={}, shardingItem={}, cost time: {} ms", 
jobId, shardingItem, System.currentTimeMillis() - startTimeMillis);
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
index ebd74c85f1a..236a3f17881 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
@@ -19,14 +19,13 @@ package 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
 
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.PipelineJob;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
 import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
-import 
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
-import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineIgnoredException;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
@@ -41,8 +40,6 @@ import 
org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
 @RequiredArgsConstructor
 public final class RuleAlteredJob extends AbstractPipelineJob implements 
SimpleJob, PipelineJob {
     
-    private final GovernanceRepositoryAPI governanceRepositoryAPI = 
PipelineAPIFactory.getGovernanceRepositoryAPI();
-    
     private final PipelineDataSourceManager dataSourceManager = new 
PipelineDataSourceManager();
     
     // Shared by all sharding items
@@ -57,7 +54,7 @@ public final class RuleAlteredJob extends AbstractPipelineJob 
implements SimpleJ
         }
         setJobId(shardingContext.getJobName());
         RuleAlteredJobConfiguration jobConfig = 
RuleAlteredJobConfigurationSwapper.swapToObject(shardingContext.getJobParameter());
-        JobProgress initProgress = 
governanceRepositoryAPI.getJobProgress(shardingContext.getJobName(), 
shardingContext.getShardingItem());
+        JobProgress initProgress = 
RuleAlteredJobAPIFactory.getInstance().getJobProgress(shardingContext.getJobName(),
 shardingContext.getShardingItem());
         RuleAlteredJobContext jobContext = new 
RuleAlteredJobContext(jobConfig, shardingContext.getShardingItem(), 
initProgress, dataSourceManager);
         int shardingItem = jobContext.getShardingItem();
         if (getTasksRunnerMap().containsKey(shardingItem)) {
@@ -75,6 +72,23 @@ public final class RuleAlteredJob extends 
AbstractPipelineJob implements SimpleJ
         
PipelineJobProgressPersistService.addJobProgressPersistContext(getJobId(), 
shardingItem);
     }
     
+    private void prepare(final RuleAlteredJobContext jobContext) {
+        try {
+            jobPreparer.prepare(jobContext);
+        } catch (final PipelineIgnoredException ex) {
+            log.info("pipeline ignore exception: {}", ex.getMessage());
+            PipelineJobCenter.stop(getJobId());
+            // CHECKSTYLE:OFF
+        } catch (final RuntimeException ex) {
+            // CHECKSTYLE:ON
+            log.error("job prepare failed, {}-{}", getJobId(), 
jobContext.getShardingItem(), ex);
+            PipelineJobCenter.stop(getJobId());
+            jobContext.setStatus(JobStatus.PREPARING_FAILURE);
+            
RuleAlteredJobAPIFactory.getInstance().persistJobProgress(jobContext);
+            throw ex;
+        }
+    }
+    
     /**
      * Stop job.
      */
@@ -95,21 +109,4 @@ public final class RuleAlteredJob extends 
AbstractPipelineJob implements SimpleJ
         getTasksRunnerMap().clear();
         
PipelineJobProgressPersistService.removeJobProgressPersistContext(getJobId());
     }
-    
-    private void prepare(final RuleAlteredJobContext jobContext) {
-        try {
-            jobPreparer.prepare(jobContext);
-        } catch (final PipelineIgnoredException ex) {
-            log.info("pipeline ignore exception: {}", ex.getMessage());
-            PipelineJobCenter.stop(getJobId());
-            // CHECKSTYLE:OFF
-        } catch (final RuntimeException ex) {
-            // CHECKSTYLE:ON
-            log.error("job prepare failed, {}-{}", getJobId(), 
jobContext.getShardingItem(), ex);
-            PipelineJobCenter.stop(getJobId());
-            jobContext.setStatus(JobStatus.PREPARING_FAILURE);
-            
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobProgress(jobContext);
-            throw ex;
-        }
-    }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
index 6253682d5a5..c3231d5ea35 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
@@ -71,8 +71,6 @@ import java.util.concurrent.TimeUnit;
 @Slf4j
 public final class RuleAlteredJobPreparer {
     
-    private final InventoryTaskSplitter inventoryTaskSplitter = new 
InventoryTaskSplitter();
-    
     /**
      * Do prepare work for scaling job.
      *
@@ -184,7 +182,7 @@ public final class RuleAlteredJobPreparer {
     }
     
     private void initInventoryTasks(final RuleAlteredJobContext jobContext) {
-        List<InventoryTask> allInventoryTasks = 
inventoryTaskSplitter.splitInventoryData(jobContext);
+        List<InventoryTask> allInventoryTasks = new 
InventoryTaskSplitter().splitInventoryData(jobContext);
         jobContext.getInventoryTasks().addAll(allInventoryTasks);
     }
     
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
index 45f99dc3e21..7ae9c65e99b 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
@@ -20,10 +20,10 @@ package 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
-import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
 import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
@@ -64,7 +64,7 @@ public final class RuleAlteredJobScheduler implements 
PipelineTasksRunner {
             log.info("job stopping, ignore inventory task");
             return;
         }
-        
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobProgress(jobContext);
+        RuleAlteredJobAPIFactory.getInstance().persistJobProgress(jobContext);
         if (executeInventoryTask()) {
             if (jobContext.isStopping()) {
                 log.info("stopping, ignore incremental task");
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
index c6864fe9f61..f33ad304c97 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
@@ -22,22 +22,18 @@ import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumper
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
-import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobProgressListener;
-import 
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlJobProgressSwapper;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
-import org.apache.shardingsphere.data.pipeline.core.util.ConfigurationFileUtil;
 import 
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
 import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
 import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
 import org.junit.BeforeClass;
@@ -61,8 +57,6 @@ import static org.mockito.Mockito.mock;
 
 public final class GovernanceRepositoryAPIImplTest {
     
-    private static final YamlJobProgressSwapper SWAPPER = new 
YamlJobProgressSwapper();
-    
     private static GovernanceRepositoryAPI governanceRepositoryAPI;
     
     @BeforeClass
@@ -74,9 +68,9 @@ public final class GovernanceRepositoryAPIImplTest {
     @Test
     public void assertPersistJobProgress() {
         RuleAlteredJobContext jobContext = mockJobContext();
-        governanceRepositoryAPI.persistJobProgress(jobContext);
-        JobProgress actual = 
governanceRepositoryAPI.getJobProgress(jobContext.getJobId(), 
jobContext.getShardingItem());
-        
assertThat(YamlEngine.marshal(SWAPPER.swapToYamlConfiguration(actual)), 
is(ConfigurationFileUtil.readFileAndIgnoreComments("governance-repository.yaml")));
+        governanceRepositoryAPI.persistJobProgress(jobContext.getJobId(), 
jobContext.getShardingItem(), "testValue");
+        String actual = 
governanceRepositoryAPI.getJobProgress(jobContext.getJobId(), 
jobContext.getShardingItem());
+        assertThat(actual, is("testValue"));
     }
     
     @Test
@@ -91,7 +85,7 @@ public final class GovernanceRepositoryAPIImplTest {
     public void assertDeleteJob() {
         
governanceRepositoryAPI.persist(DataPipelineConstants.DATA_PIPELINE_ROOT + 
"/1", "");
         governanceRepositoryAPI.deleteJob("1");
-        JobProgress actual = governanceRepositoryAPI.getJobProgress("1", 0);
+        String actual = governanceRepositoryAPI.getJobProgress("1", 0);
         assertNull(actual);
     }
     
@@ -125,21 +119,12 @@ public final class GovernanceRepositoryAPIImplTest {
     @Test
     public void assertGetShardingItems() {
         RuleAlteredJobContext jobContext = mockJobContext();
-        governanceRepositoryAPI.persistJobProgress(jobContext);
+        governanceRepositoryAPI.persistJobProgress(jobContext.getJobId(), 
jobContext.getShardingItem(), "testValue");
         List<Integer> shardingItems = 
governanceRepositoryAPI.getShardingItems(jobContext.getJobId());
         assertThat(shardingItems.size(), is(1));
         assertThat(shardingItems.get(0), is(jobContext.getShardingItem()));
     }
     
-    @Test
-    public void assertRenewJobStatus() {
-        RuleAlteredJobContext jobContext = mockJobContext();
-        governanceRepositoryAPI.persistJobProgress(jobContext);
-        governanceRepositoryAPI.updateShardingJobStatus(jobContext.getJobId(), 
jobContext.getShardingItem(), JobStatus.FINISHED);
-        JobProgress jobProgress = 
governanceRepositoryAPI.getJobProgress(jobContext.getJobId(), 
jobContext.getShardingItem());
-        assertThat(jobProgress.getStatus(), is(JobStatus.FINISHED));
-    }
-    
     private RuleAlteredJobContext mockJobContext() {
         RuleAlteredJobContext result = new 
RuleAlteredJobContext(JobConfigurationBuilder.createJobConfiguration(), 0, new 
JobProgress(), new PipelineDataSourceManager());
         TaskConfiguration taskConfig = result.getTaskConfig();
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
index 6c72641a38d..58973dc9c7d 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
@@ -200,9 +200,9 @@ public final class RuleAlteredJobAPIImplTest {
         assertTrue(jobId.isPresent());
         final GovernanceRepositoryAPI repositoryAPI = 
PipelineAPIFactory.getGovernanceRepositoryAPI();
         RuleAlteredJobContext jobContext = new 
RuleAlteredJobContext(jobConfig, 0, new JobProgress(), new 
PipelineDataSourceManager());
-        repositoryAPI.persistJobProgress(jobContext);
+        ruleAlteredJobAPI.persistJobProgress(jobContext);
         repositoryAPI.persistJobCheckResult(jobId.get(), true);
-        repositoryAPI.updateShardingJobStatus(jobId.get(), 0, 
JobStatus.FINISHED);
+        ruleAlteredJobAPI.updateShardingJobStatus(jobId.get(), 0, 
JobStatus.FINISHED);
         ruleAlteredJobAPI.switchClusterConfiguration(jobId.get());
     }
     
@@ -213,9 +213,9 @@ public final class RuleAlteredJobAPIImplTest {
         assertTrue(jobId.isPresent());
         GovernanceRepositoryAPI repositoryAPI = 
PipelineAPIFactory.getGovernanceRepositoryAPI();
         RuleAlteredJobContext jobContext = new 
RuleAlteredJobContext(jobConfig, 0, new JobProgress(), new 
PipelineDataSourceManager());
-        repositoryAPI.persistJobProgress(jobContext);
+        ruleAlteredJobAPI.persistJobProgress(jobContext);
         repositoryAPI.persistJobCheckResult(jobId.get(), true);
-        repositoryAPI.updateShardingJobStatus(jobId.get(), 
jobContext.getShardingItem(), JobStatus.EXECUTE_INVENTORY_TASK);
+        ruleAlteredJobAPI.updateShardingJobStatus(jobId.get(), 
jobContext.getShardingItem(), JobStatus.EXECUTE_INVENTORY_TASK);
         ruleAlteredJobAPI.switchClusterConfiguration(jobId.get());
         Map<Integer, JobProgress> progress = 
ruleAlteredJobAPI.getProgress(jobId.get());
         for (Entry<Integer, JobProgress> entry : progress.entrySet()) {
@@ -252,4 +252,14 @@ public final class RuleAlteredJobAPIImplTest {
             statement.execute("INSERT INTO t_order (order_id, user_id) VALUES 
(1, 'xxx'), (999, 'yyy')");
         }
     }
+    
+    @Test
+    public void assertRenewJobStatus() {
+        final RuleAlteredJobConfiguration jobConfig = 
JobConfigurationBuilder.createJobConfiguration();
+        RuleAlteredJobContext jobContext = new 
RuleAlteredJobContext(jobConfig, 0, new JobProgress(), new 
PipelineDataSourceManager());
+        ruleAlteredJobAPI.persistJobProgress(jobContext);
+        ruleAlteredJobAPI.updateShardingJobStatus(jobConfig.getJobId(), 0, 
JobStatus.FINISHED);
+        JobProgress jobProgress = 
ruleAlteredJobAPI.getJobProgress(jobContext.getJobId(), 
jobContext.getShardingItem());
+        assertThat(jobProgress.getStatus(), is(JobStatus.FINISHED));
+    }
 }
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
index 32efa4f06fc..f2eeed50c7a 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.YamlRuleAlteredJobConfiguration;
@@ -100,7 +101,7 @@ public final class RuleAlteredJobWorkerTest {
         RuleAlteredJobContext jobContext = new 
RuleAlteredJobContext(jobConfig, 0, new JobProgress(), new 
PipelineDataSourceManager());
         jobContext.setStatus(JobStatus.PREPARING);
         GovernanceRepositoryAPI repositoryAPI = 
PipelineAPIFactory.getGovernanceRepositoryAPI();
-        repositoryAPI.persistJobProgress(jobContext);
+        RuleAlteredJobAPIFactory.getInstance().persistJobProgress(jobContext);
         URL jobConfigUrl = 
getClass().getClassLoader().getResource("scaling/rule_alter/scaling_job_config.yaml");
         assertNotNull(jobConfigUrl);
         
repositoryAPI.persist(PipelineMetaDataNode.getJobConfigPath(jobContext.getJobId()),
 FileUtils.readFileToString(new File(jobConfigUrl.getFile()), 
StandardCharsets.UTF_8));
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java
index 99fbc9b6d30..18f0c16ea59 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java
@@ -17,12 +17,12 @@
 
 package org.apache.shardingsphere.data.pipeline.scenario.rulealtered.prepare;
 
+import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IntegerPrimaryKeyPosition;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
-import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreationException;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
@@ -66,7 +66,7 @@ public final class InventoryTaskSplitterTest {
     
     private void initJobContext() {
         RuleAlteredJobConfiguration jobConfig = 
JobConfigurationBuilder.createJobConfiguration();
-        JobProgress initProgress = 
PipelineAPIFactory.getGovernanceRepositoryAPI().getJobProgress(jobConfig.getJobId(),
 0);
+        JobProgress initProgress = 
RuleAlteredJobAPIFactory.getInstance().getJobProgress(jobConfig.getJobId(), 0);
         jobContext = new RuleAlteredJobContext(jobConfig, 0, initProgress, new 
PipelineDataSourceManager());
         dataSourceManager = jobContext.getDataSourceManager();
         taskConfig = jobContext.getTaskConfig();

Reply via email to