This is an automated email from the ASF dual-hosted git repository.

sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 394353400b1 Add AbstractInseparablePipelineJob (#29336)
394353400b1 is described below

commit 394353400b12f93ecd816414e733570233478069
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Dec 9 15:01:24 2023 +0800

    Add AbstractInseparablePipelineJob (#29336)
    
    * Rename AbstractSeparablePipelineJob
    
    * Add AbstractInseparablePipelineJob
    
    * Add AbstractInseparablePipelineJob
    
    * Add AbstractInseparablePipelineJob
---
 .../core/job/AbstractInseparablePipelineJob.java   | 152 +++++++++++++++++++++
 .../pipeline/core/job/AbstractPipelineJob.java     |  31 ++---
 ...eJob.java => AbstractSeparablePipelineJob.java} |  22 ++-
 .../data/pipeline/core/job/PipelineJob.java        |   4 +-
 .../shardingsphere/data/pipeline/cdc/CDCJob.java   | 120 ++++------------
 .../consistencycheck/ConsistencyCheckJob.java      |   4 +-
 .../pipeline/scenario/migration/MigrationJob.java  |   4 +-
 7 files changed, 211 insertions(+), 126 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
new file mode 100644
index 00000000000..845d3605f0a
--- /dev/null
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.job;
+
+import lombok.extern.slf4j.Slf4j;
+import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
+import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.FinishedPosition;
+import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
+import 
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
+import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
+import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
+import org.apache.shardingsphere.elasticjob.api.ShardingContext;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Abstract inseparable pipeline job.
+ */
+@Slf4j
+public abstract class AbstractInseparablePipelineJob extends 
AbstractPipelineJob {
+    
+    private final PipelineJobItemManager<TransmissionJobItemProgress> 
jobItemManager;
+    
+    protected AbstractInseparablePipelineJob(final String jobId) {
+        super(jobId);
+        jobItemManager = new 
PipelineJobItemManager<>(getJobType().getYamlJobItemProgressSwapper());
+    }
+    
+    @Override
+    public final void execute(final ShardingContext shardingContext) {
+        String jobId = shardingContext.getJobName();
+        log.info("Execute job {}", jobId);
+        PipelineJobConfiguration jobConfig = 
getJobConfiguration(shardingContext);
+        Collection<PipelineJobItemContext> jobItemContexts = new 
LinkedList<>();
+        for (int shardingItem = 0; shardingItem < 
jobConfig.getJobShardingCount(); shardingItem++) {
+            if (isStopping()) {
+                log.info("stopping true, ignore");
+                return;
+            }
+            PipelineJobItemContext jobItemContext = 
buildPipelineJobItemContext(jobConfig, shardingItem);
+            if (!addTasksRunner(shardingItem, 
buildPipelineTasksRunner(jobItemContext))) {
+                continue;
+            }
+            jobItemContexts.add(jobItemContext);
+            
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().clean(jobId,
 shardingItem);
+            log.info("start tasks runner, jobId={}, shardingItem={}", jobId, 
shardingItem);
+        }
+        if (jobItemContexts.isEmpty()) {
+            log.warn("job item contexts empty, ignore");
+            return;
+        }
+        prepare(jobItemContexts);
+        executeInventoryTasks(jobItemContexts);
+        executeIncrementalTasks(jobItemContexts);
+    }
+    
+    protected abstract PipelineJobConfiguration 
getJobConfiguration(ShardingContext shardingContext);
+    
+    protected abstract PipelineJobItemContext 
buildPipelineJobItemContext(PipelineJobConfiguration jobConfig, int 
shardingItem);
+    
+    protected abstract PipelineTasksRunner 
buildPipelineTasksRunner(PipelineJobItemContext pipelineJobItemContext);
+    
+    private void prepare(final Collection<PipelineJobItemContext> 
jobItemContexts) {
+        try {
+            doPrepare0(jobItemContexts);
+            // CHECKSTYLE:OFF
+        } catch (final RuntimeException ex) {
+            // CHECKSTYLE:ON
+            for (PipelineJobItemContext each : jobItemContexts) {
+                processFailed(each.getJobId(), each.getShardingItem(), ex);
+            }
+            throw ex;
+        }
+    }
+    
+    protected abstract void doPrepare0(Collection<PipelineJobItemContext> 
jobItemContexts);
+    
+    private void processFailed(final String jobId, final int shardingItem, 
final Exception ex) {
+        log.error("job execution failed, {}-{}", jobId, shardingItem, ex);
+        
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().update(jobId,
 shardingItem, ex);
+        PipelineJobRegistry.stop(jobId);
+        processFailed(jobId);
+    }
+    
+    protected abstract void processFailed(String jobId);
+    
+    private void executeInventoryTasks(final 
Collection<PipelineJobItemContext> jobItemContexts) {
+        Collection<CompletableFuture<?>> futures = new LinkedList<>();
+        for (PipelineJobItemContext each : jobItemContexts) {
+            updateLocalAndRemoteJobItemStatus(each, 
JobStatus.EXECUTE_INVENTORY_TASK);
+            for (PipelineTask task : ((TransmissionJobItemContext) 
each).getInventoryTasks()) {
+                if (task.getTaskProgress().getPosition() instanceof 
FinishedPosition) {
+                    continue;
+                }
+                futures.addAll(task.start());
+            }
+        }
+        if (futures.isEmpty()) {
+            return;
+        }
+        executeInventoryTasks(futures, jobItemContexts);
+    }
+    
+    protected abstract void 
executeInventoryTasks(Collection<CompletableFuture<?>> futures, 
Collection<PipelineJobItemContext> jobItemContexts);
+    
+    private void updateLocalAndRemoteJobItemStatus(final 
PipelineJobItemContext jobItemContext, final JobStatus jobStatus) {
+        jobItemContext.setStatus(jobStatus);
+        jobItemManager.updateStatus(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), jobStatus);
+    }
+    
+    private void executeIncrementalTasks(final 
Collection<PipelineJobItemContext> jobItemContexts) {
+        log.info("execute incremental tasks, jobId={}", getJobId());
+        Collection<CompletableFuture<?>> futures = new LinkedList<>();
+        for (PipelineJobItemContext each : jobItemContexts) {
+            if (JobStatus.EXECUTE_INCREMENTAL_TASK == each.getStatus()) {
+                log.info("job status already EXECUTE_INCREMENTAL_TASK, 
ignore");
+                return;
+            }
+            updateLocalAndRemoteJobItemStatus(each, 
JobStatus.EXECUTE_INCREMENTAL_TASK);
+            for (PipelineTask task : ((TransmissionJobItemContext) 
each).getIncrementalTasks()) {
+                if (task.getTaskProgress().getPosition() instanceof 
FinishedPosition) {
+                    continue;
+                }
+                futures.addAll(task.start());
+            }
+        }
+        executeIncrementalTasks(futures, jobItemContexts);
+    }
+    
+    protected abstract void 
executeIncrementalTasks(Collection<CompletableFuture<?>> futures, 
Collection<PipelineJobItemContext> jobItemContexts);
+}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
index 910e7ceaff2..402d9546f7a 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
@@ -20,22 +20,19 @@ package org.apache.shardingsphere.data.pipeline.core.job;
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
-import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
 import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
 import 
org.apache.shardingsphere.data.pipeline.core.listener.PipelineElasticJobListener;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
-import 
org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
-import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
-import 
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
+import 
org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
 import org.apache.shardingsphere.elasticjob.infra.listener.ElasticJobListener;
 import org.apache.shardingsphere.elasticjob.infra.spi.ElasticJobServiceLoader;
 import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.JobBootstrap;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
 
-import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Map;
@@ -61,7 +58,7 @@ public abstract class AbstractPipelineJob implements 
PipelineJob {
     
     private final AtomicReference<JobBootstrap> jobBootstrap = new 
AtomicReference<>();
     
-    private final Map<Integer, PipelineTasksRunner> tasksRunnerMap = new 
ConcurrentHashMap<>();
+    private final Map<Integer, PipelineTasksRunner> tasksRunners = new 
ConcurrentHashMap<>();
     
     protected AbstractPipelineJob(final String jobId) {
         this.jobId = jobId;
@@ -86,30 +83,18 @@ public abstract class AbstractPipelineJob implements 
PipelineJob {
         this.jobBootstrap.set(jobBootstrap);
     }
     
-    protected void prepare(final PipelineJobItemContext jobItemContext) {
-        try {
-            doPrepare(jobItemContext);
-            // CHECKSTYLE:OFF
-        } catch (final SQLException ex) {
-            // CHECKSTYLE:ON
-            throw new PipelineInternalException(ex);
-        }
-    }
-    
-    protected abstract void doPrepare(PipelineJobItemContext jobItemContext) 
throws SQLException;
-    
     @Override
     public final Optional<PipelineTasksRunner> getTasksRunner(final int 
shardingItem) {
-        return Optional.ofNullable(tasksRunnerMap.get(shardingItem));
+        return Optional.ofNullable(tasksRunners.get(shardingItem));
     }
     
     @Override
     public final Collection<Integer> getShardingItems() {
-        return new ArrayList<>(tasksRunnerMap.keySet());
+        return new ArrayList<>(tasksRunners.keySet());
     }
     
     protected final boolean addTasksRunner(final int shardingItem, final 
PipelineTasksRunner tasksRunner) {
-        if (null != tasksRunnerMap.putIfAbsent(shardingItem, tasksRunner)) {
+        if (null != tasksRunners.putIfAbsent(shardingItem, tasksRunner)) {
             log.warn("shardingItem {} tasks runner exists, ignore", 
shardingItem);
             return false;
         }
@@ -132,7 +117,7 @@ public abstract class AbstractPipelineJob implements 
PipelineJob {
     private void innerStop() {
         stopping.set(true);
         log.info("stop tasks runner, jobId={}", jobId);
-        for (PipelineTasksRunner each : tasksRunnerMap.values()) {
+        for (PipelineTasksRunner each : tasksRunners.values()) {
             each.stop();
         }
         Optional<ElasticJobListener> pipelineJobListener = 
ElasticJobServiceLoader.getCachedTypedServiceInstance(ElasticJobListener.class, 
PipelineElasticJobListener.class.getName());
@@ -161,7 +146,7 @@ public abstract class AbstractPipelineJob implements 
PipelineJob {
     
     private void innerClean() {
         PipelineJobProgressPersistService.remove(jobId);
-        for (PipelineTasksRunner each : tasksRunnerMap.values()) {
+        for (PipelineTasksRunner each : tasksRunners.values()) {
             
QuietlyCloser.close(each.getJobItemContext().getJobProcessContext());
         }
     }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
similarity index 83%
rename from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
rename to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
index 96861144eb8..cd668ca127f 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
@@ -19,21 +19,23 @@ package org.apache.shardingsphere.data.pipeline.core.job;
 
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
 import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
-import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
+
+import java.sql.SQLException;
 
 /**
- * Abstract simple pipeline job.
+ * Abstract separable pipeline job.
  */
 @Slf4j
-public abstract class AbstractSimplePipelineJob extends AbstractPipelineJob 
implements SimpleJob {
+public abstract class AbstractSeparablePipelineJob extends AbstractPipelineJob 
{
     
-    protected AbstractSimplePipelineJob(final String jobId) {
+    protected AbstractSeparablePipelineJob(final String jobId) {
         super(jobId);
     }
     
@@ -69,6 +71,18 @@ public abstract class AbstractSimplePipelineJob extends 
AbstractPipelineJob impl
         tasksRunner.start();
     }
     
+    protected final void prepare(final PipelineJobItemContext jobItemContext) {
+        try {
+            doPrepare(jobItemContext);
+            // CHECKSTYLE:OFF
+        } catch (final SQLException ex) {
+            // CHECKSTYLE:ON
+            throw new PipelineInternalException(ex);
+        }
+    }
+    
+    protected abstract void doPrepare(PipelineJobItemContext jobItemContext) 
throws SQLException;
+    
     protected abstract PipelineJobItemContext 
buildPipelineJobItemContext(ShardingContext shardingContext);
     
     protected abstract PipelineTasksRunner 
buildPipelineTasksRunner(PipelineJobItemContext pipelineJobItemContext);
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJob.java
index 0d20a209072..e71792d6f9a 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJob.java
@@ -18,7 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.core.job;
 
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
-import org.apache.shardingsphere.elasticjob.api.ElasticJob;
+import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
 
 import java.util.Collection;
 import java.util.Optional;
@@ -26,7 +26,7 @@ import java.util.Optional;
 /**
  * Pipeline job.
  */
-public interface PipelineJob extends ElasticJob {
+public interface PipelineJob extends SimpleJob {
     
     /**
      * Get tasks runner.
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
index b8999fb58bb..b957fb2c7b5 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
@@ -45,12 +45,11 @@ import 
org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.DumperCommonContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.FinishedPosition;
-import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
-import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
+import 
org.apache.shardingsphere.data.pipeline.core.job.AbstractInseparablePipelineJob;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
 import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
+import 
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
@@ -60,7 +59,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveIdentifier;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
 import 
org.apache.shardingsphere.data.pipeline.core.spi.algorithm.JobRateLimitAlgorithm;
-import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
+import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
 import 
org.apache.shardingsphere.data.pipeline.core.util.ShardingColumnsExtractor;
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
 import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
@@ -68,8 +67,6 @@ import 
org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
 
 import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -80,7 +77,7 @@ import java.util.stream.Collectors;
  * CDC job.
  */
 @Slf4j
-public final class CDCJob extends AbstractPipelineJob implements SimpleJob {
+public final class CDCJob extends AbstractInseparablePipelineJob implements 
SimpleJob {
     
     @Getter
     private final PipelineSink sink;
@@ -103,40 +100,28 @@ public final class CDCJob extends AbstractPipelineJob 
implements SimpleJob {
     }
     
     @Override
-    public void execute(final ShardingContext shardingContext) {
-        String jobId = shardingContext.getJobName();
-        log.info("Execute job {}", jobId);
-        CDCJobConfiguration jobConfig = new 
YamlCDCJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
-        List<CDCJobItemContext> jobItemContexts = new LinkedList<>();
-        for (int shardingItem = 0; shardingItem < 
jobConfig.getJobShardingCount(); shardingItem++) {
-            if (isStopping()) {
-                log.info("stopping true, ignore");
-                return;
-            }
-            CDCJobItemContext jobItemContext = 
buildCDCJobItemContext(jobConfig, shardingItem);
-            if (!addTasksRunner(shardingItem, new 
CDCTasksRunner(jobItemContext))) {
-                continue;
-            }
-            jobItemContexts.add(jobItemContext);
-            
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().clean(jobId,
 shardingItem);
-            log.info("start tasks runner, jobId={}, shardingItem={}", jobId, 
shardingItem);
-        }
-        if (jobItemContexts.isEmpty()) {
-            log.warn("job item contexts empty, ignore");
-            return;
-        }
-        prepare(jobItemContexts);
-        executeInventoryTasks(jobItemContexts);
-        executeIncrementalTasks(jobItemContexts);
+    protected PipelineJobConfiguration getJobConfiguration(final 
ShardingContext shardingContext) {
+        return new 
YamlCDCJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
     }
     
-    private CDCJobItemContext buildCDCJobItemContext(final CDCJobConfiguration 
jobConfig, final int shardingItem) {
+    @Override
+    protected PipelineJobItemContext buildPipelineJobItemContext(final 
PipelineJobConfiguration jobConfig, final int shardingItem) {
         Optional<TransmissionJobItemProgress> initProgress = 
jobItemManager.getProgress(jobConfig.getJobId(), shardingItem);
         PipelineProcessConfiguration processConfig = 
PipelineProcessConfigurationUtils.convertWithDefaultValue(
                 
processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()),
 jobType.getType()));
         TransmissionProcessContext jobProcessContext = new 
TransmissionProcessContext(jobConfig.getJobId(), processConfig);
-        CDCTaskConfiguration taskConfig = buildTaskConfiguration(jobConfig, 
shardingItem, jobProcessContext.getPipelineProcessConfig());
-        return new CDCJobItemContext(jobConfig, shardingItem, 
initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager, 
sink);
+        CDCTaskConfiguration taskConfig = 
buildTaskConfiguration((CDCJobConfiguration) jobConfig, shardingItem, 
jobProcessContext.getPipelineProcessConfig());
+        return new CDCJobItemContext((CDCJobConfiguration) jobConfig, 
shardingItem, initProgress.orElse(null), jobProcessContext, taskConfig, 
dataSourceManager, sink);
+    }
+    
+    @Override
+    protected PipelineTasksRunner buildPipelineTasksRunner(final 
PipelineJobItemContext jobItemContext) {
+        return new CDCTasksRunner((CDCJobItemContext) jobItemContext);
+    }
+    
+    @Override
+    protected void doPrepare0(final Collection<PipelineJobItemContext> 
jobItemContexts) {
+        jobPreparer.initTasks(jobItemContexts.stream().map(each -> 
(CDCJobItemContext) each).collect(Collectors.toList()));
     }
     
     private CDCTaskConfiguration buildTaskConfiguration(final 
CDCJobConfiguration jobConfig, final int jobShardingItem, final 
PipelineProcessConfiguration processConfig) {
@@ -169,70 +154,19 @@ public final class CDCJob extends AbstractPipelineJob 
implements SimpleJob {
         return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap, 
tableAndSchemaNameMapper, batchSize, writeRateLimitAlgorithm, 0, 1);
     }
     
-    private void prepare(final Collection<CDCJobItemContext> jobItemContexts) {
-        try {
-            jobPreparer.initTasks(jobItemContexts);
-            // CHECKSTYLE:OFF
-        } catch (final RuntimeException ex) {
-            // CHECKSTYLE:ON
-            for (PipelineJobItemContext each : jobItemContexts) {
-                processFailed(each.getJobId(), each.getShardingItem(), ex);
-            }
-            throw ex;
-        }
-    }
-    
-    private void processFailed(final String jobId, final int shardingItem, 
final Exception ex) {
-        log.error("job execution failed, {}-{}", jobId, shardingItem, ex);
-        
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().update(jobId,
 shardingItem, ex);
-        PipelineJobRegistry.stop(jobId);
+    @Override
+    protected void processFailed(final String jobId) {
         jobAPI.disable(jobId);
     }
     
-    private void executeInventoryTasks(final List<CDCJobItemContext> 
jobItemContexts) {
-        Collection<CompletableFuture<?>> futures = new LinkedList<>();
-        for (CDCJobItemContext each : jobItemContexts) {
-            updateLocalAndRemoteJobItemStatus(each, 
JobStatus.EXECUTE_INVENTORY_TASK);
-            for (PipelineTask task : each.getInventoryTasks()) {
-                if (task.getTaskProgress().getPosition() instanceof 
FinishedPosition) {
-                    continue;
-                }
-                futures.addAll(task.start());
-            }
-        }
-        if (futures.isEmpty()) {
-            return;
-        }
-        ExecuteEngine.trigger(futures, new CDCExecuteCallback("inventory", 
jobItemContexts.get(0)));
-    }
-    
-    private void updateLocalAndRemoteJobItemStatus(final 
PipelineJobItemContext jobItemContext, final JobStatus jobStatus) {
-        jobItemContext.setStatus(jobStatus);
-        jobItemManager.updateStatus(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), jobStatus);
-    }
-    
-    private void executeIncrementalTasks(final List<CDCJobItemContext> 
jobItemContexts) {
-        log.info("execute incremental tasks, jobId={}", getJobId());
-        Collection<CompletableFuture<?>> futures = new LinkedList<>();
-        for (CDCJobItemContext each : jobItemContexts) {
-            if (JobStatus.EXECUTE_INCREMENTAL_TASK == each.getStatus()) {
-                log.info("job status already EXECUTE_INCREMENTAL_TASK, 
ignore");
-                return;
-            }
-            updateLocalAndRemoteJobItemStatus(each, 
JobStatus.EXECUTE_INCREMENTAL_TASK);
-            for (PipelineTask task : each.getIncrementalTasks()) {
-                if (task.getTaskProgress().getPosition() instanceof 
FinishedPosition) {
-                    continue;
-                }
-                futures.addAll(task.start());
-            }
-        }
-        ExecuteEngine.trigger(futures, new CDCExecuteCallback("incremental", 
jobItemContexts.get(0)));
+    @Override
+    protected void executeInventoryTasks(final 
Collection<CompletableFuture<?>> futures, final 
Collection<PipelineJobItemContext> jobItemContexts) {
+        ExecuteEngine.trigger(futures, new CDCExecuteCallback("inventory", 
(CDCJobItemContext) jobItemContexts.iterator().next()));
     }
     
     @Override
-    protected void doPrepare(final PipelineJobItemContext jobItemContext) {
-        throw new UnsupportedOperationException();
+    protected void executeIncrementalTasks(final 
Collection<CompletableFuture<?>> futures, final 
Collection<PipelineJobItemContext> jobItemContexts) {
+        ExecuteEngine.trigger(futures, new CDCExecuteCallback("incremental", 
(CDCJobItemContext) jobItemContexts.iterator().next()));
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
index 039418b6ac1..d669eda81e1 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
@@ -21,7 +21,7 @@ import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
 import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.ConsistencyCheckJobItemProgress;
-import 
org.apache.shardingsphere.data.pipeline.core.job.AbstractSimplePipelineJob;
+import 
org.apache.shardingsphere.data.pipeline.core.job.AbstractSeparablePipelineJob;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration;
@@ -36,7 +36,7 @@ import java.util.Optional;
  * Consistency check job.
  */
 @Slf4j
-public final class ConsistencyCheckJob extends AbstractSimplePipelineJob {
+public final class ConsistencyCheckJob extends AbstractSeparablePipelineJob {
     
     public ConsistencyCheckJob(final String jobId) {
         super(jobId);
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index e9a3b6440ea..b990ef1a620 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -37,7 +37,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.spi.algorithm.JobRateLimitAl
 import 
org.apache.shardingsphere.data.pipeline.core.util.ShardingColumnsExtractor;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
-import 
org.apache.shardingsphere.data.pipeline.core.job.AbstractSimplePipelineJob;
+import 
org.apache.shardingsphere.data.pipeline.core.job.AbstractSeparablePipelineJob;
 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
@@ -66,7 +66,7 @@ import java.util.stream.Collectors;
  * Migration job.
  */
 @Slf4j
-public final class MigrationJob extends AbstractSimplePipelineJob {
+public final class MigrationJob extends AbstractSeparablePipelineJob {
     
     private final PipelineJobItemManager<TransmissionJobItemProgress> 
jobItemManager = new PipelineJobItemManager<>(new 
MigrationJobType().getYamlJobItemProgressSwapper());
     

Reply via email to