This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 19c6551e3c5 Add YamlPipelineJobItemProgressConfiguration (#29074)
19c6551e3c5 is described below

commit 19c6551e3c540dcf49b554859fe9627007303f72
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Nov 18 17:11:35 2023 +0800

    Add YamlPipelineJobItemProgressConfiguration (#29074)
---
 .../yaml/YamlConsistencyCheckJobItemProgress.java    |  4 ++--
 .../YamlConsistencyCheckJobItemProgressSwapper.java  |  5 +++++
 .../YamlInventoryIncrementalJobItemProgress.java     |  4 ++--
 ...mlInventoryIncrementalJobItemProgressSwapper.java |  5 +++++
 .../core/job/service/InventoryIncrementalJobAPI.java |  5 +----
 .../pipeline/core/job/service/PipelineJobAPI.java    | 14 +++-----------
 .../core/job/service/PipelineJobManager.java         | 19 +++++++++++++++++--
 .../impl/AbstractInventoryIncrementalJobAPIImpl.java | 13 ++++---------
 ...=> YamlPipelineJobItemProgressConfiguration.java} |  9 ++-------
 .../job/yaml/YamlPipelineJobItemProgressSwapper.java | 12 +++++++++---
 .../data/pipeline/cdc/api/impl/CDCJobAPI.java        |  3 ++-
 .../data/pipeline/cdc/core/job/CDCJob.java           |  2 +-
 .../pipeline/cdc/core/prepare/CDCJobPreparer.java    |  2 +-
 .../consistencycheck/ConsistencyCheckJob.java        |  4 +++-
 .../api/impl/ConsistencyCheckJobAPI.java             | 20 +++++++++-----------
 .../pipeline/scenario/migration/MigrationJob.java    |  5 ++++-
 .../migration/prepare/MigrationJobPreparer.java      |  2 +-
 .../migration/api/impl/MigrationJobAPITest.java      |  2 +-
 18 files changed, 72 insertions(+), 58 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlConsistencyCheckJobItemProgress.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlConsistencyCheckJobItemProgress.java
index 2ed595408b3..72ea55e1865 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlConsistencyCheckJobItemProgress.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlConsistencyCheckJobItemProgress.java
@@ -19,7 +19,7 @@ package 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml;
 
 import lombok.Getter;
 import lombok.Setter;
-import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressConfiguration;
 
 import java.util.LinkedHashMap;
 import java.util.Map;
@@ -29,7 +29,7 @@ import java.util.Map;
  */
 @Getter
 @Setter
-public final class YamlConsistencyCheckJobItemProgress implements 
YamlConfiguration {
+public final class YamlConsistencyCheckJobItemProgress implements 
YamlPipelineJobItemProgressConfiguration {
     
     private String status;
     
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlConsistencyCheckJobItemProgressSwapper.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlConsistencyCheckJobItemProgressSwapper.java
index 066df10fc28..eb582d1d896 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlConsistencyCheckJobItemProgressSwapper.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlConsistencyCheckJobItemProgressSwapper.java
@@ -50,4 +50,9 @@ public final class YamlConsistencyCheckJobItemProgressSwapper 
implements YamlPip
         result.setStatus(JobStatus.valueOf(yamlConfig.getStatus()));
         return result;
     }
+    
+    @Override
+    public Class<YamlConsistencyCheckJobItemProgress> getYamlProgressClass() {
+        return YamlConsistencyCheckJobItemProgress.class;
+    }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlInventoryIncrementalJobItemProgress.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlInventoryIncrementalJobItemProgress.java
index e73596ce667..1e44dcf8984 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlInventoryIncrementalJobItemProgress.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlInventoryIncrementalJobItemProgress.java
@@ -19,14 +19,14 @@ package 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml;
 
 import lombok.Getter;
 import lombok.Setter;
-import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressConfiguration;
 
 /**
  * YAML inventory incremental job item progress.
  */
 @Getter
 @Setter
-public final class YamlInventoryIncrementalJobItemProgress implements 
YamlConfiguration {
+public final class YamlInventoryIncrementalJobItemProgress implements 
YamlPipelineJobItemProgressConfiguration {
     
     private String status;
     
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java
index 24c2f6a5a74..1b29d4f52de 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java
@@ -57,4 +57,9 @@ public final class 
YamlInventoryIncrementalJobItemProgressSwapper implements Yam
         
result.setInventoryRecordsCount(yamlProgress.getInventoryRecordsCount());
         return result;
     }
+    
+    @Override
+    public Class<YamlInventoryIncrementalJobItemProgress> 
getYamlProgressClass() {
+        return YamlInventoryIncrementalJobItemProgress.class;
+    }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java
index 908c3103c76..fa4594f7eab 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java
@@ -37,13 +37,13 @@ import java.sql.SQLException;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 
 /**
  * Inventory incremental job API.
  */
 public interface InventoryIncrementalJobAPI extends PipelineJobAPI {
     
+    @SuppressWarnings("unchecked")
     @Override
     default YamlInventoryIncrementalJobItemProgressSwapper 
getYamlJobItemProgressSwapper() {
         return new YamlInventoryIncrementalJobItemProgressSwapper();
@@ -123,9 +123,6 @@ public interface InventoryIncrementalJobAPI extends 
PipelineJobAPI {
      */
     Map<Integer, InventoryIncrementalJobItemProgress> 
getJobProgress(PipelineJobConfiguration pipelineJobConfig);
     
-    @Override
-    Optional<InventoryIncrementalJobItemProgress> getJobItemProgress(String 
jobId, int shardingItem);
-    
     /**
      * Get job infos.
      *
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
index b5ab26690a0..168b18d88ad 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
@@ -21,6 +21,7 @@ import 
org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobConfigurationSwapper;
+import 
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressSwapper;
 import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI;
@@ -43,10 +44,10 @@ public interface PipelineJobAPI extends TypedSPI {
     /**
      * Get YAML pipeline job item progress swapper.
      * 
+     * @param <T> type of pipeline job item progress
      * @return YAML pipeline job item progress swapper
      */
-    @SuppressWarnings("rawtypes")
-    YamlPipelineJobItemProgressSwapper getYamlJobItemProgressSwapper();
+    <T extends PipelineJobItemProgress> 
YamlPipelineJobItemProgressSwapper<YamlPipelineJobItemProgressConfiguration, T> 
getYamlJobItemProgressSwapper();
     
     /**
      * Whether to ignore to start disabled job when job item progress is 
finished.
@@ -75,15 +76,6 @@ public interface PipelineJobAPI extends TypedSPI {
         return Optional.empty();
     }
     
-    /**
-     * Get job item progress.
-     *
-     * @param jobId job id
-     * @param shardingItem sharding item
-     * @return job item progress, may be null
-     */
-    Optional<? extends PipelineJobItemProgress> getJobItemProgress(String 
jobId, int shardingItem);
-    
     /**
      * Update job item status.
      *
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
index 99d846dd6d7..e7fd3ad85e6 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
@@ -32,6 +32,8 @@ import 
org.apache.shardingsphere.data.pipeline.common.util.PipelineDistributedBa
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationWithInvalidShardingCountException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobHasAlreadyStartedException;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
+import 
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressSwapper;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo;
 import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
@@ -95,7 +97,7 @@ public final class PipelineJobManager {
      */
     public void startDisabledJob(final String jobId) {
         if (jobAPI.isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished()) {
-            Optional<? extends PipelineJobItemProgress> jobItemProgress = 
jobAPI.getJobItemProgress(jobId, 0);
+            Optional<? extends PipelineJobItemProgress> jobItemProgress = 
getJobItemProgress(jobId, 0);
             if (jobItemProgress.isPresent() && JobStatus.FINISHED == 
jobItemProgress.get().getStatus()) {
                 log.info("job status is FINISHED, ignore, jobId={}", jobId);
                 return;
@@ -201,6 +203,20 @@ public final class PipelineJobManager {
                 .filter(each -> 
jobType.equals(PipelineJobIdUtils.parseJobType(each.getJobName()).getType()));
     }
     
+    /**
+     * Get job item progress.
+     *
+     * @param jobId job id
+     * @param shardingItem sharding item
+     * @param <T> type of pipeline job item progress
+     * @return job item progress, may be null
+     */
+    public <T extends PipelineJobItemProgress> Optional<T> 
getJobItemProgress(final String jobId, final int shardingItem) {
+        
YamlPipelineJobItemProgressSwapper<YamlPipelineJobItemProgressConfiguration, T> 
swapper = jobAPI.getYamlJobItemProgressSwapper();
+        Optional<String> progress = 
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemProgress(jobId,
 shardingItem);
+        return progress.map(optional -> 
swapper.swapToObject(YamlEngine.unmarshal(optional, 
swapper.getYamlProgressClass(), true)));
+    }
+    
     /**
      * Persist job item progress.
      *
@@ -221,7 +237,6 @@ public final class PipelineJobManager {
                 .updateJobItemProgress(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), convertJobItemProgress(jobItemContext));
     }
     
-    @SuppressWarnings("unchecked")
     private String convertJobItemProgress(final PipelineJobItemContext 
jobItemContext) {
         return 
YamlEngine.marshal(jobAPI.getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemContext.toProgress()));
     }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
index 1706b2c4c9f..6b0cf4171e6 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -25,7 +25,6 @@ import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey
 import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
-import 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlInventoryIncrementalJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfo;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfoSwapper;
 import 
org.apache.shardingsphere.data.pipeline.common.pojo.DataConsistencyCheckAlgorithmInfo;
@@ -77,10 +76,11 @@ public abstract class 
AbstractInventoryIncrementalJobAPIImpl implements Inventor
     
     @Override
     public Map<Integer, InventoryIncrementalJobItemProgress> 
getJobProgress(final PipelineJobConfiguration jobConfig) {
+        PipelineJobManager jobManager = new PipelineJobManager(this);
         String jobId = jobConfig.getJobId();
         JobConfigurationPOJO jobConfigPOJO = 
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
         return IntStream.range(0, 
jobConfig.getJobShardingCount()).boxed().collect(LinkedHashMap::new, (map, 
each) -> {
-            Optional<InventoryIncrementalJobItemProgress> jobItemProgress = 
getJobItemProgress(jobId, each);
+            Optional<InventoryIncrementalJobItemProgress> jobItemProgress = 
jobManager.getJobItemProgress(jobId, each);
             jobItemProgress.ifPresent(optional -> 
optional.setActive(!jobConfigPOJO.isDisabled()));
             map.put(each, jobItemProgress.orElse(null));
         }, LinkedHashMap::putAll);
@@ -130,15 +130,10 @@ public abstract class 
AbstractInventoryIncrementalJobAPIImpl implements Inventor
         return jobOffsetInfoSwapper.swapToObject(new YamlJobOffsetInfo());
     }
     
-    @Override
-    public Optional<InventoryIncrementalJobItemProgress> 
getJobItemProgress(final String jobId, final int shardingItem) {
-        Optional<String> progress = 
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemProgress(jobId,
 shardingItem);
-        return progress.map(optional -> 
getYamlJobItemProgressSwapper().swapToObject(YamlEngine.unmarshal(optional, 
YamlInventoryIncrementalJobItemProgress.class)));
-    }
-    
     @Override
     public void updateJobItemStatus(final String jobId, final int 
shardingItem, final JobStatus status) {
-        Optional<InventoryIncrementalJobItemProgress> jobItemProgress = 
getJobItemProgress(jobId, shardingItem);
+        PipelineJobManager jobManager = new PipelineJobManager(this);
+        Optional<InventoryIncrementalJobItemProgress> jobItemProgress = 
jobManager.getJobItemProgress(jobId, shardingItem);
         if (!jobItemProgress.isPresent()) {
             return;
         }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/yaml/YamlPipelineJobItemProgressSwapper.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/yaml/YamlPipelineJobItemProgressConfiguration.java
similarity index 66%
copy from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/yaml/YamlPipelineJobItemProgressSwapper.java
copy to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/yaml/YamlPipelineJobItemProgressConfiguration.java
index 90f522b0910..1a1ef05cbdb 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/yaml/YamlPipelineJobItemProgressSwapper.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/yaml/YamlPipelineJobItemProgressConfiguration.java
@@ -17,15 +17,10 @@
 
 package org.apache.shardingsphere.data.pipeline.core.job.yaml;
 
-import 
org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
 import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
-import 
org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
 
 /**
- * YAML pipeline job configuration swapper.
- *
- * @param <Y> type of YAML configuration
- * @param <T> type of swapped pipeline job item progress
+ * YAML pipeline job item progress configuration.
  */
-public interface YamlPipelineJobItemProgressSwapper<Y extends 
YamlConfiguration, T extends PipelineJobItemProgress> extends 
YamlConfigurationSwapper<Y, T> {
+public interface YamlPipelineJobItemProgressConfiguration extends 
YamlConfiguration {
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/yaml/YamlPipelineJobItemProgressSwapper.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/yaml/YamlPipelineJobItemProgressSwapper.java
index 90f522b0910..4f803634df9 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/yaml/YamlPipelineJobItemProgressSwapper.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/yaml/YamlPipelineJobItemProgressSwapper.java
@@ -18,14 +18,20 @@
 package org.apache.shardingsphere.data.pipeline.core.job.yaml;
 
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
-import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
 import 
org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
 
 /**
  * YAML pipeline job configuration swapper.
  *
- * @param <Y> type of YAML configuration
+ * @param <Y> type of YAML pipeline job item progress configuration
  * @param <T> type of swapped pipeline job item progress
  */
-public interface YamlPipelineJobItemProgressSwapper<Y extends 
YamlConfiguration, T extends PipelineJobItemProgress> extends 
YamlConfigurationSwapper<Y, T> {
+public interface YamlPipelineJobItemProgressSwapper<Y extends 
YamlPipelineJobItemProgressConfiguration, T extends PipelineJobItemProgress> 
extends YamlConfigurationSwapper<Y, T> {
+    
+    /**
+     * Get YAML pipeline job item progress configuration class.
+     * 
+     * @return YAML pipeline job item progress configuration class
+     */
+    Class<Y> getYamlProgressClass();
 }
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 070328d868e..e4aec40e5cb 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -168,9 +168,10 @@ public final class CDCJobAPI extends 
AbstractInventoryIncrementalJobAPIImpl {
     
     private void initIncrementalPosition(final CDCJobConfiguration jobConfig) {
         String jobId = jobConfig.getJobId();
+        PipelineJobManager jobManager = new PipelineJobManager(this);
         try (PipelineDataSourceManager pipelineDataSourceManager = new 
DefaultPipelineDataSourceManager()) {
             for (int i = 0; i < jobConfig.getJobShardingCount(); i++) {
-                if (getJobItemProgress(jobId, i).isPresent()) {
+                if (jobManager.getJobItemProgress(jobId, i).isPresent()) {
                     continue;
                 }
                 IncrementalDumperContext dumperContext = 
buildDumperContext(jobConfig, i, new 
TableAndSchemaNameMapper(jobConfig.getSchemaTableNames()));
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
index c2637c0de36..657b7b48ccd 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
@@ -106,7 +106,7 @@ public final class CDCJob extends AbstractPipelineJob 
implements SimpleJob {
     }
     
     private CDCJobItemContext buildPipelineJobItemContext(final 
CDCJobConfiguration jobConfig, final int shardingItem) {
-        Optional<InventoryIncrementalJobItemProgress> initProgress = 
jobAPI.getJobItemProgress(jobConfig.getJobId(), shardingItem);
+        Optional<InventoryIncrementalJobItemProgress> initProgress = 
jobManager.getJobItemProgress(jobConfig.getJobId(), shardingItem);
         CDCProcessContext jobProcessContext = 
jobAPI.buildPipelineProcessContext(jobConfig);
         CDCTaskConfiguration taskConfig = 
jobAPI.buildTaskConfiguration(jobConfig, shardingItem, 
jobProcessContext.getPipelineProcessConfig());
         return new CDCJobItemContext(jobConfig, shardingItem, 
initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager, 
sink);
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index e79887f33cd..31f50757bbe 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -89,7 +89,7 @@ public final class CDCJobPreparer {
     
     private void initTasks0(final CDCJobItemContext jobItemContext, final 
AtomicBoolean inventoryImporterUsed, final List<CDCChannelProgressPair> 
inventoryChannelProgressPairs,
                             final AtomicBoolean incrementalImporterUsed, final 
List<CDCChannelProgressPair> incrementalChannelProgressPairs) {
-        Optional<InventoryIncrementalJobItemProgress> jobItemProgress = 
jobAPI.getJobItemProgress(jobItemContext.getJobId(), 
jobItemContext.getShardingItem());
+        Optional<InventoryIncrementalJobItemProgress> jobItemProgress = 
jobManager.getJobItemProgress(jobItemContext.getJobId(), 
jobItemContext.getShardingItem());
         if (!jobItemProgress.isPresent()) {
             jobManager.persistJobItemProgress(jobItemContext);
         }
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
index 7d289dd0a89..913fb82ed29 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
@@ -22,6 +22,7 @@ import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemCon
 import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.ConsistencyCheckJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.AbstractSimplePipelineJob;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl.ConsistencyCheckJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration;
@@ -46,7 +47,8 @@ public final class ConsistencyCheckJob extends 
AbstractSimplePipelineJob {
     public ConsistencyCheckJobItemContext buildPipelineJobItemContext(final 
ShardingContext shardingContext) {
         ConsistencyCheckJobConfiguration jobConfig = new 
YamlConsistencyCheckJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
         ConsistencyCheckJobAPI jobAPI = (ConsistencyCheckJobAPI) getJobAPI();
-        Optional<ConsistencyCheckJobItemProgress> jobItemProgress = 
jobAPI.getJobItemProgress(jobConfig.getJobId(), 
shardingContext.getShardingItem());
+        PipelineJobManager jobManager = new PipelineJobManager(jobAPI);
+        Optional<ConsistencyCheckJobItemProgress> jobItemProgress = 
jobManager.getJobItemProgress(jobConfig.getJobId(), 
shardingContext.getShardingItem());
         return new ConsistencyCheckJobItemContext(jobConfig, 
shardingContext.getShardingItem(), JobStatus.RUNNING, 
jobItemProgress.orElse(null));
     }
     
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index 62a1520c06b..4ba4d05079d 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -22,7 +22,6 @@ import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
 import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.ConsistencyCheckJobItemProgress;
-import 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlConsistencyCheckJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlConsistencyCheckJobItemProgressSwapper;
 import 
org.apache.shardingsphere.data.pipeline.common.pojo.ConsistencyCheckJobItemInfo;
 import 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
@@ -84,7 +83,8 @@ public final class ConsistencyCheckJobAPI implements 
PipelineJobAPI {
         GovernanceRepositoryAPI repositoryAPI = 
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(parentJobId));
         Optional<String> latestCheckJobId = 
repositoryAPI.getLatestCheckJobId(parentJobId);
         if (latestCheckJobId.isPresent()) {
-            Optional<ConsistencyCheckJobItemProgress> progress = 
getJobItemProgress(latestCheckJobId.get(), 0);
+            PipelineJobManager jobManager = new PipelineJobManager(this);
+            Optional<ConsistencyCheckJobItemProgress> progress = 
jobManager.getJobItemProgress(latestCheckJobId.get(), 0);
             if (!progress.isPresent() || JobStatus.FINISHED != 
progress.get().getStatus()) {
                 log.info("check job already exists and status is not FINISHED, 
progress={}", progress);
                 throw new 
UncompletedConsistencyCheckJobExistsException(latestCheckJobId.get());
@@ -117,15 +117,10 @@ public final class ConsistencyCheckJobAPI implements 
PipelineJobAPI {
         return true;
     }
     
-    @Override
-    public Optional<ConsistencyCheckJobItemProgress> getJobItemProgress(final 
String jobId, final int shardingItem) {
-        Optional<String> progress = 
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemProgress(jobId,
 shardingItem);
-        return progress.map(s -> 
getYamlJobItemProgressSwapper().swapToObject(YamlEngine.unmarshal(s, 
YamlConsistencyCheckJobItemProgress.class, true)));
-    }
-    
     @Override
     public void updateJobItemStatus(final String jobId, final int 
shardingItem, final JobStatus status) {
-        Optional<ConsistencyCheckJobItemProgress> jobItemProgress = 
getJobItemProgress(jobId, shardingItem);
+        PipelineJobManager jobManager = new PipelineJobManager(this);
+        Optional<ConsistencyCheckJobItemProgress> jobItemProgress = 
jobManager.getJobItemProgress(jobId, shardingItem);
         if (!jobItemProgress.isPresent()) {
             log.warn("updateJobItemStatus, jobProgress is null, jobId={}, 
shardingItem={}", jobId, shardingItem);
             return;
@@ -193,7 +188,8 @@ public final class ConsistencyCheckJobAPI implements 
PipelineJobAPI {
         Optional<String> latestCheckJobId = 
governanceRepositoryAPI.getLatestCheckJobId(parentJobId);
         ShardingSpherePreconditions.checkState(latestCheckJobId.isPresent(), 
() -> new ConsistencyCheckJobNotFoundException(parentJobId));
         String checkJobId = latestCheckJobId.get();
-        Optional<ConsistencyCheckJobItemProgress> progress = 
getJobItemProgress(checkJobId, 0);
+        PipelineJobManager jobManager = new PipelineJobManager(this);
+        Optional<ConsistencyCheckJobItemProgress> progress = 
jobManager.getJobItemProgress(checkJobId, 0);
         if (!progress.isPresent()) {
             return Collections.emptyList();
         }
@@ -233,7 +229,8 @@ public final class ConsistencyCheckJobAPI implements 
PipelineJobAPI {
         Optional<String> latestCheckJobId = 
governanceRepositoryAPI.getLatestCheckJobId(parentJobId);
         ShardingSpherePreconditions.checkState(latestCheckJobId.isPresent(), 
() -> new ConsistencyCheckJobNotFoundException(parentJobId));
         String checkJobId = latestCheckJobId.get();
-        Optional<ConsistencyCheckJobItemProgress> progress = 
getJobItemProgress(checkJobId, 0);
+        PipelineJobManager jobManager = new PipelineJobManager(this);
+        Optional<ConsistencyCheckJobItemProgress> progress = 
jobManager.getJobItemProgress(checkJobId, 0);
         ConsistencyCheckJobItemInfo result = new ConsistencyCheckJobItemInfo();
         JobConfigurationPOJO jobConfigPOJO = 
PipelineJobIdUtils.getElasticJobConfigurationPOJO(checkJobId);
         result.setActive(!jobConfigPOJO.isDisabled());
@@ -306,6 +303,7 @@ public final class ConsistencyCheckJobAPI implements 
PipelineJobAPI {
         return new YamlConsistencyCheckJobConfigurationSwapper();
     }
     
+    @SuppressWarnings("unchecked")
     @Override
     public YamlConsistencyCheckJobItemProgressSwapper 
getYamlJobItemProgressSwapper() {
         return new YamlConsistencyCheckJobItemProgressSwapper();
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index adef5960815..136842f0541 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -24,6 +24,7 @@ import 
org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipeline
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.AbstractSimplePipelineJob;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.InventoryIncrementalTasksRunner;
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
@@ -46,6 +47,8 @@ public final class MigrationJob extends 
AbstractSimplePipelineJob {
     
     private final MigrationJobAPI jobAPI = new MigrationJobAPI();
     
+    private final PipelineJobManager jobManager = new 
PipelineJobManager(jobAPI);
+    
     private final PipelineDataSourceManager dataSourceManager = new 
DefaultPipelineDataSourceManager();
     
     // Shared by all sharding items
@@ -59,7 +62,7 @@ public final class MigrationJob extends 
AbstractSimplePipelineJob {
     protected InventoryIncrementalJobItemContext 
buildPipelineJobItemContext(final ShardingContext shardingContext) {
         int shardingItem = shardingContext.getShardingItem();
         MigrationJobConfiguration jobConfig = new 
YamlMigrationJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
-        Optional<InventoryIncrementalJobItemProgress> initProgress = 
jobAPI.getJobItemProgress(shardingContext.getJobName(), shardingItem);
+        Optional<InventoryIncrementalJobItemProgress> initProgress = 
jobManager.getJobItemProgress(shardingContext.getJobName(), shardingItem);
         MigrationProcessContext jobProcessContext = 
jobAPI.buildPipelineProcessContext(jobConfig);
         MigrationTaskConfiguration taskConfig = 
jobAPI.buildTaskConfiguration(jobConfig, shardingItem, 
jobProcessContext.getPipelineProcessConfig());
         return new MigrationJobItemContext(jobConfig, shardingItem, 
initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager);
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
index 37772491969..d473001b873 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
@@ -125,7 +125,7 @@ public final class MigrationJobPreparer {
         MigrationJobConfiguration jobConfig = jobItemContext.getJobConfig();
         String jobId = jobConfig.getJobId();
         LockContext lockContext = 
PipelineContextManager.getContext(PipelineJobIdUtils.parseContextKey(jobId)).getContextManager().getInstanceContext().getLockContext();
-        if (!jobAPI.getJobItemProgress(jobId, 
jobItemContext.getShardingItem()).isPresent()) {
+        if (!jobManager.getJobItemProgress(jobId, 
jobItemContext.getShardingItem()).isPresent()) {
             jobManager.persistJobItemProgress(jobItemContext);
         }
         LockDefinition lockDefinition = new 
GlobalLockDefinition(String.format(GlobalLockNames.PREPARE.getLockName(), 
jobConfig.getJobId()));
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
index fd0e33cdfa0..a1a1c905dbd 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
@@ -247,7 +247,7 @@ class MigrationJobAPITest {
         MigrationJobItemContext jobItemContext = 
PipelineContextUtils.mockMigrationJobItemContext(jobConfig);
         jobManager.persistJobItemProgress(jobItemContext);
         jobAPI.updateJobItemStatus(jobConfig.getJobId(), 0, 
JobStatus.FINISHED);
-        Optional<InventoryIncrementalJobItemProgress> actual = 
jobAPI.getJobItemProgress(jobItemContext.getJobId(), 
jobItemContext.getShardingItem());
+        Optional<InventoryIncrementalJobItemProgress> actual = 
jobManager.getJobItemProgress(jobItemContext.getJobId(), 
jobItemContext.getShardingItem());
         assertTrue(actual.isPresent());
         assertThat(actual.get().getStatus(), is(JobStatus.FINISHED));
     }


Reply via email to