This is an automated email from the ASF dual-hosted git repository.

jianglongtao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 0df0325d9b0 Add CDCJobAPI (#29220)
0df0325d9b0 is described below

commit 0df0325d9b0aad185eb2b269a3bd214fb79b4d24
Author: Liang Zhang <[email protected]>
AuthorDate: Mon Nov 27 23:17:41 2023 +0800

    Add CDCJobAPI (#29220)
    
    * Refactor MigrationJobOption
    
    * Refactor MigrationJobOption
    
    * Add CDCJobAPI
    
    * Add CDCJobAPI
---
 .../core/job/option/TransmissionJobOption.java     |  16 +-
 .../handler/update/DropStreamingUpdater.java       |   8 +-
 .../api/impl/{CDCJobOption.java => CDCJobAPI.java} | 168 ++++------------
 .../data/pipeline/cdc/api/impl/CDCJobOption.java   | 222 ++-------------------
 .../data/pipeline/cdc/core/job/CDCJob.java         |  17 +-
 .../pipeline/cdc/handler/CDCBackendHandler.java    |  15 +-
 ...ta.pipeline.core.job.service.TransmissionJobAPI |  18 ++
 .../migration/api/impl/MigrationJobOption.java     |  45 ++---
 8 files changed, 132 insertions(+), 377 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/TransmissionJobOption.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/TransmissionJobOption.java
index edf23ebc525..a9a630ec205 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/TransmissionJobOption.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/TransmissionJobOption.java
@@ -47,6 +47,14 @@ public interface TransmissionJobOption extends 
PipelineJobOption {
      */
     PipelineJobInfo getJobInfo(String jobId);
     
+    /**
+     * Extend YAML job configuration.
+     *
+     * @param contextKey context key
+     * @param yamlJobConfig YAML job configuration
+     */
+    void extendYamlJobConfiguration(PipelineContextKey contextKey, 
YamlPipelineJobConfiguration yamlJobConfig);
+    
     /**
      * Build task configuration.
      *
@@ -65,14 +73,6 @@ public interface TransmissionJobOption extends 
PipelineJobOption {
      */
     TransmissionProcessContext buildProcessContext(PipelineJobConfiguration 
jobConfig);
     
-    /**
-     * Extend YAML job configuration.
-     *
-     * @param contextKey context key
-     * @param yamlJobConfig YAML job configuration
-     */
-    void extendYamlJobConfiguration(PipelineContextKey contextKey, 
YamlPipelineJobConfiguration yamlJobConfig);
-    
     /**
      * Build pipeline data consistency checker.
      *
diff --git 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/update/DropStreamingUpdater.java
 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/update/DropStreamingUpdater.java
index 87a6a73050e..3befad8d811 100644
--- 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/update/DropStreamingUpdater.java
+++ 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/update/DropStreamingUpdater.java
@@ -18,8 +18,10 @@
 package org.apache.shardingsphere.cdc.distsql.handler.update;
 
 import org.apache.shardingsphere.cdc.distsql.statement.DropStreamingStatement;
-import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobOption;
+import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI;
 import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
+import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 
 import java.sql.SQLException;
 
@@ -28,11 +30,11 @@ import java.sql.SQLException;
  */
 public final class DropStreamingUpdater implements 
RALUpdater<DropStreamingStatement> {
     
-    private final CDCJobOption jobAPI = new CDCJobOption();
+    private final CDCJobAPI jobAPI = (CDCJobAPI) 
TypedSPILoader.getService(TransmissionJobAPI.class, "STREAMING");
     
     @Override
     public void executeUpdate(final String databaseName, final 
DropStreamingStatement sqlStatement) throws SQLException {
-        jobAPI.dropStreaming(sqlStatement.getJobId());
+        jobAPI.drop(sqlStatement.getJobId());
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobOption.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
similarity index 64%
copy from 
kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobOption.java
copy to 
kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 48f6bf3bb61..ef06ee45dcf 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobOption.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -17,46 +17,28 @@
 
 package org.apache.shardingsphere.data.pipeline.cdc.api.impl;
 
-import com.google.common.base.Strings;
 import lombok.extern.slf4j.Slf4j;
-import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.cdc.api.pojo.StreamDataParameter;
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
 import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
-import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext;
 import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJob;
-import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJobId;
 import 
org.apache.shardingsphere.data.pipeline.cdc.yaml.config.YamlCDCJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.cdc.yaml.config.YamlCDCJobConfiguration.YamlSinkConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.cdc.yaml.swapper.YamlCDCJobConfigurationSwapper;
-import 
org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextManager;
-import 
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
 import 
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeEntry;
 import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
 import 
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLineConvertUtils;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager;
-import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceConfigurationFactory;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipelineDataSourceConfigurationSwapper;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.JobItemIncrementalTasksProgress;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress;
-import 
org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
-import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
-import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData;
 import 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.PipelineGovernanceFacade;
-import 
org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.common.task.progress.IncrementalTaskProgress;
-import 
org.apache.shardingsphere.data.pipeline.common.util.ShardingColumnsExtractor;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationWithInvalidShardingCountException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
@@ -66,12 +48,11 @@ import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.Increm
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
-import 
org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
-import 
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparerUtils;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import 
org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
@@ -87,20 +68,18 @@ import 
org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigur
 
 import java.sql.SQLException;
 import java.time.LocalDateTime;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Properties;
-import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
- * CDC job option.
+ * CDC job API.
  */
 @Slf4j
-public final class CDCJobOption implements TransmissionJobOption {
+public final class CDCJobAPI implements TransmissionJobAPI {
     
     private final YamlDataSourceConfigurationSwapper dataSourceConfigSwapper = 
new YamlDataSourceConfigurationSwapper();
     
@@ -108,26 +87,28 @@ public final class CDCJobOption implements 
TransmissionJobOption {
     
     private final YamlPipelineDataSourceConfigurationSwapper 
pipelineDataSourceConfigSwapper = new 
YamlPipelineDataSourceConfigurationSwapper();
     
+    private final CDCJobOption jobOption = new CDCJobOption();
+    
     /**
-     * Create CDC job config.
+     * Create CDC job.
      *
-     * @param param create CDC job param
+     * @param param CDC job parameter
      * @param sinkType sink type
      * @param sinkProps sink properties
      * @return job id
      */
-    public String createJob(final StreamDataParameter param, final CDCSinkType 
sinkType, final Properties sinkProps) {
+    public String create(final StreamDataParameter param, final CDCSinkType 
sinkType, final Properties sinkProps) {
         PipelineContextKey contextKey = new 
PipelineContextKey(param.getDatabaseName(), InstanceType.PROXY);
         YamlCDCJobConfiguration yamlJobConfig = 
getYamlCDCJobConfiguration(param, sinkType, sinkProps, contextKey);
-        extendYamlJobConfiguration(contextKey, yamlJobConfig);
+        jobOption.extendYamlJobConfiguration(contextKey, yamlJobConfig);
         CDCJobConfiguration jobConfig = new 
YamlCDCJobConfigurationSwapper().swapToObject(yamlJobConfig);
         ShardingSpherePreconditions.checkState(0 != 
jobConfig.getJobShardingCount(), () -> new 
PipelineJobCreationWithInvalidShardingCountException(jobConfig.getJobId()));
         PipelineGovernanceFacade governanceFacade = 
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()));
         if 
(governanceFacade.getJobFacade().getConfiguration().isExisted(jobConfig.getJobId()))
 {
             log.warn("CDC job already exists in registry center, ignore, job 
id is `{}`", jobConfig.getJobId());
         } else {
-            
governanceFacade.getJobFacade().getJob().create(jobConfig.getJobId(), 
getJobClass());
-            JobConfigurationPOJO jobConfigPOJO = new 
PipelineJobConfigurationManager(this).convertToJobConfigurationPOJO(jobConfig);
+            
governanceFacade.getJobFacade().getJob().create(jobConfig.getJobId(), 
jobOption.getJobClass());
+            JobConfigurationPOJO jobConfigPOJO = new 
PipelineJobConfigurationManager(jobOption).convertToJobConfigurationPOJO(jobConfig);
             jobConfigPOJO.setDisabled(true);
             
governanceFacade.getJobFacade().getConfiguration().persist(jobConfig.getJobId(),
 jobConfigPOJO);
             if (!param.isFull()) {
@@ -171,7 +152,7 @@ public final class CDCJobOption implements 
TransmissionJobOption {
     
     private void initIncrementalPosition(final CDCJobConfiguration jobConfig) {
         String jobId = jobConfig.getJobId();
-        PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = 
new PipelineJobItemManager<>(getYamlJobItemProgressSwapper());
+        PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = 
new PipelineJobItemManager<>(jobOption.getYamlJobItemProgressSwapper());
         try (PipelineDataSourceManager pipelineDataSourceManager = new 
DefaultPipelineDataSourceManager()) {
             for (int i = 0; i < jobConfig.getJobShardingCount(); i++) {
                 if (jobItemManager.getProgress(jobId, i).isPresent()) {
@@ -180,13 +161,22 @@ public final class CDCJobOption implements 
TransmissionJobOption {
                 IncrementalDumperContext dumperContext = 
buildDumperContext(jobConfig, i, new 
TableAndSchemaNameMapper(jobConfig.getSchemaTableNames()));
                 TransmissionJobItemProgress jobItemProgress = 
getTransmissionJobItemProgress(jobConfig, pipelineDataSourceManager, 
dumperContext);
                 
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getProcess().persist(
-                        jobId, i, 
YamlEngine.marshal(getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress)));
+                        jobId, i, 
YamlEngine.marshal(jobOption.getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress)));
             }
         } catch (final SQLException ex) {
             throw new PrepareJobWithGetBinlogPositionException(jobId, ex);
         }
     }
     
+    private IncrementalDumperContext buildDumperContext(final 
CDCJobConfiguration jobConfig, final int jobShardingItem, final 
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
+        JobDataNodeLine dataNodeLine = 
jobConfig.getJobShardingDataNodes().get(jobShardingItem);
+        String dataSourceName = 
dataNodeLine.getEntries().iterator().next().getDataNodes().iterator().next().getDataSourceName();
+        StandardPipelineDataSourceConfiguration actualDataSourceConfig = 
jobConfig.getDataSourceConfig().getActualDataSourceConfiguration(dataSourceName);
+        return new IncrementalDumperContext(
+                new DumperCommonContext(dataSourceName, 
actualDataSourceConfig, 
JobDataNodeLineConvertUtils.buildTableNameMapper(dataNodeLine), 
tableAndSchemaNameMapper),
+                jobConfig.getJobId(), jobConfig.isDecodeWithTX());
+    }
+    
     private static TransmissionJobItemProgress 
getTransmissionJobItemProgress(final CDCJobConfiguration jobConfig,
                                                                               
final PipelineDataSourceManager dataSourceManager,
                                                                               
final IncrementalDumperContext incrementalDumperContext) throws SQLException {
@@ -199,118 +189,51 @@ public final class CDCJobOption implements 
TransmissionJobOption {
     }
     
     /**
-     * Start job.
+     * Start CDC job.
      *
      * @param jobId job id
      * @param sink sink
      */
-    public void startJob(final String jobId, final PipelineSink sink) {
+    public void start(final String jobId, final PipelineSink sink) {
         CDCJob job = new CDCJob(jobId, sink);
         PipelineJobCenter.addJob(jobId, job);
-        updateJobConfigurationDisabled(jobId, false);
+        enable(jobId);
         JobConfigurationPOJO jobConfigPOJO = 
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
         OneOffJobBootstrap oneOffJobBootstrap = new 
OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(PipelineJobIdUtils.parseContextKey(jobId)),
 job, jobConfigPOJO.toJobConfiguration());
         job.setJobBootstrap(oneOffJobBootstrap);
         oneOffJobBootstrap.execute();
     }
     
+    private void enable(final String jobId) {
+        JobConfigurationPOJO jobConfigPOJO = 
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
+        jobConfigPOJO.setDisabled(false);
+        jobConfigPOJO.getProps().setProperty("start_time_millis", 
String.valueOf(System.currentTimeMillis()));
+        jobConfigPOJO.getProps().remove("stop_time_millis");
+        
PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobConfigPOJO.getJobName())).updateJobConfiguration(jobConfigPOJO);
+    }
+    
     /**
-     * Update job configuration disabled.
+     * Disable CDC job.
      *
      * @param jobId job id
-     * @param disabled disabled
      */
-    public void updateJobConfigurationDisabled(final String jobId, final 
boolean disabled) {
+    public void disable(final String jobId) {
         JobConfigurationPOJO jobConfigPOJO = 
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
-        jobConfigPOJO.setDisabled(disabled);
-        if (disabled) {
-            jobConfigPOJO.getProps().setProperty("stop_time", 
LocalDateTime.now().format(DateTimeFormatterFactory.getStandardFormatter()));
-            jobConfigPOJO.getProps().setProperty("stop_time_millis", 
String.valueOf(System.currentTimeMillis()));
-        } else {
-            jobConfigPOJO.getProps().setProperty("start_time_millis", 
String.valueOf(System.currentTimeMillis()));
-            jobConfigPOJO.getProps().remove("stop_time_millis");
-        }
+        jobConfigPOJO.setDisabled(true);
+        jobConfigPOJO.getProps().setProperty("stop_time", 
LocalDateTime.now().format(DateTimeFormatterFactory.getStandardFormatter()));
+        jobConfigPOJO.getProps().setProperty("stop_time_millis", 
String.valueOf(System.currentTimeMillis()));
         
PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobConfigPOJO.getJobName())).updateJobConfiguration(jobConfigPOJO);
     }
     
-    @Override
-    public void extendYamlJobConfiguration(final PipelineContextKey 
contextKey, final YamlPipelineJobConfiguration yamlJobConfig) {
-        YamlCDCJobConfiguration config = (YamlCDCJobConfiguration) 
yamlJobConfig;
-        if (null == yamlJobConfig.getJobId()) {
-            config.setJobId(new CDCJobId(contextKey, 
config.getSchemaTableNames(), config.isFull(), 
config.getSinkConfig().getSinkType()).marshal());
-        }
-        if (Strings.isNullOrEmpty(config.getSourceDatabaseType())) {
-            PipelineDataSourceConfiguration sourceDataSourceConfig = 
PipelineDataSourceConfigurationFactory.newInstance(config.getDataSourceConfiguration().getType(),
-                    config.getDataSourceConfiguration().getParameter());
-            
config.setSourceDatabaseType(sourceDataSourceConfig.getDatabaseType().getType());
-        }
-    }
-    
-    @Override
-    public CDCTaskConfiguration buildTaskConfiguration(final 
PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final 
PipelineProcessConfiguration processConfig) {
-        CDCJobConfiguration jobConfig = (CDCJobConfiguration) 
pipelineJobConfig;
-        TableAndSchemaNameMapper tableAndSchemaNameMapper = new 
TableAndSchemaNameMapper(jobConfig.getSchemaTableNames());
-        IncrementalDumperContext dumperContext = buildDumperContext(jobConfig, 
jobShardingItem, tableAndSchemaNameMapper);
-        ImporterConfiguration importerConfig = 
buildImporterConfiguration(jobConfig, processConfig, 
jobConfig.getSchemaTableNames(), tableAndSchemaNameMapper);
-        CDCTaskConfiguration result = new CDCTaskConfiguration(dumperContext, 
importerConfig);
-        log.debug("buildTaskConfiguration, result={}", result);
-        return result;
-    }
-    
-    private IncrementalDumperContext buildDumperContext(final 
CDCJobConfiguration jobConfig, final int jobShardingItem, final 
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
-        JobDataNodeLine dataNodeLine = 
jobConfig.getJobShardingDataNodes().get(jobShardingItem);
-        String dataSourceName = 
dataNodeLine.getEntries().iterator().next().getDataNodes().iterator().next().getDataSourceName();
-        StandardPipelineDataSourceConfiguration actualDataSourceConfig = 
jobConfig.getDataSourceConfig().getActualDataSourceConfiguration(dataSourceName);
-        return new IncrementalDumperContext(
-                new DumperCommonContext(dataSourceName, 
actualDataSourceConfig, 
JobDataNodeLineConvertUtils.buildTableNameMapper(dataNodeLine), 
tableAndSchemaNameMapper),
-                jobConfig.getJobId(), jobConfig.isDecodeWithTX());
-    }
-    
-    private ImporterConfiguration buildImporterConfiguration(final 
CDCJobConfiguration jobConfig, final PipelineProcessConfiguration 
pipelineProcessConfig, final Collection<String> schemaTableNames,
-                                                             final 
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
-        PipelineDataSourceConfiguration dataSourceConfig = 
PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getDataSourceConfig().getType(),
-                jobConfig.getDataSourceConfig().getParameter());
-        CDCProcessContext processContext = new 
CDCProcessContext(jobConfig.getJobId(), pipelineProcessConfig);
-        JobRateLimitAlgorithm writeRateLimitAlgorithm = 
processContext.getWriteRateLimitAlgorithm();
-        int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
-        Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap = new 
ShardingColumnsExtractor()
-                
.getShardingColumnsMap(jobConfig.getDataSourceConfig().getRootConfig().getRules(),
 
schemaTableNames.stream().map(CaseInsensitiveIdentifier::new).collect(Collectors.toSet()));
-        return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap, 
tableAndSchemaNameMapper, batchSize, writeRateLimitAlgorithm, 0, 1);
-    }
-    
-    @Override
-    public CDCProcessContext buildProcessContext(final 
PipelineJobConfiguration jobConfig) {
-        TransmissionJobManager jobManager = new TransmissionJobManager(this);
-        return new CDCProcessContext(jobConfig.getJobId(), 
jobManager.showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId())));
-    }
-    
-    @SuppressWarnings("unchecked")
-    @Override
-    public YamlCDCJobConfigurationSwapper getYamlJobConfigurationSwapper() {
-        return new YamlCDCJobConfigurationSwapper();
-    }
-    
-    @Override
-    public PipelineJobInfo getJobInfo(final String jobId) {
-        PipelineJobMetaData jobMetaData = new 
PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
-        CDCJobConfiguration jobConfig = new 
PipelineJobConfigurationManager(this).getJobConfiguration(jobId);
-        return new PipelineJobInfo(jobMetaData, jobConfig.getDatabaseName(), 
String.join(", ", jobConfig.getSchemaTableNames()));
-    }
-    
-    @Override
-    public boolean isForceNoShardingWhenConvertToJobConfigurationPOJO() {
-        return true;
-    }
-    
     /**
-     * Drop streaming job.
+     * Drop CDC job.
      *
      * @param jobId job id
      */
-    public void dropStreaming(final String jobId) {
-        CDCJobConfiguration jobConfig = new 
PipelineJobConfigurationManager(this).getJobConfiguration(jobId);
+    public void drop(final String jobId) {
+        CDCJobConfiguration jobConfig = new 
PipelineJobConfigurationManager(jobOption).getJobConfiguration(jobId);
         
ShardingSpherePreconditions.checkState(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).isDisabled(),
 () -> new PipelineInternalException("Can't drop streaming job which is 
active"));
-        new PipelineJobManager(this).drop(jobId);
+        new PipelineJobManager(jobOption).drop(jobId);
         cleanup(jobConfig);
     }
     
@@ -325,14 +248,11 @@ public final class CDCJobOption implements 
TransmissionJobOption {
     }
     
     @Override
-    public PipelineDataConsistencyChecker buildDataConsistencyChecker(final 
PipelineJobConfiguration jobConfig, final TransmissionProcessContext 
processContext,
-                                                                      final 
ConsistencyCheckJobItemProgressContext progressContext) {
-        throw new UnsupportedOperationException();
+    public void commit(final String jobId) throws SQLException {
     }
     
     @Override
-    public Class<CDCJob> getJobClass() {
-        return CDCJob.class;
+    public void rollback(final String jobId) throws SQLException {
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobOption.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobOption.java
index 48f6bf3bb61..5408034eaac 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobOption.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobOption.java
@@ -20,79 +20,40 @@ package 
org.apache.shardingsphere.data.pipeline.cdc.api.impl;
 import com.google.common.base.Strings;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.cdc.api.pojo.StreamDataParameter;
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
-import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
 import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext;
 import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJob;
 import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJobId;
 import 
org.apache.shardingsphere.data.pipeline.cdc.yaml.config.YamlCDCJobConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.cdc.yaml.config.YamlCDCJobConfiguration.YamlSinkConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.cdc.yaml.swapper.YamlCDCJobConfigurationSwapper;
 import 
org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
-import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextManager;
 import 
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
-import 
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeEntry;
 import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
 import 
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLineConvertUtils;
-import 
org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceConfigurationFactory;
-import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager;
-import 
org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipelineDataSourceConfigurationSwapper;
-import 
org.apache.shardingsphere.data.pipeline.common.job.progress.JobItemIncrementalTasksProgress;
-import 
org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
 import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
 import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData;
-import 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.PipelineGovernanceFacade;
 import 
org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm;
-import 
org.apache.shardingsphere.data.pipeline.common.task.progress.IncrementalTaskProgress;
 import 
org.apache.shardingsphere.data.pipeline.common.util.ShardingColumnsExtractor;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
-import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
-import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationWithInvalidShardingCountException;
-import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
-import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.DumperCommonContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
-import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
-import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
-import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
-import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
-import 
org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparerUtils;
-import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
-import 
org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
-import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
-import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
-import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
-import 
org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
-import org.apache.shardingsphere.infra.util.datetime.DateTimeFormatterFactory;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
-import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
-import 
org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
-import 
org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapperEngine;
 
-import java.sql.SQLException;
-import java.time.LocalDateTime;
 import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -102,135 +63,27 @@ import java.util.stream.Collectors;
 @Slf4j
 public final class CDCJobOption implements TransmissionJobOption {
     
-    private final YamlDataSourceConfigurationSwapper dataSourceConfigSwapper = 
new YamlDataSourceConfigurationSwapper();
-    
-    private final YamlRuleConfigurationSwapperEngine ruleConfigSwapperEngine = 
new YamlRuleConfigurationSwapperEngine();
-    
-    private final YamlPipelineDataSourceConfigurationSwapper 
pipelineDataSourceConfigSwapper = new 
YamlPipelineDataSourceConfigurationSwapper();
-    
-    /**
-     * Create CDC job config.
-     *
-     * @param param create CDC job param
-     * @param sinkType sink type
-     * @param sinkProps sink properties
-     * @return job id
-     */
-    public String createJob(final StreamDataParameter param, final CDCSinkType 
sinkType, final Properties sinkProps) {
-        PipelineContextKey contextKey = new 
PipelineContextKey(param.getDatabaseName(), InstanceType.PROXY);
-        YamlCDCJobConfiguration yamlJobConfig = 
getYamlCDCJobConfiguration(param, sinkType, sinkProps, contextKey);
-        extendYamlJobConfiguration(contextKey, yamlJobConfig);
-        CDCJobConfiguration jobConfig = new 
YamlCDCJobConfigurationSwapper().swapToObject(yamlJobConfig);
-        ShardingSpherePreconditions.checkState(0 != 
jobConfig.getJobShardingCount(), () -> new 
PipelineJobCreationWithInvalidShardingCountException(jobConfig.getJobId()));
-        PipelineGovernanceFacade governanceFacade = 
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()));
-        if 
(governanceFacade.getJobFacade().getConfiguration().isExisted(jobConfig.getJobId()))
 {
-            log.warn("CDC job already exists in registry center, ignore, job 
id is `{}`", jobConfig.getJobId());
-        } else {
-            
governanceFacade.getJobFacade().getJob().create(jobConfig.getJobId(), 
getJobClass());
-            JobConfigurationPOJO jobConfigPOJO = new 
PipelineJobConfigurationManager(this).convertToJobConfigurationPOJO(jobConfig);
-            jobConfigPOJO.setDisabled(true);
-            
governanceFacade.getJobFacade().getConfiguration().persist(jobConfig.getJobId(),
 jobConfigPOJO);
-            if (!param.isFull()) {
-                initIncrementalPosition(jobConfig);
-            }
-        }
-        return jobConfig.getJobId();
-    }
-    
-    private YamlCDCJobConfiguration getYamlCDCJobConfiguration(final 
StreamDataParameter param, final CDCSinkType sinkType, final Properties 
sinkProps, final PipelineContextKey contextKey) {
-        YamlCDCJobConfiguration result = new YamlCDCJobConfiguration();
-        result.setDatabaseName(param.getDatabaseName());
-        result.setSchemaTableNames(param.getSchemaTableNames());
-        result.setFull(param.isFull());
-        result.setDecodeWithTX(param.isDecodeWithTX());
-        YamlSinkConfiguration sinkConfig = new YamlSinkConfiguration();
-        sinkConfig.setSinkType(sinkType.name());
-        sinkConfig.setProps(sinkProps);
-        result.setSinkConfig(sinkConfig);
-        ShardingSphereDatabase database = 
PipelineContextManager.getContext(contextKey).getContextManager().getMetaDataContexts().getMetaData().getDatabase(param.getDatabaseName());
-        
result.setDataSourceConfiguration(pipelineDataSourceConfigSwapper.swapToYamlConfiguration(getDataSourceConfiguration(database)));
-        List<JobDataNodeLine> jobDataNodeLines = 
JobDataNodeLineConvertUtils.convertDataNodesToLines(param.getDataNodesMap());
-        
result.setJobShardingDataNodes(jobDataNodeLines.stream().map(JobDataNodeLine::marshal).collect(Collectors.toList()));
-        JobDataNodeLine tableFirstDataNodes = new 
JobDataNodeLine(param.getDataNodesMap().entrySet().stream()
-                .map(each -> new JobDataNodeEntry(each.getKey(), 
each.getValue().subList(0, 1))).collect(Collectors.toList()));
-        result.setTablesFirstDataNodes(tableFirstDataNodes.marshal());
-        return result;
-    }
-    
-    private ShardingSpherePipelineDataSourceConfiguration 
getDataSourceConfiguration(final ShardingSphereDatabase database) {
-        Map<String, Map<String, Object>> dataSourcePoolProps = new HashMap<>();
-        for (Entry<String, StorageUnit> entry : 
database.getResourceMetaData().getStorageUnits().entrySet()) {
-            dataSourcePoolProps.put(entry.getKey(), 
dataSourceConfigSwapper.swapToMap(entry.getValue().getDataSourcePoolProperties()));
-        }
-        YamlRootConfiguration targetRootConfig = new YamlRootConfiguration();
-        targetRootConfig.setDatabaseName(database.getName());
-        targetRootConfig.setDataSources(dataSourcePoolProps);
-        
targetRootConfig.setRules(ruleConfigSwapperEngine.swapToYamlRuleConfigurations(database.getRuleMetaData().getConfigurations()));
-        return new 
ShardingSpherePipelineDataSourceConfiguration(targetRootConfig);
-    }
-    
-    private void initIncrementalPosition(final CDCJobConfiguration jobConfig) {
-        String jobId = jobConfig.getJobId();
-        PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = 
new PipelineJobItemManager<>(getYamlJobItemProgressSwapper());
-        try (PipelineDataSourceManager pipelineDataSourceManager = new 
DefaultPipelineDataSourceManager()) {
-            for (int i = 0; i < jobConfig.getJobShardingCount(); i++) {
-                if (jobItemManager.getProgress(jobId, i).isPresent()) {
-                    continue;
-                }
-                IncrementalDumperContext dumperContext = 
buildDumperContext(jobConfig, i, new 
TableAndSchemaNameMapper(jobConfig.getSchemaTableNames()));
-                TransmissionJobItemProgress jobItemProgress = 
getTransmissionJobItemProgress(jobConfig, pipelineDataSourceManager, 
dumperContext);
-                
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getProcess().persist(
-                        jobId, i, 
YamlEngine.marshal(getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress)));
-            }
-        } catch (final SQLException ex) {
-            throw new PrepareJobWithGetBinlogPositionException(jobId, ex);
-        }
+    @SuppressWarnings("unchecked")
+    @Override
+    public YamlCDCJobConfigurationSwapper getYamlJobConfigurationSwapper() {
+        return new YamlCDCJobConfigurationSwapper();
     }
     
-    private static TransmissionJobItemProgress 
getTransmissionJobItemProgress(final CDCJobConfiguration jobConfig,
-                                                                              
final PipelineDataSourceManager dataSourceManager,
-                                                                              
final IncrementalDumperContext incrementalDumperContext) throws SQLException {
-        TransmissionJobItemProgress result = new TransmissionJobItemProgress();
-        result.setSourceDatabaseType(jobConfig.getSourceDatabaseType());
-        
result.setDataSourceName(incrementalDumperContext.getCommonContext().getDataSourceName());
-        IncrementalTaskProgress incrementalTaskProgress = new 
IncrementalTaskProgress(PipelineJobPreparerUtils.getIncrementalPosition(null, 
incrementalDumperContext, dataSourceManager));
-        result.setIncremental(new 
JobItemIncrementalTasksProgress(incrementalTaskProgress));
-        return result;
+    @Override
+    public Class<CDCJob> getJobClass() {
+        return CDCJob.class;
     }
     
-    /**
-     * Start job.
-     *
-     * @param jobId job id
-     * @param sink sink
-     */
-    public void startJob(final String jobId, final PipelineSink sink) {
-        CDCJob job = new CDCJob(jobId, sink);
-        PipelineJobCenter.addJob(jobId, job);
-        updateJobConfigurationDisabled(jobId, false);
-        JobConfigurationPOJO jobConfigPOJO = 
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
-        OneOffJobBootstrap oneOffJobBootstrap = new 
OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(PipelineJobIdUtils.parseContextKey(jobId)),
 job, jobConfigPOJO.toJobConfiguration());
-        job.setJobBootstrap(oneOffJobBootstrap);
-        oneOffJobBootstrap.execute();
+    @Override
+    public boolean isForceNoShardingWhenConvertToJobConfigurationPOJO() {
+        return true;
     }
     
-    /**
-     * Update job configuration disabled.
-     *
-     * @param jobId job id
-     * @param disabled disabled
-     */
-    public void updateJobConfigurationDisabled(final String jobId, final 
boolean disabled) {
-        JobConfigurationPOJO jobConfigPOJO = 
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
-        jobConfigPOJO.setDisabled(disabled);
-        if (disabled) {
-            jobConfigPOJO.getProps().setProperty("stop_time", 
LocalDateTime.now().format(DateTimeFormatterFactory.getStandardFormatter()));
-            jobConfigPOJO.getProps().setProperty("stop_time_millis", 
String.valueOf(System.currentTimeMillis()));
-        } else {
-            jobConfigPOJO.getProps().setProperty("start_time_millis", 
String.valueOf(System.currentTimeMillis()));
-            jobConfigPOJO.getProps().remove("stop_time_millis");
-        }
-        
PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobConfigPOJO.getJobName())).updateJobConfiguration(jobConfigPOJO);
+    @Override
+    public PipelineJobInfo getJobInfo(final String jobId) {
+        PipelineJobMetaData jobMetaData = new 
PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
+        CDCJobConfiguration jobConfig = new 
PipelineJobConfigurationManager(this).getJobConfiguration(jobId);
+        return new PipelineJobInfo(jobMetaData, jobConfig.getDatabaseName(), 
String.join(", ", jobConfig.getSchemaTableNames()));
     }
     
     @Override
@@ -284,57 +137,12 @@ public final class CDCJobOption implements 
TransmissionJobOption {
         return new CDCProcessContext(jobConfig.getJobId(), 
jobManager.showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId())));
     }
     
-    @SuppressWarnings("unchecked")
-    @Override
-    public YamlCDCJobConfigurationSwapper getYamlJobConfigurationSwapper() {
-        return new YamlCDCJobConfigurationSwapper();
-    }
-    
-    @Override
-    public PipelineJobInfo getJobInfo(final String jobId) {
-        PipelineJobMetaData jobMetaData = new 
PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
-        CDCJobConfiguration jobConfig = new 
PipelineJobConfigurationManager(this).getJobConfiguration(jobId);
-        return new PipelineJobInfo(jobMetaData, jobConfig.getDatabaseName(), 
String.join(", ", jobConfig.getSchemaTableNames()));
-    }
-    
-    @Override
-    public boolean isForceNoShardingWhenConvertToJobConfigurationPOJO() {
-        return true;
-    }
-    
-    /**
-     * Drop streaming job.
-     *
-     * @param jobId job id
-     */
-    public void dropStreaming(final String jobId) {
-        CDCJobConfiguration jobConfig = new 
PipelineJobConfigurationManager(this).getJobConfiguration(jobId);
-        
ShardingSpherePreconditions.checkState(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).isDisabled(),
 () -> new PipelineInternalException("Can't drop streaming job which is 
active"));
-        new PipelineJobManager(this).drop(jobId);
-        cleanup(jobConfig);
-    }
-    
-    private void cleanup(final CDCJobConfiguration jobConfig) {
-        for (Entry<String, Map<String, Object>> entry : 
jobConfig.getDataSourceConfig().getRootConfig().getDataSources().entrySet()) {
-            try {
-                PipelineJobPreparerUtils.destroyPosition(jobConfig.getJobId(), 
new StandardPipelineDataSourceConfiguration(entry.getValue()));
-            } catch (final SQLException ex) {
-                log.warn("job destroying failed, jobId={}, dataSourceName={}", 
jobConfig.getJobId(), entry.getKey(), ex);
-            }
-        }
-    }
-    
     @Override
     public PipelineDataConsistencyChecker buildDataConsistencyChecker(final 
PipelineJobConfiguration jobConfig, final TransmissionProcessContext 
processContext,
                                                                       final 
ConsistencyCheckJobItemProgressContext progressContext) {
         throw new UnsupportedOperationException();
     }
     
-    @Override
-    public Class<CDCJob> getJobClass() {
-        return CDCJob.class;
-    }
-    
     @Override
     public String getType() {
         return "STREAMING";
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
index 79d53ea16f6..7f477377486 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.data.pipeline.cdc.core.job;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI;
 import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobOption;
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
@@ -44,10 +45,12 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI;
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
 import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
+import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
 
 import java.util.Collection;
@@ -65,9 +68,11 @@ public final class CDCJob extends AbstractPipelineJob 
implements SimpleJob {
     @Getter
     private final PipelineSink sink;
     
-    private final CDCJobOption jobAPI = new CDCJobOption();
+    private final CDCJobOption jobOption = new CDCJobOption();
     
-    private final PipelineJobItemManager<TransmissionJobItemProgress> 
jobItemManager = new 
PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper());
+    private final CDCJobAPI jobAPI = (CDCJobAPI) 
TypedSPILoader.getService(TransmissionJobAPI.class, "STREAMING");
+    
+    private final PipelineJobItemManager<TransmissionJobItemProgress> 
jobItemManager = new 
PipelineJobItemManager<>(jobOption.getYamlJobItemProgressSwapper());
     
     private final CDCJobPreparer jobPreparer = new CDCJobPreparer();
     
@@ -109,8 +114,8 @@ public final class CDCJob extends AbstractPipelineJob 
implements SimpleJob {
     
     private CDCJobItemContext buildPipelineJobItemContext(final 
CDCJobConfiguration jobConfig, final int shardingItem) {
         Optional<TransmissionJobItemProgress> initProgress = 
jobItemManager.getProgress(jobConfig.getJobId(), shardingItem);
-        CDCProcessContext jobProcessContext = 
jobAPI.buildProcessContext(jobConfig);
-        CDCTaskConfiguration taskConfig = 
jobAPI.buildTaskConfiguration(jobConfig, shardingItem, 
jobProcessContext.getPipelineProcessConfig());
+        CDCProcessContext jobProcessContext = 
jobOption.buildProcessContext(jobConfig);
+        CDCTaskConfiguration taskConfig = 
jobOption.buildTaskConfiguration(jobConfig, shardingItem, 
jobProcessContext.getPipelineProcessConfig());
         return new CDCJobItemContext(jobConfig, shardingItem, 
initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager, 
sink);
     }
     
@@ -131,7 +136,7 @@ public final class CDCJob extends AbstractPipelineJob 
implements SimpleJob {
         log.error("job execution failed, {}-{}", jobId, shardingItem, ex);
         
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().update(jobId,
 shardingItem, ex);
         PipelineJobCenter.stop(jobId);
-        jobAPI.updateJobConfigurationDisabled(jobId, true);
+        jobAPI.disable(jobId);
     }
     
     private void executeInventoryTasks(final List<CDCJobItemContext> 
jobItemContexts) {
@@ -212,7 +217,7 @@ public final class CDCJob extends AbstractPipelineJob 
implements SimpleJob {
                 cdcSink.getChannel().writeAndFlush(CDCResponseUtils.failed("", 
"", throwable.getMessage()));
             }
             PipelineJobCenter.stop(jobId);
-            jobAPI.updateJobConfigurationDisabled(jobId, true);
+            jobAPI.disable(jobId);
         }
     }
 }
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
index 849ab9a61c0..a5079896438 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
@@ -21,6 +21,7 @@ import com.google.common.base.Strings;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelId;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI;
 import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobOption;
 import 
org.apache.shardingsphere.data.pipeline.cdc.api.pojo.StreamDataParameter;
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
@@ -48,12 +49,14 @@ import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNot
 import 
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI;
 import 
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
 import 
org.apache.shardingsphere.infra.database.opengauss.type.OpenGaussDatabaseType;
 import org.apache.shardingsphere.infra.datanode.DataNode;
 import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 
 import java.util.Collection;
 import java.util.HashSet;
@@ -70,9 +73,9 @@ import java.util.stream.Collectors;
 @Slf4j
 public final class CDCBackendHandler {
     
-    private final CDCJobOption jobAPI = new CDCJobOption();
+    private final CDCJobAPI jobAPI = (CDCJobAPI) 
TypedSPILoader.getService(TransmissionJobAPI.class, "STREAMING");
     
-    private final PipelineJobConfigurationManager jobConfigManager = new 
PipelineJobConfigurationManager(jobAPI);
+    private final PipelineJobConfigurationManager jobConfigManager = new 
PipelineJobConfigurationManager(new CDCJobOption());
     
     /**
      * Get database name by job ID.
@@ -115,7 +118,7 @@ public final class CDCBackendHandler {
         ShardingSpherePreconditions.checkState(!actualDataNodesMap.isEmpty(), 
() -> new PipelineInvalidParameterException(String.format("Not find table %s", 
tableNames)));
         boolean decodeWithTx = database.getProtocolType() instanceof 
OpenGaussDatabaseType;
         StreamDataParameter parameter = new 
StreamDataParameter(requestBody.getDatabase(), new 
LinkedList<>(schemaTableNames), requestBody.getFull(), actualDataNodesMap, 
decodeWithTx);
-        String jobId = jobAPI.createJob(parameter, CDCSinkType.SOCKET, new 
Properties());
+        String jobId = jobAPI.create(parameter, CDCSinkType.SOCKET, new 
Properties());
         connectionContext.setJobId(jobId);
         startStreaming(jobId, connectionContext, channel);
         return CDCResponseUtils.succeed(requestId, 
ResponseCase.STREAM_DATA_RESULT, 
StreamDataResult.newBuilder().setStreamingId(jobId).build());
@@ -135,7 +138,7 @@ public final class CDCBackendHandler {
             PipelineJobCenter.stop(jobId);
         }
         ShardingSphereDatabase database = 
PipelineContextManager.getProxyContext().getContextManager().getMetaDataContexts().getMetaData().getDatabase(cdcJobConfig.getDatabaseName());
-        jobAPI.startJob(jobId, new CDCSocketSink(channel, database, 
cdcJobConfig.getSchemaTableNames()));
+        jobAPI.start(jobId, new CDCSocketSink(channel, database, 
cdcJobConfig.getSchemaTableNames()));
         connectionContext.setJobId(jobId);
     }
     
@@ -157,7 +160,7 @@ public final class CDCBackendHandler {
         if (job.getSink().identifierMatched(channelId)) {
             log.info("close CDC job, channel id: {}", channelId);
             PipelineJobCenter.stop(jobId);
-            jobAPI.updateJobConfigurationDisabled(jobId, true);
+            jobAPI.disable(jobId);
         }
     }
     
@@ -167,7 +170,7 @@ public final class CDCBackendHandler {
      * @param jobId job ID
      */
     public void dropStreaming(final String jobId) {
-        jobAPI.dropStreaming(jobId);
+        jobAPI.drop(jobId);
     }
     
     /**
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI
new file mode 100644
index 00000000000..b500cdc9f58
--- /dev/null
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobOption.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobOption.java
index c62eb313f46..c98f837421d 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobOption.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobOption.java
@@ -58,7 +58,6 @@ import org.apache.shardingsphere.infra.datanode.DataNode;
 
 import java.util.Collection;
 import java.util.LinkedList;
-import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -70,10 +69,31 @@ import java.util.stream.Collectors;
 @Slf4j
 public final class MigrationJobOption implements TransmissionJobOption {
     
+    @SuppressWarnings("unchecked")
+    @Override
+    public YamlMigrationJobConfigurationSwapper 
getYamlJobConfigurationSwapper() {
+        return new YamlMigrationJobConfigurationSwapper();
+    }
+    
+    @Override
+    public Class<MigrationJob> getJobClass() {
+        return MigrationJob.class;
+    }
+    
+    @Override
+    public Optional<String> getToBeStartDisabledNextJobType() {
+        return Optional.of("CONSISTENCY_CHECK");
+    }
+    
+    @Override
+    public Optional<String> getToBeStoppedPreviousJobType() {
+        return Optional.of("CONSISTENCY_CHECK");
+    }
+    
     @Override
     public PipelineJobInfo getJobInfo(final String jobId) {
         PipelineJobMetaData jobMetaData = new 
PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
-        List<String> sourceTables = new LinkedList<>();
+        Collection<String> sourceTables = new LinkedList<>();
         new 
PipelineJobConfigurationManager(this).<MigrationJobConfiguration>getJobConfiguration(jobId).getJobShardingDataNodes()
                 .forEach(each -> each.getEntries().forEach(entry -> 
entry.getDataNodes().forEach(dataNode -> 
sourceTables.add(DataNodeUtils.formatWithSchema(dataNode)))));
         return new PipelineJobInfo(jobMetaData, null, String.join(",", 
sourceTables));
@@ -87,12 +107,6 @@ public final class MigrationJobOption implements 
TransmissionJobOption {
         }
     }
     
-    @SuppressWarnings("unchecked")
-    @Override
-    public YamlMigrationJobConfigurationSwapper 
getYamlJobConfigurationSwapper() {
-        return new YamlMigrationJobConfigurationSwapper();
-    }
-    
     @Override
     public MigrationTaskConfiguration buildTaskConfiguration(final 
PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final 
PipelineProcessConfiguration processConfig) {
         MigrationJobConfiguration jobConfig = (MigrationJobConfiguration) 
pipelineJobConfig;
@@ -149,21 +163,6 @@ public final class MigrationJobOption implements 
TransmissionJobOption {
         return new MigrationDataConsistencyChecker((MigrationJobConfiguration) 
jobConfig, processContext, progressContext);
     }
     
-    @Override
-    public Optional<String> getToBeStartDisabledNextJobType() {
-        return Optional.of("CONSISTENCY_CHECK");
-    }
-    
-    @Override
-    public Optional<String> getToBeStoppedPreviousJobType() {
-        return Optional.of("CONSISTENCY_CHECK");
-    }
-    
-    @Override
-    public Class<MigrationJob> getJobClass() {
-        return MigrationJob.class;
-    }
-    
     @Override
     public String getType() {
         return "MIGRATION";

Reply via email to