This is an automated email from the ASF dual-hosted git repository.

sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new b15790a9e93 Merge AbstractPipelineJobAPIImpl and PipelineJobManager 
(#29047)
b15790a9e93 is described below

commit b15790a9e939a04dcf9654618371b6300ace7986
Author: Liang Zhang <[email protected]>
AuthorDate: Thu Nov 16 10:46:40 2023 +0800

    Merge AbstractPipelineJobAPIImpl and PipelineJobManager (#29047)
    
    * Move PipelineJobAPI.start() to PipelineJobManager
    
    * Move PipelineJobAPI.dropJob() to PipelineJobManager
    
    * Move PipelineJobAPI.getJobItemErrorMessage() to PipelineJobManager
    
    * Move PipelineJobAPI.updateJobItemErrorMessage() to PipelineJobManager
    
    * Move PipelineJobAPI.stop() to PipelineJobManager
    
    * Merge AbstractPipelineJobAPIImpl and PipelineJobManager
---
 .../core/job/AbstractSimplePipelineJob.java        |  16 +-
 .../pipeline/core/job/service/PipelineJobAPI.java  |  69 +++-----
 .../core/job/service/PipelineJobManager.java       | 174 +++++++++++++++++++++
 .../AbstractInventoryIncrementalJobAPIImpl.java    |   6 +-
 .../service/impl/AbstractPipelineJobAPIImpl.java   | 124 ---------------
 .../runner/InventoryIncrementalTasksRunner.java    |  12 +-
 .../handler/update/StartMigrationUpdater.java      |   5 +-
 .../handler/update/StopMigrationUpdater.java       |   5 +-
 .../data/pipeline/cdc/api/impl/CDCJobAPI.java      |   3 +-
 .../data/pipeline/cdc/core/job/CDCJob.java         |   9 +-
 .../api/impl/ConsistencyCheckJobAPI.java           |  28 ++--
 .../task/ConsistencyCheckTasksRunner.java          |  11 +-
 .../migration/api/impl/MigrationJobAPI.java        |  42 ++---
 .../migration/api/impl/MigrationJobAPITest.java    |  26 +--
 14 files changed, 277 insertions(+), 253 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
index fae85d03ad3..197c09e2d49 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.data.pipeline.core.job;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
 import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
@@ -46,6 +47,7 @@ public abstract class AbstractSimplePipelineJob extends 
AbstractPipelineJob impl
     
     @Override
     public void execute(final ShardingContext shardingContext) {
+        PipelineJobManager jobManager = new PipelineJobManager(getJobAPI());
         String jobId = shardingContext.getJobName();
         int shardingItem = shardingContext.getShardingItem();
         log.info("Execute job {}-{}", jobId, shardingItem);
@@ -55,33 +57,33 @@ public abstract class AbstractSimplePipelineJob extends 
AbstractPipelineJob impl
         }
         try {
             PipelineJobItemContext jobItemContext = 
buildPipelineJobItemContext(shardingContext);
-            execute0(jobItemContext);
+            execute0(jobManager, jobItemContext);
             // CHECKSTYLE:OFF
         } catch (final RuntimeException ex) {
             // CHECKSTYLE:ON
-            processFailed(jobId, shardingItem, ex);
+            processFailed(jobManager, jobId, shardingItem, ex);
             throw ex;
         }
     }
     
-    private void execute0(final PipelineJobItemContext jobItemContext) {
+    private void execute0(final PipelineJobManager jobManager, final 
PipelineJobItemContext jobItemContext) {
         String jobId = jobItemContext.getJobId();
         int shardingItem = jobItemContext.getShardingItem();
         PipelineTasksRunner tasksRunner = 
buildPipelineTasksRunner(jobItemContext);
         if (!addTasksRunner(shardingItem, tasksRunner)) {
             return;
         }
-        getJobAPI().cleanJobItemErrorMessage(jobId, shardingItem);
+        jobManager.cleanJobItemErrorMessage(jobId, shardingItem);
         prepare(jobItemContext);
         log.info("start tasks runner, jobId={}, shardingItem={}", jobId, 
shardingItem);
         tasksRunner.start();
     }
     
-    private void processFailed(final String jobId, final int shardingItem, 
final Exception ex) {
+    private void processFailed(final PipelineJobManager jobManager, final 
String jobId, final int shardingItem, final Exception ex) {
         log.error("job execution failed, {}-{}", jobId, shardingItem, ex);
-        getJobAPI().updateJobItemErrorMessage(jobId, shardingItem, ex);
+        jobManager.updateJobItemErrorMessage(jobId, shardingItem, ex);
         try {
-            getJobAPI().stop(jobId);
+            jobManager.stop(jobId);
         } catch (final PipelineJobNotFoundException ignored) {
         }
     }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
index 44799e07817..134bc7e7e16 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
@@ -67,42 +67,47 @@ public interface PipelineJobAPI extends TypedSPI {
     PipelineProcessContext 
buildPipelineProcessContext(PipelineJobConfiguration pipelineJobConfig);
     
     /**
-     * Start job.
+     * Get job configuration.
      *
-     * @param jobConfig job configuration
-     * @return job id
+     * @param jobId job id
+     * @return job configuration
      */
-    Optional<String> start(PipelineJobConfiguration jobConfig);
+    PipelineJobConfiguration getJobConfiguration(String jobId);
     
     /**
-     * Start disabled job.
+     * Get job configuration.
      *
-     * @param jobId job id
+     * @param jobConfigPOJO job configuration POJO
+     * @return pipeline job configuration
      */
-    void startDisabledJob(String jobId);
+    PipelineJobConfiguration getJobConfiguration(JobConfigurationPOJO 
jobConfigPOJO);
     
     /**
-     * Stop pipeline job.
-     *
-     * @param jobId job id
+     * Whether to ignore to start disabled job when job item progress is 
finished.
+     * 
+     * @return ignore to start disabled job when job item progress is finished 
or not
      */
-    void stop(String jobId);
+    default boolean isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished() {
+        return false;
+    }
     
     /**
-     * Get job configuration.
+     * Get to be start disabled next job type.
      *
-     * @param jobId job id
-     * @return job configuration
+     * @return to be start disabled next job type
      */
-    PipelineJobConfiguration getJobConfiguration(String jobId);
+    default Optional<String> getToBeStartDisabledNextJobType() {
+        return Optional.empty();
+    }
     
     /**
-     * Get job configuration.
+     * Get to be stopped previous job type.
      *
-     * @param jobConfigPOJO job configuration POJO
-     * @return pipeline job configuration
+     * @return to be stopped previous job type
      */
-    PipelineJobConfiguration getJobConfiguration(JobConfigurationPOJO 
jobConfigPOJO);
+    default Optional<String> getToBeStoppedPreviousJobType() {
+        return Optional.empty();
+    }
     
     /**
      * Get pipeline job info.
@@ -144,32 +149,6 @@ public interface PipelineJobAPI extends TypedSPI {
      */
     void updateJobItemStatus(String jobId, int shardingItem, JobStatus status);
     
-    /**
-     * Get job item error message.
-     *
-     * @param jobId job id
-     * @param shardingItem sharding item
-     * @return map, key is sharding item, value is error message
-     */
-    String getJobItemErrorMessage(String jobId, int shardingItem);
-    
-    /**
-     * Update job item error message.
-     *
-     * @param jobId job id
-     * @param shardingItem sharding item
-     * @param error error
-     */
-    void updateJobItemErrorMessage(String jobId, int shardingItem, Object 
error);
-    
-    /**
-     * Clean job item error message.
-     *
-     * @param jobId job id
-     * @param shardingItem sharding item
-     */
-    void cleanJobItemErrorMessage(String jobId, int shardingItem);
-    
     /**
      * Get pipeline job class.
      * 
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
index 55a4a6b3337..47d8eecad2e 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
@@ -18,12 +18,30 @@
 package org.apache.shardingsphere.data.pipeline.core.job.service;
 
 import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
+import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
+import 
org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
+import 
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
 import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
+import 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
+import 
org.apache.shardingsphere.data.pipeline.common.util.PipelineDistributedBarrier;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationWithInvalidShardingCountException;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobHasAlreadyStartedException;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
+import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo;
+import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
+import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
 import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -31,10 +49,128 @@ import java.util.stream.Stream;
  * Pipeline job manager.
  */
 @RequiredArgsConstructor
+@Slf4j
 public final class PipelineJobManager {
     
+    private static final DateTimeFormatter DATE_TIME_FORMATTER = 
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+    
     private final PipelineJobAPI pipelineJobAPI;
     
+    /**
+     * Start job.
+     *
+     * @param jobConfig job configuration
+     * @return job id
+     */
+    public Optional<String> start(final PipelineJobConfiguration jobConfig) {
+        String jobId = jobConfig.getJobId();
+        ShardingSpherePreconditions.checkState(0 != 
jobConfig.getJobShardingCount(), () -> new 
PipelineJobCreationWithInvalidShardingCountException(jobId));
+        GovernanceRepositoryAPI repositoryAPI = 
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId));
+        String jobConfigKey = PipelineMetaDataNode.getJobConfigPath(jobId);
+        if (repositoryAPI.isExisted(jobConfigKey)) {
+            log.warn("jobId already exists in registry center, ignore, 
jobConfigKey={}", jobConfigKey);
+            return Optional.of(jobId);
+        }
+        repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobId), 
pipelineJobAPI.getPipelineJobClass().getName());
+        repositoryAPI.persist(jobConfigKey, 
YamlEngine.marshal(jobConfig.convertToJobConfigurationPOJO()));
+        return Optional.of(jobId);
+    }
+    
+    /**
+     * Start disabled job.
+     *
+     * @param jobId job id
+     */
+    public void startDisabledJob(final String jobId) {
+        if 
(pipelineJobAPI.isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished()) {
+            Optional<? extends PipelineJobItemProgress> jobItemProgress = 
pipelineJobAPI.getJobItemProgress(jobId, 0);
+            if (jobItemProgress.isPresent() && JobStatus.FINISHED == 
jobItemProgress.get().getStatus()) {
+                log.info("job status is FINISHED, ignore, jobId={}", jobId);
+                return;
+            }
+        }
+        startCurrentDisabledJob(jobId);
+        pipelineJobAPI.getToBeStartDisabledNextJobType().ifPresent(optional -> 
startNextDisabledJob(jobId, optional));
+        
+    }
+    
+    private void startCurrentDisabledJob(final String jobId) {
+        PipelineDistributedBarrier pipelineDistributedBarrier = 
PipelineDistributedBarrier.getInstance(PipelineJobIdUtils.parseContextKey(jobId));
+        
pipelineDistributedBarrier.unregister(PipelineMetaDataNode.getJobBarrierDisablePath(jobId));
+        JobConfigurationPOJO jobConfigPOJO = 
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
+        ShardingSpherePreconditions.checkState(jobConfigPOJO.isDisabled(), () 
-> new PipelineJobHasAlreadyStartedException(jobId));
+        jobConfigPOJO.setDisabled(false);
+        jobConfigPOJO.getProps().setProperty("start_time_millis", 
String.valueOf(System.currentTimeMillis()));
+        jobConfigPOJO.getProps().remove("stop_time");
+        jobConfigPOJO.getProps().remove("stop_time_millis");
+        jobConfigPOJO.getProps().setProperty("run_count", 
String.valueOf(Integer.parseInt(jobConfigPOJO.getProps().getProperty("run_count",
 "0")) + 1));
+        String barrierEnablePath = 
PipelineMetaDataNode.getJobBarrierEnablePath(jobId);
+        pipelineDistributedBarrier.register(barrierEnablePath, 
jobConfigPOJO.getShardingTotalCount());
+        
PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobId)).updateJobConfiguration(jobConfigPOJO);
+        pipelineDistributedBarrier.await(barrierEnablePath, 5L, 
TimeUnit.SECONDS);
+    }
+    
+    private void startNextDisabledJob(final String jobId, final String 
toBeStartDisabledNextJobType) {
+        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getLatestCheckJobId(jobId).ifPresent(optional
 -> {
+            try {
+                new 
PipelineJobManager(TypedSPILoader.getService(PipelineJobAPI.class, 
toBeStartDisabledNextJobType)).startDisabledJob(optional);
+                // CHECKSTYLE:OFF
+            } catch (final RuntimeException ex) {
+                // CHECKSTYLE:ON
+                log.warn("start related check job failed, check job id: {}, 
error: {}", optional, ex.getMessage());
+            }
+        });
+    }
+    
+    /**
+     * Stop pipeline job.
+     *
+     * @param jobId job id
+     */
+    public void stop(final String jobId) {
+        pipelineJobAPI.getToBeStoppedPreviousJobType().ifPresent(optional -> 
stopPreviousJob(jobId, optional));
+        stopCurrentJob(jobId);
+    }
+    
+    private void stopPreviousJob(final String jobId, final String 
toBeStoppedPreviousJobType) {
+        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getLatestCheckJobId(jobId).ifPresent(optional
 -> {
+            try {
+                new 
PipelineJobManager(TypedSPILoader.getService(PipelineJobAPI.class, 
toBeStoppedPreviousJobType)).stop(optional);
+                // CHECKSTYLE:OFF
+            } catch (final RuntimeException ex) {
+                // CHECKSTYLE:ON
+                log.warn("stop related check job failed, check job id: {}, 
error: {}", optional, ex.getMessage());
+            }
+        });
+    }
+    
+    private void stopCurrentJob(final String jobId) {
+        PipelineDistributedBarrier pipelineDistributedBarrier = 
PipelineDistributedBarrier.getInstance(PipelineJobIdUtils.parseContextKey(jobId));
+        
pipelineDistributedBarrier.unregister(PipelineMetaDataNode.getJobBarrierEnablePath(jobId));
+        JobConfigurationPOJO jobConfigPOJO = 
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
+        if (jobConfigPOJO.isDisabled()) {
+            return;
+        }
+        jobConfigPOJO.setDisabled(true);
+        jobConfigPOJO.getProps().setProperty("stop_time", 
LocalDateTime.now().format(DATE_TIME_FORMATTER));
+        jobConfigPOJO.getProps().setProperty("stop_time_millis", 
String.valueOf(System.currentTimeMillis()));
+        String barrierPath = 
PipelineMetaDataNode.getJobBarrierDisablePath(jobId);
+        pipelineDistributedBarrier.register(barrierPath, 
jobConfigPOJO.getShardingTotalCount());
+        
PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobId)).updateJobConfiguration(jobConfigPOJO);
+        pipelineDistributedBarrier.await(barrierPath, 5L, TimeUnit.SECONDS);
+    }
+    
+    /**
+     * Drop job.
+     * 
+     * @param jobId to be drooped job id
+     */
+    public void drop(final String jobId) {
+        PipelineContextKey contextKey = 
PipelineJobIdUtils.parseContextKey(jobId);
+        
PipelineAPIFactory.getJobOperateAPI(contextKey).remove(String.valueOf(jobId), 
null);
+        
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).deleteJob(jobId);
+    }
+    
     /**
      * Get pipeline jobs info.
      *
@@ -49,4 +185,42 @@ public final class PipelineJobManager {
         return 
PipelineAPIFactory.getJobStatisticsAPI(contextKey).getAllJobsBriefInfo().stream().filter(each
 -> !each.getJobName().startsWith("_"))
                 .filter(each -> 
jobType.equals(PipelineJobIdUtils.parseJobType(each.getJobName()).getType()));
     }
+    
+    /**
+     * Get job item error message.
+     *
+     * @param jobId job id
+     * @param shardingItem sharding item
+     * @return map, key is sharding item, value is error message
+     */
+    public String getJobItemErrorMessage(final String jobId, final int 
shardingItem) {
+        return 
Optional.ofNullable(PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemErrorMessage(jobId,
 shardingItem)).orElse("");
+    }
+    
+    /**
+     * Update job item error message.
+     *
+     * @param jobId job id
+     * @param shardingItem sharding item
+     * @param error error
+     */
+    public void updateJobItemErrorMessage(final String jobId, final int 
shardingItem, final Object error) {
+        String key = PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, 
shardingItem);
+        String value = "";
+        if (null != error) {
+            value = error instanceof Throwable ? 
ExceptionUtils.getStackTrace((Throwable) error) : error.toString();
+        }
+        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).update(key,
 value);
