This is an automated email from the ASF dual-hosted git repository.

chengzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 36b622bc6fa Refactor MigrationJobPreparer (#32517)
36b622bc6fa is described below

commit 36b622bc6fa08dbb7831753d7ef4f4c2c6824136
Author: Liang Zhang <[email protected]>
AuthorDate: Thu Aug 15 11:30:19 2024 +0800

    Refactor MigrationJobPreparer (#32517)
---
 .../pipeline/scenario/migration/preparer/MigrationJobPreparer.java  | 6 ++----
 1 file changed, 2 insertions(+), 4 deletions(-)

diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
index b69a38cd42e..67c5ef189b7 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
@@ -44,7 +44,6 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncremen
 import org.apache.shardingsphere.data.pipeline.core.job.progress.JobOffsetInfo;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.PipelineJobDataSourcePreparer;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.CreateTableConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter;
@@ -193,13 +192,12 @@ public final class MigrationJobPreparer {
     
     private void initIncrementalTasks(final MigrationJobItemContext 
jobItemContext) {
         MigrationTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
-        PipelineTableMetaDataLoader sourceMetaDataLoader = 
jobItemContext.getSourceMetaDataLoader();
         IncrementalDumperContext dumperContext = taskConfig.getDumperContext();
         ExecuteEngine incrementalExecuteEngine = 
jobItemContext.getJobProcessContext().getIncrementalExecuteEngine();
         IncrementalTaskProgress taskProgress = 
PipelineTaskUtils.createIncrementalTaskProgress(dumperContext.getCommonContext().getPosition(),
 jobItemContext.getInitProgress());
         PipelineChannel channel = 
PipelineTaskUtils.createIncrementalChannel(jobItemContext.getJobProcessContext().getProcessConfiguration().getStreamChannel(),
 taskProgress);
-        Dumper dumper = IncrementalDumperCreator.create(
-                new CreateIncrementalDumperParameter(dumperContext, 
dumperContext.getCommonContext().getPosition(), channel, sourceMetaDataLoader, 
jobItemContext.getDataSourceManager()));
+        Dumper dumper = IncrementalDumperCreator.create(new 
CreateIncrementalDumperParameter(
+                dumperContext, dumperContext.getCommonContext().getPosition(), 
channel, jobItemContext.getSourceMetaDataLoader(), 
jobItemContext.getDataSourceManager()));
         Collection<Importer> importers = Collections.singletonList(new 
SingleChannelConsumerImporter(channel, 1, 5L, jobItemContext.getSink(), 
jobItemContext));
         PipelineTask incrementalTask = new 
IncrementalTask(dumperContext.getCommonContext().getDataSourceName(), 
incrementalExecuteEngine, dumper, importers, taskProgress);
         jobItemContext.getIncrementalTasks().add(incrementalTask);

Reply via email to