This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new fff648e94a1 Refactor PipelineJobConfigurationManager (#37109)
fff648e94a1 is described below

commit fff648e94a14c5d6770f64ae13899b8f4f4d6bd8
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Nov 15 13:00:39 2025 +0800

    Refactor PipelineJobConfigurationManager (#37109)
---
 .../core/job/service/PipelineJobConfigurationManager.java      | 10 +++++-----
 .../data/pipeline/core/job/service/PipelineJobManager.java     |  2 +-
 .../data/pipeline/core/job/service/TransmissionJobManager.java |  2 +-
 .../apache/shardingsphere/data/pipeline/cdc/CDCJobType.java    |  2 +-
 .../apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java |  4 ++--
 .../data/pipeline/cdc/handler/CDCBackendHandler.java           |  2 +-
 .../scenario/consistencycheck/api/ConsistencyCheckJobAPI.java  |  2 +-
 .../consistencycheck/task/ConsistencyCheckTasksRunner.java     |  2 +-
 .../data/pipeline/scenario/migration/MigrationJobType.java     |  2 +-
 .../data/pipeline/scenario/migration/api/MigrationJobAPI.java  |  4 ++--
 .../distsql/handler/update/CheckMigrationJobExecutor.java      |  2 +-
 .../consistencycheck/api/ConsistencyCheckJobAPITest.java       |  4 ++--
 .../pipeline/scenario/migration/api/MigrationJobAPITest.java   |  2 +-
 13 files changed, 20 insertions(+), 20 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobConfigurationManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobConfigurationManager.java
index f9af4cc3f73..92d5deda4c5 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobConfigurationManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobConfigurationManager.java
@@ -22,7 +22,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.execute.ShardingTotalCountUs
 import 
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.job.config.yaml.swapper.YamlPipelineJobConfigurationSwapper;
 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
-import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
+import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobOption;
 import 
org.apache.shardingsphere.data.pipeline.core.listener.PipelineElasticJobListener;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.infra.util.datetime.DateTimeFormatterFactory;
@@ -37,7 +37,7 @@ import java.util.Collections;
 @RequiredArgsConstructor
 public final class PipelineJobConfigurationManager {
     
-    private final PipelineJobType jobType;
+    private final PipelineJobOption jobOption;
     
     /**
      * Get job configuration.
@@ -48,7 +48,7 @@ public final class PipelineJobConfigurationManager {
      */
     @SuppressWarnings("unchecked")
     public <T extends PipelineJobConfiguration> T getJobConfiguration(final 
String jobId) {
-        return (T) 
jobType.getOption().getYamlJobConfigurationSwapper().swapToObject(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getJobParameter());
+        return (T) 
jobOption.getYamlJobConfigurationSwapper().swapToObject(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getJobParameter());
     }
     
     /**
@@ -61,8 +61,8 @@ public final class PipelineJobConfigurationManager {
     public JobConfigurationPOJO convertToJobConfigurationPOJO(final 
PipelineJobConfiguration jobConfig) {
         JobConfigurationPOJO result = new JobConfigurationPOJO();
         result.setJobName(jobConfig.getJobId());
-        
result.setShardingTotalCount(jobType.getOption().isForceNoShardingWhenConvertToJobConfigurationPOJO()
 ? 1 : jobConfig.getJobShardingCount());
-        YamlPipelineJobConfigurationSwapper swapper = 
jobType.getOption().getYamlJobConfigurationSwapper();
+        
result.setShardingTotalCount(jobOption.isForceNoShardingWhenConvertToJobConfigurationPOJO()
 ? 1 : jobConfig.getJobShardingCount());
+        YamlPipelineJobConfigurationSwapper swapper = 
jobOption.getYamlJobConfigurationSwapper();
         
result.setJobParameter(YamlEngine.marshal(swapper.swapToYamlConfiguration(jobConfig)));
         String createTimeFormat = 
LocalDateTime.now().format(DateTimeFormatterFactory.getDatetimeFormatter());
         result.getProps().setProperty("create_time", createTimeFormat);
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
index dcd238cc377..4738540975c 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
@@ -69,7 +69,7 @@ public final class PipelineJobManager {
             return;
         }
         governanceFacade.getJobFacade().getJob().create(jobId, 
jobType.getOption().getJobClass());
-        governanceFacade.getJobFacade().getConfiguration().persist(jobId, new 
PipelineJobConfigurationManager(jobType).convertToJobConfigurationPOJO(jobConfig));
+        governanceFacade.getJobFacade().getConfiguration().persist(jobId, new 
PipelineJobConfigurationManager(jobType.getOption()).convertToJobConfigurationPOJO(jobConfig));
     }
     
     /**
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
index f47efd748e6..d9f8277573a 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
@@ -53,7 +53,7 @@ public final class TransmissionJobManager {
      * @return job item infos
      */
     public Collection<TransmissionJobItemInfo> getJobItemInfos(final String 
jobId) {
-        PipelineJobConfiguration jobConfig = new 
PipelineJobConfigurationManager(jobType).getJobConfiguration(jobId);
+        PipelineJobConfiguration jobConfig = new 
PipelineJobConfigurationManager(jobType.getOption()).getJobConfiguration(jobId);
         long startTimeMillis = 
Long.parseLong(Optional.ofNullable(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getProps().getProperty("start_time_millis")).orElse("0"));
         Map<Integer, TransmissionJobItemProgress> jobProgress = 
getJobProgress(jobConfig);
         List<TransmissionJobItemInfo> result = new LinkedList<>();
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobType.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobType.java
index a0264488739..78782ff27e3 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobType.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobType.java
@@ -38,7 +38,7 @@ public final class CDCJobType implements PipelineJobType {
     
     @Override
     public PipelineJobInfo getJobInfo(final PipelineJobMetaData jobMetaData) {
-        CDCJobConfiguration jobConfig = new 
PipelineJobConfigurationManager(new 
CDCJobType()).getJobConfiguration(jobMetaData.getJobId());
+        CDCJobConfiguration jobConfig = new 
PipelineJobConfigurationManager(getOption()).getJobConfiguration(jobMetaData.getJobId());
         return new PipelineJobInfo(jobMetaData, jobConfig.getDatabaseName(), 
String.join(", ", jobConfig.getSchemaTableNames()));
     }
     
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
index 1eb93f50767..72b54107780 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
@@ -110,7 +110,7 @@ public final class CDCJobAPI implements TransmissionJobAPI {
     public CDCJobAPI() {
         jobType = new CDCJobType();
         jobManager = new PipelineJobManager(jobType);
-        jobConfigManager = new PipelineJobConfigurationManager(jobType);
+        jobConfigManager = new 
PipelineJobConfigurationManager(jobType.getOption());
         dataSourceConfigSwapper = new YamlDataSourceConfigurationSwapper();
         ruleConfigSwapperEngine = new YamlRuleConfigurationSwapperEngine();
         pipelineDataSourceConfigSwapper = new 
YamlPipelineDataSourceConfigurationSwapper();
@@ -285,7 +285,7 @@ public final class CDCJobAPI implements TransmissionJobAPI {
      * @return job item infos
      */
     public Collection<CDCJobItemInfo> getJobItemInfos(final String jobId) {
-        CDCJobConfiguration jobConfig = new 
PipelineJobConfigurationManager(jobType).getJobConfiguration(jobId);
+        CDCJobConfiguration jobConfig = new 
PipelineJobConfigurationManager(jobType.getOption()).getJobConfiguration(jobId);
         ShardingSphereDatabase database = 
PipelineContextManager.getProxyContext().getMetaDataContexts().getMetaData().getDatabase(jobConfig.getDatabaseName());
         Collection<CDCJobItemInfo> result = new LinkedList<>();
         for (TransmissionJobItemInfo each : new 
TransmissionJobManager(jobType).getJobItemInfos(jobId)) {
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
index dac670c5455..e129229e4b7 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
@@ -75,7 +75,7 @@ public final class CDCBackendHandler {
     
     private final CDCJobAPI jobAPI = (CDCJobAPI) 
TypedSPILoader.getService(TransmissionJobAPI.class, "STREAMING");
     
-    private final PipelineJobConfigurationManager jobConfigManager = new 
PipelineJobConfigurationManager(new CDCJobType());
+    private final PipelineJobConfigurationManager jobConfigManager = new 
PipelineJobConfigurationManager(new CDCJobType().getOption());
     
     /**
      * Get database name by job ID.
diff --git 
a/kernel/data-pipeline/scenario/consistency-check/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPI.java
 
b/kernel/data-pipeline/scenario/consistency-check/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPI.java
index 0d99f34600f..859db4bedcc 100644
--- 
a/kernel/data-pipeline/scenario/consistency-check/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/consistency-check/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPI.java
@@ -70,7 +70,7 @@ public final class ConsistencyCheckJobAPI {
     public ConsistencyCheckJobAPI(final ConsistencyCheckJobType jobType) {
         progressSwapper = (YamlConsistencyCheckJobItemProgressSwapper) 
jobType.getOption().getYamlJobItemProgressSwapper();
         jobManager = new PipelineJobManager(jobType);
-        jobConfigManager = new PipelineJobConfigurationManager(jobType);
+        jobConfigManager = new 
PipelineJobConfigurationManager(jobType.getOption());
         jobItemManager = new PipelineJobItemManager<>(progressSwapper);
     }
     
diff --git 
a/kernel/data-pipeline/scenario/consistency-check/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
 
b/kernel/data-pipeline/scenario/consistency-check/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
index 50b84d63eca..4d7951d4b66 100644
--- 
a/kernel/data-pipeline/scenario/consistency-check/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
+++ 
b/kernel/data-pipeline/scenario/consistency-check/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
@@ -107,7 +107,7 @@ public final class ConsistencyCheckTasksRunner implements 
PipelineTasksRunner {
         protected void runBlocking() {
             jobItemManager.persistProgress(jobItemContext);
             PipelineJobType jobType = 
PipelineJobIdUtils.parseJobType(parentJobId);
-            PipelineJobConfiguration parentJobConfig = new 
PipelineJobConfigurationManager(jobType).getJobConfiguration(parentJobId);
+            PipelineJobConfiguration parentJobConfig = new 
PipelineJobConfigurationManager(jobType.getOption()).getJobConfiguration(parentJobId);
             try {
                 PipelineProcessConfiguration processConfig = 
PipelineProcessConfigurationUtils.fillInDefaultValue(
                         
processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(parentJobConfig.getJobId()),
 jobType.getType()));
diff --git 
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java
 
b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java
index da4abe3186e..6005b214b6e 100644
--- 
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java
+++ 
b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java
@@ -48,7 +48,7 @@ public final class MigrationJobType implements 
PipelineJobType {
     @Override
     public PipelineJobInfo getJobInfo(final PipelineJobMetaData jobMetaData) {
         Collection<String> sourceTables = new LinkedList<>();
-        new PipelineJobConfigurationManager(new 
MigrationJobType()).<MigrationJobConfiguration>getJobConfiguration(jobMetaData.getJobId()).getJobShardingDataNodes()
+        new PipelineJobConfigurationManager(new 
MigrationJobType().getOption()).<MigrationJobConfiguration>getJobConfiguration(jobMetaData.getJobId()).getJobShardingDataNodes()
                 .forEach(each -> each.getEntries().forEach(entry -> 
entry.getDataNodes().forEach(dataNode -> sourceTables.add(dataNode.format()))));
         return new PipelineJobInfo(jobMetaData, null, String.join(",", 
sourceTables));
     }
diff --git 
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
 
b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
index 7745861d7a7..f08ab2bdf62 100644
--- 
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
@@ -99,7 +99,7 @@ public final class MigrationJobAPI implements 
TransmissionJobAPI {
     public MigrationJobAPI() {
         PipelineJobType jobType = new MigrationJobType();
         jobManager = new PipelineJobManager(jobType);
-        jobConfigManager = new PipelineJobConfigurationManager(jobType);
+        jobConfigManager = new 
PipelineJobConfigurationManager(jobType.getOption());
         dataSourcePersistService = new PipelineDataSourcePersistService();
     }
     
@@ -326,7 +326,7 @@ public final class MigrationJobAPI implements 
TransmissionJobAPI {
     }
     
     private void cleanTempTableOnRollback(final String jobId) throws 
SQLException {
-        MigrationJobConfiguration jobConfig = new 
PipelineJobConfigurationManager(TypedSPILoader.getService(PipelineJobType.class,
 getType())).getJobConfiguration(jobId);
+        MigrationJobConfiguration jobConfig = new 
PipelineJobConfigurationManager(TypedSPILoader.getService(PipelineJobType.class,
 getType()).getOption()).getJobConfiguration(jobId);
         PipelinePrepareSQLBuilder pipelineSQLBuilder = new 
PipelinePrepareSQLBuilder(jobConfig.getTargetDatabaseType());
         TableAndSchemaNameMapper mapping = new 
TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap());
         try (
diff --git 
a/kernel/data-pipeline/scenario/migration/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/handler/update/CheckMigrationJobExecutor.java
 
b/kernel/data-pipeline/scenario/migration/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/handler/update/CheckMigrationJobExecutor.java
index 4d508787adc..26575f31a8e 100644
--- 
a/kernel/data-pipeline/scenario/migration/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/handler/update/CheckMigrationJobExecutor.java
+++ 
b/kernel/data-pipeline/scenario/migration/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/handler/update/CheckMigrationJobExecutor.java
@@ -51,7 +51,7 @@ public final class CheckMigrationJobExecutor implements 
DistSQLUpdateExecutor<Ch
         String algorithmTypeName = null == typeStrategy ? null : 
typeStrategy.getName();
         Properties algorithmProps = null == typeStrategy ? null : 
typeStrategy.getProps();
         String jobId = sqlStatement.getJobId();
-        MigrationJobConfiguration jobConfig = new 
PipelineJobConfigurationManager(migrationJobType).getJobConfiguration(jobId);
+        MigrationJobConfiguration jobConfig = new 
PipelineJobConfigurationManager(migrationJobType.getOption()).getJobConfiguration(jobId);
         verifyInventoryFinished(jobConfig);
         checkJobAPI.start(new CreateConsistencyCheckJobParameter(jobId, 
algorithmTypeName, algorithmProps, jobConfig.getSourceDatabaseType(), 
jobConfig.getTargetDatabaseType()));
     }
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPITest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPITest.java
index 0cab811477d..aa9ee65bc18 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPITest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPITest.java
@@ -74,7 +74,7 @@ class ConsistencyCheckJobAPITest {
         String parentJobId = parentJobConfig.getJobId();
         String checkJobId = jobAPI.start(new 
CreateConsistencyCheckJobParameter(parentJobId, null, null,
                 parentJobConfig.getSourceDatabaseType(), 
parentJobConfig.getTargetDatabaseType()));
-        ConsistencyCheckJobConfiguration checkJobConfig = new 
PipelineJobConfigurationManager(jobType).getJobConfiguration(checkJobId);
+        ConsistencyCheckJobConfiguration checkJobConfig = new 
PipelineJobConfigurationManager(jobType.getOption()).getJobConfiguration(checkJobId);
         int expectedSequence = ConsistencyCheckSequence.MIN_SEQUENCE;
         String expectCheckJobId = PipelineJobIdUtils.marshal(new 
ConsistencyCheckJobId(PipelineJobIdUtils.parseContextKey(parentJobId), 
parentJobId, expectedSequence));
         assertThat(checkJobConfig.getJobId(), is(expectCheckJobId));
@@ -240,7 +240,7 @@ class ConsistencyCheckJobAPITest {
     }
     
     private void persistCheckJobProgress(final ConsistencyCheckJobItemProgress 
checkJobItemProgress, final String checkJobId, final JobStatus jobStatus, final 
int recordCount) {
-        ConsistencyCheckJobConfiguration checkJobConfig = new 
PipelineJobConfigurationManager(jobType).getJobConfiguration(checkJobId);
+        ConsistencyCheckJobConfiguration checkJobConfig = new 
PipelineJobConfigurationManager(jobType.getOption()).getJobConfiguration(checkJobId);
         ConsistencyCheckJobItemContext checkJobItemContext = new 
ConsistencyCheckJobItemContext(checkJobConfig, 0, jobStatus, 
checkJobItemProgress);
         LocalDateTime checkBeginTime = new 
Timestamp(checkJobItemContext.getProgressContext().getCheckBeginTimeMillis()).toLocalDateTime();
         checkJobItemContext.getProgressContext().setRecordsCount(recordCount);
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPITest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPITest.java
index aa807dbb132..ac760942004 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPITest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPITest.java
@@ -113,7 +113,7 @@ class MigrationJobAPITest {
         PipelineContextUtils.initPipelineContextManager();
         jobType = new MigrationJobType();
         jobAPI = (MigrationJobAPI) 
TypedSPILoader.getService(TransmissionJobAPI.class, "MIGRATION");
-        jobConfigManager = new PipelineJobConfigurationManager(jobType);
+        jobConfigManager = new 
PipelineJobConfigurationManager(jobType.getOption());
         jobManager = new PipelineJobManager(jobType);
         transmissionJobManager = new TransmissionJobManager(jobType);
         jobItemManager = new 
PipelineJobItemManager<>(jobType.getOption().getYamlJobItemProgressSwapper());

Reply via email to