This is an automated email from the ASF dual-hosted git repository.

sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new df44c995f31 Add PipelineJobItemManager (#29078)
df44c995f31 is described below

commit df44c995f31df93711afc427496a92ee3809fd3b
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Nov 18 20:53:15 2023 +0800

    Add PipelineJobItemManager (#29078)
    
    * Add PipelineJobItemManager
    
    * Add PipelineJobItemManager
---
 .../core/job/AbstractSimplePipelineJob.java        |  14 ++-
 .../persist/PipelineJobProgressPersistService.java |   5 +-
 .../core/job/service/PipelineJobItemManager.java   | 137 +++++++++++++++++++++
 .../core/job/service/PipelineJobManager.java       | 100 +--------------
 .../AbstractInventoryIncrementalJobAPIImpl.java    |  12 +-
 .../runner/InventoryIncrementalTasksRunner.java    |  20 +--
 .../data/pipeline/cdc/api/impl/CDCJobAPI.java      |   5 +-
 .../data/pipeline/cdc/core/job/CDCJob.java         |  14 +--
 .../pipeline/cdc/core/prepare/CDCJobPreparer.java  |  22 ++--
 .../consistencycheck/ConsistencyCheckJob.java      |   8 +-
 .../api/impl/ConsistencyCheckJobAPI.java           |  15 +--
 .../task/ConsistencyCheckTasksRunner.java          |  17 ++-
 .../pipeline/scenario/migration/MigrationJob.java  |   6 +-
 .../migration/prepare/MigrationJobPreparer.java    |  10 +-
 .../api/impl/ConsistencyCheckJobAPITest.java       |   6 +-
 .../migration/api/impl/MigrationJobAPITest.java    |  14 ++-
 16 files changed, 233 insertions(+), 172 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
index 197c09e2d49..36899a266c8 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.data.pipeline.core.job;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
@@ -48,6 +49,7 @@ public abstract class AbstractSimplePipelineJob extends 
AbstractPipelineJob impl
     @Override
     public void execute(final ShardingContext shardingContext) {
         PipelineJobManager jobManager = new PipelineJobManager(getJobAPI());
+        PipelineJobItemManager<?> jobItemManager = new 
PipelineJobItemManager<>(getJobAPI().getYamlJobItemProgressSwapper());
         String jobId = shardingContext.getJobName();
         int shardingItem = shardingContext.getShardingItem();
         log.info("Execute job {}-{}", jobId, shardingItem);
@@ -57,31 +59,31 @@ public abstract class AbstractSimplePipelineJob extends 
AbstractPipelineJob impl
         }
         try {
             PipelineJobItemContext jobItemContext = 
buildPipelineJobItemContext(shardingContext);
-            execute0(jobManager, jobItemContext);
+            execute0(jobItemManager, jobItemContext);
             // CHECKSTYLE:OFF
         } catch (final RuntimeException ex) {
             // CHECKSTYLE:ON
-            processFailed(jobManager, jobId, shardingItem, ex);
+            processFailed(jobManager, jobItemManager, jobId, shardingItem, ex);
             throw ex;
         }
     }
     
-    private void execute0(final PipelineJobManager jobManager, final 
PipelineJobItemContext jobItemContext) {
+    private void execute0(final PipelineJobItemManager<?> jobItemManager, 
final PipelineJobItemContext jobItemContext) {
         String jobId = jobItemContext.getJobId();
         int shardingItem = jobItemContext.getShardingItem();
         PipelineTasksRunner tasksRunner = 
buildPipelineTasksRunner(jobItemContext);
         if (!addTasksRunner(shardingItem, tasksRunner)) {
             return;
         }
-        jobManager.cleanJobItemErrorMessage(jobId, shardingItem);
+        jobItemManager.cleanErrorMessage(jobId, shardingItem);
         prepare(jobItemContext);
         log.info("start tasks runner, jobId={}, shardingItem={}", jobId, 
shardingItem);
         tasksRunner.start();
     }
     
-    private void processFailed(final PipelineJobManager jobManager, final 
String jobId, final int shardingItem, final Exception ex) {
+    private void processFailed(final PipelineJobManager jobManager, final 
PipelineJobItemManager<?> jobItemManager, final String jobId, final int 
shardingItem, final Exception ex) {
         log.error("job execution failed, {}-{}", jobId, shardingItem, ex);
-        jobManager.updateJobItemErrorMessage(jobId, shardingItem, ex);
+        jobItemManager.updateErrorMessage(jobId, shardingItem, ex);
         try {
             jobManager.stop(jobId);
         } catch (final PipelineJobNotFoundException ignored) {
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
index 87e13a2ea09..143ba94c50b 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
@@ -24,7 +24,7 @@ import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemCon
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
-import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import 
org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 
@@ -130,7 +130,8 @@ public final class PipelineJobProgressPersistService {
             }
             persistContext.getHasNewEvents().set(false);
             long startTimeMillis = System.currentTimeMillis();
-            new 
PipelineJobManager(TypedSPILoader.getService(PipelineJobAPI.class, 
PipelineJobIdUtils.parseJobType(jobId).getType())).updateJobItemProgress(jobItemContext.get());
+            new 
PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobAPI.class, 
PipelineJobIdUtils.parseJobType(jobId).getType())
+                    
.getYamlJobItemProgressSwapper()).updateProgress(jobItemContext.get());
             persistContext.getBeforePersistingProgressMillis().set(null);
             if (6 == ThreadLocalRandom.current().nextInt(100)) {
                 log.info("persist, jobId={}, shardingItem={}, cost {} ms", 
jobId, shardingItem, System.currentTimeMillis() - startTimeMillis);
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobItemManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobItemManager.java
new file mode 100644
index 00000000000..9aa4a01cbb3
--- /dev/null
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobItemManager.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.job.service;
+
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
+import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
+import 
org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
+import 
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
+import 
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressSwapper;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+
+import java.util.Optional;
+
+/**
+ * Pipeline job manager.
+ * 
+ * @param <T> type of pipeline job item progress
+ */
+public final class PipelineJobItemManager<T extends PipelineJobItemProgress> {
+    
+    private final 
YamlPipelineJobItemProgressSwapper<YamlPipelineJobItemProgressConfiguration, T> 
swapper;
+    
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    public PipelineJobItemManager(final YamlPipelineJobItemProgressSwapper 
swapper) {
+        this.swapper = swapper;
+    }
+    
+    /**
+     * Update job item status.
+     *
+     * @param jobId job id
+     * @param shardingItem sharding item
+     * @param status status
+     */
+    public void updateStatus(final String jobId, final int shardingItem, final 
JobStatus status) {
+        Optional<T> jobItemProgress = getProgress(jobId, shardingItem);
+        if (!jobItemProgress.isPresent()) {
+            return;
+        }
+        jobItemProgress.get().setStatus(status);
+        PipelineAPIFactory.getGovernanceRepositoryAPI(
+                
PipelineJobIdUtils.parseContextKey(jobId)).updateJobItemProgress(jobId, 
shardingItem, 
YamlEngine.marshal(swapper.swapToYamlConfiguration(jobItemProgress.get())));
+    }
+    
+    /**
+     * Get job item progress.
+     *
+     * @param jobId job id
+     * @param shardingItem sharding item
+     * @return job item progress
+     */
+    public Optional<T> getProgress(final String jobId, final int shardingItem) 
{
+        return 
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemProgress(jobId,
 shardingItem)
+                .map(optional -> 
swapper.swapToObject(YamlEngine.unmarshal(optional, 
swapper.getYamlProgressClass(), true)));
+    }
+    
+    /**
+     * Persist job item progress.
+     *
+     * @param jobItemContext job item context
+     */
+    public void persistProgress(final PipelineJobItemContext jobItemContext) {
+        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobItemContext.getJobId()))
+                .persistJobItemProgress(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), convertProgressYamlContent(jobItemContext));
+    }
+    
+    /**
+     * Update job item progress.
+     *
+     * @param jobItemContext job item context
+     */
+    public void updateProgress(final PipelineJobItemContext jobItemContext) {
+        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobItemContext.getJobId()))
+                .updateJobItemProgress(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), convertProgressYamlContent(jobItemContext));
+    }
+    
+    @SuppressWarnings("unchecked")
+    private String convertProgressYamlContent(final PipelineJobItemContext 
jobItemContext) {
+        return YamlEngine.marshal(swapper.swapToYamlConfiguration((T) 
jobItemContext.toProgress()));
+    }
+    
+    /**
+     * Get job item error message.
+     *
+     * @param jobId job id
+     * @param shardingItem sharding item
+     * @return map, key is sharding item, value is error message
+     */
+    public String getErrorMessage(final String jobId, final int shardingItem) {
+        return 
Optional.ofNullable(PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemErrorMessage(jobId,
 shardingItem)).orElse("");
+    }
+    
+    /**
+     * Update job item error message.
+     *
+     * @param jobId job id
+     * @param shardingItem sharding item
+     * @param error error
+     */
+    public void updateErrorMessage(final String jobId, final int shardingItem, 
final Object error) {
+        String key = PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, 
shardingItem);
+        String value = "";
+        if (null != error) {
+            value = error instanceof Throwable ? 
ExceptionUtils.getStackTrace((Throwable) error) : error.toString();
+        }
+        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).update(key,
 value);
