This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 915351f63af Refactor MigrationJobItemContext for better unit test
(#20321)
915351f63af is described below
commit 915351f63afc8b01710a8e3b78f9f068009e110c
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Sat Aug 20 20:30:27 2022 +0800
Refactor MigrationJobItemContext for better unit test (#20321)
* Refactor MigrationJobItemContext for better unit test
* Read process config from registry center on buildPipelineProcessContext
* Improve code style
---
.../util/PipelineProcessConfigurationUtils.java | 11 ++++++-
.../pipeline/scenario/migration/MigrationJob.java | 15 ++++++----
.../scenario/migration/MigrationJobAPIImpl.java | 12 ++------
.../migration/MigrationJobItemContext.java | 12 ++++----
.../api/impl/GovernanceRepositoryAPIImplTest.java | 4 +--
.../core/api/impl/MigrationJobAPIImplTest.java | 7 ++---
.../consistency/DataConsistencyCheckerTest.java | 4 +--
.../core/prepare/InventoryTaskSplitterTest.java | 6 +---
.../pipeline/core/task/IncrementalTaskTest.java | 5 +---
.../data/pipeline/core/task/InventoryTaskTest.java | 5 +---
.../pipeline/core/util/PipelineContextUtil.java | 34 ++++++++++++++++++++++
11 files changed, 69 insertions(+), 46 deletions(-)
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineProcessConfigurationUtils.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineProcessConfigurationUtils.java
index 0f8f0e1f7a2..373260d923c 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineProcessConfigurationUtils.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineProcessConfigurationUtils.java
@@ -45,6 +45,16 @@ public final class PipelineProcessConfigurationUtils {
return originalConfig;
}
YamlPipelineProcessConfiguration yamlConfig = null != originalConfig ?
SWAPPER.swapToYamlConfiguration(originalConfig) : new
YamlPipelineProcessConfiguration();
+ fillInDefaultValue(yamlConfig);
+ return SWAPPER.swapToObject(yamlConfig);
+ }
+
+ /**
+ * Fill in default value.
+ *
+ * @param yamlConfig YAML configuration, non-null
+ */
+ public static void fillInDefaultValue(final
YamlPipelineProcessConfiguration yamlConfig) {
if (null == yamlConfig.getRead()) {
yamlConfig.setRead(YamlPipelineReadConfiguration.buildWithDefaultValue());
} else {
@@ -60,6 +70,5 @@ public final class PipelineProcessConfigurationUtils {
props.put(MemoryPipelineChannelCreator.BLOCK_QUEUE_SIZE_KEY,
MemoryPipelineChannelCreator.BLOCK_QUEUE_SIZE_DEFAULT_VALUE);
yamlConfig.setStreamChannel(new
YamlAlgorithmConfiguration(MemoryPipelineChannelCreator.TYPE, props));
}
- return SWAPPER.swapToObject(yamlConfig);
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index bdb54293c0a..8b32537e2a9 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -19,6 +19,7 @@ package
org.apache.shardingsphere.data.pipeline.scenario.migration;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfigurationSwapper;
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
@@ -46,6 +47,8 @@ import java.sql.SQLException;
@RequiredArgsConstructor
public final class MigrationJob extends AbstractPipelineJob implements
SimpleJob, PipelineJob {
+ private final MigrationJobAPI jobAPI =
MigrationJobAPIFactory.getInstance();
+
private final PipelineDataSourceManager dataSourceManager = new
DefaultPipelineDataSourceManager();
private final PipelineDistributedBarrier pipelineDistributedBarrier =
PipelineDistributedBarrier.getInstance();
@@ -55,16 +58,18 @@ public final class MigrationJob extends AbstractPipelineJob
implements SimpleJob
@Override
public void execute(final ShardingContext shardingContext) {
- log.info("Execute job {}-{}", shardingContext.getJobName(),
shardingContext.getShardingItem());
+ int shardingItem = shardingContext.getShardingItem();
+ log.info("Execute job {}-{}", shardingContext.getJobName(),
shardingItem);
if (isStopping()) {
log.info("stopping true, ignore");
return;
}
setJobId(shardingContext.getJobName());
MigrationJobConfiguration jobConfig =
YamlMigrationJobConfigurationSwapper.swapToObject(shardingContext.getJobParameter());
- InventoryIncrementalJobItemProgress initProgress =
MigrationJobAPIFactory.getInstance().getJobItemProgress(shardingContext.getJobName(),
shardingContext.getShardingItem());
- MigrationJobItemContext jobItemContext = new
MigrationJobItemContext(jobConfig, shardingContext.getShardingItem(),
initProgress, dataSourceManager);
- int shardingItem = jobItemContext.getShardingItem();
+ InventoryIncrementalJobItemProgress initProgress =
jobAPI.getJobItemProgress(shardingContext.getJobName(), shardingItem);
+ MigrationProcessContext jobProcessContext =
jobAPI.buildPipelineProcessContext(jobConfig);
+ TaskConfiguration taskConfig =
jobAPI.buildTaskConfiguration(jobConfig, shardingItem,
jobProcessContext.getPipelineProcessConfig());
+ MigrationJobItemContext jobItemContext = new
MigrationJobItemContext(jobConfig, shardingItem, initProgress,
jobProcessContext, taskConfig, dataSourceManager);
if (getTasksRunnerMap().containsKey(shardingItem)) {
log.warn("tasksRunnerMap contains shardingItem {}, ignore",
shardingItem);
return;
@@ -93,7 +98,7 @@ public final class MigrationJob extends AbstractPipelineJob
implements SimpleJob
log.error("job prepare failed, {}-{}", getJobId(),
jobItemContext.getShardingItem(), ex);
PipelineJobCenter.stop(getJobId());
jobItemContext.setStatus(JobStatus.PREPARING_FAILURE);
-
MigrationJobAPIFactory.getInstance().persistJobItemProgress(jobItemContext);
+ jobAPI.persistJobItemProgress(jobItemContext);
if (ex instanceof RuntimeException) {
throw (RuntimeException) ex;
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index aef8bb6501a..fb01beb0a6f 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -57,12 +57,9 @@ import
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobCon
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo;
import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
-import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineReadConfiguration;
import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
import org.apache.shardingsphere.infra.lock.LockContext;
import org.apache.shardingsphere.infra.lock.LockDefinition;
-import
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineReadConfiguration;
-import
org.apache.shardingsphere.infra.yaml.config.swapper.rule.data.pipeline.YamlPipelineReadConfigurationSwapper;
import org.apache.shardingsphere.mode.lock.ExclusiveLockDefinition;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.ScalingTaskFinishedEvent;
@@ -152,13 +149,8 @@ public final class MigrationJobAPIImpl extends
AbstractPipelineJobAPIImpl implem
@Override
public MigrationProcessContext buildPipelineProcessContext(final
PipelineJobConfiguration pipelineJobConfig) {
- // TODO add jobType
- // TODO read process config from registry center
- YamlPipelineReadConfiguration yamlReadConfig =
YamlPipelineReadConfiguration.buildWithDefaultValue();
- yamlReadConfig.fillInNullFieldsWithDefaultValue();
- yamlReadConfig.setShardingSize(10);
- PipelineReadConfiguration readConfig = new
YamlPipelineReadConfigurationSwapper().swapToObject(yamlReadConfig);
- PipelineProcessConfiguration processConfig = new
PipelineProcessConfiguration(readConfig, null, null);
+ // TODO cache process config on local
+ PipelineProcessConfiguration processConfig =
showProcessConfiguration();
return new MigrationProcessContext(pipelineJobConfig.getJobId(),
processConfig);
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
index 2f954f91f04..bfd3e25ca6b 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
@@ -83,17 +83,15 @@ public final class MigrationJobItemContext implements
InventoryIncrementalJobIte
}
};
- public MigrationJobItemContext(final MigrationJobConfiguration jobConfig,
final int jobShardingItem, final InventoryIncrementalJobItemProgress
initProgress,
- final PipelineDataSourceManager
dataSourceManager) {
- // TODO refactor, transfer in parameter
- MigrationJobAPI jobAPI = MigrationJobAPIFactory.getInstance();
- jobProcessContext = jobAPI.buildPipelineProcessContext(jobConfig);
+ public MigrationJobItemContext(final MigrationJobConfiguration jobConfig,
final int shardingItem, final InventoryIncrementalJobItemProgress initProgress,
+ final MigrationProcessContext
jobProcessContext, final TaskConfiguration taskConfig, final
PipelineDataSourceManager dataSourceManager) {
this.jobConfig = jobConfig;
jobId = jobConfig.getJobId();
- this.shardingItem = jobShardingItem;
+ this.shardingItem = shardingItem;
this.initProgress = initProgress;
+ this.jobProcessContext = jobProcessContext;
+ this.taskConfig = taskConfig;
this.dataSourceManager = dataSourceManager;
- taskConfig =
MigrationJobAPIFactory.getInstance().buildTaskConfiguration(jobConfig,
jobShardingItem, jobProcessContext.getPipelineProcessConfig());
}
/**
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
index 3117e8a66c1..f340101601c 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
@@ -22,7 +22,6 @@ import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfigura
import
org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
-import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import
org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
@@ -126,8 +125,7 @@ public final class GovernanceRepositoryAPIImplTest {
}
private MigrationJobItemContext mockJobItemContext() {
- MigrationJobItemContext result = new
MigrationJobItemContext(JobConfigurationBuilder.createJobConfiguration(),
- 0, new InventoryIncrementalJobItemProgress(), new
DefaultPipelineDataSourceManager());
+ MigrationJobItemContext result =
PipelineContextUtil.mockMigrationJobItemContext(JobConfigurationBuilder.createJobConfiguration());
TaskConfiguration taskConfig = result.getTaskConfig();
result.getInventoryTasks().add(mockInventoryTask(taskConfig));
result.getIncrementalTasks().add(mockIncrementalTask(taskConfig));
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
index 2b292de129b..2797c0d23b1 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
@@ -31,7 +31,6 @@ import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncreme
import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
import
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.datasource.creator.PipelineDataSourceCreatorFactory;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineVerifyFailedException;
import
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
@@ -203,7 +202,7 @@ public final class MigrationJobAPIImplTest {
Optional<String> jobId = jobAPI.start(jobConfig);
assertTrue(jobId.isPresent());
final GovernanceRepositoryAPI repositoryAPI =
PipelineAPIFactory.getGovernanceRepositoryAPI();
- MigrationJobItemContext jobItemContext = new
MigrationJobItemContext(jobConfig, 0, new
InventoryIncrementalJobItemProgress(), new DefaultPipelineDataSourceManager());
+ MigrationJobItemContext jobItemContext =
PipelineContextUtil.mockMigrationJobItemContext(jobConfig);
jobAPI.persistJobItemProgress(jobItemContext);
repositoryAPI.persistJobCheckResult(jobId.get(), true);
jobAPI.updateJobItemStatus(jobId.get(), 0, JobStatus.FINISHED);
@@ -216,7 +215,7 @@ public final class MigrationJobAPIImplTest {
Optional<String> jobId = jobAPI.start(jobConfig);
assertTrue(jobId.isPresent());
GovernanceRepositoryAPI repositoryAPI =
PipelineAPIFactory.getGovernanceRepositoryAPI();
- MigrationJobItemContext jobItemContext = new
MigrationJobItemContext(jobConfig, 0, new
InventoryIncrementalJobItemProgress(), new DefaultPipelineDataSourceManager());
+ MigrationJobItemContext jobItemContext =
PipelineContextUtil.mockMigrationJobItemContext(jobConfig);
jobAPI.persistJobItemProgress(jobItemContext);
repositoryAPI.persistJobCheckResult(jobId.get(), true);
jobAPI.updateJobItemStatus(jobId.get(),
jobItemContext.getShardingItem(), JobStatus.EXECUTE_INVENTORY_TASK);
@@ -260,7 +259,7 @@ public final class MigrationJobAPIImplTest {
@Test
public void assertRenewJobStatus() {
final MigrationJobConfiguration jobConfig =
JobConfigurationBuilder.createJobConfiguration();
- MigrationJobItemContext jobItemContext = new
MigrationJobItemContext(jobConfig, 0, new
InventoryIncrementalJobItemProgress(), new DefaultPipelineDataSourceManager());
+ MigrationJobItemContext jobItemContext =
PipelineContextUtil.mockMigrationJobItemContext(jobConfig);
jobAPI.persistJobItemProgress(jobItemContext);
jobAPI.updateJobItemStatus(jobConfig.getJobId(), 0,
JobStatus.FINISHED);
InventoryIncrementalJobItemProgress actual =
jobAPI.getJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem());
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java
index 8145b7062f3..4adbd5ddff9 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java
@@ -20,7 +20,6 @@ package
org.apache.shardingsphere.data.pipeline.core.check.consistency;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
import
org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
-import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.fixture.DataConsistencyCalculateAlgorithmFixture;
import
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
@@ -54,8 +53,7 @@ public final class DataConsistencyCheckerTest {
}
private MigrationJobConfiguration createJobConfiguration() throws
SQLException {
- MigrationJobItemContext jobItemContext = new
MigrationJobItemContext(JobConfigurationBuilder.createJobConfiguration(), 0,
- new InventoryIncrementalJobItemProgress(), new
DefaultPipelineDataSourceManager());
+ MigrationJobItemContext jobItemContext =
PipelineContextUtil.mockMigrationJobItemContext(JobConfigurationBuilder.createJobConfiguration());
initTableData(jobItemContext.getTaskConfig().getDumperConfig().getDataSourceConfig());
initTableData(jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig());
return jobItemContext.getJobConfig();
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
index a2ab03f5744..27f35e6442b 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
@@ -22,13 +22,10 @@ import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfigura
import
org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.IntegerPrimaryKeyPosition;
-import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreationException;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
import
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIFactory;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobItemContext;
import org.junit.After;
import org.junit.Before;
@@ -68,8 +65,7 @@ public final class InventoryTaskSplitterTest {
private void initJobItemContext() {
MigrationJobConfiguration jobConfig =
JobConfigurationBuilder.createJobConfiguration();
- InventoryIncrementalJobItemProgress initProgress =
MigrationJobAPIFactory.getInstance().getJobItemProgress(jobConfig.getJobId(),
0);
- jobItemContext = new MigrationJobItemContext(jobConfig, 0,
initProgress, new DefaultPipelineDataSourceManager());
+ jobItemContext =
PipelineContextUtil.mockMigrationJobItemContext(jobConfig);
dataSourceManager = jobItemContext.getDataSourceManager();
taskConfig = jobItemContext.getTaskConfig();
}
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
index bc9bddf2d79..fd0d8e0ee05 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
@@ -20,13 +20,11 @@ package org.apache.shardingsphere.data.pipeline.core.task;
import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
-import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobProgressListener;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobItemContext;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -48,8 +46,7 @@ public final class IncrementalTaskTest {
@Before
public void setUp() {
- TaskConfiguration taskConfig = new
MigrationJobItemContext(JobConfigurationBuilder.createJobConfiguration(), 0,
new InventoryIncrementalJobItemProgress(),
- new DefaultPipelineDataSourceManager()).getTaskConfig();
+ TaskConfiguration taskConfig =
PipelineContextUtil.mockMigrationJobItemContext(JobConfigurationBuilder.createJobConfiguration()).getTaskConfig();
taskConfig.getDumperConfig().setPosition(new PlaceholderPosition());
PipelineTableMetaDataLoader metaDataLoader = new
PipelineTableMetaDataLoader(mock(PipelineDataSourceWrapper.class));
incrementalTask = new IncrementalTask(3, taskConfig.getDumperConfig(),
taskConfig.getImporterConfig(),
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
index 5809f42a82e..d1a6d3b28e2 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
@@ -23,14 +23,12 @@ import
org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumper
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.IntegerPrimaryKeyPosition;
-import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobProgressListener;
import
org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobItemContext;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -62,8 +60,7 @@ public final class InventoryTaskTest {
@Before
public void setUp() {
- taskConfig = new
MigrationJobItemContext(JobConfigurationBuilder.createJobConfiguration(), 0,
- new InventoryIncrementalJobItemProgress(), new
DefaultPipelineDataSourceManager()).getTaskConfig();
+ taskConfig =
PipelineContextUtil.mockMigrationJobItemContext(JobConfigurationBuilder.createJobConfiguration()).getTaskConfig();
}
@Test(expected = IngestException.class)
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java
index 79b8e6c587c..98fe2cabb1d 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java
@@ -20,18 +20,28 @@ package org.apache.shardingsphere.data.pipeline.core.util;
import lombok.SneakyThrows;
import org.apache.commons.lang3.concurrent.ConcurrentException;
import org.apache.commons.lang3.concurrent.LazyInitializer;
+import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
import org.apache.shardingsphere.data.pipeline.core.fixture.EmbedTestingServer;
import
org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.MemoryPipelineChannelCreator;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIImpl;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobItemContext;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationProcessContext;
import
org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
import
org.apache.shardingsphere.driver.jdbc.core.datasource.ShardingSphereDataSource;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
+import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
import org.apache.shardingsphere.infra.database.DefaultDatabase;
import
org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereColumn;
import
org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereTable;
+import
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineProcessConfiguration;
+import
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineReadConfiguration;
+import
org.apache.shardingsphere.infra.yaml.config.swapper.rule.data.pipeline.YamlPipelineProcessConfigurationSwapper;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
@@ -138,4 +148,28 @@ public final class PipelineContextUtil {
public static PipelineChannelCreator getPipelineChannelCreator() {
return PIPELINE_CHANNEL_CREATOR;
}
+
+ /**
+ * Mock migration job item context.
+ *
+ * @param jobConfig job configuration
+ * @return job item context
+ */
+ public static MigrationJobItemContext mockMigrationJobItemContext(final
MigrationJobConfiguration jobConfig) {
+ PipelineProcessConfiguration processConfig =
mockPipelineProcessConfiguration();
+ MigrationProcessContext processContext = new
MigrationProcessContext(jobConfig.getJobId(), processConfig);
+ int jobShardingItem = 0;
+ TaskConfiguration taskConfig = new
MigrationJobAPIImpl().buildTaskConfiguration(jobConfig, jobShardingItem,
processConfig);
+ return new MigrationJobItemContext(jobConfig, jobShardingItem, null,
+ processContext, taskConfig, new
DefaultPipelineDataSourceManager());
+ }
+
+ private static PipelineProcessConfiguration
mockPipelineProcessConfiguration() {
+ YamlPipelineReadConfiguration yamlReadConfig =
YamlPipelineReadConfiguration.buildWithDefaultValue();
+ yamlReadConfig.setShardingSize(10);
+ YamlPipelineProcessConfiguration yamlProcessConfig = new
YamlPipelineProcessConfiguration();
+ yamlProcessConfig.setRead(yamlReadConfig);
+
PipelineProcessConfigurationUtils.fillInDefaultValue(yamlProcessConfig);
+ return new
YamlPipelineProcessConfigurationSwapper().swapToObject(yamlProcessConfig);
+ }
}