This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new b9288841882 Add YamlPipelineJobItemProgressSwapper (#29073)
b9288841882 is described below

commit b9288841882245c911fa31bf6902517b0c99a328
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Nov 18 15:01:07 2023 +0800

    Add YamlPipelineJobItemProgressSwapper (#29073)
---
 ...YamlConsistencyCheckJobItemProgressSwapper.java |  4 +-
 ...InventoryIncrementalJobItemProgressSwapper.java |  4 +-
 .../persist/PipelineJobProgressPersistService.java |  3 +-
 .../job/service/InventoryIncrementalJobAPI.java    |  6 +++
 .../pipeline/core/job/service/PipelineJobAPI.java  | 30 ++++++---------
 .../core/job/service/PipelineJobManager.java       | 44 +++++++++++++++++-----
 .../AbstractInventoryIncrementalJobAPIImpl.java    | 27 +------------
 .../yaml/YamlPipelineJobItemProgressSwapper.java   | 31 +++++++++++++++
 .../runner/InventoryIncrementalTasksRunner.java    |  2 +-
 .../data/pipeline/cdc/api/impl/CDCJobAPI.java      |  6 +--
 .../pipeline/cdc/core/prepare/CDCJobPreparer.java  |  5 ++-
 .../api/impl/ConsistencyCheckJobAPI.java           | 30 ++++-----------
 .../task/ConsistencyCheckTasksRunner.java          | 10 ++---
 .../migration/api/impl/MigrationJobAPI.java        |  2 +-
 .../migration/prepare/MigrationJobPreparer.java    |  5 ++-
 .../api/impl/ConsistencyCheckJobAPITest.java       | 16 ++++----
 .../migration/api/impl/MigrationJobAPITest.java    |  4 +-
 17 files changed, 129 insertions(+), 100 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlConsistencyCheckJobItemProgressSwapper.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlConsistencyCheckJobItemProgressSwapper.java
index c399505edf2..066df10fc28 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlConsistencyCheckJobItemProgressSwapper.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlConsistencyCheckJobItemProgressSwapper.java
@@ -19,12 +19,12 @@ package 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml;
 
 import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.ConsistencyCheckJobItemProgress;
-import 
org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
+import 
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressSwapper;
 
 /**
  * YAML data check job item progress swapper.
  */
-public final class YamlConsistencyCheckJobItemProgressSwapper implements 
YamlConfigurationSwapper<YamlConsistencyCheckJobItemProgress, 
ConsistencyCheckJobItemProgress> {
+public final class YamlConsistencyCheckJobItemProgressSwapper implements 
YamlPipelineJobItemProgressSwapper<YamlConsistencyCheckJobItemProgress, 
ConsistencyCheckJobItemProgress> {
     
     @Override
     public YamlConsistencyCheckJobItemProgress swapToYamlConfiguration(final 
ConsistencyCheckJobItemProgress data) {
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java
index 696a4e5f34e..24c2f6a5a74 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java
@@ -19,14 +19,14 @@ package 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml;
 
 import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
+import 
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressSwapper;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
-import 
org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
 
 /**
  * YAML inventory incremental job item progress swapper.
  */
-public final class YamlInventoryIncrementalJobItemProgressSwapper implements 
YamlConfigurationSwapper<YamlInventoryIncrementalJobItemProgress, 
InventoryIncrementalJobItemProgress> {
+public final class YamlInventoryIncrementalJobItemProgressSwapper implements 
YamlPipelineJobItemProgressSwapper<YamlInventoryIncrementalJobItemProgress, 
InventoryIncrementalJobItemProgress> {
     
     private final YamlJobItemInventoryTasksProgressSwapper 
inventoryTasksProgressSwapper = new YamlJobItemInventoryTasksProgressSwapper();
     
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
index 478cd13bb6c..87e13a2ea09 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
@@ -24,6 +24,7 @@ import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemCon
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
 import 
org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 
@@ -129,7 +130,7 @@ public final class PipelineJobProgressPersistService {
             }
             persistContext.getHasNewEvents().set(false);
             long startTimeMillis = System.currentTimeMillis();
-            TypedSPILoader.getService(PipelineJobAPI.class, 
PipelineJobIdUtils.parseJobType(jobId).getType()).updateJobItemProgress(jobItemContext.get());
+            new 
PipelineJobManager(TypedSPILoader.getService(PipelineJobAPI.class, 
PipelineJobIdUtils.parseJobType(jobId).getType())).updateJobItemProgress(jobItemContext.get());
             persistContext.getBeforePersistingProgressMillis().set(null);
             if (6 == ThreadLocalRandom.current().nextInt(100)) {
                 log.info("persist, jobId={}, shardingItem={}, cost {} ms", 
jobId, shardingItem, System.currentTimeMillis() - startTimeMillis);
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java
index 487555edcb5..908c3103c76 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java
@@ -24,6 +24,7 @@ import 
org.apache.shardingsphere.data.pipeline.common.context.InventoryIncrement
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
+import 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlInventoryIncrementalJobItemProgressSwapper;
 import 
org.apache.shardingsphere.data.pipeline.common.pojo.DataConsistencyCheckAlgorithmInfo;
 import 
org.apache.shardingsphere.data.pipeline.common.pojo.InventoryIncrementalJobItemInfo;
 import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
@@ -43,6 +44,11 @@ import java.util.Optional;
  */
 public interface InventoryIncrementalJobAPI extends PipelineJobAPI {
     
+    @Override
+    default YamlInventoryIncrementalJobItemProgressSwapper 
getYamlJobItemProgressSwapper() {
+        return new YamlInventoryIncrementalJobItemProgressSwapper();
+    }
+    
     /**
      * Get pipeline job info.
      *
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
index 6a4d6a5fcc4..b5ab26690a0 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
@@ -17,11 +17,11 @@
 
 package org.apache.shardingsphere.data.pipeline.core.job.service;
 
-import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
 import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobConfigurationSwapper;
+import 
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressSwapper;
 import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI;
 
@@ -34,12 +34,20 @@ import java.util.Optional;
 public interface PipelineJobAPI extends TypedSPI {
     
     /**
-     * Get YAML job configuration swapper.
+     * Get YAML pipeline job configuration swapper.
      * 
-     * @return YAML job configuration swapper
+     * @return YAML pipeline job configuration swapper
      */
     YamlPipelineJobConfigurationSwapper<?, ?> getYamlJobConfigurationSwapper();
     
+    /**
+     * Get YAML pipeline job item progress swapper.
+     * 
+     * @return YAML pipeline job item progress swapper
+     */
+    @SuppressWarnings("rawtypes")
+    YamlPipelineJobItemProgressSwapper getYamlJobItemProgressSwapper();
+    
     /**
      * Whether to ignore to start disabled job when job item progress is 
finished.
      * 
@@ -67,20 +75,6 @@ public interface PipelineJobAPI extends TypedSPI {
         return Optional.empty();
     }
     
-    /**
-     * Persist job item progress.
-     *
-     * @param jobItemContext job item context
-     */
-    void persistJobItemProgress(PipelineJobItemContext jobItemContext);
-    
-    /**
-     * Update job item progress.
-     *
-     * @param jobItemContext job item context
-     */
-    void updateJobItemProgress(PipelineJobItemContext jobItemContext);
-    
     /**
      * Get job item progress.
      *
@@ -104,7 +98,7 @@ public interface PipelineJobAPI extends TypedSPI {
      * 
      * @return pipeline job class
      */
-    Class<? extends PipelineJob> getPipelineJobClass();
+    Class<? extends PipelineJob> getJobClass();
     
     @Override
     String getType();
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
index be7c8121cb3..99d846dd6d7 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
@@ -22,6 +22,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
+import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
 import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
@@ -55,7 +56,7 @@ public final class PipelineJobManager {
     
     private static final DateTimeFormatter DATE_TIME_FORMATTER = 
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
     
-    private final PipelineJobAPI pipelineJobAPI;
+    private final PipelineJobAPI jobAPI;
     
     /**
      * Get job configuration.
@@ -64,7 +65,7 @@ public final class PipelineJobManager {
      * @return pipeline job configuration
      */
     public PipelineJobConfiguration getJobConfiguration(final 
JobConfigurationPOJO jobConfigPOJO) {
-        return 
pipelineJobAPI.getYamlJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter());
+        return 
jobAPI.getYamlJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter());
     }
     
     /**
@@ -82,7 +83,7 @@ public final class PipelineJobManager {
             log.warn("jobId already exists in registry center, ignore, 
jobConfigKey={}", jobConfigKey);
             return Optional.of(jobId);
         }
-        repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobId), 
pipelineJobAPI.getPipelineJobClass().getName());
+        repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobId), 
jobAPI.getJobClass().getName());
         repositoryAPI.persist(jobConfigKey, 
YamlEngine.marshal(jobConfig.convertToJobConfigurationPOJO()));
         return Optional.of(jobId);
     }
@@ -93,15 +94,15 @@ public final class PipelineJobManager {
      * @param jobId job id
      */
     public void startDisabledJob(final String jobId) {
-        if 
(pipelineJobAPI.isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished()) {
-            Optional<? extends PipelineJobItemProgress> jobItemProgress = 
pipelineJobAPI.getJobItemProgress(jobId, 0);
+        if (jobAPI.isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished()) {
+            Optional<? extends PipelineJobItemProgress> jobItemProgress = 
jobAPI.getJobItemProgress(jobId, 0);
             if (jobItemProgress.isPresent() && JobStatus.FINISHED == 
jobItemProgress.get().getStatus()) {
                 log.info("job status is FINISHED, ignore, jobId={}", jobId);
                 return;
             }
         }
         startCurrentDisabledJob(jobId);
-        pipelineJobAPI.getToBeStartDisabledNextJobType().ifPresent(optional -> 
startNextDisabledJob(jobId, optional));
+        jobAPI.getToBeStartDisabledNextJobType().ifPresent(optional -> 
startNextDisabledJob(jobId, optional));
         
     }
     
@@ -139,7 +140,7 @@ public final class PipelineJobManager {
      * @param jobId job id
      */
     public void stop(final String jobId) {
-        pipelineJobAPI.getToBeStoppedPreviousJobType().ifPresent(optional -> 
stopPreviousJob(jobId, optional));
+        jobAPI.getToBeStoppedPreviousJobType().ifPresent(optional -> 
stopPreviousJob(jobId, optional));
         stopCurrentJob(jobId);
     }
     
@@ -189,8 +190,8 @@ public final class PipelineJobManager {
      * @return jobs info
      */
     public List<PipelineJobInfo> getPipelineJobInfos(final PipelineContextKey 
contextKey) {
-        if (pipelineJobAPI instanceof InventoryIncrementalJobAPI) {
-            return getJobBriefInfos(contextKey, 
pipelineJobAPI.getType()).map(each -> ((InventoryIncrementalJobAPI) 
pipelineJobAPI).getJobInfo(each.getJobName())).collect(Collectors.toList());
+        if (jobAPI instanceof InventoryIncrementalJobAPI) {
+            return getJobBriefInfos(contextKey, jobAPI.getType()).map(each -> 
((InventoryIncrementalJobAPI) 
jobAPI).getJobInfo(each.getJobName())).collect(Collectors.toList());
         }
         return Collections.emptyList();
     }
@@ -200,6 +201,31 @@ public final class PipelineJobManager {
                 .filter(each -> 
jobType.equals(PipelineJobIdUtils.parseJobType(each.getJobName()).getType()));
     }
     
+    /**
+     * Persist job item progress.
+     *
+     * @param jobItemContext job item context
+     */
+    public void persistJobItemProgress(final PipelineJobItemContext 
jobItemContext) {
+        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobItemContext.getJobId()))
+                .persistJobItemProgress(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), convertJobItemProgress(jobItemContext));
+    }
+    
+    /**
+     * Update job item progress.
+     *
+     * @param jobItemContext job item context
+     */
+    public void updateJobItemProgress(final PipelineJobItemContext 
jobItemContext) {
+        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobItemContext.getJobId()))
+                .updateJobItemProgress(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), convertJobItemProgress(jobItemContext));
+    }
+    
+    @SuppressWarnings("unchecked")
+    private String convertJobItemProgress(final PipelineJobItemContext 
jobItemContext) {
+        return 
YamlEngine.marshal(jobAPI.getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemContext.toProgress()));
+    }
+    
     /**
      * Get job item error message.
      *
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
index 372971129cc..1706b2c4c9f 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -17,19 +17,15 @@
 
 package org.apache.shardingsphere.data.pipeline.core.job.service.impl;
 
-import lombok.AccessLevel;
-import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfigurationUtils;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
-import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
 import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlInventoryIncrementalJobItemProgress;
-import 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlInventoryIncrementalJobItemProgressSwapper;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfo;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfoSwapper;
 import 
org.apache.shardingsphere.data.pipeline.common.pojo.DataConsistencyCheckAlgorithmInfo;
@@ -66,9 +62,6 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl 
implements Inventor
     
     private final PipelineProcessConfigurationPersistService 
processConfigPersistService = new PipelineProcessConfigurationPersistService();
     
-    @Getter(AccessLevel.PROTECTED)
-    private final YamlInventoryIncrementalJobItemProgressSwapper 
jobItemProgressSwapper = new YamlInventoryIncrementalJobItemProgressSwapper();
-    
     private final YamlJobOffsetInfoSwapper jobOffsetInfoSwapper = new 
YamlJobOffsetInfoSwapper();
     
     @Override
@@ -121,22 +114,6 @@ public abstract class 
AbstractInventoryIncrementalJobAPIImpl implements Inventor
         return result;
     }
     
-    @Override
-    public void persistJobItemProgress(final PipelineJobItemContext 
jobItemContext) {
-        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobItemContext.getJobId()))
-                .persistJobItemProgress(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), convertJobItemProgress(jobItemContext));
-    }
-    
-    @Override
-    public void updateJobItemProgress(final PipelineJobItemContext 
jobItemContext) {
-        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobItemContext.getJobId()))
-                .updateJobItemProgress(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), convertJobItemProgress(jobItemContext));
-    }
-    
-    private String convertJobItemProgress(final PipelineJobItemContext 
jobItemContext) {
-        return 
YamlEngine.marshal(jobItemProgressSwapper.swapToYamlConfiguration((InventoryIncrementalJobItemProgress)
 jobItemContext.toProgress()));
-    }
-    
     @Override
     public void persistJobOffsetInfo(final String jobId, final JobOffsetInfo 
jobOffsetInfo) {
         String value = 
YamlEngine.marshal(jobOffsetInfoSwapper.swapToYamlConfiguration(jobOffsetInfo));
@@ -156,7 +133,7 @@ public abstract class 
AbstractInventoryIncrementalJobAPIImpl implements Inventor
     @Override
     public Optional<InventoryIncrementalJobItemProgress> 
getJobItemProgress(final String jobId, final int shardingItem) {
         Optional<String> progress = 
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemProgress(jobId,
 shardingItem);
-        return progress.map(optional -> 
jobItemProgressSwapper.swapToObject(YamlEngine.unmarshal(optional, 
YamlInventoryIncrementalJobItemProgress.class)));
+        return progress.map(optional -> 
getYamlJobItemProgressSwapper().swapToObject(YamlEngine.unmarshal(optional, 
YamlInventoryIncrementalJobItemProgress.class)));
     }
     
     @Override
@@ -167,7 +144,7 @@ public abstract class 
AbstractInventoryIncrementalJobAPIImpl implements Inventor
         }
         jobItemProgress.get().setStatus(status);
         
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).updateJobItemProgress(jobId,
 shardingItem,
-                
YamlEngine.marshal(jobItemProgressSwapper.swapToYamlConfiguration(jobItemProgress.get())));
+                
YamlEngine.marshal(getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress.get())));
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/yaml/YamlPipelineJobItemProgressSwapper.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/yaml/YamlPipelineJobItemProgressSwapper.java
new file mode 100644
index 00000000000..90f522b0910
--- /dev/null
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/yaml/YamlPipelineJobItemProgressSwapper.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.job.yaml;
+
+import 
org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
+import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
+import 
org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
+
+/**
+ * YAML pipeline job configuration swapper.
+ *
+ * @param <Y> type of YAML configuration
+ * @param <T> type of swapped pipeline job item progress
+ */
+public interface YamlPipelineJobItemProgressSwapper<Y extends 
YamlConfiguration, T extends PipelineJobItemProgress> extends 
YamlConfigurationSwapper<Y, T> {
+}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
index 1923ad14911..ad9e3ccb81b 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
@@ -83,7 +83,7 @@ public class InventoryIncrementalTasksRunner implements 
PipelineTasksRunner {
         if (jobItemContext.isStopping()) {
             return;
         }
-        TypedSPILoader.getService(PipelineJobAPI.class, 
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType()).persistJobItemProgress(jobItemContext);
+        new PipelineJobManager(TypedSPILoader.getService(PipelineJobAPI.class, 
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType())).persistJobItemProgress(jobItemContext);
         if 
(PipelineJobProgressDetector.isAllInventoryTasksFinished(inventoryTasks)) {
             log.info("All inventory tasks finished.");
             executeIncrementalTask();
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index dc0bda53380..070328d868e 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -123,7 +123,7 @@ public final class CDCJobAPI extends 
AbstractInventoryIncrementalJobAPIImpl {
         if (repositoryAPI.isExisted(jobConfigKey)) {
             log.warn("CDC job already exists in registry center, ignore, 
jobConfigKey={}", jobConfigKey);
         } else {
-            
repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobConfig.getJobId()),
 getPipelineJobClass().getName());
+            
repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobConfig.getJobId()),
 getJobClass().getName());
             JobConfigurationPOJO jobConfigPOJO = 
jobConfig.convertToJobConfigurationPOJO();
             jobConfigPOJO.setDisabled(true);
             repositoryAPI.persist(jobConfigKey, 
YamlEngine.marshal(jobConfigPOJO));
@@ -176,7 +176,7 @@ public final class CDCJobAPI extends 
AbstractInventoryIncrementalJobAPIImpl {
                 IncrementalDumperContext dumperContext = 
buildDumperContext(jobConfig, i, new 
TableAndSchemaNameMapper(jobConfig.getSchemaTableNames()));
                 InventoryIncrementalJobItemProgress jobItemProgress = 
getInventoryIncrementalJobItemProgress(jobConfig, pipelineDataSourceManager, 
dumperContext);
                 
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persistJobItemProgress(
-                        jobId, i, 
YamlEngine.marshal(getJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress)));
+                        jobId, i, 
YamlEngine.marshal(getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress)));
             }
         } catch (final SQLException ex) {
             throw new PrepareJobWithGetBinlogPositionException(jobId, ex);
@@ -329,7 +329,7 @@ public final class CDCJobAPI extends 
AbstractInventoryIncrementalJobAPIImpl {
     }
     
     @Override
-    public Class<CDCJob> getPipelineJobClass() {
+    public Class<CDCJob> getJobClass() {
         return CDCJob.class;
     }
     
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index 681307a84c1..e79887f33cd 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -41,6 +41,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWith
 import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
 import org.apache.shardingsphere.data.pipeline.core.importer.ImporterType;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.InventoryTaskSplitter;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparerUtils;
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
@@ -68,6 +69,8 @@ public final class CDCJobPreparer {
     
     private final CDCJobAPI jobAPI = new CDCJobAPI();
     
+    private final PipelineJobManager jobManager = new 
PipelineJobManager(jobAPI);
+    
     /**
      * Do prepare work.
      *
@@ -88,7 +91,7 @@ public final class CDCJobPreparer {
                             final AtomicBoolean incrementalImporterUsed, final 
List<CDCChannelProgressPair> incrementalChannelProgressPairs) {
         Optional<InventoryIncrementalJobItemProgress> jobItemProgress = 
jobAPI.getJobItemProgress(jobItemContext.getJobId(), 
jobItemContext.getShardingItem());
         if (!jobItemProgress.isPresent()) {
-            jobAPI.persistJobItemProgress(jobItemContext);
+            jobManager.persistJobItemProgress(jobItemContext);
         }
         if (jobItemContext.isStopping()) {
             PipelineJobCenter.stop(jobItemContext.getJobId());
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index da0f06e6ed9..62a1520c06b 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -20,7 +20,6 @@ package 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.im
 import com.google.common.base.Strings;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
-import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
 import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.ConsistencyCheckJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlConsistencyCheckJobItemProgress;
@@ -73,8 +72,6 @@ public final class ConsistencyCheckJobAPI implements 
PipelineJobAPI {
     
     private static final DateTimeFormatter DATE_TIME_FORMATTER = 
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
     
-    private final YamlConsistencyCheckJobItemProgressSwapper swapper = new 
YamlConsistencyCheckJobItemProgressSwapper();
-    
     /**
      * Create consistency check configuration and start job.
      *
@@ -120,26 +117,10 @@ public final class ConsistencyCheckJobAPI implements 
PipelineJobAPI {
         return true;
     }
     
-    @Override
-    public void persistJobItemProgress(final PipelineJobItemContext 
jobItemContext) {
-        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobItemContext.getJobId()))
-                .persistJobItemProgress(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), convertJobItemProgress(jobItemContext));
-    }
-    
-    private String convertJobItemProgress(final PipelineJobItemContext 
jobItemContext) {
-        return 
YamlEngine.marshal(swapper.swapToYamlConfiguration((ConsistencyCheckJobItemProgress)
 jobItemContext.toProgress()));
-    }
-    
-    @Override
-    public void updateJobItemProgress(final PipelineJobItemContext 
jobItemContext) {
-        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobItemContext.getJobId()))
-                .updateJobItemProgress(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), convertJobItemProgress(jobItemContext));
-    }
-    
     @Override
     public Optional<ConsistencyCheckJobItemProgress> getJobItemProgress(final 
String jobId, final int shardingItem) {
         Optional<String> progress = 
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemProgress(jobId,
 shardingItem);
-        return progress.map(s -> swapper.swapToObject(YamlEngine.unmarshal(s, 
YamlConsistencyCheckJobItemProgress.class, true)));
+        return progress.map(s -> 
getYamlJobItemProgressSwapper().swapToObject(YamlEngine.unmarshal(s, 
YamlConsistencyCheckJobItemProgress.class, true)));
     }
     
     @Override
@@ -151,7 +132,7 @@ public final class ConsistencyCheckJobAPI implements 
PipelineJobAPI {
         }
         jobItemProgress.get().setStatus(status);
         
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).updateJobItemProgress(jobId,
 shardingItem,
-                
YamlEngine.marshal(swapper.swapToYamlConfiguration(jobItemProgress.get())));
+                
YamlEngine.marshal(getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress.get())));
     }
     
     /**
@@ -326,7 +307,12 @@ public final class ConsistencyCheckJobAPI implements 
PipelineJobAPI {
     }
     
     @Override
-    public Class<ConsistencyCheckJob> getPipelineJobClass() {
+    public YamlConsistencyCheckJobItemProgressSwapper 
getYamlJobItemProgressSwapper() {
+        return new YamlConsistencyCheckJobItemProgressSwapper();
+    }
+    
+    @Override
+    public Class<ConsistencyCheckJob> getJobClass() {
         return ConsistencyCheckJob.class;
     }
     
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
index 43f826dc52c..37947db77b8 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
@@ -50,9 +50,9 @@ import java.util.concurrent.atomic.AtomicReference;
 @Slf4j
 public final class ConsistencyCheckTasksRunner implements PipelineTasksRunner {
     
-    private final ConsistencyCheckJobAPI checkJobAPI = new 
ConsistencyCheckJobAPI();
+    private final ConsistencyCheckJobAPI jobAPI = new ConsistencyCheckJobAPI();
     
-    private final PipelineJobManager jobManager = new 
PipelineJobManager(checkJobAPI);
+    private final PipelineJobManager jobManager = new 
PipelineJobManager(jobAPI);
     
     @Getter
     private final ConsistencyCheckJobItemContext jobItemContext;
@@ -80,7 +80,7 @@ public final class ConsistencyCheckTasksRunner implements 
PipelineTasksRunner {
         if (jobItemContext.isStopping()) {
             return;
         }
-        TypedSPILoader.getService(PipelineJobAPI.class, 
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType()).persistJobItemProgress(jobItemContext);
+        new PipelineJobManager(TypedSPILoader.getService(PipelineJobAPI.class, 
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType())).persistJobItemProgress(jobItemContext);
         CompletableFuture<?> future = 
jobItemContext.getProcessContext().getConsistencyCheckExecuteEngine().submit(checkExecutor);
         ExecuteEngine.trigger(Collections.singletonList(future), new 
CheckExecuteCallback());
     }
@@ -95,7 +95,7 @@ public final class ConsistencyCheckTasksRunner implements 
PipelineTasksRunner {
         
         @Override
         protected void runBlocking() {
-            checkJobAPI.persistJobItemProgress(jobItemContext);
+            jobManager.persistJobItemProgress(jobItemContext);
             JobType jobType = PipelineJobIdUtils.parseJobType(parentJobId);
             InventoryIncrementalJobAPI jobAPI = (InventoryIncrementalJobAPI) 
TypedSPILoader.getService(PipelineJobAPI.class, jobType.getType());
             PipelineJobConfiguration parentJobConfig = new 
PipelineJobManager(jobAPI).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(parentJobId));
@@ -133,7 +133,7 @@ public final class ConsistencyCheckTasksRunner implements 
PipelineTasksRunner {
             }
             log.info("onSuccess, check job id: {}, parent job id: {}", 
checkJobId, parentJobId);
             jobItemContext.setStatus(JobStatus.FINISHED);
-            checkJobAPI.persistJobItemProgress(jobItemContext);
+            jobManager.persistJobItemProgress(jobItemContext);
             jobManager.stop(checkJobId);
         }
         
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index 1f2378d05d3..3d12a4363bc 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -445,7 +445,7 @@ public final class MigrationJobAPI extends 
AbstractInventoryIncrementalJobAPIImp
     }
     
     @Override
-    public Class<MigrationJob> getPipelineJobClass() {
+    public Class<MigrationJob> getJobClass() {
         return MigrationJob.class;
     }
     
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
index cf73d15646b..37772491969 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
@@ -45,6 +45,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.Increm
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.InventoryDumperContext;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.InventoryTaskSplitter;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparerUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.PrepareTargetSchemasParameter;
@@ -81,6 +82,8 @@ public final class MigrationJobPreparer {
     
     private final MigrationJobAPI jobAPI = new MigrationJobAPI();
     
+    private final PipelineJobManager jobManager = new 
PipelineJobManager(jobAPI);
+    
     /**
      * Do prepare work.
      *
@@ -123,7 +126,7 @@ public final class MigrationJobPreparer {
         String jobId = jobConfig.getJobId();
         LockContext lockContext = 
PipelineContextManager.getContext(PipelineJobIdUtils.parseContextKey(jobId)).getContextManager().getInstanceContext().getLockContext();
         if (!jobAPI.getJobItemProgress(jobId, 
jobItemContext.getShardingItem()).isPresent()) {
-            jobAPI.persistJobItemProgress(jobItemContext);
+            jobManager.persistJobItemProgress(jobItemContext);
         }
         LockDefinition lockDefinition = new 
GlobalLockDefinition(String.format(GlobalLockNames.PREPARE.getLockName(), 
jobConfig.getJobId()));
         long startTimeMillis = System.currentTimeMillis();
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
index 6ab0ef41af0..deeae3d64e5 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
@@ -50,7 +50,9 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 class ConsistencyCheckJobAPITest {
     
-    private final ConsistencyCheckJobAPI checkJobAPI = new 
ConsistencyCheckJobAPI();
+    private final ConsistencyCheckJobAPI jobAPI = new ConsistencyCheckJobAPI();
+    
+    private final PipelineJobManager jobManager = new 
PipelineJobManager(jobAPI);
     
     private final YamlMigrationJobConfigurationSwapper jobConfigSwapper = new 
YamlMigrationJobConfigurationSwapper();
     
@@ -63,9 +65,9 @@ class ConsistencyCheckJobAPITest {
     void assertCreateJobConfig() {
         MigrationJobConfiguration parentJobConfig = 
jobConfigSwapper.swapToObject(JobConfigurationBuilder.createYamlMigrationJobConfiguration());
         String parentJobId = parentJobConfig.getJobId();
-        String checkJobId = checkJobAPI.createJobAndStart(new 
CreateConsistencyCheckJobParameter(parentJobId, null, null,
+        String checkJobId = jobAPI.createJobAndStart(new 
CreateConsistencyCheckJobParameter(parentJobId, null, null,
                 parentJobConfig.getSourceDatabaseType(), 
parentJobConfig.getTargetDatabaseType()));
-        ConsistencyCheckJobConfiguration checkJobConfig = 
(ConsistencyCheckJobConfiguration) new PipelineJobManager(checkJobAPI)
+        ConsistencyCheckJobConfiguration checkJobConfig = 
(ConsistencyCheckJobConfiguration) new PipelineJobManager(jobAPI)
                 
.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(checkJobId));
         int expectedSequence = ConsistencyCheckSequence.MIN_SEQUENCE;
         String expectCheckJobId = new 
ConsistencyCheckJobId(PipelineJobIdUtils.parseContextKey(parentJobId), 
parentJobId, expectedSequence).marshal();
@@ -82,11 +84,11 @@ class ConsistencyCheckJobAPITest {
         GovernanceRepositoryAPI repositoryAPI = 
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey());
         int expectedSequence = 1;
         for (int i = 0; i < 3; i++) {
-            String checkJobId = checkJobAPI.createJobAndStart(new 
CreateConsistencyCheckJobParameter(parentJobId, null, null,
+            String checkJobId = jobAPI.createJobAndStart(new 
CreateConsistencyCheckJobParameter(parentJobId, null, null,
                     parentJobConfig.getSourceDatabaseType(), 
parentJobConfig.getTargetDatabaseType()));
             ConsistencyCheckJobItemContext checkJobItemContext = new 
ConsistencyCheckJobItemContext(
                     new ConsistencyCheckJobConfiguration(checkJobId, 
parentJobId, null, null, TypedSPILoader.getService(DatabaseType.class, "H2")), 
0, JobStatus.FINISHED, null);
-            checkJobAPI.persistJobItemProgress(checkJobItemContext);
+            jobManager.persistJobItemProgress(checkJobItemContext);
             Map<String, TableDataConsistencyCheckResult> 
dataConsistencyCheckResult = Collections.singletonMap("t_order", new 
TableDataConsistencyCheckResult(true));
             repositoryAPI.persistCheckJobResult(parentJobId, checkJobId, 
dataConsistencyCheckResult);
             Optional<String> latestCheckJobId = 
repositoryAPI.getLatestCheckJobId(parentJobId);
@@ -95,12 +97,12 @@ class ConsistencyCheckJobAPITest {
         }
         expectedSequence = 2;
         for (int i = 0; i < 2; i++) {
-            checkJobAPI.dropByParentJobId(parentJobId);
+            jobAPI.dropByParentJobId(parentJobId);
             Optional<String> latestCheckJobId = 
repositoryAPI.getLatestCheckJobId(parentJobId);
             assertTrue(latestCheckJobId.isPresent());
             
assertThat(ConsistencyCheckJobId.parseSequence(latestCheckJobId.get()), 
is(expectedSequence--));
         }
-        checkJobAPI.dropByParentJobId(parentJobId);
+        jobAPI.dropByParentJobId(parentJobId);
         Optional<String> latestCheckJobId = 
repositoryAPI.getLatestCheckJobId(parentJobId);
         assertFalse(latestCheckJobId.isPresent());
     }
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
index 472a14550de..fd0e33cdfa0 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
@@ -212,7 +212,7 @@ class MigrationJobAPITest {
         Optional<String> jobId = jobManager.start(jobConfig);
         assertTrue(jobId.isPresent());
         MigrationJobItemContext jobItemContext = 
PipelineContextUtils.mockMigrationJobItemContext(jobConfig);
-        jobAPI.persistJobItemProgress(jobItemContext);
+        jobManager.persistJobItemProgress(jobItemContext);
         jobAPI.updateJobItemStatus(jobId.get(), 
jobItemContext.getShardingItem(), JobStatus.EXECUTE_INVENTORY_TASK);
         Map<Integer, InventoryIncrementalJobItemProgress> progress = 
jobAPI.getJobProgress(jobConfig);
         for (Entry<Integer, InventoryIncrementalJobItemProgress> entry : 
progress.entrySet()) {
@@ -245,7 +245,7 @@ class MigrationJobAPITest {
     void assertRenewJobStatus() {
         final MigrationJobConfiguration jobConfig = 
JobConfigurationBuilder.createJobConfiguration();
         MigrationJobItemContext jobItemContext = 
PipelineContextUtils.mockMigrationJobItemContext(jobConfig);
-        jobAPI.persistJobItemProgress(jobItemContext);
+        jobManager.persistJobItemProgress(jobItemContext);
         jobAPI.updateJobItemStatus(jobConfig.getJobId(), 0, 
JobStatus.FINISHED);
         Optional<InventoryIncrementalJobItemProgress> actual = 
jobAPI.getJobItemProgress(jobItemContext.getJobId(), 
jobItemContext.getShardingItem());
         assertTrue(actual.isPresent());


Reply via email to