+    }
+    
+    /**
+     * Clean job item error message.
+     *
+     * @param jobId job id
+     * @param shardingItem sharding item
+     */
+    public void cleanErrorMessage(final String jobId, final int shardingItem) {
+        String key = PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, 
shardingItem);
+        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persist(key,
 "");
+    }
+}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
index 4ffeaafbee9..eccbdd80227 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
@@ -19,10 +19,8 @@ package 
org.apache.shardingsphere.data.pipeline.core.job.service;
 
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.exception.ExceptionUtils;
 import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
-import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
 import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
@@ -32,8 +30,6 @@ import 
org.apache.shardingsphere.data.pipeline.common.util.PipelineDistributedBa
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationWithInvalidShardingCountException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobHasAlreadyStartedException;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
-import 
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressSwapper;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
@@ -95,7 +91,7 @@ public final class PipelineJobManager {
      */
     public void startDisabledJob(final String jobId) {
         if (jobAPI.isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished()) {
-            Optional<? extends PipelineJobItemProgress> jobItemProgress = 
getJobItemProgress(jobId, 0);
+            Optional<? extends PipelineJobItemProgress> jobItemProgress = new 
PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper()).getProgress(jobId,
 0);
             if (jobItemProgress.isPresent() && JobStatus.FINISHED == 
jobItemProgress.get().getStatus()) {
                 log.info("job status is FINISHED, ignore, jobId={}", jobId);
                 return;
@@ -197,98 +193,4 @@ public final class PipelineJobManager {
         }
         return Collections.emptyList();
     }
-    
-    /**
-     * Get job item progress.
-     *
-     * @param jobId job id
-     * @param shardingItem sharding item
-     * @param <T> type of pipeline job item progress
-     * @return job item progress, may be null
-     */
-    public <T extends PipelineJobItemProgress> Optional<T> 
getJobItemProgress(final String jobId, final int shardingItem) {
-        
YamlPipelineJobItemProgressSwapper<YamlPipelineJobItemProgressConfiguration, T> 
swapper = jobAPI.getYamlJobItemProgressSwapper();
-        Optional<String> progress = 
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemProgress(jobId,
 shardingItem);
-        return progress.map(optional -> 
swapper.swapToObject(YamlEngine.unmarshal(optional, 
swapper.getYamlProgressClass(), true)));
-    }
-    
-    /**
-     * Persist job item progress.
-     *
-     * @param jobItemContext job item context
-     */
-    public void persistJobItemProgress(final PipelineJobItemContext 
jobItemContext) {
-        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobItemContext.getJobId()))
-                .persistJobItemProgress(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), convertJobItemProgress(jobItemContext));
-    }
-    
-    /**
-     * Update job item status.
-     *
-     * @param jobId job id
-     * @param shardingItem sharding item
-     * @param status status
-     */
-    public void updateJobItemStatus(final String jobId, final int 
shardingItem, final JobStatus status) {
-        Optional<PipelineJobItemProgress> jobItemProgress = 
getJobItemProgress(jobId, shardingItem);
-        if (!jobItemProgress.isPresent()) {
-            log.warn("updateJobItemStatus, jobProgress is null, jobId={}, 
shardingItem={}", jobId, shardingItem);
-            return;
-        }
-        jobItemProgress.get().setStatus(status);
-        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).updateJobItemProgress(jobId,
 shardingItem,
-                
YamlEngine.marshal(jobAPI.getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress.get())));
-    }
-    
-    /**
-     * Update job item progress.
-     *
-     * @param jobItemContext job item context
-     */
-    public void updateJobItemProgress(final PipelineJobItemContext 
jobItemContext) {
-        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobItemContext.getJobId()))
-                .updateJobItemProgress(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), convertJobItemProgress(jobItemContext));
-    }
-    
-    private String convertJobItemProgress(final PipelineJobItemContext 
jobItemContext) {
-        return 
YamlEngine.marshal(jobAPI.getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemContext.toProgress()));
-    }
-    
-    /**
-     * Get job item error message.
-     *
-     * @param jobId job id
-     * @param shardingItem sharding item
-     * @return map, key is sharding item, value is error message
-     */
-    public String getJobItemErrorMessage(final String jobId, final int 
shardingItem) {
-        return 
Optional.ofNullable(PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemErrorMessage(jobId,
 shardingItem)).orElse("");
-    }
-    
-    /**
-     * Update job item error message.
-     *
-     * @param jobId job id
-     * @param shardingItem sharding item
-     * @param error error
-     */
-    public void updateJobItemErrorMessage(final String jobId, final int 
shardingItem, final Object error) {
-        String key = PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, 
shardingItem);
-        String value = "";
-        if (null != error) {
-            value = error instanceof Throwable ? 
ExceptionUtils.getStackTrace((Throwable) error) : error.toString();
-        }
-        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).update(key,
 value);