+    }
+    
+    /**
+     * Clean job item error message.
+     *
+     * @param jobId job id
+     * @param shardingItem sharding item
+     */
+    public void cleanJobItemErrorMessage(final String jobId, final int 
shardingItem) {
+        String key = PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, 
shardingItem);
+        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persist(key,
 "");
+    }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
index 391e3b8454e..bfc7e8e9fd4 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -45,6 +45,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.Table
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
@@ -68,7 +69,7 @@ import java.util.stream.IntStream;
  * Abstract inventory incremental job API implementation.
  */
 @Slf4j
-public abstract class AbstractInventoryIncrementalJobAPIImpl extends 
AbstractPipelineJobAPIImpl implements InventoryIncrementalJobAPI {
+public abstract class AbstractInventoryIncrementalJobAPIImpl implements 
InventoryIncrementalJobAPI {
     
     private final PipelineProcessConfigurationPersistService 
processConfigPersistService = new PipelineProcessConfigurationPersistService();
     
@@ -106,11 +107,12 @@ public abstract class 
AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
         long startTimeMillis = 
Long.parseLong(Optional.ofNullable(jobConfigPOJO.getProps().getProperty("start_time_millis")).orElse("0"));
         Map<Integer, InventoryIncrementalJobItemProgress> jobProgress = 
getJobProgress(jobConfig);
         List<InventoryIncrementalJobItemInfo> result = new LinkedList<>();
+        PipelineJobManager pipelineJobManager = new PipelineJobManager(this);
         for (Entry<Integer, InventoryIncrementalJobItemProgress> entry : 
jobProgress.entrySet()) {
             int shardingItem = entry.getKey();
             TableBasedPipelineJobInfo jobInfo = (TableBasedPipelineJobInfo) 
getJobInfo(jobId);
             InventoryIncrementalJobItemProgress jobItemProgress = 
entry.getValue();
-            String errorMessage = getJobItemErrorMessage(jobId, shardingItem);
+            String errorMessage = 
pipelineJobManager.getJobItemErrorMessage(jobId, shardingItem);
             if (null == jobItemProgress) {
                 result.add(new InventoryIncrementalJobItemInfo(shardingItem, 
jobInfo.getTable(), null, startTimeMillis, 0, errorMessage));
                 continue;
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
deleted file mode 100644
index cdff9743e88..00000000000
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.job.service.impl;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.exception.ExceptionUtils;
-import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
-import 
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
-import 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
-import 
org.apache.shardingsphere.data.pipeline.common.util.PipelineDistributedBarrier;
-import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationWithInvalidShardingCountException;
-import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobHasAlreadyStartedException;
-import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
-import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
-import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
-import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
-import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
-
-import java.time.LocalDateTime;
-import java.time.format.DateTimeFormatter;
-import java.util.Optional;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Abstract pipeline job API impl.
- */
-@Slf4j
-public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI {
-    
-    protected static final DateTimeFormatter DATE_TIME_FORMATTER = 
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
-    
-    @Override
-    public Optional<String> start(final PipelineJobConfiguration jobConfig) {
-        String jobId = jobConfig.getJobId();
-        ShardingSpherePreconditions.checkState(0 != 
jobConfig.getJobShardingCount(), () -> new 
PipelineJobCreationWithInvalidShardingCountException(jobId));
-        GovernanceRepositoryAPI repositoryAPI = 
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId));
-        String jobConfigKey = PipelineMetaDataNode.getJobConfigPath(jobId);
-        if (repositoryAPI.isExisted(jobConfigKey)) {
-            log.warn("jobId already exists in registry center, ignore, 
jobConfigKey={}", jobConfigKey);
-            return Optional.of(jobId);
-        }
-        repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobId), 
getPipelineJobClass().getName());
-        repositoryAPI.persist(jobConfigKey, 
YamlEngine.marshal(jobConfig.convertToJobConfigurationPOJO()));
-        return Optional.of(jobId);
-    }
-    
-    @Override
-    public void startDisabledJob(final String jobId) {
-        PipelineDistributedBarrier pipelineDistributedBarrier = 
PipelineDistributedBarrier.getInstance(PipelineJobIdUtils.parseContextKey(jobId));
-        
pipelineDistributedBarrier.unregister(PipelineMetaDataNode.getJobBarrierDisablePath(jobId));
-        JobConfigurationPOJO jobConfigPOJO = 
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
-        ShardingSpherePreconditions.checkState(jobConfigPOJO.isDisabled(), () 
-> new PipelineJobHasAlreadyStartedException(jobId));
-        jobConfigPOJO.setDisabled(false);
-        jobConfigPOJO.getProps().setProperty("start_time_millis", 
String.valueOf(System.currentTimeMillis()));
-        jobConfigPOJO.getProps().remove("stop_time");
-        jobConfigPOJO.getProps().remove("stop_time_millis");
-        jobConfigPOJO.getProps().setProperty("run_count", 
String.valueOf(Integer.parseInt(jobConfigPOJO.getProps().getProperty("run_count",
 "0")) + 1));
-        String barrierEnablePath = 
PipelineMetaDataNode.getJobBarrierEnablePath(jobId);
-        pipelineDistributedBarrier.register(barrierEnablePath, 
jobConfigPOJO.getShardingTotalCount());
-        
PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobId)).updateJobConfiguration(jobConfigPOJO);
-        pipelineDistributedBarrier.await(barrierEnablePath, 5, 
TimeUnit.SECONDS);
-    }
-    
-    @Override
-    public void stop(final String jobId) {
-        PipelineDistributedBarrier pipelineDistributedBarrier = 
PipelineDistributedBarrier.getInstance(PipelineJobIdUtils.parseContextKey(jobId));
-        
pipelineDistributedBarrier.unregister(PipelineMetaDataNode.getJobBarrierEnablePath(jobId));
-        JobConfigurationPOJO jobConfigPOJO = 
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
-        if (jobConfigPOJO.isDisabled()) {
-            return;
-        }
-        jobConfigPOJO.setDisabled(true);
-        jobConfigPOJO.getProps().setProperty("stop_time", 
LocalDateTime.now().format(DATE_TIME_FORMATTER));
-        jobConfigPOJO.getProps().setProperty("stop_time_millis", 
String.valueOf(System.currentTimeMillis()));
-        String barrierPath = 
PipelineMetaDataNode.getJobBarrierDisablePath(jobId);
-        pipelineDistributedBarrier.register(barrierPath, 
jobConfigPOJO.getShardingTotalCount());
-        
PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobId)).updateJobConfiguration(jobConfigPOJO);
-        pipelineDistributedBarrier.await(barrierPath, 5, TimeUnit.SECONDS);
-    }
-    
-    protected void dropJob(final String jobId) {
-        PipelineContextKey contextKey = 
PipelineJobIdUtils.parseContextKey(jobId);
-        
PipelineAPIFactory.getJobOperateAPI(contextKey).remove(String.valueOf(jobId), 
null);
-        
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).deleteJob(jobId);
-    }
-    
-    @Override
-    public String getJobItemErrorMessage(final String jobId, final int 
shardingItem) {
-        return 
Optional.ofNullable(PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemErrorMessage(jobId,
 shardingItem)).orElse("");
-    }
-    
-    @Override
-    public void updateJobItemErrorMessage(final String jobId, final int 
shardingItem, final Object error) {
-        String key = PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, 
shardingItem);
-        String value = "";
-        if (null != error) {
-            value = error instanceof Throwable ? 
ExceptionUtils.getStackTrace((Throwable) error) : error.toString();
-        }
-        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).update(key,
 value);
