This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 80ea2b529e4 Refactor AbstractSeparablePipelineJob and 
AbstractInseparablePipelineJob (#32749)
80ea2b529e4 is described below

commit 80ea2b529e4e7ab27a39008f57373f2ff2ce2851
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Aug 31 20:52:17 2024 +0800

    Refactor AbstractSeparablePipelineJob and AbstractInseparablePipelineJob 
(#32749)
    
    * Refactor AbstractSeparablePipelineJob and AbstractInseparablePipelineJob
    
    * Refactor AbstractSeparablePipelineJob and AbstractInseparablePipelineJob
---
 .../core/job/AbstractInseparablePipelineJob.java   | 20 ++++++----------
 .../core/job/AbstractSeparablePipelineJob.java     | 28 ++++++++--------------
 .../pipeline/core/job/type/PipelineJobType.java    |  7 ++++++
 .../pipeline/core/job/type/FixtureJobType.java     |  5 ++++
 .../shardingsphere/data/pipeline/cdc/CDCJob.java   |  4 ++--
 .../data/pipeline/cdc/CDCJobType.java              |  5 ++++
 .../data/pipeline/cdc/api/CDCJobAPI.java           |  2 +-
 .../consistencycheck/ConsistencyCheckJob.java      |  5 ++--
 .../consistencycheck/ConsistencyCheckJobType.java  |  5 ++++
 ...tencyCheckJobConfigurationChangedProcessor.java |  2 +-
 .../pipeline/scenario/migration/MigrationJob.java  |  5 ++--
 .../scenario/migration/MigrationJobType.java       |  5 ++++
 .../MigrationJobConfigurationChangedProcessor.java |  2 +-
 .../consistencycheck/ConsistencyCheckJobTest.java  |  2 +-
 14 files changed, 56 insertions(+), 41 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
index b6d4c28eac5..bcbcca15ede 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
@@ -59,19 +59,6 @@ public abstract class AbstractInseparablePipelineJob<T 
extends PipelineJobConfig
     
     private final PipelineJobRunnerManager jobRunnerManager;
     
-    private final TransmissionProcessContext jobProcessContext;
-    
-    protected AbstractInseparablePipelineJob(final String jobId, final 
PipelineJobRunnerManager jobRunnerManager) {
-        this.jobRunnerManager = jobRunnerManager;
-        jobProcessContext = createTransmissionProcessContext(jobId);
-    }
-    
-    private TransmissionProcessContext createTransmissionProcessContext(final 
String jobId) {
-        PipelineProcessConfiguration processConfig = 
PipelineProcessConfigurationUtils.fillInDefaultValue(
-                new 
PipelineProcessConfigurationPersistService().load(PipelineJobIdUtils.parseContextKey(jobId),
 PipelineJobIdUtils.parseJobType(jobId).getType()));
-        return new TransmissionProcessContext(jobId, processConfig);
-    }
-    
     @SuppressWarnings("unchecked")
     @Override
     public final void execute(final ShardingContext shardingContext) {
@@ -79,6 +66,7 @@ public abstract class AbstractInseparablePipelineJob<T 
extends PipelineJobConfig
         log.info("Execute job {}", jobId);
         PipelineJobType jobType = PipelineJobIdUtils.parseJobType(jobId);
         T jobConfig = (T) 
jobType.getYamlJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
+        TransmissionProcessContext jobProcessContext = 
jobType.isTransmissionJob() ? createTransmissionProcessContext(jobId) : null;
         Collection<I> jobItemContexts = new LinkedList<>();
         PipelineJobItemManager<P> jobItemManager = new 
PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
         PipelineGovernanceFacade governanceFacade = 
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId));
@@ -105,6 +93,12 @@ public abstract class AbstractInseparablePipelineJob<T 
extends PipelineJobConfig
         executeIncrementalTasks(jobItemContexts, jobItemManager);
     }
     
+    private TransmissionProcessContext createTransmissionProcessContext(final 
String jobId) {
+        PipelineProcessConfiguration processConfig = 
PipelineProcessConfigurationUtils.fillInDefaultValue(
+                new 
PipelineProcessConfigurationPersistService().load(PipelineJobIdUtils.parseContextKey(jobId),
 PipelineJobIdUtils.parseJobType(jobId).getType()));
+        return new TransmissionProcessContext(jobId, processConfig);
+    }
+    
     protected abstract I buildJobItemContext(T jobConfig, int shardingItem, P 
jobItemProgress, TransmissionProcessContext jobProcessContext);
     
     protected abstract PipelineTasksRunner buildTasksRunner(I jobItemContext);
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
index 5afdc66cb8b..c77303c1feb 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.core.job;
 
 import lombok.Getter;
+import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
@@ -46,29 +47,13 @@ import java.sql.SQLException;
  * @param <I> type of pipeline job item context
  * @param <P> type of pipeline job item progress
  */
+@RequiredArgsConstructor
+@Getter
 @Slf4j
 public abstract class AbstractSeparablePipelineJob<T extends 
