This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new b261b603523 Code format for MigrationJobPreparer (#32518)
b261b603523 is described below

commit b261b603523d10f27edf37472bc4d5d59297e657
Author: Liang Zhang <[email protected]>
AuthorDate: Thu Aug 15 11:47:01 2024 +0800

    Code format for MigrationJobPreparer (#32518)
    
    * Code format for MigrationJobPreparer
    
    * Move GlobalTableMapEventMapping
---
 .../mysql/ingest/{ => client}/GlobalTableMapEventMapping.java        | 2 +-
 .../data/pipeline/mysql/ingest/client/MySQLClient.java               | 1 -
 .../data/pipeline/cdc/core/prepare/CDCJobPreparer.java               | 5 +++--
 .../pipeline/scenario/migration/preparer/MigrationJobPreparer.java   | 5 +++--
 4 files changed, 7 insertions(+), 6 deletions(-)

diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/GlobalTableMapEventMapping.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/GlobalTableMapEventMapping.java
similarity index 96%
rename from 
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/GlobalTableMapEventMapping.java
rename to 
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/GlobalTableMapEventMapping.java
index 78a215e80da..25b26769620 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/GlobalTableMapEventMapping.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/GlobalTableMapEventMapping.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.mysql.ingest;
+package org.apache.shardingsphere.data.pipeline.mysql.ingest.client;
 
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
index df60c00de2f..c1b4e10d3e5 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
@@ -33,7 +33,6 @@ import io.netty.util.concurrent.Promise;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
-import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.GlobalTableMapEventMapping;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractBinlogEvent;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.PlaceholderEvent;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.client.netty.MySQLBinlogEventPacketDecoder;
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index 8cd25dc4c20..dde3d6a7176 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -140,8 +140,9 @@ public final class CDCJobPreparer {
         IncrementalTaskProgress taskProgress = 
PipelineTaskUtils.createIncrementalTaskProgress(dumperContext.getCommonContext().getPosition(),
 jobItemContext.getInitProgress());
         PipelineChannel channel = 
PipelineTaskUtils.createIncrementalChannel(jobItemContext.getJobProcessContext().getProcessConfiguration().getStreamChannel(),
 taskProgress);
         channelProgressPairs.add(new CDCChannelProgressPair(channel, 
jobItemContext));
-        Dumper dumper = IncrementalDumperCreator.create(new 
CreateIncrementalDumperParameter(
-                dumperContext, dumperContext.getCommonContext().getPosition(), 
channel, jobItemContext.getSourceMetaDataLoader(), 
jobItemContext.getDataSourceManager()));
+        CreateIncrementalDumperParameter param = new 
CreateIncrementalDumperParameter(
+                dumperContext, dumperContext.getCommonContext().getPosition(), 
channel, jobItemContext.getSourceMetaDataLoader(), 
jobItemContext.getDataSourceManager());
+        Dumper dumper = IncrementalDumperCreator.create(param);
         boolean needSorting = jobItemContext.getJobConfig().isDecodeWithTX();
         Importer importer = importerUsed.get() ? null
                 : new CDCImporter(channelProgressPairs, 1, 100L, 
jobItemContext.getSink(), needSorting, 
taskConfig.getImporterConfig().getRateLimitAlgorithm());
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
index 67c5ef189b7..106d6daf76f 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
@@ -196,8 +196,9 @@ public final class MigrationJobPreparer {
         ExecuteEngine incrementalExecuteEngine = 
jobItemContext.getJobProcessContext().getIncrementalExecuteEngine();
         IncrementalTaskProgress taskProgress = 
PipelineTaskUtils.createIncrementalTaskProgress(dumperContext.getCommonContext().getPosition(),
 jobItemContext.getInitProgress());
         PipelineChannel channel = 
PipelineTaskUtils.createIncrementalChannel(jobItemContext.getJobProcessContext().getProcessConfiguration().getStreamChannel(),
 taskProgress);
-        Dumper dumper = IncrementalDumperCreator.create(new 
CreateIncrementalDumperParameter(
-                dumperContext, dumperContext.getCommonContext().getPosition(), 
channel, jobItemContext.getSourceMetaDataLoader(), 
jobItemContext.getDataSourceManager()));
+        CreateIncrementalDumperParameter param = new 
CreateIncrementalDumperParameter(
+                dumperContext, dumperContext.getCommonContext().getPosition(), 
channel, jobItemContext.getSourceMetaDataLoader(), 
jobItemContext.getDataSourceManager());
+        Dumper dumper = IncrementalDumperCreator.create(param);
         Collection<Importer> importers = Collections.singletonList(new 
SingleChannelConsumerImporter(channel, 1, 5L, jobItemContext.getSink(), 
jobItemContext));
         PipelineTask incrementalTask = new 
IncrementalTask(dumperContext.getCommonContext().getDataSourceName(), 
incrementalExecuteEngine, dumper, importers, taskProgress);
         jobItemContext.getIncrementalTasks().add(incrementalTask);

Reply via email to