This is an automated email from the ASF dual-hosted git repository.
chengzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 36b622bc6fa Refactor MigrationJobPreparer (#32517)
36b622bc6fa is described below
commit 36b622bc6fa08dbb7831753d7ef4f4c2c6824136
Author: Liang Zhang <[email protected]>
AuthorDate: Thu Aug 15 11:30:19 2024 +0800
Refactor MigrationJobPreparer (#32517)
---
.../pipeline/scenario/migration/preparer/MigrationJobPreparer.java | 6 ++----
1 file changed, 2 insertions(+), 4 deletions(-)
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
index b69a38cd42e..67c5ef189b7 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
@@ -44,7 +44,6 @@ import
org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncremen
import org.apache.shardingsphere.data.pipeline.core.job.progress.JobOffsetInfo;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
-import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.PipelineJobDataSourcePreparer;
import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.CreateTableConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter;
@@ -193,13 +192,12 @@ public final class MigrationJobPreparer {
private void initIncrementalTasks(final MigrationJobItemContext
jobItemContext) {
MigrationTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
- PipelineTableMetaDataLoader sourceMetaDataLoader =
jobItemContext.getSourceMetaDataLoader();
IncrementalDumperContext dumperContext = taskConfig.getDumperContext();
ExecuteEngine incrementalExecuteEngine =
jobItemContext.getJobProcessContext().getIncrementalExecuteEngine();
IncrementalTaskProgress taskProgress =
PipelineTaskUtils.createIncrementalTaskProgress(dumperContext.getCommonContext().getPosition(),
jobItemContext.getInitProgress());
PipelineChannel channel =
PipelineTaskUtils.createIncrementalChannel(jobItemContext.getJobProcessContext().getProcessConfiguration().getStreamChannel(),
taskProgress);
- Dumper dumper = IncrementalDumperCreator.create(
- new CreateIncrementalDumperParameter(dumperContext,
dumperContext.getCommonContext().getPosition(), channel, sourceMetaDataLoader,
jobItemContext.getDataSourceManager()));
+ Dumper dumper = IncrementalDumperCreator.create(new
CreateIncrementalDumperParameter(
+ dumperContext, dumperContext.getCommonContext().getPosition(),
channel, jobItemContext.getSourceMetaDataLoader(),
jobItemContext.getDataSourceManager()));
Collection<Importer> importers = Collections.singletonList(new
SingleChannelConsumerImporter(channel, 1, 5L, jobItemContext.getSink(),
jobItemContext));
PipelineTask incrementalTask = new
IncrementalTask(dumperContext.getCommonContext().getDataSourceName(),
incrementalExecuteEngine, dumper, importers, taskProgress);
jobItemContext.getIncrementalTasks().add(incrementalTask);