PipelineJobConfiguration, I extends PipelineJobItemContext, P extends 
PipelineJobItemProgress> implements PipelineJob {
     
-    @Getter
     private final PipelineJobRunnerManager jobRunnerManager;
     
-    private final TransmissionProcessContext jobProcessContext;
-    
-    protected AbstractSeparablePipelineJob(final String jobId) {
-        this(jobId, true);
-    }
-    
-    protected AbstractSeparablePipelineJob(final String jobId, final boolean 
isTransmissionProcessContextNeeded) {
-        jobRunnerManager = new PipelineJobRunnerManager();
-        jobProcessContext = isTransmissionProcessContextNeeded ? 
createTransmissionProcessContext(jobId) : null;
-    }
-    
-    private TransmissionProcessContext createTransmissionProcessContext(final 
String jobId) {
-        PipelineProcessConfiguration processConfig = 
PipelineProcessConfigurationUtils.fillInDefaultValue(
-                new 
PipelineProcessConfigurationPersistService().load(PipelineJobIdUtils.parseContextKey(jobId),
 PipelineJobIdUtils.parseJobType(jobId).getType()));
-        return new TransmissionProcessContext(jobId, processConfig);
-    }
-    
     @Override
     public final void execute(final ShardingContext shardingContext) {
         String jobId = shardingContext.getJobName();
@@ -81,6 +66,7 @@ public abstract class AbstractSeparablePipelineJob<T extends 
PipelineJobConfigur
         PipelineJobType jobType = PipelineJobIdUtils.parseJobType(jobId);
         PipelineJobConfigurationManager jobConfigManager = new 
PipelineJobConfigurationManager(jobType);
         T jobConfig = jobConfigManager.getJobConfiguration(jobId);
+        TransmissionProcessContext jobProcessContext = 
jobType.isTransmissionJob() ? createTransmissionProcessContext(jobId) : null;
         PipelineJobItemManager<P> jobItemManager = new 
PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
         P jobItemProgress = 
jobItemManager.getProgress(shardingContext.getJobName(), 
shardingItem).orElse(null);
         boolean started = false;
@@ -118,6 +104,12 @@ public abstract class AbstractSeparablePipelineJob<T 
extends PipelineJobConfigur
         return true;
     }
     
+    private TransmissionProcessContext createTransmissionProcessContext(final 
String jobId) {
+        PipelineProcessConfiguration processConfig = 
PipelineProcessConfigurationUtils.fillInDefaultValue(
+                new 
PipelineProcessConfigurationPersistService().load(PipelineJobIdUtils.parseContextKey(jobId),
 PipelineJobIdUtils.parseJobType(jobId).getType()));
+        return new TransmissionProcessContext(jobId, processConfig);
+    }
+    
     protected abstract I buildJobItemContext(T jobConfig, int shardingItem, P 
jobItemProgress, TransmissionProcessContext jobProcessContext);
     
     protected abstract PipelineTasksRunner buildTasksRunner(I jobItemContext);
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/type/PipelineJobType.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/type/PipelineJobType.java
index a217f0ec2b4..aa9886233f6 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/type/PipelineJobType.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/type/PipelineJobType.java
@@ -48,6 +48,13 @@ public interface PipelineJobType extends TypedSPI {
      */
     String getCode();
     
+    /**
+     * Is transmission job.
+     *
+     * @return is transmission job or not
+     */
+    boolean isTransmissionJob();
+    
     /**
      * Get YAML pipeline job configuration swapper.
      *
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/type/FixtureJobType.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/type/FixtureJobType.java
index 2fe7575ae48..c6988111e34 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/type/FixtureJobType.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/type/FixtureJobType.java
@@ -36,6 +36,11 @@ public final class FixtureJobType implements PipelineJobType 
{
         return "00";
     }
     
+    @Override
+    public boolean isTransmissionJob() {
+        return true;
+    }
+    
     @Override
     public <Y extends YamlConfiguration, T extends PipelineJobConfiguration> 
YamlPipelineJobConfigurationSwapper<Y, T> getYamlJobConfigurationSwapper() {
         return null;
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
index c210fffc0e0..ad69003cd6a 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
@@ -75,8 +75,8 @@ public final class CDCJob extends 
AbstractInseparablePipelineJob<CDCJobConfigura
     @Getter
     private final PipelineSink sink;
     
-    public CDCJob(final String jobId, final PipelineSink sink) {
-        super(jobId, new PipelineJobRunnerManager(new 
CDCJobRunnerCleaner(sink)));
+    public CDCJob(final PipelineSink sink) {
+        super(new PipelineJobRunnerManager(new CDCJobRunnerCleaner(sink)));
         this.sink = sink;
     }
     
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobType.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobType.java
index 705b5ee90b5..be6e1d76225 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobType.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobType.java
@@ -36,6 +36,11 @@ public final class CDCJobType implements PipelineJobType {
         return "03";
     }
     
+    @Override
+    public boolean isTransmissionJob() {
+        return true;
+    }
+    
     @SuppressWarnings("unchecked")
     @Override
     public YamlCDCJobConfigurationSwapper getYamlJobConfigurationSwapper() {
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
index 805a15c1666..2e9d6187cf8 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
@@ -223,7 +223,7 @@ public final class CDCJobAPI implements TransmissionJobAPI {
      * @param sink sink
      */
     public void start(final String jobId, final PipelineSink sink) {
-        CDCJob job = new CDCJob(jobId, sink);
+        CDCJob job = new CDCJob(sink);
         PipelineJobRegistry.add(jobId, job);
         enable(jobId);
         JobConfigurationPOJO jobConfigPOJO = 
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
index 24639b29bde..2c73aef56b7 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
@@ -20,6 +20,7 @@ package 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
 import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
 import 
org.apache.shardingsphere.data.pipeline.core.job.AbstractSeparablePipelineJob;
 import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
+import 
org.apache.shardingsphere.data.pipeline.core.job.engine.PipelineJobRunnerManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.ConsistencyCheckJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration;
@@ -31,8 +32,8 @@ import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.task.Co
  */
 public final class ConsistencyCheckJob extends 
AbstractSeparablePipelineJob<ConsistencyCheckJobConfiguration, 
ConsistencyCheckJobItemContext, ConsistencyCheckJobItemProgress> {
     
-    public ConsistencyCheckJob(final String jobId) {
-        super(jobId, false);
+    public ConsistencyCheckJob() {
+        super(new PipelineJobRunnerManager());
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java
index 1a5102018c6..2bfc557d23d 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java
@@ -32,6 +32,11 @@ public final class ConsistencyCheckJobType implements 
PipelineJobType {
         return "02";
     }
     
+    @Override
+    public boolean isTransmissionJob() {
+        return false;
+    }
+    
     @SuppressWarnings("unchecked")
     @Override
     public YamlConsistencyCheckJobConfigurationSwapper 
getYamlJobConfigurationSwapper() {
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java
index 1a3d470f8b1..4afb66b446a 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java
@@ -29,7 +29,7 @@ public final class 
ConsistencyCheckJobConfigurationChangedProcessor implements J
     
     @Override
     public PipelineJob createJob(final ConsistencyCheckJobConfiguration 
jobConfig) {
-        return new ConsistencyCheckJob(jobConfig.getJobId());
+        return new ConsistencyCheckJob();
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index 6ae23369c6c..09e140a72ef 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -25,6 +25,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfigurati
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.TableAndSchemaNameMapper;
 import 
org.apache.shardingsphere.data.pipeline.core.job.AbstractSeparablePipelineJob;
+import 
org.apache.shardingsphere.data.pipeline.core.job.engine.PipelineJobRunnerManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
 import 
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
@@ -55,8 +56,8 @@ public final class MigrationJob extends 
AbstractSeparablePipelineJob<MigrationJo
     
     private final MigrationJobPreparer jobPreparer = new 
MigrationJobPreparer();
     
-    public MigrationJob(final String jobId) {
-        super(jobId);
+    public MigrationJob() {
+        super(new PipelineJobRunnerManager());
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java
index cf4ba6b392e..b6576d42735 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java
@@ -46,6 +46,11 @@ public final class MigrationJobType implements 
PipelineJobType {
         return "01";
     }
     
+    @Override
+    public boolean isTransmissionJob() {
+        return true;
+    }
+    
     @SuppressWarnings("unchecked")
     @Override
     public YamlMigrationJobConfigurationSwapper 
getYamlJobConfigurationSwapper() {
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java
index af1cb8fb894..39545ca0355 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java
@@ -32,7 +32,7 @@ public final class MigrationJobConfigurationChangedProcessor 
implements JobConfi
     
     @Override
     public PipelineJob createJob(final MigrationJobConfiguration jobConfig) {
-        return new MigrationJob(jobConfig.getJobId());
+        return new MigrationJob();
     }
     
     @Override
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
index 19de73a4f19..16468324e59 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
@@ -59,7 +59,7 @@ class ConsistencyCheckJobTest {
         Map<String, Object> expectTableCheckPosition = 
Collections.singletonMap("t_order", 100);
         
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemFacade().getProcess().persist(checkJobId,
 0,
                 
YamlEngine.marshal(createYamlConsistencyCheckJobItemProgress(expectTableCheckPosition)));
-        ConsistencyCheckJob consistencyCheckJob = new 
ConsistencyCheckJob("j02");
+        ConsistencyCheckJob consistencyCheckJob = new ConsistencyCheckJob();
         ConsistencyCheckJobConfiguration jobConfig = new 
YamlConsistencyCheckJobConfigurationSwapper().swapToObject(createYamlConsistencyCheckJobConfiguration(checkJobId));
         PipelineJobItemManager<ConsistencyCheckJobItemProgress> jobItemManager 
= new PipelineJobItemManager<>(new 
ConsistencyCheckJobType().getYamlJobItemProgressSwapper());
         Optional<ConsistencyCheckJobItemProgress> jobItemProgress = 
jobItemManager.getProgress(jobConfig.getJobId(), 0);

Reply via email to