This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 915351f63af Refactor MigrationJobItemContext for better unit test 
(#20321)
915351f63af is described below

commit 915351f63afc8b01710a8e3b78f9f068009e110c
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Sat Aug 20 20:30:27 2022 +0800

    Refactor MigrationJobItemContext for better unit test (#20321)
    
    * Refactor MigrationJobItemContext for better unit test
    
    * Read process config from registry center on buildPipelineProcessContext
    
    * Improve code style
---
 .../util/PipelineProcessConfigurationUtils.java    | 11 ++++++-
 .../pipeline/scenario/migration/MigrationJob.java  | 15 ++++++----
 .../scenario/migration/MigrationJobAPIImpl.java    | 12 ++------
 .../migration/MigrationJobItemContext.java         | 12 ++++----
 .../api/impl/GovernanceRepositoryAPIImplTest.java  |  4 +--
 .../core/api/impl/MigrationJobAPIImplTest.java     |  7 ++---
 .../consistency/DataConsistencyCheckerTest.java    |  4 +--
 .../core/prepare/InventoryTaskSplitterTest.java    |  6 +---
 .../pipeline/core/task/IncrementalTaskTest.java    |  5 +---
 .../data/pipeline/core/task/InventoryTaskTest.java |  5 +---
 .../pipeline/core/util/PipelineContextUtil.java    | 34 ++++++++++++++++++++++
 11 files changed, 69 insertions(+), 46 deletions(-)

diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineProcessConfigurationUtils.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineProcessConfigurationUtils.java
index 0f8f0e1f7a2..373260d923c 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineProcessConfigurationUtils.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineProcessConfigurationUtils.java
@@ -45,6 +45,16 @@ public final class PipelineProcessConfigurationUtils {
             return originalConfig;
         }
         YamlPipelineProcessConfiguration yamlConfig = null != originalConfig ? 
SWAPPER.swapToYamlConfiguration(originalConfig) : new 
YamlPipelineProcessConfiguration();
+        fillInDefaultValue(yamlConfig);
+        return SWAPPER.swapToObject(yamlConfig);
+    }
+    
+    /**
+     * Fill in default value.
+     *
+     * @param yamlConfig YAML configuration, non-null
+     */
+    public static void fillInDefaultValue(final 
YamlPipelineProcessConfiguration yamlConfig) {
         if (null == yamlConfig.getRead()) {
             
yamlConfig.setRead(YamlPipelineReadConfiguration.buildWithDefaultValue());
         } else {
@@ -60,6 +70,5 @@ public final class PipelineProcessConfigurationUtils {
             props.put(MemoryPipelineChannelCreator.BLOCK_QUEUE_SIZE_KEY, 
MemoryPipelineChannelCreator.BLOCK_QUEUE_SIZE_DEFAULT_VALUE);
             yamlConfig.setStreamChannel(new 
YamlAlgorithmConfiguration(MemoryPipelineChannelCreator.TYPE, props));
         }
-        return SWAPPER.swapToObject(yamlConfig);
     }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index bdb54293c0a..8b32537e2a9 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -19,6 +19,7 @@ package 
org.apache.shardingsphere.data.pipeline.scenario.migration;
 
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfigurationSwapper;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
@@ -46,6 +47,8 @@ import java.sql.SQLException;
 @RequiredArgsConstructor
 public final class MigrationJob extends AbstractPipelineJob implements 
SimpleJob, PipelineJob {
     
+    private final MigrationJobAPI jobAPI = 
MigrationJobAPIFactory.getInstance();
+    
     private final PipelineDataSourceManager dataSourceManager = new 
DefaultPipelineDataSourceManager();
     
     private final PipelineDistributedBarrier pipelineDistributedBarrier = 
PipelineDistributedBarrier.getInstance();
@@ -55,16 +58,18 @@ public final class MigrationJob extends AbstractPipelineJob 
implements SimpleJob
     
     @Override
     public void execute(final ShardingContext shardingContext) {
-        log.info("Execute job {}-{}", shardingContext.getJobName(), 
shardingContext.getShardingItem());
+        int shardingItem = shardingContext.getShardingItem();
+        log.info("Execute job {}-{}", shardingContext.getJobName(), 
shardingItem);
         if (isStopping()) {
             log.info("stopping true, ignore");
             return;
         }
         setJobId(shardingContext.getJobName());
         MigrationJobConfiguration jobConfig = 
YamlMigrationJobConfigurationSwapper.swapToObject(shardingContext.getJobParameter());
-        InventoryIncrementalJobItemProgress initProgress = 
MigrationJobAPIFactory.getInstance().getJobItemProgress(shardingContext.getJobName(),
 shardingContext.getShardingItem());
-        MigrationJobItemContext jobItemContext = new 
MigrationJobItemContext(jobConfig, shardingContext.getShardingItem(), 
initProgress, dataSourceManager);
-        int shardingItem = jobItemContext.getShardingItem();
+        InventoryIncrementalJobItemProgress initProgress = 
jobAPI.getJobItemProgress(shardingContext.getJobName(), shardingItem);
+        MigrationProcessContext jobProcessContext = 
jobAPI.buildPipelineProcessContext(jobConfig);
+        TaskConfiguration taskConfig = 
jobAPI.buildTaskConfiguration(jobConfig, shardingItem, 
jobProcessContext.getPipelineProcessConfig());
+        MigrationJobItemContext jobItemContext = new 
MigrationJobItemContext(jobConfig, shardingItem, initProgress, 
jobProcessContext, taskConfig, dataSourceManager);
         if (getTasksRunnerMap().containsKey(shardingItem)) {
             log.warn("tasksRunnerMap contains shardingItem {}, ignore", 
shardingItem);
             return;
@@ -93,7 +98,7 @@ public final class MigrationJob extends AbstractPipelineJob 
implements SimpleJob
             log.error("job prepare failed, {}-{}", getJobId(), 
jobItemContext.getShardingItem(), ex);
             PipelineJobCenter.stop(getJobId());
             jobItemContext.setStatus(JobStatus.PREPARING_FAILURE);
-            
MigrationJobAPIFactory.getInstance().persistJobItemProgress(jobItemContext);
+            jobAPI.persistJobItemProgress(jobItemContext);
             if (ex instanceof RuntimeException) {
                 throw (RuntimeException) ex;
             }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index aef8bb6501a..fb01beb0a6f 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -57,12 +57,9 @@ import 
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobCon
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo;
 import 
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
-import 
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineReadConfiguration;
 import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
 import org.apache.shardingsphere.infra.lock.LockContext;
 import org.apache.shardingsphere.infra.lock.LockDefinition;
-import 
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineReadConfiguration;
-import 
org.apache.shardingsphere.infra.yaml.config.swapper.rule.data.pipeline.YamlPipelineReadConfigurationSwapper;
 import org.apache.shardingsphere.mode.lock.ExclusiveLockDefinition;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.ScalingTaskFinishedEvent;
 
@@ -152,13 +149,8 @@ public final class MigrationJobAPIImpl extends 
AbstractPipelineJobAPIImpl implem
     
     @Override
     public MigrationProcessContext buildPipelineProcessContext(final 
PipelineJobConfiguration pipelineJobConfig) {
-        // TODO add jobType
-        // TODO read process config from registry center
-        YamlPipelineReadConfiguration yamlReadConfig = 
YamlPipelineReadConfiguration.buildWithDefaultValue();
-        yamlReadConfig.fillInNullFieldsWithDefaultValue();
-        yamlReadConfig.setShardingSize(10);
-        PipelineReadConfiguration readConfig = new 
YamlPipelineReadConfigurationSwapper().swapToObject(yamlReadConfig);
-        PipelineProcessConfiguration processConfig = new 
PipelineProcessConfiguration(readConfig, null, null);
+        // TODO cache process config on local
+        PipelineProcessConfiguration processConfig = 
showProcessConfiguration();
         return new MigrationProcessContext(pipelineJobConfig.getJobId(), 
processConfig);
     }
     
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
index 2f954f91f04..bfd3e25ca6b 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
@@ -83,17 +83,15 @@ public final class MigrationJobItemContext implements 
InventoryIncrementalJobIte
         }
     };
     
-    public MigrationJobItemContext(final MigrationJobConfiguration jobConfig, 
final int jobShardingItem, final InventoryIncrementalJobItemProgress 
initProgress,
-                                   final PipelineDataSourceManager 
dataSourceManager) {
-        // TODO refactor, transfer in parameter
-        MigrationJobAPI jobAPI = MigrationJobAPIFactory.getInstance();
-        jobProcessContext = jobAPI.buildPipelineProcessContext(jobConfig);
+    public MigrationJobItemContext(final MigrationJobConfiguration jobConfig, 
final int shardingItem, final InventoryIncrementalJobItemProgress initProgress,
+                                   final MigrationProcessContext 
jobProcessContext, final TaskConfiguration taskConfig, final 
PipelineDataSourceManager dataSourceManager) {
         this.jobConfig = jobConfig;
         jobId = jobConfig.getJobId();
-        this.shardingItem = jobShardingItem;
+        this.shardingItem = shardingItem;
         this.initProgress = initProgress;
+        this.jobProcessContext = jobProcessContext;
+        this.taskConfig = taskConfig;
         this.dataSourceManager = dataSourceManager;
-        taskConfig = 
MigrationJobAPIFactory.getInstance().buildTaskConfiguration(jobConfig, 
jobShardingItem, jobProcessContext.getPipelineProcessConfig());
     }
     
     /**
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
index 3117e8a66c1..f340101601c 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
@@ -22,7 +22,6 @@ import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfigura
 import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
-import 
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
@@ -126,8 +125,7 @@ public final class GovernanceRepositoryAPIImplTest {
     }
     
     private MigrationJobItemContext mockJobItemContext() {
-        MigrationJobItemContext result = new 
MigrationJobItemContext(JobConfigurationBuilder.createJobConfiguration(),
-                0, new InventoryIncrementalJobItemProgress(), new 
DefaultPipelineDataSourceManager());
+        MigrationJobItemContext result = 
PipelineContextUtil.mockMigrationJobItemContext(JobConfigurationBuilder.createJobConfiguration());
         TaskConfiguration taskConfig = result.getTaskConfig();
         result.getInventoryTasks().add(mockInventoryTask(taskConfig));
         result.getIncrementalTasks().add(mockIncrementalTask(taskConfig));
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
index 2b292de129b..2797c0d23b1 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
@@ -31,7 +31,6 @@ import 
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncreme
 import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
 import 
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.creator.PipelineDataSourceCreatorFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineVerifyFailedException;
 import 
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
@@ -203,7 +202,7 @@ public final class MigrationJobAPIImplTest {
         Optional<String> jobId = jobAPI.start(jobConfig);
         assertTrue(jobId.isPresent());
         final GovernanceRepositoryAPI repositoryAPI = 
PipelineAPIFactory.getGovernanceRepositoryAPI();
-        MigrationJobItemContext jobItemContext = new 
MigrationJobItemContext(jobConfig, 0, new 
InventoryIncrementalJobItemProgress(), new DefaultPipelineDataSourceManager());
+        MigrationJobItemContext jobItemContext = 
PipelineContextUtil.mockMigrationJobItemContext(jobConfig);
         jobAPI.persistJobItemProgress(jobItemContext);
         repositoryAPI.persistJobCheckResult(jobId.get(), true);
         jobAPI.updateJobItemStatus(jobId.get(), 0, JobStatus.FINISHED);
@@ -216,7 +215,7 @@ public final class MigrationJobAPIImplTest {
         Optional<String> jobId = jobAPI.start(jobConfig);
         assertTrue(jobId.isPresent());
         GovernanceRepositoryAPI repositoryAPI = 
PipelineAPIFactory.getGovernanceRepositoryAPI();
-        MigrationJobItemContext jobItemContext = new 
MigrationJobItemContext(jobConfig, 0, new 
InventoryIncrementalJobItemProgress(), new DefaultPipelineDataSourceManager());
+        MigrationJobItemContext jobItemContext = 
PipelineContextUtil.mockMigrationJobItemContext(jobConfig);
         jobAPI.persistJobItemProgress(jobItemContext);
         repositoryAPI.persistJobCheckResult(jobId.get(), true);
         jobAPI.updateJobItemStatus(jobId.get(), 
jobItemContext.getShardingItem(), JobStatus.EXECUTE_INVENTORY_TASK);
@@ -260,7 +259,7 @@ public final class MigrationJobAPIImplTest {
     @Test
     public void assertRenewJobStatus() {
         final MigrationJobConfiguration jobConfig = 
JobConfigurationBuilder.createJobConfiguration();
-        MigrationJobItemContext jobItemContext = new 
MigrationJobItemContext(jobConfig, 0, new 
InventoryIncrementalJobItemProgress(), new DefaultPipelineDataSourceManager());
+        MigrationJobItemContext jobItemContext = 
PipelineContextUtil.mockMigrationJobItemContext(jobConfig);
         jobAPI.persistJobItemProgress(jobItemContext);
         jobAPI.updateJobItemStatus(jobConfig.getJobId(), 0, 
JobStatus.FINISHED);
         InventoryIncrementalJobItemProgress actual = 
jobAPI.getJobItemProgress(jobItemContext.getJobId(), 
jobItemContext.getShardingItem());
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java
index 8145b7062f3..4adbd5ddff9 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java
@@ -20,7 +20,6 @@ package 
org.apache.shardingsphere.data.pipeline.core.check.consistency;
 import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
 import 
org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.fixture.DataConsistencyCalculateAlgorithmFixture;
 import 
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
@@ -54,8 +53,7 @@ public final class DataConsistencyCheckerTest {
     }
     
     private MigrationJobConfiguration createJobConfiguration() throws 
SQLException {
-        MigrationJobItemContext jobItemContext = new 
MigrationJobItemContext(JobConfigurationBuilder.createJobConfiguration(), 0,
-                new InventoryIncrementalJobItemProgress(), new 
DefaultPipelineDataSourceManager());
+        MigrationJobItemContext jobItemContext = 
PipelineContextUtil.mockMigrationJobItemContext(JobConfigurationBuilder.createJobConfiguration());
         
initTableData(jobItemContext.getTaskConfig().getDumperConfig().getDataSourceConfig());
         
initTableData(jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig());
         return jobItemContext.getJobConfig();
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
index a2ab03f5744..27f35e6442b 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
@@ -22,13 +22,10 @@ import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfigura
 import 
org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IntegerPrimaryKeyPosition;
-import 
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreationException;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
 import 
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobItemContext;
 import org.junit.After;
 import org.junit.Before;
@@ -68,8 +65,7 @@ public final class InventoryTaskSplitterTest {
     
     private void initJobItemContext() {
         MigrationJobConfiguration jobConfig = 
JobConfigurationBuilder.createJobConfiguration();
-        InventoryIncrementalJobItemProgress initProgress = 
MigrationJobAPIFactory.getInstance().getJobItemProgress(jobConfig.getJobId(), 
0);
-        jobItemContext = new MigrationJobItemContext(jobConfig, 0, 
initProgress, new DefaultPipelineDataSourceManager());
+        jobItemContext = 
PipelineContextUtil.mockMigrationJobItemContext(jobConfig);
         dataSourceManager = jobItemContext.getDataSourceManager();
         taskConfig = jobItemContext.getTaskConfig();
     }
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
index bc9bddf2d79..fd0d8e0ee05 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
@@ -20,13 +20,11 @@ package org.apache.shardingsphere.data.pipeline.core.task;
 import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
-import 
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobProgressListener;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobItemContext;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -48,8 +46,7 @@ public final class IncrementalTaskTest {
     
     @Before
     public void setUp() {
-        TaskConfiguration taskConfig = new 
MigrationJobItemContext(JobConfigurationBuilder.createJobConfiguration(), 0, 
new InventoryIncrementalJobItemProgress(),
-                new DefaultPipelineDataSourceManager()).getTaskConfig();
+        TaskConfiguration taskConfig = 
PipelineContextUtil.mockMigrationJobItemContext(JobConfigurationBuilder.createJobConfiguration()).getTaskConfig();
         taskConfig.getDumperConfig().setPosition(new PlaceholderPosition());
         PipelineTableMetaDataLoader metaDataLoader = new 
PipelineTableMetaDataLoader(mock(PipelineDataSourceWrapper.class));
         incrementalTask = new IncrementalTask(3, taskConfig.getDumperConfig(), 
taskConfig.getImporterConfig(),
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
index 5809f42a82e..d1a6d3b28e2 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
@@ -23,14 +23,12 @@ import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumper
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IntegerPrimaryKeyPosition;
-import 
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobProgressListener;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobItemContext;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -62,8 +60,7 @@ public final class InventoryTaskTest {
     
     @Before
     public void setUp() {
-        taskConfig = new 
MigrationJobItemContext(JobConfigurationBuilder.createJobConfiguration(), 0,
-                new InventoryIncrementalJobItemProgress(), new 
DefaultPipelineDataSourceManager()).getTaskConfig();
+        taskConfig = 
PipelineContextUtil.mockMigrationJobItemContext(JobConfigurationBuilder.createJobConfiguration()).getTaskConfig();
     }
     
     @Test(expected = IngestException.class)
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java
index 79b8e6c587c..98fe2cabb1d 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java
@@ -20,18 +20,28 @@ package org.apache.shardingsphere.data.pipeline.core.util;
 import lombok.SneakyThrows;
 import org.apache.commons.lang3.concurrent.ConcurrentException;
 import org.apache.commons.lang3.concurrent.LazyInitializer;
+import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
 import org.apache.shardingsphere.data.pipeline.core.fixture.EmbedTestingServer;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.MemoryPipelineChannelCreator;
+import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIImpl;
+import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobItemContext;
+import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationProcessContext;
 import 
org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
 import 
org.apache.shardingsphere.driver.jdbc.core.datasource.ShardingSphereDataSource;
 import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
+import 
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
 import org.apache.shardingsphere.infra.database.DefaultDatabase;
 import 
org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereColumn;
 import 
org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereTable;
+import 
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineProcessConfiguration;
+import 
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineReadConfiguration;
+import 
org.apache.shardingsphere.infra.yaml.config.swapper.rule.data.pipeline.YamlPipelineProcessConfigurationSwapper;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
 import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
@@ -138,4 +148,28 @@ public final class PipelineContextUtil {
     public static PipelineChannelCreator getPipelineChannelCreator() {
         return PIPELINE_CHANNEL_CREATOR;
     }
+    
+    /**
+     * Mock migration job item context.
+     *
+     * @param jobConfig job configuration
+     * @return job item context
+     */
+    public static MigrationJobItemContext mockMigrationJobItemContext(final 
MigrationJobConfiguration jobConfig) {
+        PipelineProcessConfiguration processConfig = 
mockPipelineProcessConfiguration();
+        MigrationProcessContext processContext = new 
MigrationProcessContext(jobConfig.getJobId(), processConfig);
+        int jobShardingItem = 0;
+        TaskConfiguration taskConfig = new 
MigrationJobAPIImpl().buildTaskConfiguration(jobConfig, jobShardingItem, 
processConfig);
+        return new MigrationJobItemContext(jobConfig, jobShardingItem, null,
+                processContext, taskConfig, new 
DefaultPipelineDataSourceManager());
+    }
+    
+    private static PipelineProcessConfiguration 
mockPipelineProcessConfiguration() {
+        YamlPipelineReadConfiguration yamlReadConfig = 
YamlPipelineReadConfiguration.buildWithDefaultValue();
+        yamlReadConfig.setShardingSize(10);
+        YamlPipelineProcessConfiguration yamlProcessConfig = new 
YamlPipelineProcessConfiguration();
+        yamlProcessConfig.setRead(yamlReadConfig);
+        
PipelineProcessConfigurationUtils.fillInDefaultValue(yamlProcessConfig);
+        return new 
YamlPipelineProcessConfigurationSwapper().swapToObject(yamlProcessConfig);
+    }
 }

Reply via email to