-    }
-    
-    @Override
-    public void cleanJobItemErrorMessage(final String jobId, final int 
shardingItem) {
-        String key = PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, 
shardingItem);
-        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persist(key,
 "");
-    }
-}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
index ff3e9308f58..1923ad14911 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
@@ -27,6 +27,7 @@ import 
org.apache.shardingsphere.data.pipeline.common.ingest.position.FinishedPo
 import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
 import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
@@ -54,11 +55,14 @@ public class InventoryIncrementalTasksRunner implements 
PipelineTasksRunner {
     
     private final PipelineJobAPI jobAPI;
     
+    private final PipelineJobManager jobManager;
+    
     public InventoryIncrementalTasksRunner(final 
InventoryIncrementalJobItemContext jobItemContext) {
         this.jobItemContext = jobItemContext;
         inventoryTasks = jobItemContext.getInventoryTasks();
         incrementalTasks = jobItemContext.getIncrementalTasks();
         jobAPI = TypedSPILoader.getService(PipelineJobAPI.class, 
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType());
+        jobManager = new PipelineJobManager(jobAPI);
     }
     
     @Override
@@ -142,9 +146,9 @@ public class InventoryIncrementalTasksRunner implements 
PipelineTasksRunner {
     protected void inventoryFailureCallback(final Throwable throwable) {
         log.error("onFailure, inventory task execute failed.", throwable);
         String jobId = jobItemContext.getJobId();
-        jobAPI.updateJobItemErrorMessage(jobId, 
jobItemContext.getShardingItem(), throwable);
+        jobManager.updateJobItemErrorMessage(jobId, 
jobItemContext.getShardingItem(), throwable);
         try {
-            jobAPI.stop(jobId);
+            jobManager.stop(jobId);
         } catch (final PipelineJobNotFoundException ignored) {
         }
     }
@@ -177,9 +181,9 @@ public class InventoryIncrementalTasksRunner implements 
PipelineTasksRunner {
         public void onFailure(final Throwable throwable) {
             log.error("onFailure, incremental task execute failed.", 
throwable);
             String jobId = jobItemContext.getJobId();
-            jobAPI.updateJobItemErrorMessage(jobId, 
jobItemContext.getShardingItem(), throwable);
+            jobManager.updateJobItemErrorMessage(jobId, 
jobItemContext.getShardingItem(), throwable);
             try {
-                jobAPI.stop(jobId);
+                jobManager.stop(jobId);
             } catch (final PipelineJobNotFoundException ignored) {
             }
         }
diff --git 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/StartMigrationUpdater.java
 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/StartMigrationUpdater.java
index fe1629adaaf..7c504a3f4fc 100644
--- 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/StartMigrationUpdater.java
+++ 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/StartMigrationUpdater.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.migration.distsql.handler.update;
 
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
 import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
 import 
org.apache.shardingsphere.migration.distsql.statement.StartMigrationStatement;
@@ -26,11 +27,11 @@ import 
org.apache.shardingsphere.migration.distsql.statement.StartMigrationState
  */
 public final class StartMigrationUpdater implements 
RALUpdater<StartMigrationStatement> {
     
-    private final MigrationJobAPI jobAPI = new MigrationJobAPI();
+    private final PipelineJobManager jobManager = new PipelineJobManager(new 
MigrationJobAPI());
     
     @Override
     public void executeUpdate(final String databaseName, final 
StartMigrationStatement sqlStatement) {
-        jobAPI.startDisabledJob(sqlStatement.getJobId());
+        jobManager.startDisabledJob(sqlStatement.getJobId());
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/StopMigrationUpdater.java
 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/StopMigrationUpdater.java
index fb3a8b22591..77c4c050611 100644
--- 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/StopMigrationUpdater.java
+++ 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/StopMigrationUpdater.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.migration.distsql.handler.update;
 
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
 import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
 import 
org.apache.shardingsphere.migration.distsql.statement.StopMigrationStatement;
@@ -28,9 +29,11 @@ public final class StopMigrationUpdater implements 
RALUpdater<StopMigrationState
     
     private final MigrationJobAPI jobAPI = new MigrationJobAPI();
     
+    private final PipelineJobManager jobManager = new 
PipelineJobManager(jobAPI);
+    
     @Override
     public void executeUpdate(final String databaseName, final 
StopMigrationStatement sqlStatement) {
-        jobAPI.stop(sqlStatement.getJobId());
+        jobManager.stop(sqlStatement.getJobId());
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index e4134a90c0f..1993cde6e7b 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -68,6 +68,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.impl.AbstractInventoryIncrementalJobAPIImpl;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparerUtils;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
@@ -308,7 +309,7 @@ public final class CDCJobAPI extends 
AbstractInventoryIncrementalJobAPIImpl {
         JobConfigurationPOJO jobConfigPOJO = 
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
         CDCJobConfiguration jobConfig = getJobConfiguration(jobConfigPOJO);
         ShardingSpherePreconditions.checkState(jobConfigPOJO.isDisabled(), () 
-> new PipelineInternalException("Can't drop streaming job which is active"));
-        dropJob(jobId);
+        new PipelineJobManager(this).drop(jobId);
         cleanup(jobConfig);
     }
     
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
index 50e572c5a32..c2637c0de36 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
@@ -41,6 +41,7 @@ import 
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncr
 import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
 import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
@@ -64,6 +65,8 @@ public final class CDCJob extends AbstractPipelineJob 
implements SimpleJob {
     
     private final CDCJobAPI jobAPI = new CDCJobAPI();
     
+    private final PipelineJobManager jobManager = new 
PipelineJobManager(jobAPI);
+    
     private final CDCJobPreparer jobPreparer = new CDCJobPreparer();
     
     private final PipelineDataSourceManager dataSourceManager = new 
DefaultPipelineDataSourceManager();
@@ -90,7 +93,7 @@ public final class CDCJob extends AbstractPipelineJob 
implements SimpleJob {
                 continue;
             }
             jobItemContexts.add(jobItemContext);
-            jobAPI.cleanJobItemErrorMessage(jobId, shardingItem);
+            jobManager.cleanJobItemErrorMessage(jobId, shardingItem);
             log.info("start tasks runner, jobId={}, shardingItem={}", jobId, 
shardingItem);
         }
         if (jobItemContexts.isEmpty()) {
@@ -124,7 +127,7 @@ public final class CDCJob extends AbstractPipelineJob 
implements SimpleJob {
     
     private void processFailed(final String jobId, final int shardingItem, 
final Exception ex) {
         log.error("job execution failed, {}-{}", jobId, shardingItem, ex);
-        jobAPI.updateJobItemErrorMessage(jobId, shardingItem, ex);
+        jobManager.updateJobItemErrorMessage(jobId, shardingItem, ex);
         PipelineJobCenter.stop(jobId);
         jobAPI.updateJobConfigurationDisabled(jobId, true);
     }
@@ -201,7 +204,7 @@ public final class CDCJob extends AbstractPipelineJob 
implements SimpleJob {
         public void onFailure(final Throwable throwable) {
             log.error("onFailure, {} task execute failed.", identifier, 
throwable);
             String jobId = jobItemContext.getJobId();
-            jobAPI.updateJobItemErrorMessage(jobId, 
jobItemContext.getShardingItem(), throwable);
+            jobManager.updateJobItemErrorMessage(jobId, 
jobItemContext.getShardingItem(), throwable);
             if (jobItemContext.getSink() instanceof CDCSocketSink) {
                 CDCSocketSink cdcSink = (CDCSocketSink) 
jobItemContext.getSink();
                 cdcSink.getChannel().writeAndFlush(CDCResponseUtils.failed("", 
"", throwable.getMessage()));
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index 6f825c9f17d..ddbb7e7a89e 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -42,7 +42,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
-import 
org.apache.shardingsphere.data.pipeline.core.job.service.impl.AbstractPipelineJobAPIImpl;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
 import 
org.apache.shardingsphere.data.pipeline.core.task.config.PipelineTaskConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJob;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobId;
@@ -77,7 +77,7 @@ import java.util.stream.Collectors;
  * Consistency check job API.
  */
 @Slf4j
-public final class ConsistencyCheckJobAPI extends AbstractPipelineJobAPIImpl {
+public final class ConsistencyCheckJobAPI implements PipelineJobAPI {
     
     private static final DateTimeFormatter DATE_TIME_FORMATTER = 
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
     
@@ -106,14 +106,14 @@ public final class ConsistencyCheckJobAPI extends 
AbstractPipelineJobAPIImpl {
         String result = latestCheckJobId.map(s -> new 
ConsistencyCheckJobId(contextKey, parentJobId, s)).orElseGet(() -> new 
ConsistencyCheckJobId(contextKey, parentJobId)).marshal();
         repositoryAPI.persistLatestCheckJobId(parentJobId, result);
         repositoryAPI.deleteCheckJobResult(parentJobId, result);
-        dropJob(result);
+        new PipelineJobManager(this).drop(result);
         YamlConsistencyCheckJobConfiguration yamlConfig = new 
YamlConsistencyCheckJobConfiguration();
         yamlConfig.setJobId(result);
         yamlConfig.setParentJobId(parentJobId);
         yamlConfig.setAlgorithmTypeName(param.getAlgorithmTypeName());
         yamlConfig.setAlgorithmProps(param.getAlgorithmProps());
         
yamlConfig.setSourceDatabaseType(param.getSourceDatabaseType().getType());
-        start(new 
YamlConsistencyCheckJobConfigurationSwapper().swapToObject(yamlConfig));
+        new PipelineJobManager(this).start(new 
YamlConsistencyCheckJobConfigurationSwapper().swapToObject(yamlConfig));
         return result;
     }
     
@@ -180,23 +180,13 @@ public final class ConsistencyCheckJobAPI extends 
AbstractPipelineJobAPIImpl {
                 
YamlEngine.marshal(swapper.swapToYamlConfiguration(jobItemProgress.get())));
     }
     
-    @Override
-    public void startDisabledJob(final String jobId) {
-        Optional<ConsistencyCheckJobItemProgress> jobItemProgress = 
getJobItemProgress(jobId, 0);
-        if (jobItemProgress.isPresent() && JobStatus.FINISHED == 
jobItemProgress.get().getStatus()) {
-            log.info("job status is FINISHED, ignore, jobId={}", jobId);
-            return;
-        }
-        super.startDisabledJob(jobId);
-    }
-    
     /**
      * Start by parent job id.
      *
      * @param parentJobId parent job id
      */
     public void startByParentJobId(final String parentJobId) {
-        startDisabledJob(getLatestCheckJobId(parentJobId));
+        new 
PipelineJobManager(this).startDisabledJob(getLatestCheckJobId(parentJobId));
     }
     
     private String getLatestCheckJobId(final String parentJobId) {
@@ -211,7 +201,7 @@ public final class ConsistencyCheckJobAPI extends 
AbstractPipelineJobAPIImpl {
      * @param parentJobId parent job id
      */
     public void stopByParentJobId(final String parentJobId) {
-        stop(getLatestCheckJobId(parentJobId));
+        new PipelineJobManager(this).stop(getLatestCheckJobId(parentJobId));
     }
     
     /**
@@ -221,7 +211,7 @@ public final class ConsistencyCheckJobAPI extends 
AbstractPipelineJobAPIImpl {
      */
     public void dropByParentJobId(final String parentJobId) {
         String latestCheckJobId = getLatestCheckJobId(parentJobId);
-        stop(latestCheckJobId);
+        new PipelineJobManager(this).stop(latestCheckJobId);
         PipelineContextKey contextKey = 
PipelineJobIdUtils.parseContextKey(parentJobId);
         GovernanceRepositoryAPI repositoryAPI = 
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey);
         Collection<String> checkJobIds = 
repositoryAPI.listCheckJobIds(parentJobId);
@@ -234,7 +224,7 @@ public final class ConsistencyCheckJobAPI extends 
AbstractPipelineJobAPIImpl {
             repositoryAPI.deleteLatestCheckJobId(parentJobId);
         }
         repositoryAPI.deleteCheckJobResult(parentJobId, latestCheckJobId);
-        dropJob(latestCheckJobId);
+        new PipelineJobManager(this).drop(latestCheckJobId);
     }
     
     /**
@@ -304,7 +294,7 @@ public final class ConsistencyCheckJobAPI extends 
AbstractPipelineJobAPIImpl {
         fillInJobItemInfoWithTimes(result, jobItemProgress, jobConfigPOJO);
         
result.setTableNames(Optional.ofNullable(jobItemProgress.getTableNames()).orElse(""));
         fillInJobItemInfoWithCheckAlgorithm(result, checkJobId);
-        result.setErrorMessage(getJobItemErrorMessage(checkJobId, 0));
+        result.setErrorMessage(new 
PipelineJobManager(this).getJobItemErrorMessage(checkJobId, 0));
         Map<String, TableDataConsistencyCheckResult> checkJobResult = 
governanceRepositoryAPI.getCheckJobResult(parentJobId, checkJobId);
         fillInJobItemInfoWithCheckResult(result, checkJobResult, parentJobId);
         
result.setCheckFailedTableNames(checkJobResult.entrySet().stream().filter(each 
-> !each.getValue().isIgnored() && !each.getValue().isMatched())
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
index b6ed5aa0c5f..3ae18d58129 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
@@ -32,6 +32,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl.ConsistencyCheckJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration;
@@ -51,6 +52,8 @@ public final class ConsistencyCheckTasksRunner implements 
PipelineTasksRunner {
     
     private final ConsistencyCheckJobAPI checkJobAPI = new 
ConsistencyCheckJobAPI();
     
+    private final PipelineJobManager jobManager = new 
PipelineJobManager(checkJobAPI);
+    
     @Getter
     private final ConsistencyCheckJobItemContext jobItemContext;
     
@@ -131,7 +134,7 @@ public final class ConsistencyCheckTasksRunner implements 
PipelineTasksRunner {
             log.info("onSuccess, check job id: {}, parent job id: {}", 
checkJobId, parentJobId);
             jobItemContext.setStatus(JobStatus.FINISHED);
             checkJobAPI.persistJobItemProgress(jobItemContext);
-            checkJobAPI.stop(checkJobId);
+            jobManager.stop(checkJobId);
         }
         
         @Override
@@ -139,12 +142,12 @@ public final class ConsistencyCheckTasksRunner implements 
PipelineTasksRunner {
             PipelineDataConsistencyChecker checker = consistencyChecker.get();
             if (null != checker && checker.isCanceling()) {
                 log.info("onFailure, canceling, check job id: {}, parent job 
id: {}", checkJobId, parentJobId);
-                checkJobAPI.stop(checkJobId);
+                jobManager.stop(checkJobId);
                 return;
             }
             log.info("onFailure, check job id: {}, parent job id: {}", 
checkJobId, parentJobId, throwable);
-            checkJobAPI.updateJobItemErrorMessage(checkJobId, 0, throwable);
-            checkJobAPI.stop(checkJobId);
+            jobManager.updateJobItemErrorMessage(checkJobId, 0, throwable);
+            jobManager.stop(checkJobId);
         }
     }
 }
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index 0cfed7317b4..1fe00789a82 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -54,7 +54,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.Increm
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
-import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.impl.AbstractInventoryIncrementalJobAPIImpl;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob;
@@ -80,7 +80,6 @@ import 
org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePo
 import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import 
org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
-import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 import org.apache.shardingsphere.infra.util.json.JsonUtils;
 import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
 import 
org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
@@ -102,6 +101,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -122,7 +122,7 @@ public final class MigrationJobAPI extends 
AbstractInventoryIncrementalJobAPIImp
      */
     public String createJobAndStart(final PipelineContextKey contextKey, final 
MigrateTableStatement param) {
         MigrationJobConfiguration jobConfig = new 
YamlMigrationJobConfigurationSwapper().swapToObject(buildYamlJobConfiguration(contextKey,
 param));
-        start(jobConfig);
+        new PipelineJobManager(this).start(jobConfig);
         return jobConfig.getJobId();
     }
     
@@ -293,40 +293,21 @@ public final class MigrationJobAPI extends 
AbstractInventoryIncrementalJobAPIImp
     }
     
     @Override
-    public void startDisabledJob(final String jobId) {
-        super.startDisabledJob(jobId);
-        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getLatestCheckJobId(jobId).ifPresent(optional
 -> {
-            try {
-                TypedSPILoader.getService(PipelineJobAPI.class, 
"CONSISTENCY_CHECK").startDisabledJob(optional);
-                // CHECKSTYLE:OFF
-            } catch (final RuntimeException ex) {
-                // CHECKSTYLE:ON
-                log.warn("start related check job failed, check job id: {}, 
error: {}", optional, ex.getMessage());
-            }
-        });
+    public Optional<String> getToBeStartDisabledNextJobType() {
+        return Optional.of("CONSISTENCY_CHECK");
     }
     
     @Override
-    public void stop(final String jobId) {
-        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getLatestCheckJobId(jobId).ifPresent(optional
 -> {
-            try {
-                TypedSPILoader.getService(PipelineJobAPI.class, 
"CONSISTENCY_CHECK").stop(optional);
-                // CHECKSTYLE:OFF
-            } catch (final RuntimeException ex) {
-                // CHECKSTYLE:ON
-                log.warn("stop related check job failed, check job id: {}, 
error: {}", optional, ex.getMessage());
-            }
-        });
-        super.stop(jobId);
+    public Optional<String> getToBeStoppedPreviousJobType() {
+        return Optional.of("CONSISTENCY_CHECK");
     }
     
     @Override
     public void rollback(final String jobId) throws SQLException {
         final long startTimeMillis = System.currentTimeMillis();
-        stop(jobId);
         dropCheckJobs(jobId);
         cleanTempTableOnRollback(jobId);
-        dropJob(jobId);
+        new PipelineJobManager(this).drop(jobId);
         log.info("Rollback job {} cost {} ms", jobId, 
System.currentTimeMillis() - startTimeMillis);
     }
     
@@ -337,7 +318,7 @@ public final class MigrationJobAPI extends 
AbstractInventoryIncrementalJobAPIImp
         }
         for (String each : checkJobIds) {
             try {
-                dropJob(each);
+                new PipelineJobManager(this).drop(each);
                 // CHECKSTYLE:OFF
             } catch (final RuntimeException ex) {
                 // CHECKSTYLE:ON
@@ -368,11 +349,12 @@ public final class MigrationJobAPI extends 
AbstractInventoryIncrementalJobAPIImp
     public void commit(final String jobId) {
         log.info("Commit job {}", jobId);
         final long startTimeMillis = System.currentTimeMillis();
-        stop(jobId);
+        PipelineJobManager jobManager = new PipelineJobManager(this);
+        jobManager.stop(jobId);
         dropCheckJobs(jobId);
         MigrationJobConfiguration jobConfig = getJobConfiguration(jobId);
         refreshTableMetadata(jobId, jobConfig.getTargetDatabaseName());
-        dropJob(jobId);
+        jobManager.drop(jobId);
         log.info("Commit cost {} ms", System.currentTimeMillis() - 
startTimeMillis);
     }
     
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
index c51bddd7599..e206c202875 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
@@ -33,6 +33,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.Tabl
 import 
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
@@ -88,12 +89,15 @@ class MigrationJobAPITest {
     
     private static MigrationJobAPI jobAPI;
     
+    private static PipelineJobManager jobManager;
+    
     private static DatabaseType databaseType;
     
     @BeforeAll
     static void beforeClass() {
         PipelineContextUtils.mockModeConfigAndContextManager();
         jobAPI = new MigrationJobAPI();
+        jobManager = new PipelineJobManager(jobAPI);
         String jdbcUrl = 
"jdbc:h2:mem:test_ds_0;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL";
         databaseType = DatabaseTypeFactory.get(jdbcUrl);
         Map<String, Object> props = new HashMap<>();
@@ -110,7 +114,7 @@ class MigrationJobAPITest {
     
     @Test
     void assertStartAndList() {
-        Optional<String> jobId = 
jobAPI.start(JobConfigurationBuilder.createJobConfiguration());
+        Optional<String> jobId = 
jobManager.start(JobConfigurationBuilder.createJobConfiguration());
         assertTrue(jobId.isPresent());
         JobConfigurationPOJO jobConfigPOJO = 
getJobConfigurationPOJO(jobId.get());
         assertFalse(jobConfigPOJO.isDisabled());
@@ -123,20 +127,20 @@ class MigrationJobAPITest {
     
     @Test
     void assertStartOrStopById() {
-        Optional<String> jobId = 
jobAPI.start(JobConfigurationBuilder.createJobConfiguration());
+        Optional<String> jobId = 
jobManager.start(JobConfigurationBuilder.createJobConfiguration());
         assertTrue(jobId.isPresent());
         assertFalse(getJobConfigurationPOJO(jobId.get()).isDisabled());
         PipelineDistributedBarrier mockBarrier = 
mock(PipelineDistributedBarrier.class);
         
when(PipelineDistributedBarrier.getInstance(any())).thenReturn(mockBarrier);
-        jobAPI.stop(jobId.get());
+        jobManager.stop(jobId.get());
         assertTrue(getJobConfigurationPOJO(jobId.get()).isDisabled());
-        jobAPI.startDisabledJob(jobId.get());
+        jobManager.startDisabledJob(jobId.get());
         assertFalse(getJobConfigurationPOJO(jobId.get()).isDisabled());
     }
     
     @Test
     void assertRollback() throws SQLException {
-        Optional<String> jobId = 
jobAPI.start(JobConfigurationBuilder.createJobConfiguration());
+        Optional<String> jobId = 
jobManager.start(JobConfigurationBuilder.createJobConfiguration());
         assertTrue(jobId.isPresent());
         MigrationJobConfiguration jobConfig = 
jobAPI.getJobConfiguration(jobId.get());
         initTableData(jobConfig);
@@ -148,7 +152,7 @@ class MigrationJobAPITest {
     
     @Test
     void assertCommit() {
-        Optional<String> jobId = 
jobAPI.start(JobConfigurationBuilder.createJobConfiguration());
+        Optional<String> jobId = 
jobManager.start(JobConfigurationBuilder.createJobConfiguration());
         assertTrue(jobId.isPresent());
         MigrationJobConfiguration jobConfig = 
jobAPI.getJobConfiguration(jobId.get());
         initTableData(jobConfig);
@@ -161,7 +165,7 @@ class MigrationJobAPITest {
     @Test
     void assertGetProgress() {
         MigrationJobConfiguration jobConfig = 
JobConfigurationBuilder.createJobConfiguration();
-        Optional<String> jobId = jobAPI.start(jobConfig);
+        Optional<String> jobId = jobManager.start(jobConfig);
         assertTrue(jobId.isPresent());
         Map<Integer, InventoryIncrementalJobItemProgress> jobProgressMap = 
jobAPI.getJobProgress(jobConfig);
         assertThat(jobProgressMap.size(), is(1));
@@ -171,7 +175,7 @@ class MigrationJobAPITest {
     void assertDataConsistencyCheck() {
         MigrationJobConfiguration jobConfig = 
JobConfigurationBuilder.createJobConfiguration();
         initTableData(jobConfig);
-        Optional<String> jobId = jobAPI.start(jobConfig);
+        Optional<String> jobId = jobManager.start(jobConfig);
         assertTrue(jobId.isPresent());
         Map<String, TableDataConsistencyCheckResult> checkResultMap = 
jobAPI.buildPipelineDataConsistencyChecker(
                 jobConfig, jobAPI.buildPipelineProcessContext(jobConfig), new 
ConsistencyCheckJobItemProgressContext(jobId.get(), 0, "H2")).check("FIXTURE", 
null);
@@ -205,7 +209,7 @@ class MigrationJobAPITest {
     @Test
     void assertSwitchClusterConfigurationSucceed() {
         final MigrationJobConfiguration jobConfig = 
JobConfigurationBuilder.createJobConfiguration();
-        Optional<String> jobId = jobAPI.start(jobConfig);
+        Optional<String> jobId = jobManager.start(jobConfig);
         assertTrue(jobId.isPresent());
         MigrationJobItemContext jobItemContext = 
PipelineContextUtils.mockMigrationJobItemContext(jobConfig);
         jobAPI.persistJobItemProgress(jobItemContext);
@@ -308,7 +312,7 @@ class MigrationJobAPITest {
     
     @Test
     void assertGetJobItemInfosAtBegin() {
-        Optional<String> jobId = 
jobAPI.start(JobConfigurationBuilder.createJobConfiguration());
+        Optional<String> jobId = 
jobManager.start(JobConfigurationBuilder.createJobConfiguration());
         assertTrue(jobId.isPresent());
         YamlInventoryIncrementalJobItemProgress yamlJobItemProgress = new 
YamlInventoryIncrementalJobItemProgress();
         yamlJobItemProgress.setStatus(JobStatus.RUNNING.name());
@@ -323,7 +327,7 @@ class MigrationJobAPITest {
     
     @Test
     void assertGetJobItemInfosAtIncrementTask() {
-        Optional<String> jobId = 
jobAPI.start(JobConfigurationBuilder.createJobConfiguration());
+        Optional<String> jobId = 
jobManager.start(JobConfigurationBuilder.createJobConfiguration());
         assertTrue(jobId.isPresent());
         YamlInventoryIncrementalJobItemProgress yamlJobItemProgress = new 
YamlInventoryIncrementalJobItemProgress();
         yamlJobItemProgress.setSourceDatabaseType("MySQL");

Reply via email to