This is an automated email from the ASF dual-hosted git repository.

sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new acd872e114c Merge PipelineJobType and PipelineJobOption (#29270)
acd872e114c is described below

commit acd872e114c7c3202d644b04ae28e0605c02e488
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Dec 3 22:14:16 2023 +0800

    Merge PipelineJobType and PipelineJobOption (#29270)
    
    * Merge PipelineJobType and PipelineJobOption
    
    * Merge PipelineJobType and PipelineJobOption
---
 .../pipeline/common/job/type/PipelineJobType.java  |  91 ++++++++++++++-
 .../pipeline/core/job/AbstractPipelineJob.java     |   5 +-
 .../core/job/AbstractSimplePipelineJob.java        |   2 +-
 .../core/job/option/PipelineJobOption.java         | 125 ---------------------
 .../persist/PipelineJobProgressPersistService.java |   4 +-
 .../service/PipelineJobConfigurationManager.java   |  10 +-
 .../core/job/service/PipelineJobManager.java       |  23 ++--
 .../core/job/service/TransmissionJobManager.java   |  10 +-
 .../core/task/runner/TransmissionTasksRunner.java  |  11 +-
 .../pipeline/common/job/type/FixtureJobType.java   |  39 ++++++-
 .../query/ShowStreamingJobStatusExecutor.java      |   4 +-
 .../handler/query/ShowStreamingListExecutor.java   |   4 +-
 .../query/ShowMigrationCheckStatusExecutor.java    |   4 +-
 .../query/ShowMigrationJobStatusExecutor.java      |   4 +-
 .../handler/query/ShowMigrationListExecutor.java   |   4 +-
 .../handler/update/CheckMigrationJobUpdater.java   |  13 ++-
 .../handler/update/DropMigrationCheckUpdater.java  |   4 +-
 .../handler/update/StartMigrationCheckUpdater.java |   4 +-
 .../handler/update/StartMigrationUpdater.java      |   4 +-
 .../handler/update/StopMigrationCheckUpdater.java  |   4 +-
 .../handler/update/StopMigrationUpdater.java       |   6 +-
 .../shardingsphere/data/pipeline/cdc/CDCJob.java   |   7 +-
 .../data/pipeline/cdc/CDCJobOption.java            |  79 -------------
 .../data/pipeline/cdc/CDCJobType.java              |  46 +++++++-
 .../data/pipeline/cdc/api/CDCJobAPI.java           |  16 +--
 .../pipeline/cdc/core/prepare/CDCJobPreparer.java  |   6 +-
 .../pipeline/cdc/handler/CDCBackendHandler.java    |   8 +-
 .../consistencycheck/ConsistencyCheckJob.java      |   2 +-
 .../ConsistencyCheckJobOption.java                 |  71 ------------
 .../consistencycheck/ConsistencyCheckJobType.java  |  40 ++++++-
 .../api/ConsistencyCheckJobAPI.java                |  10 +-
 .../task/ConsistencyCheckTasksRunner.java          |  16 ++-
 .../pipeline/scenario/migration/MigrationJob.java  |   6 +-
 .../scenario/migration/MigrationJobOption.java     |  92 ---------------
 .../scenario/migration/MigrationJobType.java       |  59 +++++++++-
 .../scenario/migration/api/MigrationJobAPI.java    |  11 +-
 .../MigrationDataConsistencyChecker.java           |   4 +-
 .../migration/prepare/MigrationJobPreparer.java    |   8 +-
 .../updatable/AlterTransmissionRuleUpdater.java    |   2 +-
 .../api/impl/ConsistencyCheckJobAPITest.java       |  10 +-
 .../migration/api/impl/MigrationJobAPITest.java    |  21 ++--
 41 files changed, 376 insertions(+), 513 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/PipelineJobType.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/PipelineJobType.java
index 4781e9c4552..77689ff3f37 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/PipelineJobType.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/PipelineJobType.java
@@ -17,9 +17,21 @@
 
 package org.apache.shardingsphere.data.pipeline.common.job.type;
 
-import 
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
+import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
+import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
+import 
org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
+import 
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobConfigurationSwapper;
+import 
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressSwapper;
 import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI;
+import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
+
+import java.util.Optional;
 
 /**
  * Pipeline job type.
@@ -35,11 +47,82 @@ public interface PipelineJobType extends TypedSPI {
     String getCode();
     
     /**
-     * Get job option.
+     * Get YAML pipeline job configuration swapper.
+     *
+     * @param <T> type of YAML configuration
+     * @param <Y> type of pipeline job configuration
+     * @return YAML pipeline job configuration swapper
+     */
+    <Y extends YamlConfiguration, T extends PipelineJobConfiguration> 
YamlPipelineJobConfigurationSwapper<Y, T> getYamlJobConfigurationSwapper();
+    
+    /**
+     * Get YAML pipeline job item progress swapper.
+     *
+     * @param <T> type of pipeline job item progress
+     * @return YAML pipeline job item progress swapper
+     */
+    <T extends PipelineJobItemProgress> 
YamlPipelineJobItemProgressSwapper<YamlPipelineJobItemProgressConfiguration, T> 
getYamlJobItemProgressSwapper();
+    
+    /**
+     * Get pipeline job class.
+     *
+     * @return pipeline job class
+     */
+    Class<? extends PipelineJob> getJobClass();
+    
+    /**
+     * Whether to ignore to start disabled job when job item progress is 
finished.
+     *
+     * @return ignore to start disabled job when job item progress is finished 
or not
+     */
+    default boolean isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished() {
+        return false;
+    }
+    
+    /**
+     * Get to be start disabled next job type.
+     *
+     * @return to be start disabled next job type
+     */
+    default Optional<String> getToBeStartDisabledNextJobType() {
+        return Optional.empty();
+    }
+    
+    /**
+     * Get to be stopped previous job type.
+     *
+     * @return to be stopped previous job type
+     */
+    default Optional<String> getToBeStoppedPreviousJobType() {
+        return Optional.empty();
+    }
+    
+    /**
+     * Whether to force no sharding when convert to job configuration POJO.
+     *
+     * @return without sharding or not
+     */
+    default boolean isForceNoShardingWhenConvertToJobConfigurationPOJO() {
+        return false;
+    }
+    
+    /**
+     * Get pipeline job info.
+     *
+     * @param jobId job ID
+     * @return pipeline job info
+     */
+    PipelineJobInfo getJobInfo(String jobId);
+    
+    /**
+     * Build pipeline data consistency checker.
      *
-     * @return job option
+     * @param jobConfig job configuration
+     * @param processContext process context
+     * @param progressContext consistency check job item progress context
+     * @return all logic tables check result
      */
-    PipelineJobOption getOption();
+    PipelineDataConsistencyChecker 
buildDataConsistencyChecker(PipelineJobConfiguration jobConfig, 
TransmissionProcessContext processContext, 
ConsistencyCheckJobItemProgressContext progressContext);
     
     @Override
     String getType();
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
index 8cf5e7ea051..95b83c64372 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
@@ -27,7 +27,6 @@ import 
org.apache.shardingsphere.data.pipeline.common.listener.PipelineElasticJo
 import 
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
 import 
org.apache.shardingsphere.data.pipeline.common.util.PipelineDistributedBarrier;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
-import 
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
 import org.apache.shardingsphere.elasticjob.infra.listener.ElasticJobListener;
@@ -56,7 +55,7 @@ public abstract class AbstractPipelineJob implements 
PipelineJob {
     private final String jobId;
     
     @Getter(AccessLevel.PROTECTED)
-    private final PipelineJobOption jobOption;
+    private final PipelineJobType jobType;
     
     private final AtomicBoolean stopping = new AtomicBoolean(false);
     
@@ -66,7 +65,7 @@ public abstract class AbstractPipelineJob implements 
PipelineJob {
     
     protected AbstractPipelineJob(final String jobId) {
         this.jobId = jobId;
-        jobOption = TypedSPILoader.getService(PipelineJobType.class, 
PipelineJobIdUtils.parseJobType(jobId).getType()).getOption();
+        jobType = TypedSPILoader.getService(PipelineJobType.class, 
PipelineJobIdUtils.parseJobType(jobId).getType());
     }
     
     /**
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
index 41ed6e7f77e..92a5329850d 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
@@ -61,7 +61,7 @@ public abstract class AbstractSimplePipelineJob extends 
AbstractPipelineJob impl
             // CHECKSTYLE:OFF
         } catch (final RuntimeException ex) {
             // CHECKSTYLE:ON
-            processFailed(new PipelineJobManager(getJobOption()), jobId, 
shardingItem, ex);
+            processFailed(new PipelineJobManager(getJobType()), jobId, 
shardingItem, ex);
             throw ex;
         }
     }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/PipelineJobOption.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/PipelineJobOption.java
deleted file mode 100644
index 6b3d4b71e19..00000000000
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/PipelineJobOption.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.job.option;
-
-import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
-import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
-import 
org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
-import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
-import 
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobConfigurationSwapper;
-import 
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressSwapper;
-import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
-import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
-
-import java.util.Optional;
-
-/**
- * Pipeline job option.
- */
-@SingletonSPI
-public interface PipelineJobOption {
-    
-    /**
-     * Get YAML pipeline job configuration swapper.
-     * 
-     * @param <T> type of YAML configuration
-     * @param <Y> type of pipeline job configuration
-     * @return YAML pipeline job configuration swapper
-     */
-    <Y extends YamlConfiguration, T extends PipelineJobConfiguration> 
YamlPipelineJobConfigurationSwapper<Y, T> getYamlJobConfigurationSwapper();
-    
-    /**
-     * Get YAML pipeline job item progress swapper.
-     * 
-     * @param <T> type of pipeline job item progress
-     * @return YAML pipeline job item progress swapper
-     */
-    <T extends PipelineJobItemProgress> 
YamlPipelineJobItemProgressSwapper<YamlPipelineJobItemProgressConfiguration, T> 
getYamlJobItemProgressSwapper();
-    
-    /**
-     * Get pipeline job class.
-     * 
-     * @return pipeline job class
-     */
-    Class<? extends PipelineJob> getJobClass();
-    
-    /**
-     * Whether to ignore to start disabled job when job item progress is 
finished.
-     *
-     * @return ignore to start disabled job when job item progress is finished 
or not
-     */
-    default boolean isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished() {
-        return false;
-    }
-    
-    /**
-     * Get to be start disabled next job type.
-     *
-     * @return to be start disabled next job type
-     */
-    default Optional<String> getToBeStartDisabledNextJobType() {
-        return Optional.empty();
-    }
-    
-    /**
-     * Get to be stopped previous job type.
-     *
-     * @return to be stopped previous job type
-     */
-    default Optional<String> getToBeStoppedPreviousJobType() {
-        return Optional.empty();
-    }
-    
-    /**
-     * Whether to force no sharding when convert to job configuration POJO.
-     *
-     * @return without sharding or not
-     */
-    default boolean isForceNoShardingWhenConvertToJobConfigurationPOJO() {
-        return false;
-    }
-    
-    /**
-     * Get pipeline job info.
-     *
-     * @param jobId job ID
-     * @return pipeline job info
-     */
-    PipelineJobInfo getJobInfo(String jobId);
-    
-    /**
-     * Build pipeline data consistency checker.
-     *
-     * @param jobConfig job configuration
-     * @param processContext process context
-     * @param progressContext consistency check job item progress context
-     * @return all logic tables check result
-     */
-    PipelineDataConsistencyChecker 
buildDataConsistencyChecker(PipelineJobConfiguration jobConfig, 
TransmissionProcessContext processContext, 
ConsistencyCheckJobItemProgressContext progressContext);
-    
-    /**
-     * Get job type.
-     * 
-     * @return job type
-     */
-    String getType();
-}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
index 26c95849c91..45c7afc6755 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
@@ -130,8 +130,8 @@ public final class PipelineJobProgressPersistService {
             }
             persistContext.getHasNewEvents().set(false);
             long startTimeMillis = System.currentTimeMillis();
-            new 
PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobType.class, 
PipelineJobIdUtils.parseJobType(jobId).getType()).getOption()
-                    
.getYamlJobItemProgressSwapper()).updateProgress(jobItemContext.get());
+            new 
PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobType.class,
+                    
PipelineJobIdUtils.parseJobType(jobId).getType()).getYamlJobItemProgressSwapper()).updateProgress(jobItemContext.get());
             persistContext.getBeforePersistingProgressMillis().set(null);
             if (6 == ThreadLocalRandom.current().nextInt(100)) {
                 log.info("persist, jobId={}, shardingItem={}, cost {} ms", 
jobId, shardingItem, System.currentTimeMillis() - startTimeMillis);
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobConfigurationManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobConfigurationManager.java
index a6c49fcb965..5d4b8878308 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobConfigurationManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobConfigurationManager.java
@@ -19,9 +19,9 @@ package 
org.apache.shardingsphere.data.pipeline.core.job.service;
 
 import lombok.RequiredArgsConstructor;
 import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
 import 
org.apache.shardingsphere.data.pipeline.common.listener.PipelineElasticJobListener;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
-import 
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.infra.util.datetime.DateTimeFormatterFactory;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
@@ -35,7 +35,7 @@ import java.util.Collections;
 @RequiredArgsConstructor
 public final class PipelineJobConfigurationManager {
     
-    private final PipelineJobOption jobOption;
+    private final PipelineJobType jobType;
     
     /**
      * Get job configuration.
@@ -46,7 +46,7 @@ public final class PipelineJobConfigurationManager {
      */
     @SuppressWarnings("unchecked")
     public <T extends PipelineJobConfiguration> T getJobConfiguration(final 
String jobId) {
-        return (T) 
jobOption.getYamlJobConfigurationSwapper().swapToObject(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getJobParameter());
+        return (T) 
jobType.getYamlJobConfigurationSwapper().swapToObject(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getJobParameter());
     }
     
     /**
@@ -58,8 +58,8 @@ public final class PipelineJobConfigurationManager {
     public JobConfigurationPOJO convertToJobConfigurationPOJO(final 
PipelineJobConfiguration jobConfig) {
         JobConfigurationPOJO result = new JobConfigurationPOJO();
         result.setJobName(jobConfig.getJobId());
-        
result.setShardingTotalCount(jobOption.isForceNoShardingWhenConvertToJobConfigurationPOJO()
 ? 1 : jobConfig.getJobShardingCount());
-        
result.setJobParameter(YamlEngine.marshal(jobOption.getYamlJobConfigurationSwapper().swapToYamlConfiguration(jobConfig)));
+        
result.setShardingTotalCount(jobType.isForceNoShardingWhenConvertToJobConfigurationPOJO()
 ? 1 : jobConfig.getJobShardingCount());
+        
result.setJobParameter(YamlEngine.marshal(jobType.getYamlJobConfigurationSwapper().swapToYamlConfiguration(jobConfig)));
         String createTimeFormat = 
LocalDateTime.now().format(DateTimeFormatterFactory.getStandardFormatter());
         result.getProps().setProperty("create_time", createTimeFormat);
         result.getProps().setProperty("start_time_millis", 
String.valueOf(System.currentTimeMillis()));
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
index a00564b5bbc..2d98b49be6a 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
@@ -32,7 +32,6 @@ import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCre
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobHasAlreadyStartedException;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
-import 
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
@@ -52,7 +51,7 @@ import java.util.stream.Collectors;
 @Slf4j
 public final class PipelineJobManager {
     
-    private final PipelineJobOption jobOption;
+    private final PipelineJobType jobType;
     
     /**
      * Start job.
@@ -67,8 +66,8 @@ public final class PipelineJobManager {
             log.warn("jobId already exists in registry center, ignore, job id 
is `{}`", jobId);
             return;
         }
-        governanceFacade.getJobFacade().getJob().create(jobId, 
jobOption.getJobClass());
-        governanceFacade.getJobFacade().getConfiguration().persist(jobId, new 
PipelineJobConfigurationManager(jobOption).convertToJobConfigurationPOJO(jobConfig));
+        governanceFacade.getJobFacade().getJob().create(jobId, 
jobType.getJobClass());
+        governanceFacade.getJobFacade().getConfiguration().persist(jobId, new 
PipelineJobConfigurationManager(jobType).convertToJobConfigurationPOJO(jobConfig));
     }
     
     /**
@@ -77,15 +76,15 @@ public final class PipelineJobManager {
      * @param jobId job id
      */
     public void resume(final String jobId) {
-        if 
(jobOption.isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished()) {
-            Optional<? extends PipelineJobItemProgress> jobItemProgress = new 
PipelineJobItemManager<>(jobOption.getYamlJobItemProgressSwapper()).getProgress(jobId,
 0);
+        if (jobType.isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished()) 
{
+            Optional<? extends PipelineJobItemProgress> jobItemProgress = new 
PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper()).getProgress(jobId,
 0);
             if (jobItemProgress.isPresent() && JobStatus.FINISHED == 
jobItemProgress.get().getStatus()) {
                 log.info("job status is FINISHED, ignore, jobId={}", jobId);
                 return;
             }
         }
         startCurrentDisabledJob(jobId);
-        jobOption.getToBeStartDisabledNextJobType().ifPresent(optional -> 
startNextDisabledJob(jobId, optional));
+        jobType.getToBeStartDisabledNextJobType().ifPresent(optional -> 
startNextDisabledJob(jobId, optional));
         
     }
     
@@ -108,7 +107,7 @@ public final class PipelineJobManager {
     private void startNextDisabledJob(final String jobId, final String 
toBeStartDisabledNextJobType) {
         
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getCheck().findLatestCheckJobId(jobId).ifPresent(optional
 -> {
             try {
-                new 
PipelineJobManager(TypedSPILoader.getService(PipelineJobType.class, 
toBeStartDisabledNextJobType).getOption()).resume(optional);
+                new 
PipelineJobManager(TypedSPILoader.getService(PipelineJobType.class, 
toBeStartDisabledNextJobType)).resume(optional);
                 // CHECKSTYLE:OFF
             } catch (final RuntimeException ex) {
                 // CHECKSTYLE:ON
@@ -123,14 +122,14 @@ public final class PipelineJobManager {
      * @param jobId job id
      */
     public void stop(final String jobId) {
-        jobOption.getToBeStoppedPreviousJobType().ifPresent(optional -> 
stopPreviousJob(jobId, optional));
+        jobType.getToBeStoppedPreviousJobType().ifPresent(optional -> 
stopPreviousJob(jobId, optional));
         stopCurrentJob(jobId);
     }
     
     private void stopPreviousJob(final String jobId, final String 
toBeStoppedPreviousJobType) {
         
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getCheck().findLatestCheckJobId(jobId).ifPresent(optional
 -> {
             try {
-                new 
PipelineJobManager(TypedSPILoader.getService(PipelineJobType.class, 
toBeStoppedPreviousJobType).getOption()).stop(optional);
+                new 
PipelineJobManager(TypedSPILoader.getService(PipelineJobType.class, 
toBeStoppedPreviousJobType)).stop(optional);
                 // CHECKSTYLE:OFF
             } catch (final RuntimeException ex) {
                 // CHECKSTYLE:ON
@@ -175,8 +174,8 @@ public final class PipelineJobManager {
     public List<PipelineJobInfo> getJobInfos(final PipelineContextKey 
contextKey) {
         try {
             return 
PipelineAPIFactory.getJobStatisticsAPI(contextKey).getAllJobsBriefInfo().stream()
-                    .filter(each -> !each.getJobName().startsWith("_") && 
jobOption.getType().equals(PipelineJobIdUtils.parseJobType(each.getJobName()).getType()))
-                    .map(each -> 
jobOption.getJobInfo(each.getJobName())).collect(Collectors.toList());
+                    .filter(each -> !each.getJobName().startsWith("_") && 
jobType.getType().equals(PipelineJobIdUtils.parseJobType(each.getJobName()).getType()))
+                    .map(each -> 
jobType.getJobInfo(each.getJobName())).collect(Collectors.toList());
         } catch (final UnsupportedOperationException ex) {
             return Collections.emptyList();
         }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
index 842a84f2a6e..0be041dafd0 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
@@ -21,11 +21,11 @@ import lombok.RequiredArgsConstructor;
 import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
 import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
 import 
org.apache.shardingsphere.data.pipeline.common.pojo.TransmissionJobItemInfo;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
-import 
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 
 import java.util.Collection;
@@ -43,7 +43,7 @@ import java.util.stream.IntStream;
 @RequiredArgsConstructor
 public final class TransmissionJobManager {
     
-    private final PipelineJobOption jobOption;
+    private final PipelineJobType jobType;
     
     /**
      * Get job infos.
@@ -52,11 +52,11 @@ public final class TransmissionJobManager {
      * @return job item infos
      */
     public Collection<TransmissionJobItemInfo> getJobItemInfos(final String 
jobId) {
-        PipelineJobConfiguration jobConfig = new 
PipelineJobConfigurationManager(jobOption).getJobConfiguration(jobId);
+        PipelineJobConfiguration jobConfig = new 
PipelineJobConfigurationManager(jobType).getJobConfiguration(jobId);
         long startTimeMillis = 
Long.parseLong(Optional.ofNullable(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getProps().getProperty("start_time_millis")).orElse("0"));
         Map<Integer, TransmissionJobItemProgress> jobProgress = 
getJobProgress(jobConfig);
         List<TransmissionJobItemInfo> result = new LinkedList<>();
-        PipelineJobInfo jobInfo = jobOption.getJobInfo(jobId);
+        PipelineJobInfo jobInfo = jobType.getJobInfo(jobId);
         for (Entry<Integer, TransmissionJobItemProgress> entry : 
jobProgress.entrySet()) {
             int shardingItem = entry.getKey();
             TransmissionJobItemProgress jobItemProgress = entry.getValue();
@@ -88,7 +88,7 @@ public final class TransmissionJobManager {
      * @return each sharding item progress
      */
     public Map<Integer, TransmissionJobItemProgress> getJobProgress(final 
PipelineJobConfiguration jobConfig) {
-        PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = 
new PipelineJobItemManager<>(jobOption.getYamlJobItemProgressSwapper());
+        PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = 
new PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
         String jobId = jobConfig.getJobId();
         JobConfigurationPOJO jobConfigPOJO = 
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
         return IntStream.range(0, 
jobConfig.getJobShardingCount()).boxed().collect(LinkedHashMap::new, (map, 
each) -> {
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java
index bd4fca50425..30b6dd5351c 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java
@@ -29,7 +29,6 @@ import 
org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJ
 import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
-import 
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
 import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
@@ -57,7 +56,7 @@ public class TransmissionTasksRunner implements 
PipelineTasksRunner {
     
     private final Collection<PipelineTask> incrementalTasks;
     
-    private final PipelineJobOption jobOption;
+    private final PipelineJobType jobType;
     
     private final PipelineJobManager jobManager;
     
@@ -67,9 +66,9 @@ public class TransmissionTasksRunner implements 
PipelineTasksRunner {
         this.jobItemContext = jobItemContext;
         inventoryTasks = jobItemContext.getInventoryTasks();
         incrementalTasks = jobItemContext.getIncrementalTasks();
-        jobOption = TypedSPILoader.getService(PipelineJobType.class, 
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType()).getOption();
-        jobManager = new PipelineJobManager(jobOption);
-        jobItemManager = new 
PipelineJobItemManager<>(jobOption.getYamlJobItemProgressSwapper());
+        jobType = TypedSPILoader.getService(PipelineJobType.class, 
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType());
+        jobManager = new PipelineJobManager(jobType);
+        jobItemManager = new 
PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
     }
     
     @Override
@@ -90,7 +89,7 @@ public class TransmissionTasksRunner implements 
PipelineTasksRunner {
         if (jobItemContext.isStopping()) {
             return;
         }
-        new 
PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobType.class, 
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType()).getOption()
+        new 
PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobType.class, 
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType())
                 
.getYamlJobItemProgressSwapper()).persistProgress(jobItemContext);
         if 
(PipelineJobProgressDetector.isAllInventoryTasksFinished(inventoryTasks)) {
             log.info("All inventory tasks finished.");
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/type/FixtureJobType.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/type/FixtureJobType.java
index d4de294616f..f2d143a71f6 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/type/FixtureJobType.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/type/FixtureJobType.java
@@ -17,9 +17,17 @@
 
 package org.apache.shardingsphere.data.pipeline.common.job.type;
 
-import 
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
-
-import static org.mockito.Mockito.mock;
+import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
+import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
+import 
org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
+import 
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobConfigurationSwapper;
+import 
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressSwapper;
+import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
 
 /**
  * Fixture job type.
@@ -32,8 +40,29 @@ public final class FixtureJobType implements PipelineJobType 
{
     }
     
     @Override
-    public PipelineJobOption getOption() {
-        return mock(PipelineJobOption.class);
+    public <Y extends YamlConfiguration, T extends PipelineJobConfiguration> 
YamlPipelineJobConfigurationSwapper<Y, T> getYamlJobConfigurationSwapper() {
+        return null;
+    }
+    
+    @Override
+    public <T extends PipelineJobItemProgress> 
YamlPipelineJobItemProgressSwapper<YamlPipelineJobItemProgressConfiguration, T> 
getYamlJobItemProgressSwapper() {
+        return null;
+    }
+    
+    @Override
+    public Class<? extends PipelineJob> getJobClass() {
+        return null;
+    }
+    
+    @Override
+    public PipelineJobInfo getJobInfo(final String jobId) {
+        return null;
+    }
+    
+    @Override
+    public PipelineDataConsistencyChecker buildDataConsistencyChecker(final 
PipelineJobConfiguration jobConfig,
+                                                                      final 
TransmissionProcessContext processContext, final 
ConsistencyCheckJobItemProgressContext progressContext) {
+        return null;
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java
 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java
index 7bb2e0f4348..223d8c5a784 100644
--- 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java
+++ 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java
@@ -18,7 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.cdc.distsql.handler.query;
 
 import 
org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingStatusStatement;
-import org.apache.shardingsphere.data.pipeline.cdc.CDCJobOption;
+import org.apache.shardingsphere.data.pipeline.cdc.CDCJobType;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.common.pojo.TransmissionJobItemInfo;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
@@ -37,7 +37,7 @@ public final class ShowStreamingJobStatusExecutor implements 
QueryableRALExecuto
     
     @Override
     public Collection<LocalDataQueryResultRow> getRows(final 
ShowStreamingStatusStatement sqlStatement) {
-        Collection<TransmissionJobItemInfo> jobItemInfos = new 
TransmissionJobManager(new 
CDCJobOption()).getJobItemInfos(sqlStatement.getJobId());
+        Collection<TransmissionJobItemInfo> jobItemInfos = new 
TransmissionJobManager(new 
CDCJobType()).getJobItemInfos(sqlStatement.getJobId());
         long currentTimeMillis = System.currentTimeMillis();
         return jobItemInfos.stream().map(each -> getRow(each, 
currentTimeMillis)).collect(Collectors.toList());
     }
diff --git 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/distsql/handler/query/ShowStreamingListExecutor.java
 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/distsql/handler/query/ShowStreamingListExecutor.java
index 63c52219df3..c69d02d4678 100644
--- 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/distsql/handler/query/ShowStreamingListExecutor.java
+++ 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/distsql/handler/query/ShowStreamingListExecutor.java
@@ -18,7 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.cdc.distsql.handler.query;
 
 import 
org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingListStatement;
-import org.apache.shardingsphere.data.pipeline.cdc.CDCJobOption;
+import org.apache.shardingsphere.data.pipeline.cdc.CDCJobType;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
 import 
org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
@@ -35,7 +35,7 @@ import java.util.stream.Collectors;
  */
 public final class ShowStreamingListExecutor implements 
QueryableRALExecutor<ShowStreamingListStatement> {
     
-    private final PipelineJobManager pipelineJobManager = new 
PipelineJobManager(new CDCJobOption());
+    private final PipelineJobManager pipelineJobManager = new 
PipelineJobManager(new CDCJobType());
     
     @Override
     public Collection<LocalDataQueryResultRow> getRows(final 
ShowStreamingListStatement sqlStatement) {
diff --git 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/query/ShowMigrationCheckStatusExecutor.java
 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/query/ShowMigrationCheckStatusExecutor.java
index 4ad92f6161e..b63898a8033 100644
--- 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/query/ShowMigrationCheckStatusExecutor.java
+++ 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/query/ShowMigrationCheckStatusExecutor.java
@@ -18,8 +18,8 @@
 package 
org.apache.shardingsphere.data.pipeline.migration.distsql.handler.query;
 
 import 
org.apache.shardingsphere.data.pipeline.common.pojo.ConsistencyCheckJobItemInfo;
+import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobType;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.ConsistencyCheckJobAPI;
-import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobOption;
 import 
org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
 import 
org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow;
 import 
org.apache.shardingsphere.migration.distsql.statement.ShowMigrationCheckStatusStatement;
@@ -35,7 +35,7 @@ import java.util.Optional;
  */
 public final class ShowMigrationCheckStatusExecutor implements 
QueryableRALExecutor<ShowMigrationCheckStatusStatement> {
     
-    private final ConsistencyCheckJobAPI jobAPI = new 
ConsistencyCheckJobAPI(new ConsistencyCheckJobOption());
+    private final ConsistencyCheckJobAPI jobAPI = new 
ConsistencyCheckJobAPI(new ConsistencyCheckJobType());
     
     @Override
     public Collection<LocalDataQueryResultRow> getRows(final 
ShowMigrationCheckStatusStatement sqlStatement) {
diff --git 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java
 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java
index 2c518b1e648..38f2bcdf217 100644
--- 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java
+++ 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java
@@ -20,7 +20,7 @@ package 
org.apache.shardingsphere.data.pipeline.migration.distsql.handler.query;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.common.pojo.TransmissionJobItemInfo;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobOption;
+import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
 import 
org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
 import 
org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow;
 import 
org.apache.shardingsphere.migration.distsql.statement.ShowMigrationStatusStatement;
@@ -37,7 +37,7 @@ public final class ShowMigrationJobStatusExecutor implements 
QueryableRALExecuto
     
     @Override
     public Collection<LocalDataQueryResultRow> getRows(final 
ShowMigrationStatusStatement sqlStatement) {
-        Collection<TransmissionJobItemInfo> jobItemInfos = new 
TransmissionJobManager(new 
MigrationJobOption()).getJobItemInfos(sqlStatement.getJobId());
+        Collection<TransmissionJobItemInfo> jobItemInfos = new 
TransmissionJobManager(new 
MigrationJobType()).getJobItemInfos(sqlStatement.getJobId());
         long currentTimeMillis = System.currentTimeMillis();
         return jobItemInfos.stream().map(each -> getRow(each, 
currentTimeMillis)).collect(Collectors.toList());
     }
diff --git 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/query/ShowMigrationListExecutor.java
 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/query/ShowMigrationListExecutor.java
index 156dae257b0..a3e9f8cffff 100644
--- 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/query/ShowMigrationListExecutor.java
+++ 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/query/ShowMigrationListExecutor.java
@@ -19,7 +19,7 @@ package 
org.apache.shardingsphere.data.pipeline.migration.distsql.handler.query;
 
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobOption;
+import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
 import 
org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
 import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
 import 
org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow;
@@ -34,7 +34,7 @@ import java.util.stream.Collectors;
  */
 public final class ShowMigrationListExecutor implements 
QueryableRALExecutor<ShowMigrationListStatement> {
     
-    private final PipelineJobManager pipelineJobManager = new 
PipelineJobManager(new MigrationJobOption());
+    private final PipelineJobManager pipelineJobManager = new 
PipelineJobManager(new MigrationJobType());
     
     @Override
     public Collection<LocalDataQueryResultRow> getRows(final 
ShowMigrationListStatement sqlStatement) {
diff --git 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/CheckMigrationJobUpdater.java
 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/CheckMigrationJobUpdater.java
index 5b669b4ba18..cc0b69e236f 100644
--- 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/CheckMigrationJobUpdater.java
+++ 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/CheckMigrationJobUpdater.java
@@ -17,14 +17,15 @@
 
 package 
org.apache.shardingsphere.data.pipeline.migration.distsql.handler.update;
 
+import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
+import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobType;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.ConsistencyCheckJobAPI;
-import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobOption;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.CreateConsistencyCheckJobParameter;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobOption;
+import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
 import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
 import org.apache.shardingsphere.distsql.segment.AlgorithmSegment;
@@ -39,9 +40,9 @@ import java.util.Properties;
  */
 public final class CheckMigrationJobUpdater implements 
RALUpdater<CheckMigrationStatement> {
     
-    private final ConsistencyCheckJobAPI checkJobAPI = new 
ConsistencyCheckJobAPI(new ConsistencyCheckJobOption());
+    private final ConsistencyCheckJobAPI checkJobAPI = new 
ConsistencyCheckJobAPI(new ConsistencyCheckJobType());
     
-    private final MigrationJobOption migrationJobOption = new 
MigrationJobOption();
+    private final PipelineJobType migrationJobType = new MigrationJobType();
     
     @Override
     public void executeUpdate(final String databaseName, final 
CheckMigrationStatement sqlStatement) throws SQLException {
@@ -49,13 +50,13 @@ public final class CheckMigrationJobUpdater implements 
RALUpdater<CheckMigration
         String algorithmTypeName = null == typeStrategy ? null : 
typeStrategy.getName();
         Properties algorithmProps = null == typeStrategy ? null : 
typeStrategy.getProps();
         String jobId = sqlStatement.getJobId();
-        MigrationJobConfiguration jobConfig = new 
PipelineJobConfigurationManager(migrationJobOption).getJobConfiguration(jobId);
+        MigrationJobConfiguration jobConfig = new 
PipelineJobConfigurationManager(migrationJobType).getJobConfiguration(jobId);
         verifyInventoryFinished(jobConfig);
         checkJobAPI.start(new CreateConsistencyCheckJobParameter(jobId, 
algorithmTypeName, algorithmProps, jobConfig.getSourceDatabaseType(), 
jobConfig.getTargetDatabaseType()));
     }
     
     private void verifyInventoryFinished(final MigrationJobConfiguration 
jobConfig) {
-        TransmissionJobManager transmissionJobManager = new 
TransmissionJobManager(migrationJobOption);
+        TransmissionJobManager transmissionJobManager = new 
TransmissionJobManager(migrationJobType);
         
ShardingSpherePreconditions.checkState(PipelineJobProgressDetector.isInventoryFinished(jobConfig.getJobShardingCount(),
 transmissionJobManager.getJobProgress(jobConfig).values()),
                 () -> new PipelineInvalidParameterException("Inventory is not 
finished."));
     }
diff --git 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/DropMigrationCheckUpdater.java
 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/DropMigrationCheckUpdater.java
index 8b820291d38..da0690d1538 100644
--- 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/DropMigrationCheckUpdater.java
+++ 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/DropMigrationCheckUpdater.java
@@ -17,8 +17,8 @@
 
 package 
org.apache.shardingsphere.data.pipeline.migration.distsql.handler.update;
 
+import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobType;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.ConsistencyCheckJobAPI;
-import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobOption;
 import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
 import 
org.apache.shardingsphere.migration.distsql.statement.DropMigrationCheckStatement;
 
@@ -27,7 +27,7 @@ import 
org.apache.shardingsphere.migration.distsql.statement.DropMigrationCheckS
  */
 public final class DropMigrationCheckUpdater implements 
RALUpdater<DropMigrationCheckStatement> {
     
-    private final ConsistencyCheckJobAPI jobAPI = new 
ConsistencyCheckJobAPI(new ConsistencyCheckJobOption());
+    private final ConsistencyCheckJobAPI jobAPI = new 
ConsistencyCheckJobAPI(new ConsistencyCheckJobType());
     
     @Override
     public void executeUpdate(final String databaseName, final 
DropMigrationCheckStatement sqlStatement) {
diff --git 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/StartMigrationCheckUpdater.java
 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/StartMigrationCheckUpdater.java
index b34f4274b33..189eec12237 100644
--- 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/StartMigrationCheckUpdater.java
+++ 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/StartMigrationCheckUpdater.java
@@ -17,8 +17,8 @@
 
 package 
org.apache.shardingsphere.data.pipeline.migration.distsql.handler.update;
 
+import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobType;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.ConsistencyCheckJobAPI;
-import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobOption;
 import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
 import 
org.apache.shardingsphere.migration.distsql.statement.StartMigrationCheckStatement;
 
@@ -27,7 +27,7 @@ import 
org.apache.shardingsphere.migration.distsql.statement.StartMigrationCheck
  */
 public final class StartMigrationCheckUpdater implements 
RALUpdater<StartMigrationCheckStatement> {
     
-    private final ConsistencyCheckJobAPI jobAPI = new 
ConsistencyCheckJobAPI(new ConsistencyCheckJobOption());
+    private final ConsistencyCheckJobAPI jobAPI = new 
ConsistencyCheckJobAPI(new ConsistencyCheckJobType());
     
     @Override
     public void executeUpdate(final String databaseName, final 
StartMigrationCheckStatement sqlStatement) {
diff --git 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/StartMigrationUpdater.java
 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/StartMigrationUpdater.java
index 209b62587f3..816397e0c52 100644
--- 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/StartMigrationUpdater.java
+++ 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/StartMigrationUpdater.java
@@ -18,7 +18,7 @@
 package 
org.apache.shardingsphere.data.pipeline.migration.distsql.handler.update;
 
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobOption;
+import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
 import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
 import 
org.apache.shardingsphere.migration.distsql.statement.StartMigrationStatement;
 
@@ -27,7 +27,7 @@ import 
org.apache.shardingsphere.migration.distsql.statement.StartMigrationState
  */
 public final class StartMigrationUpdater implements 
RALUpdater<StartMigrationStatement> {
     
-    private final PipelineJobManager jobManager = new PipelineJobManager(new 
MigrationJobOption());
+    private final PipelineJobManager jobManager = new PipelineJobManager(new 
MigrationJobType());
     
     @Override
     public void executeUpdate(final String databaseName, final 
StartMigrationStatement sqlStatement) {
diff --git 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/StopMigrationCheckUpdater.java
 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/StopMigrationCheckUpdater.java
index c809ecb2743..e3059524082 100644
--- 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/StopMigrationCheckUpdater.java
+++ 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/StopMigrationCheckUpdater.java
@@ -17,8 +17,8 @@
 
 package 
org.apache.shardingsphere.data.pipeline.migration.distsql.handler.update;
 
+import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobType;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.ConsistencyCheckJobAPI;
-import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobOption;
 import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
 import 
org.apache.shardingsphere.migration.distsql.statement.StopMigrationCheckStatement;
 
@@ -27,7 +27,7 @@ import 
org.apache.shardingsphere.migration.distsql.statement.StopMigrationCheckS
  */
 public final class StopMigrationCheckUpdater implements 
RALUpdater<StopMigrationCheckStatement> {
     
-    private final ConsistencyCheckJobAPI jobAPI = new 
ConsistencyCheckJobAPI(new ConsistencyCheckJobOption());
+    private final ConsistencyCheckJobAPI jobAPI = new 
ConsistencyCheckJobAPI(new ConsistencyCheckJobType());
     
     @Override
     public void executeUpdate(final String databaseName, final 
StopMigrationCheckStatement sqlStatement) {
diff --git 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/StopMigrationUpdater.java
 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/StopMigrationUpdater.java
index fa11b738eb4..8dea2425a2c 100644
--- 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/StopMigrationUpdater.java
+++ 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/StopMigrationUpdater.java
@@ -18,7 +18,7 @@
 package 
org.apache.shardingsphere.data.pipeline.migration.distsql.handler.update;
 
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobOption;
+import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
 import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
 import 
org.apache.shardingsphere.migration.distsql.statement.StopMigrationStatement;
 
@@ -27,9 +27,7 @@ import 
org.apache.shardingsphere.migration.distsql.statement.StopMigrationStatem
  */
 public final class StopMigrationUpdater implements 
RALUpdater<StopMigrationStatement> {
     
-    private final MigrationJobOption jobOption = new MigrationJobOption();
-    
-    private final PipelineJobManager jobManager = new 
PipelineJobManager(jobOption);
+    private final PipelineJobManager jobManager = new PipelineJobManager(new 
MigrationJobType());
     
     @Override
     public void executeUpdate(final String databaseName, final 
StopMigrationStatement sqlStatement) {
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
index 5da8e693eaa..3cf6df2f069 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
@@ -46,6 +46,7 @@ import 
org.apache.shardingsphere.data.pipeline.common.execute.ExecuteEngine;
 import 
org.apache.shardingsphere.data.pipeline.common.ingest.position.FinishedPosition;
 import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
 import 
org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
 import 
org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.common.util.ShardingColumnsExtractor;
@@ -85,11 +86,11 @@ public final class CDCJob extends AbstractPipelineJob 
implements SimpleJob {
     @Getter
     private final PipelineSink sink;
     
-    private final CDCJobOption jobOption = new CDCJobOption();
+    private final PipelineJobType jobType = 
TypedSPILoader.getService(PipelineJobType.class, "STREAMING");
     
     private final CDCJobAPI jobAPI = (CDCJobAPI) 
TypedSPILoader.getService(TransmissionJobAPI.class, "STREAMING");
     
-    private final PipelineJobItemManager<TransmissionJobItemProgress> 
jobItemManager = new 
PipelineJobItemManager<>(jobOption.getYamlJobItemProgressSwapper());
+    private final PipelineJobItemManager<TransmissionJobItemProgress> 
jobItemManager = new 
PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
     
     private final PipelineProcessConfigurationPersistService 
processConfigPersistService = new PipelineProcessConfigurationPersistService();
     
@@ -134,7 +135,7 @@ public final class CDCJob extends AbstractPipelineJob 
implements SimpleJob {
     private CDCJobItemContext buildCDCJobItemContext(final CDCJobConfiguration 
jobConfig, final int shardingItem) {
         Optional<TransmissionJobItemProgress> initProgress = 
jobItemManager.getProgress(jobConfig.getJobId(), shardingItem);
         PipelineProcessConfiguration processConfig = 
PipelineProcessConfigurationUtils.convertWithDefaultValue(
-                
processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()),
 jobOption.getType()));
+                
processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()),
 jobType.getType()));
         TransmissionProcessContext jobProcessContext = new 
TransmissionProcessContext(jobConfig.getJobId(), processConfig);
         CDCTaskConfiguration taskConfig = buildTaskConfiguration(jobConfig, 
shardingItem, jobProcessContext.getPipelineProcessConfig());
         return new CDCJobItemContext(jobConfig, shardingItem, 
initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager, 
sink);
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobOption.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobOption.java
deleted file mode 100644
index d2e6854e755..00000000000
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobOption.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.cdc;
-
-import lombok.extern.slf4j.Slf4j;
-import 
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.cdc.config.yaml.YamlCDCJobConfigurationSwapper;
-import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
-import 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlTransmissionJobItemProgressSwapper;
-import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
-import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
-import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
-import 
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
-import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
-
-/**
- * CDC job option.
- */
-@Slf4j
-public final class CDCJobOption implements PipelineJobOption {
-    
-    @SuppressWarnings("unchecked")
-    @Override
-    public YamlCDCJobConfigurationSwapper getYamlJobConfigurationSwapper() {
-        return new YamlCDCJobConfigurationSwapper();
-    }
-    
-    @SuppressWarnings("unchecked")
-    @Override
-    public YamlTransmissionJobItemProgressSwapper 
getYamlJobItemProgressSwapper() {
-        return new YamlTransmissionJobItemProgressSwapper();
-    }
-    
-    @Override
-    public Class<CDCJob> getJobClass() {
-        return CDCJob.class;
-    }
-    
-    @Override
-    public boolean isForceNoShardingWhenConvertToJobConfigurationPOJO() {
-        return true;
-    }
-    
-    @Override
-    public PipelineJobInfo getJobInfo(final String jobId) {
-        PipelineJobMetaData jobMetaData = new 
PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
-        CDCJobConfiguration jobConfig = new 
PipelineJobConfigurationManager(this).getJobConfiguration(jobId);
-        return new PipelineJobInfo(jobMetaData, jobConfig.getDatabaseName(), 
String.join(", ", jobConfig.getSchemaTableNames()));
-    }
-    
-    @Override
-    public PipelineDataConsistencyChecker buildDataConsistencyChecker(final 
PipelineJobConfiguration jobConfig, final TransmissionProcessContext 
processContext,
-                                                                      final 
ConsistencyCheckJobItemProgressContext progressContext) {
-        throw new UnsupportedOperationException();
-    }
-    
-    @Override
-    public String getType() {
-        return "STREAMING";
-    }
-}
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobType.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobType.java
index 654f878dca2..509f24e6c20 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobType.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobType.java
@@ -17,8 +17,18 @@
 
 package org.apache.shardingsphere.data.pipeline.cdc;
 
+import 
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.cdc.config.yaml.YamlCDCJobConfigurationSwapper;
+import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
+import 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlTransmissionJobItemProgressSwapper;
 import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
-import 
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
+import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
+import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
 
 /**
  * CDC job type.
@@ -30,9 +40,39 @@ public final class CDCJobType implements PipelineJobType {
         return "03";
     }
     
+    @SuppressWarnings("unchecked")
     @Override
-    public PipelineJobOption getOption() {
-        return new CDCJobOption();
+    public YamlCDCJobConfigurationSwapper getYamlJobConfigurationSwapper() {
+        return new YamlCDCJobConfigurationSwapper();
+    }
+    
+    @SuppressWarnings("unchecked")
+    @Override
+    public YamlTransmissionJobItemProgressSwapper 
getYamlJobItemProgressSwapper() {
+        return new YamlTransmissionJobItemProgressSwapper();
+    }
+    
+    @Override
+    public Class<CDCJob> getJobClass() {
+        return CDCJob.class;
+    }
+    
+    @Override
+    public boolean isForceNoShardingWhenConvertToJobConfigurationPOJO() {
+        return true;
+    }
+    
+    @Override
+    public PipelineJobInfo getJobInfo(final String jobId) {
+        PipelineJobMetaData jobMetaData = new 
PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
+        CDCJobConfiguration jobConfig = new 
PipelineJobConfigurationManager(new CDCJobType()).getJobConfiguration(jobId);
+        return new PipelineJobInfo(jobMetaData, jobConfig.getDatabaseName(), 
String.join(", ", jobConfig.getSchemaTableNames()));
+    }
+    
+    @Override
+    public PipelineDataConsistencyChecker buildDataConsistencyChecker(final 
PipelineJobConfiguration jobConfig, final TransmissionProcessContext 
processContext,
+                                                                      final 
ConsistencyCheckJobItemProgressContext progressContext) {
+        throw new UnsupportedOperationException();
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
index 71c58442b9b..d24cbcc1612 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
@@ -22,7 +22,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDa
 import 
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.cdc.CDCJob;
 import org.apache.shardingsphere.data.pipeline.cdc.CDCJobId;
-import org.apache.shardingsphere.data.pipeline.cdc.CDCJobOption;
+import org.apache.shardingsphere.data.pipeline.cdc.CDCJobType;
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.yaml.YamlCDCJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.yaml.YamlCDCJobConfiguration.YamlSinkConfiguration;
@@ -83,7 +83,7 @@ import java.util.stream.Collectors;
 @Slf4j
 public final class CDCJobAPI implements TransmissionJobAPI {
     
-    private final CDCJobOption jobOption;
+    private final CDCJobType jobType;
     
     private final PipelineJobManager jobManager;
     
@@ -96,9 +96,9 @@ public final class CDCJobAPI implements TransmissionJobAPI {
     private final YamlPipelineDataSourceConfigurationSwapper 
pipelineDataSourceConfigSwapper;
     
     public CDCJobAPI() {
-        jobOption = new CDCJobOption();
-        jobManager = new PipelineJobManager(jobOption);
-        jobConfigManager = new PipelineJobConfigurationManager(jobOption);
+        jobType = new CDCJobType();
+        jobManager = new PipelineJobManager(jobType);
+        jobConfigManager = new PipelineJobConfigurationManager(jobType);
         dataSourceConfigSwapper = new YamlDataSourceConfigurationSwapper();
         ruleConfigSwapperEngine = new YamlRuleConfigurationSwapperEngine();
         pipelineDataSourceConfigSwapper = new 
YamlPipelineDataSourceConfigurationSwapper();
@@ -121,7 +121,7 @@ public final class CDCJobAPI implements TransmissionJobAPI {
         if 
(governanceFacade.getJobFacade().getConfiguration().isExisted(jobConfig.getJobId()))
 {
             log.warn("CDC job already exists in registry center, ignore, job 
id is `{}`", jobConfig.getJobId());
         } else {
-            
governanceFacade.getJobFacade().getJob().create(jobConfig.getJobId(), 
jobOption.getJobClass());
+            
governanceFacade.getJobFacade().getJob().create(jobConfig.getJobId(), 
jobType.getJobClass());
             JobConfigurationPOJO jobConfigPOJO = 
jobConfigManager.convertToJobConfigurationPOJO(jobConfig);
             jobConfigPOJO.setDisabled(true);
             
governanceFacade.getJobFacade().getConfiguration().persist(jobConfig.getJobId(),
 jobConfigPOJO);
@@ -169,7 +169,7 @@ public final class CDCJobAPI implements TransmissionJobAPI {
     
     private void initIncrementalPosition(final CDCJobConfiguration jobConfig) {
         String jobId = jobConfig.getJobId();
-        PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = 
new PipelineJobItemManager<>(jobOption.getYamlJobItemProgressSwapper());
+        PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = 
new PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
         try (PipelineDataSourceManager pipelineDataSourceManager = new 
DefaultPipelineDataSourceManager()) {
             for (int i = 0; i < jobConfig.getJobShardingCount(); i++) {
                 if (jobItemManager.getProgress(jobId, i).isPresent()) {
@@ -178,7 +178,7 @@ public final class CDCJobAPI implements TransmissionJobAPI {
                 IncrementalDumperContext dumperContext = 
buildDumperContext(jobConfig, i, new 
TableAndSchemaNameMapper(jobConfig.getSchemaTableNames()));
                 TransmissionJobItemProgress jobItemProgress = 
getTransmissionJobItemProgress(jobConfig, pipelineDataSourceManager, 
dumperContext);
                 
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getProcess().persist(
-                        jobId, i, 
YamlEngine.marshal(jobOption.getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress)));
+                        jobId, i, 
YamlEngine.marshal(jobType.getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress)));
             }
         } catch (final SQLException ex) {
             throw new PrepareJobWithGetBinlogPositionException(jobId, ex);
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index 4cb7e57ea51..a18836b0793 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -18,7 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.cdc.core.prepare;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.cdc.CDCJobOption;
+import org.apache.shardingsphere.data.pipeline.cdc.CDCJobType;
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
 import org.apache.shardingsphere.data.pipeline.cdc.context.CDCJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCChannelProgressPair;
@@ -67,9 +67,7 @@ import java.util.concurrent.atomic.AtomicReference;
 @Slf4j
 public final class CDCJobPreparer {
     
-    private final CDCJobOption jobAPI = new CDCJobOption();
-    
-    private final PipelineJobItemManager<TransmissionJobItemProgress> 
jobItemManager = new 
PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper());
+    private final PipelineJobItemManager<TransmissionJobItemProgress> 
jobItemManager = new PipelineJobItemManager<>(new 
CDCJobType().getYamlJobItemProgressSwapper());
     
     /**
      * Do prepare work.
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
index 04ab6e3cb52..6270c145d80 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
@@ -21,8 +21,9 @@ import com.google.common.base.Strings;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelId;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.cdc.CDCJob;
+import org.apache.shardingsphere.data.pipeline.cdc.CDCJobType;
 import org.apache.shardingsphere.data.pipeline.cdc.api.CDCJobAPI;
-import org.apache.shardingsphere.data.pipeline.cdc.CDCJobOption;
 import org.apache.shardingsphere.data.pipeline.cdc.api.StreamDataParameter;
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
@@ -31,7 +32,6 @@ import 
org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckId;
 import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporter;
 import 
org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporterManager;
 import 
org.apache.shardingsphere.data.pipeline.cdc.core.importer.sink.CDCSocketSink;
-import org.apache.shardingsphere.data.pipeline.cdc.CDCJob;
 import 
org.apache.shardingsphere.data.pipeline.cdc.exception.CDCExceptionWrapper;
 import 
org.apache.shardingsphere.data.pipeline.cdc.exception.CDCServerException;
 import 
org.apache.shardingsphere.data.pipeline.cdc.exception.NotFindStreamDataSourceTableException;
@@ -48,8 +48,8 @@ import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextMan
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
-import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
 import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
 import 
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
 import 
org.apache.shardingsphere.infra.database.opengauss.type.OpenGaussDatabaseType;
@@ -75,7 +75,7 @@ public final class CDCBackendHandler {
     
     private final CDCJobAPI jobAPI = (CDCJobAPI) 
TypedSPILoader.getService(TransmissionJobAPI.class, "STREAMING");
     
-    private final PipelineJobConfigurationManager jobConfigManager = new 
PipelineJobConfigurationManager(new CDCJobOption());
+    private final PipelineJobConfigurationManager jobConfigManager = new 
PipelineJobConfigurationManager(new CDCJobType());
     
     /**
      * Get database name by job ID.
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
index f3056e72746..3382e326900 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
@@ -45,7 +45,7 @@ public final class ConsistencyCheckJob extends 
AbstractSimplePipelineJob {
     @Override
     public ConsistencyCheckJobItemContext buildPipelineJobItemContext(final 
ShardingContext shardingContext) {
         ConsistencyCheckJobConfiguration jobConfig = new 
YamlConsistencyCheckJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
-        PipelineJobItemManager<ConsistencyCheckJobItemProgress> jobItemManager 
= new PipelineJobItemManager<>(getJobOption().getYamlJobItemProgressSwapper());
+        PipelineJobItemManager<ConsistencyCheckJobItemProgress> jobItemManager 
= new PipelineJobItemManager<>(getJobType().getYamlJobItemProgressSwapper());
         Optional<ConsistencyCheckJobItemProgress> jobItemProgress = 
jobItemManager.getProgress(jobConfig.getJobId(), 
shardingContext.getShardingItem());
         return new ConsistencyCheckJobItemContext(jobConfig, 
shardingContext.getShardingItem(), JobStatus.RUNNING, 
jobItemProgress.orElse(null));
     }
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobOption.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobOption.java
deleted file mode 100644
index d5a1caaf312..00000000000
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobOption.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
-
-import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
-import 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlConsistencyCheckJobItemProgressSwapper;
-import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
-import 
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
-import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.yaml.YamlConsistencyCheckJobConfigurationSwapper;
-
-/**
- * Consistency check job option.
- */
-public final class ConsistencyCheckJobOption implements PipelineJobOption {
-    
-    @SuppressWarnings("unchecked")
-    @Override
-    public YamlConsistencyCheckJobConfigurationSwapper 
getYamlJobConfigurationSwapper() {
-        return new YamlConsistencyCheckJobConfigurationSwapper();
-    }
-    
-    @SuppressWarnings("unchecked")
-    @Override
-    public YamlConsistencyCheckJobItemProgressSwapper 
getYamlJobItemProgressSwapper() {
-        return new YamlConsistencyCheckJobItemProgressSwapper();
-    }
-    
-    @Override
-    public Class<ConsistencyCheckJob> getJobClass() {
-        return ConsistencyCheckJob.class;
-    }
-    
-    @Override
-    public boolean isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished() {
-        return true;
-    }
-    
-    @Override
-    public PipelineJobInfo getJobInfo(final String jobId) {
-        throw new UnsupportedOperationException();
-    }
-    
-    @Override
-    public PipelineDataConsistencyChecker buildDataConsistencyChecker(final 
PipelineJobConfiguration jobConfig,
-                                                                      final 
TransmissionProcessContext processContext, final 
ConsistencyCheckJobItemProgressContext progressContext) {
-        return null;
-    }
-    
-    @Override
-    public String getType() {
-        return "CONSISTENCY_CHECK";
-    }
-}
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java
index 10343fb846c..2d096a2d0d6 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java
@@ -17,8 +17,14 @@
 
 package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
 
+import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
+import 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlConsistencyCheckJobItemProgressSwapper;
 import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
-import 
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
+import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
+import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.yaml.YamlConsistencyCheckJobConfigurationSwapper;
 
 /**
  * Consistency check job type.
@@ -30,9 +36,37 @@ public final class ConsistencyCheckJobType implements 
PipelineJobType {
         return "02";
     }
     
+    @SuppressWarnings("unchecked")
     @Override
-    public PipelineJobOption getOption() {
-        return new ConsistencyCheckJobOption();
+    public YamlConsistencyCheckJobConfigurationSwapper 
getYamlJobConfigurationSwapper() {
+        return new YamlConsistencyCheckJobConfigurationSwapper();
+    }
+    
+    @SuppressWarnings("unchecked")
+    @Override
+    public YamlConsistencyCheckJobItemProgressSwapper 
getYamlJobItemProgressSwapper() {
+        return new YamlConsistencyCheckJobItemProgressSwapper();
+    }
+    
+    @Override
+    public Class<ConsistencyCheckJob> getJobClass() {
+        return ConsistencyCheckJob.class;
+    }
+    
+    @Override
+    public boolean isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished() {
+        return true;
+    }
+    
+    @Override
+    public PipelineJobInfo getJobInfo(final String jobId) {
+        throw new UnsupportedOperationException();
+    }
+    
+    @Override
+    public PipelineDataConsistencyChecker buildDataConsistencyChecker(final 
PipelineJobConfiguration jobConfig,
+                                                                      final 
TransmissionProcessContext processContext, final 
ConsistencyCheckJobItemProgressContext progressContext) {
+        return null;
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPI.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPI.java
index 4c1018a7ff1..e4b7f31ebea 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPI.java
@@ -35,7 +35,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfi
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobId;
-import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobOption;
+import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobType;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.yaml.YamlConsistencyCheckJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.yaml.YamlConsistencyCheckJobConfigurationSwapper;
@@ -72,10 +72,10 @@ public final class ConsistencyCheckJobAPI {
     
     private final PipelineJobItemManager<ConsistencyCheckJobItemProgress> 
jobItemManager;
     
-    public ConsistencyCheckJobAPI(final ConsistencyCheckJobOption jobOption) {
-        progressSwapper = jobOption.getYamlJobItemProgressSwapper();
-        jobManager = new PipelineJobManager(jobOption);
-        jobConfigManager = new PipelineJobConfigurationManager(jobOption);
+    public ConsistencyCheckJobAPI(final ConsistencyCheckJobType jobType) {
+        progressSwapper = jobType.getYamlJobItemProgressSwapper();
+        jobManager = new PipelineJobManager(jobType);
+        jobConfigManager = new PipelineJobConfigurationManager(jobType);
         jobItemManager = new PipelineJobItemManager<>(progressSwapper);
     }
     
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
index 35accd7c08d..04937944021 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
@@ -34,13 +34,12 @@ import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDat
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
-import 
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
-import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobOption;
+import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobType;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.context.ConsistencyCheckJobItemContext;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
@@ -56,11 +55,11 @@ import java.util.concurrent.atomic.AtomicReference;
 @Slf4j
 public final class ConsistencyCheckTasksRunner implements PipelineTasksRunner {
     
-    private final ConsistencyCheckJobOption jobAPI = new 
ConsistencyCheckJobOption();
+    private final PipelineJobType jobType = new ConsistencyCheckJobType();
     
-    private final PipelineJobManager jobManager = new 
PipelineJobManager(jobAPI);
+    private final PipelineJobManager jobManager = new 
PipelineJobManager(jobType);
     
-    private final PipelineJobItemManager<TransmissionJobItemProgress> 
jobItemManager = new 
PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper());
+    private final PipelineJobItemManager<TransmissionJobItemProgress> 
jobItemManager = new 
PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
     
     private final PipelineProcessConfigurationPersistService 
processConfigPersistService = new PipelineProcessConfigurationPersistService();
     
@@ -90,7 +89,7 @@ public final class ConsistencyCheckTasksRunner implements 
PipelineTasksRunner {
         if (jobItemContext.isStopping()) {
             return;
         }
-        new 
PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobType.class, 
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType()).getOption()
+        new 
PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobType.class, 
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType())
                 
.getYamlJobItemProgressSwapper()).persistProgress(jobItemContext);
         CompletableFuture<?> future = 
jobItemContext.getProcessContext().getConsistencyCheckExecuteEngine().submit(checkExecutor);
         ExecuteEngine.trigger(Collections.singletonList(future), new 
CheckExecuteCallback());
@@ -108,12 +107,11 @@ public final class ConsistencyCheckTasksRunner implements 
PipelineTasksRunner {
         protected void runBlocking() {
             jobItemManager.persistProgress(jobItemContext);
             PipelineJobType jobType = 
PipelineJobIdUtils.parseJobType(parentJobId);
-            PipelineJobOption jobOption = 
TypedSPILoader.getService(PipelineJobType.class, jobType.getType()).getOption();
-            PipelineJobConfiguration parentJobConfig = new 
PipelineJobConfigurationManager(jobOption).getJobConfiguration(parentJobId);
+            PipelineJobConfiguration parentJobConfig = new 
PipelineJobConfigurationManager(jobType).getJobConfiguration(parentJobId);
             try {
                 PipelineProcessConfiguration processConfig = 
PipelineProcessConfigurationUtils.convertWithDefaultValue(
                         
processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(parentJobConfig.getJobId()),
 jobType.getType()));
-                PipelineDataConsistencyChecker checker = 
jobOption.buildDataConsistencyChecker(
+                PipelineDataConsistencyChecker checker = 
jobType.buildDataConsistencyChecker(
                         parentJobConfig, new 
TransmissionProcessContext(parentJobConfig.getJobId(), processConfig), 
jobItemContext.getProgressContext());
                 consistencyChecker.set(checker);
                 Map<String, TableDataConsistencyCheckResult> checkResultMap = 
checker.check(checkJobConfig.getAlgorithmTypeName(), 
checkJobConfig.getAlgorithmProps());
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index c405924b266..320ebb5db58 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -68,9 +68,7 @@ import java.util.stream.Collectors;
 @Slf4j
 public final class MigrationJob extends AbstractSimplePipelineJob {
     
-    private final MigrationJobOption jobOption = new MigrationJobOption();
-    
-    private final PipelineJobItemManager<TransmissionJobItemProgress> 
jobItemManager = new 
PipelineJobItemManager<>(jobOption.getYamlJobItemProgressSwapper());
+    private final PipelineJobItemManager<TransmissionJobItemProgress> 
jobItemManager = new PipelineJobItemManager<>(new 
MigrationJobType().getYamlJobItemProgressSwapper());
     
     private final PipelineProcessConfigurationPersistService 
processConfigPersistService = new PipelineProcessConfigurationPersistService();
     
@@ -89,7 +87,7 @@ public final class MigrationJob extends 
AbstractSimplePipelineJob {
         MigrationJobConfiguration jobConfig = new 
YamlMigrationJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
         Optional<TransmissionJobItemProgress> initProgress = 
jobItemManager.getProgress(shardingContext.getJobName(), shardingItem);
         PipelineProcessConfiguration processConfig = 
PipelineProcessConfigurationUtils.convertWithDefaultValue(
-                
processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()),
 jobOption.getType()));
+                
processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()),
 "MIGRATION"));
         TransmissionProcessContext jobProcessContext = new 
TransmissionProcessContext(jobConfig.getJobId(), processConfig);
         MigrationTaskConfiguration taskConfig = 
buildTaskConfiguration(jobConfig, shardingItem, 
jobProcessContext.getPipelineProcessConfig());
         return new MigrationJobItemContext(jobConfig, shardingItem, 
initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager);
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobOption.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobOption.java
deleted file mode 100644
index 31e36e2879b..00000000000
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobOption.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.scenario.migration;
-
-import lombok.extern.slf4j.Slf4j;
-import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
-import org.apache.shardingsphere.data.pipeline.common.datanode.DataNodeUtils;
-import 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlTransmissionJobItemProgressSwapper;
-import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
-import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
-import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
-import 
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
-import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.check.consistency.MigrationDataConsistencyChecker;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.YamlMigrationJobConfigurationSwapper;
-
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.Optional;
-
-/**
- * Migration job option.
- */
-@Slf4j
-public final class MigrationJobOption implements PipelineJobOption {
-    
-    @SuppressWarnings("unchecked")
-    @Override
-    public YamlMigrationJobConfigurationSwapper 
getYamlJobConfigurationSwapper() {
-        return new YamlMigrationJobConfigurationSwapper();
-    }
-    
-    @SuppressWarnings("unchecked")
-    @Override
-    public YamlTransmissionJobItemProgressSwapper 
getYamlJobItemProgressSwapper() {
-        return new YamlTransmissionJobItemProgressSwapper();
-    }
-    
-    @Override
-    public Class<MigrationJob> getJobClass() {
-        return MigrationJob.class;
-    }
-    
-    @Override
-    public Optional<String> getToBeStartDisabledNextJobType() {
-        return Optional.of("CONSISTENCY_CHECK");
-    }
-    
-    @Override
-    public Optional<String> getToBeStoppedPreviousJobType() {
-        return Optional.of("CONSISTENCY_CHECK");
-    }
-    
-    @Override
-    public PipelineJobInfo getJobInfo(final String jobId) {
-        PipelineJobMetaData jobMetaData = new 
PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
-        Collection<String> sourceTables = new LinkedList<>();
-        new 
PipelineJobConfigurationManager(this).<MigrationJobConfiguration>getJobConfiguration(jobId).getJobShardingDataNodes()
-                .forEach(each -> each.getEntries().forEach(entry -> 
entry.getDataNodes().forEach(dataNode -> 
sourceTables.add(DataNodeUtils.formatWithSchema(dataNode)))));
-        return new PipelineJobInfo(jobMetaData, null, String.join(",", 
sourceTables));
-    }
-    
-    @Override
-    public PipelineDataConsistencyChecker buildDataConsistencyChecker(final 
PipelineJobConfiguration jobConfig, final TransmissionProcessContext 
processContext,
-                                                                      final 
ConsistencyCheckJobItemProgressContext progressContext) {
-        return new MigrationDataConsistencyChecker((MigrationJobConfiguration) 
jobConfig, processContext, progressContext);
-    }
-    
-    @Override
-    public String getType() {
-        return "MIGRATION";
-    }
-}
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java
index c3a26100194..fc84d389aef 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java
@@ -17,8 +17,24 @@
 
 package org.apache.shardingsphere.data.pipeline.scenario.migration;
 
+import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
+import org.apache.shardingsphere.data.pipeline.common.datanode.DataNodeUtils;
+import 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlTransmissionJobItemProgressSwapper;
 import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
-import 
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
+import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
+import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
+import 
org.apache.shardingsphere.data.pipeline.scenario.migration.check.consistency.MigrationDataConsistencyChecker;
+import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.YamlMigrationJobConfigurationSwapper;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.Optional;
 
 /**
  * Migration job type.
@@ -30,9 +46,46 @@ public final class MigrationJobType implements 
PipelineJobType {
         return "01";
     }
     
+    @SuppressWarnings("unchecked")
+    @Override
+    public YamlMigrationJobConfigurationSwapper 
getYamlJobConfigurationSwapper() {
+        return new YamlMigrationJobConfigurationSwapper();
+    }
+    
+    @SuppressWarnings("unchecked")
+    @Override
+    public YamlTransmissionJobItemProgressSwapper 
getYamlJobItemProgressSwapper() {
+        return new YamlTransmissionJobItemProgressSwapper();
+    }
+    
+    @Override
+    public Class<MigrationJob> getJobClass() {
+        return MigrationJob.class;
+    }
+    
+    @Override
+    public Optional<String> getToBeStartDisabledNextJobType() {
+        return Optional.of("CONSISTENCY_CHECK");
+    }
+    
+    @Override
+    public Optional<String> getToBeStoppedPreviousJobType() {
+        return Optional.of("CONSISTENCY_CHECK");
+    }
+    
+    @Override
+    public PipelineJobInfo getJobInfo(final String jobId) {
+        PipelineJobMetaData jobMetaData = new 
PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
+        Collection<String> sourceTables = new LinkedList<>();
+        new PipelineJobConfigurationManager(new 
MigrationJobType()).<MigrationJobConfiguration>getJobConfiguration(jobId).getJobShardingDataNodes()
+                .forEach(each -> each.getEntries().forEach(entry -> 
entry.getDataNodes().forEach(dataNode -> 
sourceTables.add(DataNodeUtils.formatWithSchema(dataNode)))));
+        return new PipelineJobInfo(jobMetaData, null, String.join(",", 
sourceTables));
+    }
+    
     @Override
-    public PipelineJobOption getOption() {
-        return new MigrationJobOption();
+    public PipelineDataConsistencyChecker buildDataConsistencyChecker(final 
PipelineJobConfiguration jobConfig, final TransmissionProcessContext 
processContext,
+                                                                      final 
ConsistencyCheckJobItemProgressContext progressContext) {
+        return new MigrationDataConsistencyChecker((MigrationJobConfiguration) 
jobConfig, processContext, progressContext);
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
index 0da06c43777..45e5071185b 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
@@ -41,12 +41,11 @@ import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
-import 
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobOption;
+import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.YamlMigrationJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.YamlMigrationJobConfigurationSwapper;
@@ -100,9 +99,9 @@ public final class MigrationJobAPI implements 
TransmissionJobAPI {
     private final PipelineDataSourcePersistService dataSourcePersistService;
     
     public MigrationJobAPI() {
-        PipelineJobOption jobOption = new MigrationJobOption();
-        jobManager = new PipelineJobManager(jobOption);
-        jobConfigManager = new PipelineJobConfigurationManager(jobOption);
+        PipelineJobType jobType = new MigrationJobType();
+        jobManager = new PipelineJobManager(jobType);
+        jobConfigManager = new PipelineJobConfigurationManager(jobType);
         dataSourcePersistService = new PipelineDataSourcePersistService();
     }
     
@@ -319,7 +318,7 @@ public final class MigrationJobAPI implements 
TransmissionJobAPI {
     }
     
     private void cleanTempTableOnRollback(final String jobId) throws 
SQLException {
-        MigrationJobConfiguration jobConfig = new 
PipelineJobConfigurationManager(TypedSPILoader.getService(PipelineJobType.class,
 getType()).getOption()).getJobConfiguration(jobId);
+        MigrationJobConfiguration jobConfig = new 
PipelineJobConfigurationManager(TypedSPILoader.getService(PipelineJobType.class,
 getType())).getJobConfiguration(jobId);
         PipelineCommonSQLBuilder pipelineSQLBuilder = new 
PipelineCommonSQLBuilder(jobConfig.getTargetDatabaseType());
         TableAndSchemaNameMapper mapping = new 
TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap());
         try (
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
index 125c8433042..c4dcd35b7fb 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
@@ -43,7 +43,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.Table
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryChecker;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobOption;
+import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
 import org.apache.shardingsphere.infra.datanode.DataNode;
 import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
@@ -99,7 +99,7 @@ public final class MigrationDataConsistencyChecker implements 
PipelineDataConsis
     }
     
     private long getRecordsCount() {
-        Map<Integer, TransmissionJobItemProgress> jobProgress = new 
TransmissionJobManager(new MigrationJobOption()).getJobProgress(jobConfig);
+        Map<Integer, TransmissionJobItemProgress> jobProgress = new 
TransmissionJobManager(new MigrationJobType()).getJobProgress(jobConfig);
         return 
jobProgress.values().stream().filter(Objects::nonNull).mapToLong(TransmissionJobItemProgress::getProcessedRecordsCount).sum();
     }
     
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
index 9d64551061c..4afd70afd90 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
@@ -28,9 +28,9 @@ import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSou
 import org.apache.shardingsphere.data.pipeline.common.execute.ExecuteEngine;
 import 
org.apache.shardingsphere.data.pipeline.common.ingest.channel.PipelineChannelCreator;
 import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
-import 
org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.JobItemIncrementalTasksProgress;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
+import 
org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.listener.PipelineJobProgressListener;
 import 
org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.common.spi.ingest.dumper.IncrementalDumperCreator;
@@ -54,7 +54,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.PrepareT
 import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobOption;
+import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
@@ -81,9 +81,9 @@ import java.util.concurrent.TimeUnit;
 @Slf4j
 public final class MigrationJobPreparer {
     
-    private final MigrationJobOption jobOption = new MigrationJobOption();
+    private final MigrationJobType jobType = new MigrationJobType();
     
-    private final PipelineJobItemManager<TransmissionJobItemProgress> 
jobItemManager = new 
PipelineJobItemManager<>(jobOption.getYamlJobItemProgressSwapper());
+    private final PipelineJobItemManager<TransmissionJobItemProgress> 
jobItemManager = new 
PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
     
     /**
      * Do prepare work.
diff --git 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterTransmissionRuleUpdater.java
 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterTransmissionRuleUpdater.java
index fff58c7361c..cdc5c106c20 100644
--- 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterTransmissionRuleUpdater.java
+++ 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterTransmissionRuleUpdater.java
@@ -37,7 +37,7 @@ public final class AlterTransmissionRuleUpdater implements 
RALUpdater<AlterTrans
     @Override
     public void executeUpdate(final String databaseName, final 
AlterTransmissionRuleStatement sqlStatement) {
         PipelineProcessConfiguration processConfig = 
TransmissionProcessConfigurationSegmentConverter.convert(sqlStatement.getProcessConfigSegment());
-        String jobType = TypedSPILoader.getService(PipelineJobType.class, 
sqlStatement.getJobTypeName()).getOption().getType();
+        String jobType = TypedSPILoader.getService(PipelineJobType.class, 
sqlStatement.getJobTypeName()).getType();
         processConfigPersistService.persist(new 
PipelineContextKey(InstanceType.PROXY), jobType, processConfig);
     }
     
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
index a0835aaf4a8..e3b53e10fcf 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
@@ -26,8 +26,8 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobId;
+import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobType;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.ConsistencyCheckJobAPI;
-import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobOption;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.CreateConsistencyCheckJobParameter;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.context.ConsistencyCheckJobItemContext;
@@ -51,11 +51,11 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 
 class ConsistencyCheckJobAPITest {
     
-    private final ConsistencyCheckJobOption jobOption = new 
ConsistencyCheckJobOption();
+    private final ConsistencyCheckJobType jobType = new 
ConsistencyCheckJobType();
     
-    private final ConsistencyCheckJobAPI jobAPI = new 
ConsistencyCheckJobAPI(jobOption);
+    private final ConsistencyCheckJobAPI jobAPI = new 
ConsistencyCheckJobAPI(jobType);
     
-    private final PipelineJobItemManager<TransmissionJobItemProgress> 
jobItemManager = new 
PipelineJobItemManager<>(jobOption.getYamlJobItemProgressSwapper());
+    private final PipelineJobItemManager<TransmissionJobItemProgress> 
jobItemManager = new 
PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
     
     private final YamlMigrationJobConfigurationSwapper jobConfigSwapper = new 
YamlMigrationJobConfigurationSwapper();
     
@@ -70,7 +70,7 @@ class ConsistencyCheckJobAPITest {
         String parentJobId = parentJobConfig.getJobId();
         String checkJobId = jobAPI.start(new 
CreateConsistencyCheckJobParameter(parentJobId, null, null,
                 parentJobConfig.getSourceDatabaseType(), 
parentJobConfig.getTargetDatabaseType()));
-        ConsistencyCheckJobConfiguration checkJobConfig = new 
PipelineJobConfigurationManager(jobOption).getJobConfiguration(checkJobId);
+        ConsistencyCheckJobConfiguration checkJobConfig = new 
PipelineJobConfigurationManager(jobType).getJobConfiguration(checkJobId);
         int expectedSequence = ConsistencyCheckSequence.MIN_SEQUENCE;
         String expectCheckJobId = new 
ConsistencyCheckJobId(PipelineJobIdUtils.parseContextKey(parentJobId), 
parentJobId, expectedSequence).marshal();
         assertThat(checkJobConfig.getJobId(), is(expectCheckJobId));
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
index 46f0812cdc1..a06584cdf04 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
@@ -30,6 +30,7 @@ import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSou
 import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlTransmissionJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
 import 
org.apache.shardingsphere.data.pipeline.common.pojo.TransmissionJobItemInfo;
 import 
org.apache.shardingsphere.data.pipeline.common.util.PipelineDistributedBarrier;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
@@ -37,15 +38,15 @@ import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.Tabl
 import 
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
-import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
+import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.api.MigrationJobAPI;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobOption;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
 import org.apache.shardingsphere.data.pipeline.spi.PipelineDataSourceCreator;
@@ -96,7 +97,7 @@ import static org.mockito.Mockito.when;
 @StaticMockSettings(PipelineDistributedBarrier.class)
 class MigrationJobAPITest {
     
-    private static MigrationJobOption jobOption;
+    private static PipelineJobType jobType;
     
     private static MigrationJobAPI jobAPI;
     
@@ -113,12 +114,12 @@ class MigrationJobAPITest {
     @BeforeAll
     static void beforeClass() {
         PipelineContextUtils.mockModeConfigAndContextManager();
-        jobOption = new MigrationJobOption();
+        jobType = new MigrationJobType();
         jobAPI = (MigrationJobAPI) 
TypedSPILoader.getService(TransmissionJobAPI.class, "MIGRATION");
-        jobConfigManager = new PipelineJobConfigurationManager(jobOption);
-        jobManager = new PipelineJobManager(jobOption);
-        transmissionJobManager = new TransmissionJobManager(jobOption);
-        jobItemManager = new 
PipelineJobItemManager<>(jobOption.getYamlJobItemProgressSwapper());
+        jobConfigManager = new PipelineJobConfigurationManager(jobType);
+        jobManager = new PipelineJobManager(jobType);
+        transmissionJobManager = new TransmissionJobManager(jobType);
+        jobItemManager = new 
PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
         String jdbcUrl = 
"jdbc:h2:mem:test_ds_0;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL";
         databaseType = DatabaseTypeFactory.get(jdbcUrl);
         Map<String, Object> props = new HashMap<>();
@@ -195,9 +196,9 @@ class MigrationJobAPITest {
         initTableData(jobConfig);
         jobManager.start(jobConfig);
         PipelineProcessConfiguration processConfig = 
PipelineProcessConfigurationUtils.convertWithDefaultValue(
-                new 
PipelineProcessConfigurationPersistService().load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()),
 jobOption.getType()));
+                new 
PipelineProcessConfigurationPersistService().load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()),
 jobType.getType()));
         TransmissionProcessContext processContext = new 
TransmissionProcessContext(jobConfig.getJobId(), processConfig);
-        Map<String, TableDataConsistencyCheckResult> checkResultMap = 
jobOption.buildDataConsistencyChecker(
+        Map<String, TableDataConsistencyCheckResult> checkResultMap = 
jobType.buildDataConsistencyChecker(
                 jobConfig, processContext, new 
ConsistencyCheckJobItemProgressContext(jobConfig.getJobId(), 0, 
"H2")).check("FIXTURE", null);
         assertThat(checkResultMap.size(), is(1));
         String checkKey = "t_order";

Reply via email to