-    }
-    
-    /**
-     * Clean job item error message.
-     *
-     * @param jobId job id
-     * @param shardingItem sharding item
-     */
-    public void cleanJobItemErrorMessage(final String jobId, final int 
shardingItem) {
-        String key = PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, 
shardingItem);
-        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persist(key,
 "");
-    }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
index f0c85629db7..c3d0b1d689a 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -35,6 +35,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.Table
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
@@ -76,11 +77,11 @@ public abstract class 
AbstractInventoryIncrementalJobAPIImpl implements Inventor
     
     @Override
     public Map<Integer, InventoryIncrementalJobItemProgress> 
getJobProgress(final PipelineJobConfiguration jobConfig) {
-        PipelineJobManager jobManager = new PipelineJobManager(this);
+        PipelineJobItemManager<InventoryIncrementalJobItemProgress> 
jobItemManager = new PipelineJobItemManager<>(getYamlJobItemProgressSwapper());
         String jobId = jobConfig.getJobId();
         JobConfigurationPOJO jobConfigPOJO = 
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
         return IntStream.range(0, 
jobConfig.getJobShardingCount()).boxed().collect(LinkedHashMap::new, (map, 
each) -> {
-            Optional<InventoryIncrementalJobItemProgress> jobItemProgress = 
jobManager.getJobItemProgress(jobId, each);
+            Optional<InventoryIncrementalJobItemProgress> jobItemProgress = 
jobItemManager.getProgress(jobId, each);
             jobItemProgress.ifPresent(optional -> 
optional.setActive(!jobConfigPOJO.isDisabled()));
             map.put(each, jobItemProgress.orElse(null));
         }, LinkedHashMap::putAll);
@@ -88,9 +89,10 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl 
implements Inventor
     
     @Override
     public List<InventoryIncrementalJobItemInfo> getJobItemInfos(final String 
jobId) {
-        PipelineJobManager pipelineJobManager = new PipelineJobManager(this);
+        PipelineJobManager jobManager = new PipelineJobManager(this);
+        PipelineJobItemManager<InventoryIncrementalJobItemProgress> 
jobItemManager = new PipelineJobItemManager<>(getYamlJobItemProgressSwapper());
         JobConfigurationPOJO jobConfigPOJO = 
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
-        PipelineJobConfiguration jobConfig = 
pipelineJobManager.getJobConfiguration(jobConfigPOJO);
+        PipelineJobConfiguration jobConfig = 
jobManager.getJobConfiguration(jobConfigPOJO);
         long startTimeMillis = 
Long.parseLong(Optional.ofNullable(jobConfigPOJO.getProps().getProperty("start_time_millis")).orElse("0"));
         Map<Integer, InventoryIncrementalJobItemProgress> jobProgress = 
getJobProgress(jobConfig);
         List<InventoryIncrementalJobItemInfo> result = new LinkedList<>();
@@ -98,7 +100,7 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl 
implements Inventor
             int shardingItem = entry.getKey();
             TableBasedPipelineJobInfo jobInfo = (TableBasedPipelineJobInfo) 
getJobInfo(jobId);
             InventoryIncrementalJobItemProgress jobItemProgress = 
entry.getValue();
-            String errorMessage = 
pipelineJobManager.getJobItemErrorMessage(jobId, shardingItem);
+            String errorMessage = jobItemManager.getErrorMessage(jobId, 
shardingItem);
             if (null == jobItemProgress) {
                 result.add(new InventoryIncrementalJobItemInfo(shardingItem, 
jobInfo.getTable(), null, startTimeMillis, 0, errorMessage));
                 continue;
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
index 4af5b5f3b27..3935bec8106 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
@@ -25,15 +25,17 @@ import 
org.apache.shardingsphere.data.pipeline.common.execute.ExecuteCallback;
 import org.apache.shardingsphere.data.pipeline.common.execute.ExecuteEngine;
 import 
org.apache.shardingsphere.data.pipeline.common.ingest.position.FinishedPosition;
 import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
+import 
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
-import 
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
-import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
-import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
 import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
+import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
 
 import java.util.Collection;
 import java.util.LinkedList;
@@ -57,12 +59,15 @@ public class InventoryIncrementalTasksRunner implements 
PipelineTasksRunner {
     
     private final PipelineJobManager jobManager;
     
+    private final PipelineJobItemManager<InventoryIncrementalJobItemProgress> 
jobItemManager;
+    
     public InventoryIncrementalTasksRunner(final 
InventoryIncrementalJobItemContext jobItemContext) {
         this.jobItemContext = jobItemContext;
         inventoryTasks = jobItemContext.getInventoryTasks();
         incrementalTasks = jobItemContext.getIncrementalTasks();
         jobAPI = TypedSPILoader.getService(PipelineJobAPI.class, 
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType());
         jobManager = new PipelineJobManager(jobAPI);
+        jobItemManager = new 
PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper());
     }
     
     @Override
@@ -83,7 +88,8 @@ public class InventoryIncrementalTasksRunner implements 
PipelineTasksRunner {
         if (jobItemContext.isStopping()) {
             return;
         }
-        new PipelineJobManager(TypedSPILoader.getService(PipelineJobAPI.class, 
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType())).persistJobItemProgress(jobItemContext);
+        new 
PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobAPI.class, 
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType())
+                
.getYamlJobItemProgressSwapper()).persistProgress(jobItemContext);
         if 
(PipelineJobProgressDetector.isAllInventoryTasksFinished(inventoryTasks)) {
             log.info("All inventory tasks finished.");
             executeIncrementalTask();
@@ -106,7 +112,7 @@ public class InventoryIncrementalTasksRunner implements 
PipelineTasksRunner {
     
     private void updateLocalAndRemoteJobItemStatus(final JobStatus jobStatus) {
         jobItemContext.setStatus(jobStatus);
-        jobManager.updateJobItemStatus(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), jobStatus);
+        jobItemManager.updateStatus(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), jobStatus);
     }
     
     private synchronized void executeIncrementalTask() {
@@ -146,7 +152,7 @@ public class InventoryIncrementalTasksRunner implements 
PipelineTasksRunner {
     protected void inventoryFailureCallback(final Throwable throwable) {
         log.error("onFailure, inventory task execute failed.", throwable);
         String jobId = jobItemContext.getJobId();
-        jobManager.updateJobItemErrorMessage(jobId, 
jobItemContext.getShardingItem(), throwable);
+        jobItemManager.updateErrorMessage(jobId, 
jobItemContext.getShardingItem(), throwable);
         try {
             jobManager.stop(jobId);
         } catch (final PipelineJobNotFoundException ignored) {
@@ -181,7 +187,7 @@ public class InventoryIncrementalTasksRunner implements 
PipelineTasksRunner {
         public void onFailure(final Throwable throwable) {
             log.error("onFailure, incremental task execute failed.", 
throwable);
             String jobId = jobItemContext.getJobId();
-            jobManager.updateJobItemErrorMessage(jobId, 
jobItemContext.getShardingItem(), throwable);
+            jobItemManager.updateErrorMessage(jobId, 
jobItemContext.getShardingItem(), throwable);
             try {
                 jobManager.stop(jobId);
             } catch (final PipelineJobNotFoundException ignored) {
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index e4aec40e5cb..9f90858e590 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -68,6 +68,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.impl.AbstractInventoryIncrementalJobAPIImpl;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparerUtils;
@@ -168,10 +169,10 @@ public final class CDCJobAPI extends 
AbstractInventoryIncrementalJobAPIImpl {
     
     private void initIncrementalPosition(final CDCJobConfiguration jobConfig) {
         String jobId = jobConfig.getJobId();
-        PipelineJobManager jobManager = new PipelineJobManager(this);
+        PipelineJobItemManager<InventoryIncrementalJobItemProgress> 
jobItemManager = new PipelineJobItemManager<>(getYamlJobItemProgressSwapper());
         try (PipelineDataSourceManager pipelineDataSourceManager = new 
DefaultPipelineDataSourceManager()) {
             for (int i = 0; i < jobConfig.getJobShardingCount(); i++) {
-                if (jobManager.getJobItemProgress(jobId, i).isPresent()) {
+                if (jobItemManager.getProgress(jobId, i).isPresent()) {
                     continue;
                 }
                 IncrementalDumperContext dumperContext = 
buildDumperContext(jobConfig, i, new 
TableAndSchemaNameMapper(jobConfig.getSchemaTableNames()));
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
index 14cdb71513a..0f6b66fa7c1 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
@@ -41,7 +41,7 @@ import 
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncr
 import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
 import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
-import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
@@ -65,7 +65,7 @@ public final class CDCJob extends AbstractPipelineJob 
implements SimpleJob {
     
     private final CDCJobAPI jobAPI = new CDCJobAPI();
     
-    private final PipelineJobManager jobManager = new 
PipelineJobManager(jobAPI);
+    private final PipelineJobItemManager<InventoryIncrementalJobItemProgress> 
jobItemManager = new 
PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper());
     
     private final CDCJobPreparer jobPreparer = new CDCJobPreparer();
     
@@ -93,7 +93,7 @@ public final class CDCJob extends AbstractPipelineJob 
implements SimpleJob {
                 continue;
             }
             jobItemContexts.add(jobItemContext);
-            jobManager.cleanJobItemErrorMessage(jobId, shardingItem);
+            jobItemManager.cleanErrorMessage(jobId, shardingItem);
             log.info("start tasks runner, jobId={}, shardingItem={}", jobId, 
shardingItem);
         }
         if (jobItemContexts.isEmpty()) {
@@ -106,7 +106,7 @@ public final class CDCJob extends AbstractPipelineJob 
implements SimpleJob {
     }
     
     private CDCJobItemContext buildPipelineJobItemContext(final 
CDCJobConfiguration jobConfig, final int shardingItem) {
-        Optional<InventoryIncrementalJobItemProgress> initProgress = 
jobManager.getJobItemProgress(jobConfig.getJobId(), shardingItem);
+        Optional<InventoryIncrementalJobItemProgress> initProgress = 
jobItemManager.getProgress(jobConfig.getJobId(), shardingItem);
         CDCProcessContext jobProcessContext = 
jobAPI.buildPipelineProcessContext(jobConfig);
         CDCTaskConfiguration taskConfig = 
jobAPI.buildTaskConfiguration(jobConfig, shardingItem, 
jobProcessContext.getPipelineProcessConfig());
         return new CDCJobItemContext(jobConfig, shardingItem, 
initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager, 
sink);
@@ -127,7 +127,7 @@ public final class CDCJob extends AbstractPipelineJob 
implements SimpleJob {
     
     private void processFailed(final String jobId, final int shardingItem, 
final Exception ex) {
         log.error("job execution failed, {}-{}", jobId, shardingItem, ex);
-        jobManager.updateJobItemErrorMessage(jobId, shardingItem, ex);
+        jobItemManager.updateErrorMessage(jobId, shardingItem, ex);
         PipelineJobCenter.stop(jobId);
         jobAPI.updateJobConfigurationDisabled(jobId, true);
     }
@@ -151,7 +151,7 @@ public final class CDCJob extends AbstractPipelineJob 
implements SimpleJob {
     
     private void updateLocalAndRemoteJobItemStatus(final 
PipelineJobItemContext jobItemContext, final JobStatus jobStatus) {
         jobItemContext.setStatus(jobStatus);
-        jobManager.updateJobItemStatus(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), jobStatus);
+        jobItemManager.updateStatus(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), jobStatus);
     }
     
     private void executeIncrementalTasks(final List<CDCJobItemContext> 
jobItemContexts) {
@@ -204,7 +204,7 @@ public final class CDCJob extends AbstractPipelineJob 
implements SimpleJob {
         public void onFailure(final Throwable throwable) {
             log.error("onFailure, {} task execute failed.", identifier, 
throwable);
             String jobId = jobItemContext.getJobId();
-            jobManager.updateJobItemErrorMessage(jobId, 
jobItemContext.getShardingItem(), throwable);
+            jobItemManager.updateErrorMessage(jobId, 
jobItemContext.getShardingItem(), throwable);
             if (jobItemContext.getSink() instanceof CDCSocketSink) {
                 CDCSocketSink cdcSink = (CDCSocketSink) 
jobItemContext.getSink();
                 cdcSink.getChannel().writeAndFlush(CDCResponseUtils.failed("", 
"", throwable.getMessage()));
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index 31f50757bbe..b39a0647458 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -18,11 +18,6 @@
 package org.apache.shardingsphere.data.pipeline.cdc.core.prepare;
 
 import lombok.extern.slf4j.Slf4j;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.InventoryDumperContext;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
 import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
 import org.apache.shardingsphere.data.pipeline.cdc.context.CDCJobItemContext;
@@ -35,18 +30,23 @@ import 
org.apache.shardingsphere.data.pipeline.common.config.ImporterConfigurati
 import 
org.apache.shardingsphere.data.pipeline.common.ingest.position.FinishedPosition;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.JobItemIncrementalTasksProgress;
+import 
org.apache.shardingsphere.data.pipeline.common.spi.ingest.dumper.IncrementalDumperCreator;
 import 
org.apache.shardingsphere.data.pipeline.common.task.progress.IncrementalTaskProgress;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumper;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
 import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
 import org.apache.shardingsphere.data.pipeline.core.importer.ImporterType;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumper;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.InventoryDumperContext;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
-import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.InventoryTaskSplitter;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparerUtils;
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
-import 
org.apache.shardingsphere.data.pipeline.common.spi.ingest.dumper.IncrementalDumperCreator;
 import 
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import 
org.apache.shardingsphere.infra.database.opengauss.type.OpenGaussDatabaseType;
@@ -69,7 +69,7 @@ public final class CDCJobPreparer {
     
     private final CDCJobAPI jobAPI = new CDCJobAPI();
     
-    private final PipelineJobManager jobManager = new 
PipelineJobManager(jobAPI);
+    private final PipelineJobItemManager<InventoryIncrementalJobItemProgress> 
jobItemManager = new 
PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper());
     
     /**
      * Do prepare work.
@@ -89,9 +89,9 @@ public final class CDCJobPreparer {
     
     private void initTasks0(final CDCJobItemContext jobItemContext, final 
AtomicBoolean inventoryImporterUsed, final List<CDCChannelProgressPair> 
inventoryChannelProgressPairs,
                             final AtomicBoolean incrementalImporterUsed, final 
List<CDCChannelProgressPair> incrementalChannelProgressPairs) {
-        Optional<InventoryIncrementalJobItemProgress> jobItemProgress = 
jobManager.getJobItemProgress(jobItemContext.getJobId(), 
jobItemContext.getShardingItem());
+        Optional<InventoryIncrementalJobItemProgress> jobItemProgress = 
jobItemManager.getProgress(jobItemContext.getJobId(), 
jobItemContext.getShardingItem());
         if (!jobItemProgress.isPresent()) {
-            jobManager.persistJobItemProgress(jobItemContext);
+            jobItemManager.persistProgress(jobItemContext);
         }
         if (jobItemContext.isStopping()) {
             PipelineJobCenter.stop(jobItemContext.getJobId());
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
index 913fb82ed29..8932efa6613 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
@@ -22,9 +22,8 @@ import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemCon
 import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.ConsistencyCheckJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.AbstractSimplePipelineJob;
-import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
-import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl.ConsistencyCheckJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.yaml.YamlConsistencyCheckJobConfigurationSwapper;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.context.ConsistencyCheckJobItemContext;
@@ -46,9 +45,8 @@ public final class ConsistencyCheckJob extends 
AbstractSimplePipelineJob {
     @Override
     public ConsistencyCheckJobItemContext buildPipelineJobItemContext(final 
ShardingContext shardingContext) {
         ConsistencyCheckJobConfiguration jobConfig = new 
YamlConsistencyCheckJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
-        ConsistencyCheckJobAPI jobAPI = (ConsistencyCheckJobAPI) getJobAPI();
-        PipelineJobManager jobManager = new PipelineJobManager(jobAPI);
-        Optional<ConsistencyCheckJobItemProgress> jobItemProgress = 
jobManager.getJobItemProgress(jobConfig.getJobId(), 
shardingContext.getShardingItem());
+        PipelineJobItemManager<ConsistencyCheckJobItemProgress> jobItemManager 
= new PipelineJobItemManager<>(getJobAPI().getYamlJobItemProgressSwapper());
+        Optional<ConsistencyCheckJobItemProgress> jobItemProgress = 
jobItemManager.getProgress(jobConfig.getJobId(), 
shardingContext.getShardingItem());
         return new ConsistencyCheckJobItemContext(jobConfig, 
shardingContext.getShardingItem(), JobStatus.RUNNING, 
jobItemProgress.orElse(null));
     }
     
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index a60fcb44d5e..ff0e63ce7af 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -34,6 +34,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJob;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobId;
@@ -82,8 +83,8 @@ public final class ConsistencyCheckJobAPI implements 
PipelineJobAPI {
         GovernanceRepositoryAPI repositoryAPI = 
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(parentJobId));
         Optional<String> latestCheckJobId = 
repositoryAPI.getLatestCheckJobId(parentJobId);
         if (latestCheckJobId.isPresent()) {
-            PipelineJobManager jobManager = new PipelineJobManager(this);
-            Optional<ConsistencyCheckJobItemProgress> progress = 
jobManager.getJobItemProgress(latestCheckJobId.get(), 0);
+            PipelineJobItemManager<ConsistencyCheckJobItemProgress> 
jobItemManager = new PipelineJobItemManager<>(getYamlJobItemProgressSwapper());
+            Optional<ConsistencyCheckJobItemProgress> progress = 
jobItemManager.getProgress(latestCheckJobId.get(), 0);
             if (!progress.isPresent() || JobStatus.FINISHED != 
progress.get().getStatus()) {
                 log.info("check job already exists and status is not FINISHED, 
progress={}", progress);
                 throw new 
UncompletedConsistencyCheckJobExistsException(latestCheckJobId.get());
@@ -174,8 +175,8 @@ public final class ConsistencyCheckJobAPI implements 
PipelineJobAPI {
         Optional<String> latestCheckJobId = 
governanceRepositoryAPI.getLatestCheckJobId(parentJobId);
         ShardingSpherePreconditions.checkState(latestCheckJobId.isPresent(), 
() -> new ConsistencyCheckJobNotFoundException(parentJobId));
         String checkJobId = latestCheckJobId.get();
-        PipelineJobManager jobManager = new PipelineJobManager(this);
-        Optional<ConsistencyCheckJobItemProgress> progress = 
jobManager.getJobItemProgress(checkJobId, 0);
+        PipelineJobItemManager<ConsistencyCheckJobItemProgress> jobItemManager 
= new PipelineJobItemManager<>(getYamlJobItemProgressSwapper());
+        Optional<ConsistencyCheckJobItemProgress> progress = 
jobItemManager.getProgress(checkJobId, 0);
         if (!progress.isPresent()) {
             return Collections.emptyList();
         }
@@ -215,8 +216,8 @@ public final class ConsistencyCheckJobAPI implements 
PipelineJobAPI {
         Optional<String> latestCheckJobId = 
governanceRepositoryAPI.getLatestCheckJobId(parentJobId);
         ShardingSpherePreconditions.checkState(latestCheckJobId.isPresent(), 
() -> new ConsistencyCheckJobNotFoundException(parentJobId));
         String checkJobId = latestCheckJobId.get();
-        PipelineJobManager jobManager = new PipelineJobManager(this);
-        Optional<ConsistencyCheckJobItemProgress> progress = 
jobManager.getJobItemProgress(checkJobId, 0);
+        PipelineJobItemManager<ConsistencyCheckJobItemProgress> jobItemManager 
= new PipelineJobItemManager<>(getYamlJobItemProgressSwapper());
+        Optional<ConsistencyCheckJobItemProgress> progress = 
jobItemManager.getProgress(checkJobId, 0);
         ConsistencyCheckJobItemInfo result = new ConsistencyCheckJobItemInfo();
         JobConfigurationPOJO jobConfigPOJO = 
PipelineJobIdUtils.getElasticJobConfigurationPOJO(checkJobId);
         result.setActive(!jobConfigPOJO.isDisabled());
@@ -232,7 +233,7 @@ public final class ConsistencyCheckJobAPI implements 
PipelineJobAPI {
         fillInJobItemInfoWithTimes(result, jobItemProgress, jobConfigPOJO);
         
result.setTableNames(Optional.ofNullable(jobItemProgress.getTableNames()).orElse(""));
         fillInJobItemInfoWithCheckAlgorithm(result, checkJobId);
-        result.setErrorMessage(new 
PipelineJobManager(this).getJobItemErrorMessage(checkJobId, 0));
+        result.setErrorMessage(new 
PipelineJobItemManager<>(getYamlJobItemProgressSwapper()).getErrorMessage(checkJobId,
 0));
         Map<String, TableDataConsistencyCheckResult> checkJobResult = 
governanceRepositoryAPI.getCheckJobResult(parentJobId, checkJobId);
         fillInJobItemInfoWithCheckResult(result, checkJobResult, parentJobId);
         
result.setCheckFailedTableNames(checkJobResult.entrySet().stream().filter(each 
-> !each.getValue().isIgnored() && !each.getValue().isMatched())
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
index 37947db77b8..8259022cf89 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
@@ -19,12 +19,13 @@ package 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.task;
 
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
-import 
org.apache.shardingsphere.data.pipeline.common.execute.AbstractPipelineLifecycleRunnable;
-import 
org.apache.shardingsphere.data.pipeline.common.execute.PipelineLifecycleRunnable;
 import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.common.execute.AbstractPipelineLifecycleRunnable;
 import org.apache.shardingsphere.data.pipeline.common.execute.ExecuteCallback;
 import org.apache.shardingsphere.data.pipeline.common.execute.ExecuteEngine;
+import 
org.apache.shardingsphere.data.pipeline.common.execute.PipelineLifecycleRunnable;
 import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
+import 
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
 import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
@@ -32,6 +33,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl.ConsistencyCheckJobAPI;
@@ -54,6 +56,8 @@ public final class ConsistencyCheckTasksRunner implements 
PipelineTasksRunner {
     
     private final PipelineJobManager jobManager = new 
PipelineJobManager(jobAPI);
     
+    private final PipelineJobItemManager<InventoryIncrementalJobItemProgress> 
jobItemManager = new 
PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper());
+    
     @Getter
     private final ConsistencyCheckJobItemContext jobItemContext;
     
@@ -80,7 +84,8 @@ public final class ConsistencyCheckTasksRunner implements 
PipelineTasksRunner {
         if (jobItemContext.isStopping()) {
             return;
         }
-        new PipelineJobManager(TypedSPILoader.getService(PipelineJobAPI.class, 
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType())).persistJobItemProgress(jobItemContext);
+        new 
PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobAPI.class, 
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType())
+                
.getYamlJobItemProgressSwapper()).persistProgress(jobItemContext);
         CompletableFuture<?> future = 
jobItemContext.getProcessContext().getConsistencyCheckExecuteEngine().submit(checkExecutor);
         ExecuteEngine.trigger(Collections.singletonList(future), new 
CheckExecuteCallback());
     }
@@ -95,7 +100,7 @@ public final class ConsistencyCheckTasksRunner implements 
PipelineTasksRunner {
         
         @Override
         protected void runBlocking() {
-            jobManager.persistJobItemProgress(jobItemContext);
+            jobItemManager.persistProgress(jobItemContext);
             JobType jobType = PipelineJobIdUtils.parseJobType(parentJobId);
             InventoryIncrementalJobAPI jobAPI = (InventoryIncrementalJobAPI) 
TypedSPILoader.getService(PipelineJobAPI.class, jobType.getType());
             PipelineJobConfiguration parentJobConfig = new 
PipelineJobManager(jobAPI).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(parentJobId));
@@ -133,7 +138,7 @@ public final class ConsistencyCheckTasksRunner implements 
PipelineTasksRunner {
             }
             log.info("onSuccess, check job id: {}, parent job id: {}", 
checkJobId, parentJobId);
             jobItemContext.setStatus(JobStatus.FINISHED);
-            jobManager.persistJobItemProgress(jobItemContext);
+            jobItemManager.persistProgress(jobItemContext);
             jobManager.stop(checkJobId);
         }
         
@@ -146,7 +151,7 @@ public final class ConsistencyCheckTasksRunner implements 
PipelineTasksRunner {
                 return;
             }
             log.info("onFailure, check job id: {}, parent job id: {}", 
checkJobId, parentJobId, throwable);
-            jobManager.updateJobItemErrorMessage(checkJobId, 0, throwable);
+            jobItemManager.updateErrorMessage(checkJobId, 0, throwable);
             jobManager.stop(checkJobId);
         }
     }
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index 136842f0541..439cc36ad81 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -24,7 +24,7 @@ import 
org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipeline
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.AbstractSimplePipelineJob;
-import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.InventoryIncrementalTasksRunner;
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
@@ -47,7 +47,7 @@ public final class MigrationJob extends 
AbstractSimplePipelineJob {
     
     private final MigrationJobAPI jobAPI = new MigrationJobAPI();
     
-    private final PipelineJobManager jobManager = new 
PipelineJobManager(jobAPI);
+    private final PipelineJobItemManager<InventoryIncrementalJobItemProgress> 
jobItemManager = new 
PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper());
     
     private final PipelineDataSourceManager dataSourceManager = new 
DefaultPipelineDataSourceManager();
     
@@ -62,7 +62,7 @@ public final class MigrationJob extends 
AbstractSimplePipelineJob {
     protected InventoryIncrementalJobItemContext 
buildPipelineJobItemContext(final ShardingContext shardingContext) {
         int shardingItem = shardingContext.getShardingItem();
         MigrationJobConfiguration jobConfig = new 
YamlMigrationJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
-        Optional<InventoryIncrementalJobItemProgress> initProgress = 
jobManager.getJobItemProgress(shardingContext.getJobName(), shardingItem);
+        Optional<InventoryIncrementalJobItemProgress> initProgress = 
jobItemManager.getProgress(shardingContext.getJobName(), shardingItem);
         MigrationProcessContext jobProcessContext = 
jobAPI.buildPipelineProcessContext(jobConfig);
         MigrationTaskConfiguration taskConfig = 
jobAPI.buildTaskConfiguration(jobConfig, shardingItem, 
jobProcessContext.getPipelineProcessConfig());
         return new MigrationJobItemContext(jobConfig, shardingItem, 
initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager);
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
index 50ce83843f1..2857e63f58e 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
@@ -45,7 +45,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.Increm
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.InventoryDumperContext;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
-import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.InventoryTaskSplitter;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparerUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.PrepareTargetSchemasParameter;
@@ -82,7 +82,7 @@ public final class MigrationJobPreparer {
     
     private final MigrationJobAPI jobAPI = new MigrationJobAPI();
     
-    private final PipelineJobManager jobManager = new 
PipelineJobManager(jobAPI);
+    private final PipelineJobItemManager<InventoryIncrementalJobItemProgress> 
jobItemManager = new 
PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper());
     
     /**
      * Do prepare work.
@@ -125,8 +125,8 @@ public final class MigrationJobPreparer {
         MigrationJobConfiguration jobConfig = jobItemContext.getJobConfig();
         String jobId = jobConfig.getJobId();
         LockContext lockContext = 
PipelineContextManager.getContext(PipelineJobIdUtils.parseContextKey(jobId)).getContextManager().getInstanceContext().getLockContext();
-        if (!jobManager.getJobItemProgress(jobId, 
jobItemContext.getShardingItem()).isPresent()) {
-            jobManager.persistJobItemProgress(jobItemContext);
+        if (!jobItemManager.getProgress(jobId, 
jobItemContext.getShardingItem()).isPresent()) {
+            jobItemManager.persistProgress(jobItemContext);
         }
         LockDefinition lockDefinition = new 
GlobalLockDefinition(String.format(GlobalLockNames.PREPARE.getLockName(), 
jobConfig.getJobId()));
         long startTimeMillis = System.currentTimeMillis();
@@ -136,7 +136,7 @@ public final class MigrationJobPreparer {
                 JobOffsetInfo offsetInfo = jobAPI.getJobOffsetInfo(jobId);
                 if (!offsetInfo.isTargetSchemaTableCreated()) {
                     jobItemContext.setStatus(JobStatus.PREPARING);
-                    jobManager.updateJobItemStatus(jobId, 
jobItemContext.getShardingItem(), JobStatus.PREPARING);
+                    jobItemManager.updateStatus(jobId, 
jobItemContext.getShardingItem(), JobStatus.PREPARING);
                     prepareAndCheckTarget(jobItemContext);
                     jobAPI.persistJobOffsetInfo(jobId, new 
JobOffsetInfo(true));
                 }
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
index deeae3d64e5..04c35a541bb 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
@@ -18,10 +18,12 @@
 package 
org.apache.shardingsphere.test.it.data.pipeline.scenario.consistencycheck.api.impl;
 
 import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
+import 
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobId;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl.ConsistencyCheckJobAPI;
@@ -52,7 +54,7 @@ class ConsistencyCheckJobAPITest {
     
     private final ConsistencyCheckJobAPI jobAPI = new ConsistencyCheckJobAPI();
     
-    private final PipelineJobManager jobManager = new 
PipelineJobManager(jobAPI);
+    private final PipelineJobItemManager<InventoryIncrementalJobItemProgress> 
jobItemManager = new 
PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper());
     
     private final YamlMigrationJobConfigurationSwapper jobConfigSwapper = new 
YamlMigrationJobConfigurationSwapper();
     
@@ -88,7 +90,7 @@ class ConsistencyCheckJobAPITest {
                     parentJobConfig.getSourceDatabaseType(), 
parentJobConfig.getTargetDatabaseType()));
             ConsistencyCheckJobItemContext checkJobItemContext = new 
ConsistencyCheckJobItemContext(
                     new ConsistencyCheckJobConfiguration(checkJobId, 
parentJobId, null, null, TypedSPILoader.getService(DatabaseType.class, "H2")), 
0, JobStatus.FINISHED, null);
-            jobManager.persistJobItemProgress(checkJobItemContext);
+            jobItemManager.persistProgress(checkJobItemContext);
             Map<String, TableDataConsistencyCheckResult> 
dataConsistencyCheckResult = Collections.singletonMap("t_order", new 
TableDataConsistencyCheckResult(true));
             repositoryAPI.persistCheckJobResult(parentJobId, checkJobId, 
dataConsistencyCheckResult);
             Optional<String> latestCheckJobId = 
repositoryAPI.getLatestCheckJobId(parentJobId);
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
index 6201a6cb507..187448cd273 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
@@ -33,6 +33,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.Tabl
 import 
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
@@ -91,6 +92,8 @@ class MigrationJobAPITest {
     
     private static PipelineJobManager jobManager;
     
+    private static PipelineJobItemManager<InventoryIncrementalJobItemProgress> 
jobItemManager;
+    
     private static DatabaseType databaseType;
     
     @BeforeAll
@@ -98,6 +101,7 @@ class MigrationJobAPITest {
         PipelineContextUtils.mockModeConfigAndContextManager();
         jobAPI = new MigrationJobAPI();
         jobManager = new PipelineJobManager(jobAPI);
+        jobItemManager = new 
PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper());
         String jdbcUrl = 
"jdbc:h2:mem:test_ds_0;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL";
         databaseType = DatabaseTypeFactory.get(jdbcUrl);
         Map<String, Object> props = new HashMap<>();
@@ -212,8 +216,8 @@ class MigrationJobAPITest {
         Optional<String> jobId = jobManager.start(jobConfig);
         assertTrue(jobId.isPresent());
         MigrationJobItemContext jobItemContext = 
PipelineContextUtils.mockMigrationJobItemContext(jobConfig);
-        jobManager.persistJobItemProgress(jobItemContext);
-        jobManager.updateJobItemStatus(jobId.get(), 
jobItemContext.getShardingItem(), JobStatus.EXECUTE_INVENTORY_TASK);
+        jobItemManager.persistProgress(jobItemContext);
+        jobItemManager.updateStatus(jobId.get(), 
jobItemContext.getShardingItem(), JobStatus.EXECUTE_INVENTORY_TASK);
         Map<Integer, InventoryIncrementalJobItemProgress> progress = 
jobAPI.getJobProgress(jobConfig);
         for (Entry<Integer, InventoryIncrementalJobItemProgress> entry : 
progress.entrySet()) {
             assertThat(entry.getValue().getStatus(), 
is(JobStatus.EXECUTE_INVENTORY_TASK));
@@ -245,9 +249,9 @@ class MigrationJobAPITest {
     void assertRenewJobStatus() {
         final MigrationJobConfiguration jobConfig = 
JobConfigurationBuilder.createJobConfiguration();
         MigrationJobItemContext jobItemContext = 
PipelineContextUtils.mockMigrationJobItemContext(jobConfig);
-        jobManager.persistJobItemProgress(jobItemContext);
-        jobManager.updateJobItemStatus(jobConfig.getJobId(), 0, 
JobStatus.FINISHED);
-        Optional<InventoryIncrementalJobItemProgress> actual = 
jobManager.getJobItemProgress(jobItemContext.getJobId(), 
jobItemContext.getShardingItem());
+        jobItemManager.persistProgress(jobItemContext);
+        jobItemManager.updateStatus(jobConfig.getJobId(), 0, 
JobStatus.FINISHED);
+        Optional<InventoryIncrementalJobItemProgress> actual = 
jobItemManager.getProgress(jobItemContext.getJobId(), 
jobItemContext.getShardingItem());
         assertTrue(actual.isPresent());
         assertThat(actual.get().getStatus(), is(JobStatus.FINISHED));
     }

Reply via email to