This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 3393f0ace86 Add generic type of AbstractSeparablePipelineJob (#29341)
3393f0ace86 is described below

commit 3393f0ace8615f909a80ea7ef0316052ef531cb7
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Dec 9 18:48:09 2023 +0800

    Add generic type of AbstractSeparablePipelineJob (#29341)
    
    * Remove SimpleJob of CDCJob
    
    * Refactor AbstractSeparablePipelineJob
    
    * Refactor AbstractSeparablePipelineJob
    
    * Refactor MigrationJob
    
    * Refactor MigrationJob
---
 .../core/job/AbstractSeparablePipelineJob.java     | 30 +++++-----
 .../shardingsphere/data/pipeline/cdc/CDCJob.java   |  3 +-
 .../consistencycheck/ConsistencyCheckJob.java      | 13 ++---
 .../pipeline/scenario/migration/MigrationJob.java  | 66 ++++++++++------------
 .../consistencycheck/ConsistencyCheckJobTest.java  |  2 +-
 5 files changed, 55 insertions(+), 59 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
index cd668ca127f..ac5e44b1fe0 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
@@ -31,25 +31,27 @@ import java.sql.SQLException;
 
 /**
  * Abstract separable pipeline job.
+ * 
+ * @param <T> type of pipeline job item context
  */
 @Slf4j
-public abstract class AbstractSeparablePipelineJob extends AbstractPipelineJob 
{
+public abstract class AbstractSeparablePipelineJob<T extends 
PipelineJobItemContext> extends AbstractPipelineJob {
     
     protected AbstractSeparablePipelineJob(final String jobId) {
         super(jobId);
     }
     
     @Override
-    public void execute(final ShardingContext shardingContext) {
+    public final void execute(final ShardingContext shardingContext) {
         String jobId = shardingContext.getJobName();
         int shardingItem = shardingContext.getShardingItem();
         log.info("Execute job {}-{}", jobId, shardingItem);
         if (isStopping()) {
-            log.info("stopping true, ignore");
+            log.info("Stopping true, ignore");
             return;
         }
         try {
-            execute(buildPipelineJobItemContext(shardingContext));
+            execute(buildJobItemContext(shardingContext));
             // CHECKSTYLE:OFF
         } catch (final RuntimeException ex) {
             // CHECKSTYLE:ON
@@ -58,20 +60,24 @@ public abstract class AbstractSeparablePipelineJob extends 
AbstractPipelineJob {
         }
     }
     
-    private void execute(final PipelineJobItemContext jobItemContext) {
+    private void execute(final T jobItemContext) {
         String jobId = jobItemContext.getJobId();
         int shardingItem = jobItemContext.getShardingItem();
-        PipelineTasksRunner tasksRunner = 
buildPipelineTasksRunner(jobItemContext);
+        PipelineTasksRunner tasksRunner = buildTasksRunner(jobItemContext);
         if (!addTasksRunner(shardingItem, tasksRunner)) {
             return;
         }
         
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().clean(jobId,
 shardingItem);
         prepare(jobItemContext);
-        log.info("start tasks runner, jobId={}, shardingItem={}", jobId, 
shardingItem);
+        log.info("Start tasks runner, jobId={}, shardingItem={}", jobId, 
shardingItem);
         tasksRunner.start();
     }
     
-    protected final void prepare(final PipelineJobItemContext jobItemContext) {
+    protected abstract T buildJobItemContext(ShardingContext shardingContext);
+    
+    protected abstract PipelineTasksRunner buildTasksRunner(T jobItemContext);
+    
+    protected final void prepare(final T jobItemContext) {
         try {
             doPrepare(jobItemContext);
             // CHECKSTYLE:OFF
@@ -81,14 +87,10 @@ public abstract class AbstractSeparablePipelineJob extends 
AbstractPipelineJob {
         }
     }
     
-    protected abstract void doPrepare(PipelineJobItemContext jobItemContext) 
throws SQLException;
-    
-    protected abstract PipelineJobItemContext 
buildPipelineJobItemContext(ShardingContext shardingContext);
-    
-    protected abstract PipelineTasksRunner 
buildPipelineTasksRunner(PipelineJobItemContext pipelineJobItemContext);
+    protected abstract void doPrepare(T jobItemContext) throws SQLException;
     
     private void processFailed(final PipelineJobManager jobManager, final 
String jobId, final int shardingItem, final Exception ex) {
-        log.error("job execution failed, {}-{}", jobId, shardingItem, ex);
+        log.error("Job execution failed, {}-{}", jobId, shardingItem, ex);
         
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().update(jobId,
 shardingItem, ex);
         try {
             jobManager.stop(jobId);
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
index 009ea5df9f6..a9854bfd848 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
@@ -58,7 +58,6 @@ import 
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConf
 import 
org.apache.shardingsphere.data.pipeline.core.spi.algorithm.JobRateLimitAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
 import 
org.apache.shardingsphere.data.pipeline.core.util.ShardingColumnsExtractor;
-import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
 
@@ -73,7 +72,7 @@ import java.util.stream.Collectors;
  * CDC job.
  */
 @Slf4j
-public final class CDCJob extends 
AbstractInseparablePipelineJob<CDCJobItemContext> implements SimpleJob {
+public final class CDCJob extends 
AbstractInseparablePipelineJob<CDCJobItemContext> {
     
     @Getter
     private final PipelineSink sink;
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
index 5d13dd54240..61aa3e775fc 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
@@ -18,10 +18,9 @@
 package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
 
 import lombok.extern.slf4j.Slf4j;
-import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
+import 
org.apache.shardingsphere.data.pipeline.core.job.AbstractSeparablePipelineJob;
 import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.ConsistencyCheckJobItemProgress;
-import 
org.apache.shardingsphere.data.pipeline.core.job.AbstractSeparablePipelineJob;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration;
@@ -36,14 +35,14 @@ import java.util.Optional;
  * Consistency check job.
  */
 @Slf4j
-public final class ConsistencyCheckJob extends AbstractSeparablePipelineJob {
+public final class ConsistencyCheckJob extends 
AbstractSeparablePipelineJob<ConsistencyCheckJobItemContext> {
     
     public ConsistencyCheckJob(final String jobId) {
         super(jobId);
     }
     
     @Override
-    public ConsistencyCheckJobItemContext buildPipelineJobItemContext(final 
ShardingContext shardingContext) {
+    public ConsistencyCheckJobItemContext buildJobItemContext(final 
ShardingContext shardingContext) {
         ConsistencyCheckJobConfiguration jobConfig = new 
YamlConsistencyCheckJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
         PipelineJobItemManager<ConsistencyCheckJobItemProgress> jobItemManager 
= new PipelineJobItemManager<>(getJobType().getYamlJobItemProgressSwapper());
         Optional<ConsistencyCheckJobItemProgress> jobItemProgress = 
jobItemManager.getProgress(jobConfig.getJobId(), 
shardingContext.getShardingItem());
@@ -51,12 +50,12 @@ public final class ConsistencyCheckJob extends 
AbstractSeparablePipelineJob {
     }
     
     @Override
-    protected PipelineTasksRunner buildPipelineTasksRunner(final 
PipelineJobItemContext pipelineJobItemContext) {
-        return new 
ConsistencyCheckTasksRunner((ConsistencyCheckJobItemContext) 
pipelineJobItemContext);
+    protected PipelineTasksRunner buildTasksRunner(final 
ConsistencyCheckJobItemContext jobItemContext) {
+        return new ConsistencyCheckTasksRunner(jobItemContext);
     }
     
     @Override
-    protected void doPrepare(final PipelineJobItemContext jobItemContext) {
+    protected void doPrepare(final ConsistencyCheckJobItemContext 
jobItemContext) {
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index 5a204212d5b..9ed45a30c28 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -20,37 +20,34 @@ package 
org.apache.shardingsphere.data.pipeline.scenario.migration;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.core.preparer.CreateTableConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfigurationUtils;
-import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
-import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
 import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeEntry;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveIdentifier;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveQualifiedTable;
-import 
org.apache.shardingsphere.data.pipeline.core.spi.algorithm.JobRateLimitAlgorithm;
-import 
org.apache.shardingsphere.data.pipeline.core.util.ShardingColumnsExtractor;
+import 
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
 import 
org.apache.shardingsphere.data.pipeline.core.job.AbstractSeparablePipelineJob;
 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfigurationUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveIdentifier;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveQualifiedTable;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
+import 
org.apache.shardingsphere.data.pipeline.core.preparer.CreateTableConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.core.spi.algorithm.JobRateLimitAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.TransmissionTasksRunner;
+import 
org.apache.shardingsphere.data.pipeline.core.util.ShardingColumnsExtractor;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.ingest.dumper.MigrationIncrementalDumperContextCreator;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.swapper.YamlMigrationJobConfigurationSwapper;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
+import 
org.apache.shardingsphere.data.pipeline.scenario.migration.ingest.dumper.MigrationIncrementalDumperContextCreator;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.preparer.MigrationJobPreparer;
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
-import 
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
 import org.apache.shardingsphere.infra.datanode.DataNode;
 
@@ -66,25 +63,29 @@ import java.util.stream.Collectors;
  * Migration job.
  */
 @Slf4j
-public final class MigrationJob extends AbstractSeparablePipelineJob {
+public final class MigrationJob extends 
AbstractSeparablePipelineJob<MigrationJobItemContext> {
     
-    private final PipelineJobItemManager<TransmissionJobItemProgress> 
jobItemManager = new PipelineJobItemManager<>(new 
MigrationJobType().getYamlJobItemProgressSwapper());
+    private final PipelineJobItemManager<TransmissionJobItemProgress> 
jobItemManager;
     
-    private final PipelineProcessConfigurationPersistService 
processConfigPersistService = new PipelineProcessConfigurationPersistService();
+    private final PipelineProcessConfigurationPersistService 
processConfigPersistService;
     
-    private final PipelineDataSourceManager dataSourceManager = new 
DefaultPipelineDataSourceManager();
+    private final PipelineDataSourceManager dataSourceManager;
     
     // Shared by all sharding items
-    private final MigrationJobPreparer jobPreparer = new 
MigrationJobPreparer();
+    private final MigrationJobPreparer jobPreparer;
     
     public MigrationJob(final String jobId) {
         super(jobId);
+        jobItemManager = new PipelineJobItemManager<>(new 
MigrationJobType().getYamlJobItemProgressSwapper());
+        processConfigPersistService = new 
PipelineProcessConfigurationPersistService();
+        dataSourceManager = new DefaultPipelineDataSourceManager();
+        jobPreparer = new MigrationJobPreparer();
     }
     
     @Override
-    protected TransmissionJobItemContext buildPipelineJobItemContext(final 
ShardingContext shardingContext) {
-        int shardingItem = shardingContext.getShardingItem();
+    protected MigrationJobItemContext buildJobItemContext(final 
ShardingContext shardingContext) {
         MigrationJobConfiguration jobConfig = new 
YamlMigrationJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
+        int shardingItem = shardingContext.getShardingItem();
         Optional<TransmissionJobItemProgress> initProgress = 
jobItemManager.getProgress(shardingContext.getJobName(), shardingItem);
         PipelineProcessConfiguration processConfig = 
PipelineProcessConfigurationUtils.convertWithDefaultValue(
                 
processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()),
 "MIGRATION"));
@@ -100,45 +101,40 @@ public final class MigrationJob extends 
AbstractSeparablePipelineJob {
         Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap = new 
ShardingColumnsExtractor().getShardingColumnsMap(
                 ((ShardingSpherePipelineDataSourceConfiguration) 
jobConfig.getTarget()).getRootConfig().getRules(), targetTableNames);
         ImporterConfiguration importerConfig = 
buildImporterConfiguration(jobConfig, processConfig, shardingColumnsMap, 
incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper());
-        MigrationTaskConfiguration result = new MigrationTaskConfiguration(
-                
incrementalDumperContext.getCommonContext().getDataSourceName(), 
createTableConfigs, incrementalDumperContext, importerConfig);
-        log.info("buildTaskConfiguration, result={}", result);
-        return result;
+        return new 
MigrationTaskConfiguration(incrementalDumperContext.getCommonContext().getDataSourceName(),
 createTableConfigs, incrementalDumperContext, importerConfig);
     }
     
-    private Collection<CreateTableConfiguration> 
buildCreateTableConfigurations(final MigrationJobConfiguration jobConfig, final 
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
+    private Collection<CreateTableConfiguration> 
buildCreateTableConfigurations(final MigrationJobConfiguration jobConfig, final 
TableAndSchemaNameMapper mapper) {
         Collection<CreateTableConfiguration> result = new LinkedList<>();
         for (JobDataNodeEntry each : 
jobConfig.getTablesFirstDataNodes().getEntries()) {
-            String sourceSchemaName = 
tableAndSchemaNameMapper.getSchemaName(each.getLogicTableName());
-            DialectDatabaseMetaData dialectDatabaseMetaData = new 
DatabaseTypeRegistry(jobConfig.getTargetDatabaseType()).getDialectDatabaseMetaData();
-            String targetSchemaName = 
dialectDatabaseMetaData.isSchemaAvailable() ? sourceSchemaName : null;
+            String sourceSchemaName = 
mapper.getSchemaName(each.getLogicTableName());
+            String targetSchemaName = new 
DatabaseTypeRegistry(jobConfig.getTargetDatabaseType()).getDialectDatabaseMetaData().isSchemaAvailable()
 ? sourceSchemaName : null;
             DataNode dataNode = each.getDataNodes().get(0);
             PipelineDataSourceConfiguration sourceDataSourceConfig = 
jobConfig.getSources().get(dataNode.getDataSourceName());
             CreateTableConfiguration createTableConfig = new 
CreateTableConfiguration(sourceDataSourceConfig, new 
CaseInsensitiveQualifiedTable(sourceSchemaName, dataNode.getTableName()),
                     jobConfig.getTarget(), new 
CaseInsensitiveQualifiedTable(targetSchemaName, each.getLogicTableName()));
             result.add(createTableConfig);
         }
-        log.info("buildCreateTableConfigurations, result={}", result);
         return result;
     }
     
     private ImporterConfiguration buildImporterConfiguration(final 
MigrationJobConfiguration jobConfig, final PipelineProcessConfiguration 
pipelineProcessConfig,
-                                                             final 
Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap, final 
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
+                                                             final 
Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap, final 
TableAndSchemaNameMapper mapper) {
         int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
         JobRateLimitAlgorithm writeRateLimitAlgorithm = new 
TransmissionProcessContext(jobConfig.getJobId(), 
pipelineProcessConfig).getWriteRateLimitAlgorithm();
         int retryTimes = jobConfig.getRetryTimes();
         int concurrency = jobConfig.getConcurrency();
-        return new ImporterConfiguration(jobConfig.getTarget(), 
shardingColumnsMap, tableAndSchemaNameMapper, batchSize, 
writeRateLimitAlgorithm, retryTimes, concurrency);
+        return new ImporterConfiguration(jobConfig.getTarget(), 
shardingColumnsMap, mapper, batchSize, writeRateLimitAlgorithm, retryTimes, 
concurrency);
     }
     
     @Override
-    protected PipelineTasksRunner buildPipelineTasksRunner(final 
PipelineJobItemContext pipelineJobItemContext) {
-        return new TransmissionTasksRunner((TransmissionJobItemContext) 
pipelineJobItemContext);
+    protected PipelineTasksRunner buildTasksRunner(final 
MigrationJobItemContext jobItemContext) {
+        return new TransmissionTasksRunner(jobItemContext);
     }
     
     @Override
-    protected void doPrepare(final PipelineJobItemContext jobItemContext) 
throws SQLException {
-        jobPreparer.prepare((MigrationJobItemContext) jobItemContext);
+    protected void doPrepare(final MigrationJobItemContext jobItemContext) 
throws SQLException {
+        jobPreparer.prepare(jobItemContext);
     }
     
     @Override
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
index 8fd9e67ad47..3b91f6ffc0e 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
@@ -55,7 +55,7 @@ class ConsistencyCheckJobTest {
         
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemFacade().getProcess().persist(checkJobId,
 0,
                 
YamlEngine.marshal(createYamlConsistencyCheckJobItemProgress(expectTableCheckPosition)));
         ConsistencyCheckJob consistencyCheckJob = new 
ConsistencyCheckJob(checkJobId);
-        ConsistencyCheckJobItemContext actual = 
consistencyCheckJob.buildPipelineJobItemContext(
+        ConsistencyCheckJobItemContext actual = 
consistencyCheckJob.buildJobItemContext(
                 new ShardingContext(checkJobId, "", 1, 
YamlEngine.marshal(createYamlConsistencyCheckJobConfiguration(checkJobId)), 0, 
""));
         assertThat(actual.getProgressContext().getSourceTableCheckPositions(), 
is(expectTableCheckPosition));
         assertThat(actual.getProgressContext().getTargetTableCheckPositions(), 
is(expectTableCheckPosition));

Reply via email to