This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new f218cf727c0 Merge AbstractInseparablePipelineJob and CDCJob (#32759)
f218cf727c0 is described below

commit f218cf727c0f0c7e813270368fdfa6308b92bd83
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Sep 1 17:27:45 2024 +0800

    Merge AbstractInseparablePipelineJob and CDCJob (#32759)
    
    * Merge AbstractInseparablePipelineJob and CDCJob
    
    * Merge AbstractInseparablePipelineJob and CDCJob
    
    * Merge AbstractInseparablePipelineJob and CDCJob
    
    * Merge AbstractInseparablePipelineJob and CDCJob
    
    * Merge AbstractInseparablePipelineJob and CDCJob
---
 .../core/job/AbstractInseparablePipelineJob.java   | 178 ---------------------
 .../shardingsphere/data/pipeline/cdc/CDCJob.java   | 139 ++++++++++++----
 2 files changed, 112 insertions(+), 205 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
deleted file mode 100644
index 3d3047bcdea..00000000000
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.job;
-
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
-import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
-import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext;
-import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
-import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
-import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.finished.IngestFinishedPosition;
-import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
-import 
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.core.job.engine.PipelineJobRunnerManager;
-import 
org.apache.shardingsphere.data.pipeline.core.job.engine.cleaner.PipelineJobRunnerCleaner;
-import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
-import 
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobItemProgress;
-import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfigurationUtils;
-import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
-import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
-import 
org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.PipelineGovernanceFacade;
-import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
-import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
-import org.apache.shardingsphere.elasticjob.api.ShardingContext;
-
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.concurrent.CompletableFuture;
-
-/**
- * Abstract inseparable pipeline job.
- *
- * @param <T> type of pipeline job configuration
- * @param <I> type of pipeline job item context
- * @param <P> type of pipeline job item progress
- */
-@Getter
-@Slf4j
-public abstract class AbstractInseparablePipelineJob<T extends 
PipelineJobConfiguration, I extends PipelineJobItemContext, P extends 
PipelineJobItemProgress> implements PipelineJob {
-    
-    private final PipelineJobRunnerManager jobRunnerManager;
-    
-    protected AbstractInseparablePipelineJob(final PipelineJobRunnerCleaner 
cleaner) {
-        jobRunnerManager = new PipelineJobRunnerManager(cleaner);
-    }
-    
-    @SuppressWarnings("unchecked")
-    @Override
-    public final void execute(final ShardingContext shardingContext) {
-        String jobId = shardingContext.getJobName();
-        log.info("Execute job {}", jobId);
-        PipelineJobType jobType = PipelineJobIdUtils.parseJobType(jobId);
-        PipelineContextKey contextKey = 
PipelineJobIdUtils.parseContextKey(jobId);
-        T jobConfig = (T) 
jobType.getYamlJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
-        PipelineJobItemManager<P> jobItemManager = new 
PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
-        TransmissionProcessContext jobProcessContext = 
createTransmissionProcessContext(jobId, jobType, contextKey);
-        PipelineGovernanceFacade governanceFacade = 
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey);
-        Collection<I> jobItemContexts = new LinkedList<>();
-        for (int shardingItem = 0; shardingItem < 
jobConfig.getJobShardingCount(); shardingItem++) {
-            if (jobRunnerManager.isStopping()) {
-                log.info("Job is stopping, ignore.");
-                return;
-            }
-            P jobItemProgress = 
jobItemManager.getProgress(shardingContext.getJobName(), 
shardingItem).orElse(null);
-            I jobItemContext = buildJobItemContext(jobConfig, shardingItem, 
jobItemProgress, jobProcessContext);
-            if (!jobRunnerManager.addTasksRunner(shardingItem, 
buildTasksRunner(jobItemContext))) {
-                continue;
-            }
-            jobItemContexts.add(jobItemContext);
-            governanceFacade.getJobItemFacade().getErrorMessage().clean(jobId, 
shardingItem);
-            log.info("Start tasks runner, jobId={}, shardingItem={}.", jobId, 
shardingItem);
-        }
-        if (jobItemContexts.isEmpty()) {
-            log.warn("Job item contexts are empty, ignore.");
-            return;
-        }
-        prepare(jobItemContexts, governanceFacade);
-        executeInventoryTasks(jobItemContexts, jobItemManager);
-        executeIncrementalTasks(jobItemContexts, jobItemManager);
-    }
-    
-    private TransmissionProcessContext createTransmissionProcessContext(final 
String jobId, final PipelineJobType jobType, final PipelineContextKey 
contextKey) {
-        if (!jobType.isTransmissionJob()) {
-            return null;
-        }
-        PipelineProcessConfiguration processConfig = 
PipelineProcessConfigurationUtils.fillInDefaultValue(new 
PipelineProcessConfigurationPersistService().load(contextKey, 
jobType.getType()));
-        return new TransmissionProcessContext(jobId, processConfig);
-    }
-    
-    protected abstract I buildJobItemContext(T jobConfig, int shardingItem, P 
jobItemProgress, TransmissionProcessContext jobProcessContext);
-    
-    protected abstract PipelineTasksRunner buildTasksRunner(I jobItemContext);
-    
-    private void prepare(final Collection<I> jobItemContexts, final 
PipelineGovernanceFacade governanceFacade) {
-        try {
-            doPrepare(jobItemContexts);
-            // CHECKSTYLE:OFF
-        } catch (final RuntimeException ex) {
-            // CHECKSTYLE:ON
-            for (PipelineJobItemContext each : jobItemContexts) {
-                processFailed(each.getJobId(), each.getShardingItem(), ex, 
governanceFacade);
-            }
-            throw ex;
-        }
-    }
-    
-    protected abstract void doPrepare(Collection<I> jobItemContexts);
-    
-    private void processFailed(final String jobId, final int shardingItem, 
final Exception ex, final PipelineGovernanceFacade governanceFacade) {
-        log.error("Job {}-{} execution failed.", jobId, shardingItem, ex);
-        governanceFacade.getJobItemFacade().getErrorMessage().update(jobId, 
shardingItem, ex);
-        PipelineJobRegistry.stop(jobId);
-        processFailed(jobId);
-    }
-    
-    protected abstract void processFailed(String jobId);
-    
-    private void executeInventoryTasks(final Collection<I> jobItemContexts, 
final PipelineJobItemManager<P> jobItemManager) {
-        Collection<CompletableFuture<?>> futures = new LinkedList<>();
-        for (I each : jobItemContexts) {
-            updateJobItemStatus(each, JobStatus.EXECUTE_INVENTORY_TASK, 
jobItemManager);
-            for (PipelineTask task : ((TransmissionJobItemContext) 
each).getInventoryTasks()) {
-                if (task.getTaskProgress().getPosition() instanceof 
IngestFinishedPosition) {
-                    continue;
-                }
-                futures.addAll(task.start());
-            }
-        }
-        if (futures.isEmpty()) {
-            return;
-        }
-        ExecuteEngine.trigger(futures, buildExecuteCallback("inventory", 
jobItemContexts.iterator().next()));
-    }
-    
-    private void executeIncrementalTasks(final Collection<I> jobItemContexts, 
final PipelineJobItemManager<P> jobItemManager) {
-        Collection<CompletableFuture<?>> futures = new LinkedList<>();
-        for (I each : jobItemContexts) {
-            if (JobStatus.EXECUTE_INCREMENTAL_TASK == each.getStatus()) {
-                log.info("Job status has already EXECUTE_INCREMENTAL_TASK, 
ignore.");
-                return;
-            }
-            updateJobItemStatus(each, JobStatus.EXECUTE_INCREMENTAL_TASK, 
jobItemManager);
-            for (PipelineTask task : ((TransmissionJobItemContext) 
each).getIncrementalTasks()) {
-                if (task.getTaskProgress().getPosition() instanceof 
IngestFinishedPosition) {
-                    continue;
-                }
-                futures.addAll(task.start());
-            }
-        }
-        ExecuteEngine.trigger(futures, buildExecuteCallback("incremental", 
jobItemContexts.iterator().next()));
-    }
-    
-    private void updateJobItemStatus(final I jobItemContext, final JobStatus 
jobStatus, final PipelineJobItemManager<P> jobItemManager) {
-        jobItemContext.setStatus(jobStatus);
-        jobItemManager.updateStatus(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), jobStatus);
-    }
-    
-    protected abstract ExecuteCallback buildExecuteCallback(String identifier, 
I jobItemContext);
-}
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
index 8fcd1de568f..71474b4f6c5 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
@@ -31,59 +31,104 @@ import 
org.apache.shardingsphere.data.pipeline.cdc.core.prepare.CDCJobPreparer;
 import org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCTasksRunner;
 import org.apache.shardingsphere.data.pipeline.cdc.engine.CDCJobRunnerCleaner;
 import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseUtils;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
+import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
+import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
 import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLine;
 import 
org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLineConvertUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.config.PipelineDataSourceConfigurationFactory;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
+import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
 import 
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
 import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.DumperCommonContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.TableAndSchemaNameMapper;
-import 
org.apache.shardingsphere.data.pipeline.core.job.AbstractInseparablePipelineJob;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.finished.IngestFinishedPosition;
+import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJob;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
 import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
+import 
org.apache.shardingsphere.data.pipeline.core.job.engine.PipelineJobRunnerManager;
 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfigurationUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineWriteConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
+import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
 import 
org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
-import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
+import 
org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.PipelineGovernanceFacade;
+import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
 import 
org.apache.shardingsphere.data.pipeline.core.util.ShardingColumnsExtractor;
+import org.apache.shardingsphere.elasticjob.api.ShardingContext;
 import 
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 
 import java.util.Collection;
+import java.util.LinkedList;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 
 /**
  * CDC job.
  */
 @Slf4j
-public final class CDCJob extends 
AbstractInseparablePipelineJob<CDCJobConfiguration, CDCJobItemContext, 
TransmissionJobItemProgress> {
+public final class CDCJob implements PipelineJob {
     
-    private final CDCJobAPI jobAPI = (CDCJobAPI) 
TypedSPILoader.getService(TransmissionJobAPI.class, "STREAMING");
+    @Getter
+    private final PipelineJobRunnerManager jobRunnerManager;
     
-    private final CDCJobPreparer jobPreparer = new CDCJobPreparer();
+    private final CDCJobAPI jobAPI = (CDCJobAPI) 
TypedSPILoader.getService(TransmissionJobAPI.class, "STREAMING");
     
     @Getter
     private final PipelineSink sink;
     
     public CDCJob(final PipelineSink sink) {
-        super(new CDCJobRunnerCleaner(sink));
+        jobRunnerManager = new PipelineJobRunnerManager(new 
CDCJobRunnerCleaner(sink));
         this.sink = sink;
     }
     
     @Override
-    protected CDCJobItemContext buildJobItemContext(final CDCJobConfiguration 
jobConfig,
-                                                    final int shardingItem, 
final TransmissionJobItemProgress jobItemProgress, final 
TransmissionProcessContext jobProcessContext) {
-        CDCTaskConfiguration taskConfig = buildTaskConfiguration(jobConfig, 
shardingItem, jobProcessContext.getProcessConfiguration());
-        return new CDCJobItemContext(jobConfig, shardingItem, jobItemProgress, 
jobProcessContext, taskConfig, getJobRunnerManager().getDataSourceManager(), 
sink);
+    public void execute(final ShardingContext shardingContext) {
+        String jobId = shardingContext.getJobName();
+        log.info("Execute job {}", jobId);
+        PipelineJobType jobType = PipelineJobIdUtils.parseJobType(jobId);
+        PipelineContextKey contextKey = 
PipelineJobIdUtils.parseContextKey(jobId);
+        CDCJobConfiguration jobConfig = (CDCJobConfiguration) 
jobType.getYamlJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
+        PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = 
new PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
+        TransmissionProcessContext jobProcessContext = new 
TransmissionProcessContext(
+                jobId, 
PipelineProcessConfigurationUtils.fillInDefaultValue(new 
PipelineProcessConfigurationPersistService().load(contextKey, 
jobType.getType())));
+        PipelineGovernanceFacade governanceFacade = 
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey);
+        Collection<CDCJobItemContext> jobItemContexts = new LinkedList<>();
+        for (int shardingItem = 0; shardingItem < 
jobConfig.getJobShardingCount(); shardingItem++) {
+            if (jobRunnerManager.isStopping()) {
+                log.info("Job is stopping, ignore.");
+                return;
+            }
+            TransmissionJobItemProgress jobItemProgress = 
jobItemManager.getProgress(shardingContext.getJobName(), 
shardingItem).orElse(null);
+            CDCTaskConfiguration taskConfig = 
buildTaskConfiguration(jobConfig, shardingItem, 
jobProcessContext.getProcessConfiguration());
+            CDCJobItemContext jobItemContext = new 
CDCJobItemContext(jobConfig, shardingItem, jobItemProgress, jobProcessContext, 
taskConfig, getJobRunnerManager().getDataSourceManager(), sink);
+            if (!jobRunnerManager.addTasksRunner(shardingItem, new 
CDCTasksRunner(jobItemContext))) {
+                continue;
+            }
+            jobItemContexts.add(jobItemContext);
+            governanceFacade.getJobItemFacade().getErrorMessage().clean(jobId, 
shardingItem);
+            log.info("Start tasks runner, jobId={}, shardingItem={}.", jobId, 
shardingItem);
+        }
+        if (jobItemContexts.isEmpty()) {
+            log.warn("Job item contexts are empty, ignore.");
+            return;
+        }
+        initTasks(jobItemContexts, governanceFacade);
+        executeInventoryTasks(jobItemContexts, jobItemManager);
+        executeIncrementalTasks(jobItemContexts, jobItemManager);
     }
     
     private CDCTaskConfiguration buildTaskConfiguration(final 
CDCJobConfiguration jobConfig, final int jobShardingItem, final 
PipelineProcessConfiguration processConfig) {
@@ -101,8 +146,8 @@ public final class CDCJob extends 
AbstractInseparablePipelineJob<CDCJobConfigura
         return new IncrementalDumperContext(dumperCommonContext, 
jobConfig.getJobId(), jobConfig.isDecodeWithTX());
     }
     
-    private ImporterConfiguration buildImporterConfiguration(final 
CDCJobConfiguration jobConfig, final PipelineProcessConfiguration 
pipelineProcessConfig, final Collection<String> schemaTableNames,
-                                                             final 
TableAndSchemaNameMapper mapper) {
+    private ImporterConfiguration buildImporterConfiguration(final 
CDCJobConfiguration jobConfig, final PipelineProcessConfiguration 
pipelineProcessConfig,
+                                                             final 
Collection<String> schemaTableNames, final TableAndSchemaNameMapper mapper) {
         PipelineDataSourceConfiguration dataSourceConfig = 
PipelineDataSourceConfigurationFactory.newInstance(
                 jobConfig.getDataSourceConfig().getType(), 
jobConfig.getDataSourceConfig().getParameter());
         Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap = new 
ShardingColumnsExtractor()
@@ -113,24 +158,64 @@ public final class CDCJob extends 
AbstractInseparablePipelineJob<CDCJobConfigura
         return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap, 
mapper, write.getBatchSize(), writeRateLimitAlgorithm, 0, 1);
     }
     
-    @Override
-    protected PipelineTasksRunner buildTasksRunner(final CDCJobItemContext 
jobItemContext) {
-        return new CDCTasksRunner(jobItemContext);
+    private void initTasks(final Collection<CDCJobItemContext> 
jobItemContexts, final PipelineGovernanceFacade governanceFacade) {
+        try {
+            new CDCJobPreparer().initTasks(jobItemContexts);
+            // CHECKSTYLE:OFF
+        } catch (final RuntimeException ex) {
+            // CHECKSTYLE:ON
+            for (PipelineJobItemContext each : jobItemContexts) {
+                initTasksFailed(each.getJobId(), each.getShardingItem(), ex, 
governanceFacade);
+            }
+            throw ex;
+        }
     }
     
-    @Override
-    protected void doPrepare(final Collection<CDCJobItemContext> 
jobItemContexts) {
-        jobPreparer.initTasks(jobItemContexts);
+    private void initTasksFailed(final String jobId, final int shardingItem, 
final Exception ex, final PipelineGovernanceFacade governanceFacade) {
+        log.error("Job {}-{} execution failed.", jobId, shardingItem, ex);
+        governanceFacade.getJobItemFacade().getErrorMessage().update(jobId, 
shardingItem, ex);
+        PipelineJobRegistry.stop(jobId);
+        jobAPI.disable(jobId);
     }
     
-    @Override
-    protected void processFailed(final String jobId) {
-        jobAPI.disable(jobId);
+    private void executeInventoryTasks(final Collection<CDCJobItemContext> 
jobItemContexts, final PipelineJobItemManager<TransmissionJobItemProgress> 
jobItemManager) {
+        Collection<CompletableFuture<?>> futures = new LinkedList<>();
+        for (CDCJobItemContext each : jobItemContexts) {
+            updateJobItemStatus(each, JobStatus.EXECUTE_INVENTORY_TASK, 
jobItemManager);
+            for (PipelineTask task : ((TransmissionJobItemContext) 
each).getInventoryTasks()) {
+                if (task.getTaskProgress().getPosition() instanceof 
IngestFinishedPosition) {
+                    continue;
+                }
+                futures.addAll(task.start());
+            }
+        }
+        if (futures.isEmpty()) {
+            return;
+        }
+        ExecuteEngine.trigger(futures, new CDCExecuteCallback("inventory", 
jobItemContexts.iterator().next()));
     }
     
-    @Override
-    protected ExecuteCallback buildExecuteCallback(final String identifier, 
final CDCJobItemContext jobItemContext) {
-        return new CDCExecuteCallback(identifier, jobItemContext);
+    private void executeIncrementalTasks(final Collection<CDCJobItemContext> 
jobItemContexts, final PipelineJobItemManager<TransmissionJobItemProgress> 
jobItemManager) {
+        Collection<CompletableFuture<?>> futures = new LinkedList<>();
+        for (CDCJobItemContext each : jobItemContexts) {
+            if (JobStatus.EXECUTE_INCREMENTAL_TASK == each.getStatus()) {
+                log.info("Job status has already EXECUTE_INCREMENTAL_TASK, 
ignore.");
+                return;
+            }
+            updateJobItemStatus(each, JobStatus.EXECUTE_INCREMENTAL_TASK, 
jobItemManager);
+            for (PipelineTask task : ((TransmissionJobItemContext) 
each).getIncrementalTasks()) {
+                if (task.getTaskProgress().getPosition() instanceof 
IngestFinishedPosition) {
+                    continue;
+                }
+                futures.addAll(task.start());
+            }
+        }
+        ExecuteEngine.trigger(futures, new CDCExecuteCallback("incremental", 
jobItemContexts.iterator().next()));
+    }
+    
+    private void updateJobItemStatus(final CDCJobItemContext jobItemContext, 
final JobStatus jobStatus, final 
PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager) {
+        jobItemContext.setStatus(jobStatus);
+        jobItemManager.updateStatus(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), jobStatus);
     }
     
     @RequiredArgsConstructor
@@ -143,15 +228,15 @@ public final class CDCJob extends 
AbstractInseparablePipelineJob<CDCJobConfigura
         @Override
         public void onSuccess() {
             if (jobItemContext.isStopping()) {
-                log.info("onSuccess, stopping true, ignore");
+                log.info("Job is stopping, ignore.");
                 return;
             }
-            log.info("onSuccess, all {} tasks finished.", identifier);
+            log.info("All {} tasks finished successful.", identifier);
         }
         
         @Override
         public void onFailure(final Throwable throwable) {
-            log.error("onFailure, {} task execute failed.", identifier, 
throwable);
+            log.error("Task {} execute failed.", identifier, throwable);
             String jobId = jobItemContext.getJobId();
             
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().update(jobId,
 jobItemContext.getShardingItem(), throwable);
             if (jobItemContext.getSink() instanceof PipelineCDCSocketSink) {

Reply via email to