This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new b261b603523 Code format for MigrationJobPreparer (#32518)
b261b603523 is described below
commit b261b603523d10f27edf37472bc4d5d59297e657
Author: Liang Zhang <[email protected]>
AuthorDate: Thu Aug 15 11:47:01 2024 +0800
Code format for MigrationJobPreparer (#32518)
* Code format for MigrationJobPreparer
* Move GlobalTableMapEventMapping
---
.../mysql/ingest/{ => client}/GlobalTableMapEventMapping.java | 2 +-
.../data/pipeline/mysql/ingest/client/MySQLClient.java | 1 -
.../data/pipeline/cdc/core/prepare/CDCJobPreparer.java | 5 +++--
.../pipeline/scenario/migration/preparer/MigrationJobPreparer.java | 5 +++--
4 files changed, 7 insertions(+), 6 deletions(-)
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/GlobalTableMapEventMapping.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/GlobalTableMapEventMapping.java
similarity index 96%
rename from
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/GlobalTableMapEventMapping.java
rename to
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/GlobalTableMapEventMapping.java
index 78a215e80da..25b26769620 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/GlobalTableMapEventMapping.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/GlobalTableMapEventMapping.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.mysql.ingest;
+package org.apache.shardingsphere.data.pipeline.mysql.ingest.client;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
index df60c00de2f..c1b4e10d3e5 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
@@ -33,7 +33,6 @@ import io.netty.util.concurrent.Promise;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
-import
org.apache.shardingsphere.data.pipeline.mysql.ingest.GlobalTableMapEventMapping;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractBinlogEvent;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.PlaceholderEvent;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.client.netty.MySQLBinlogEventPacketDecoder;
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index 8cd25dc4c20..dde3d6a7176 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -140,8 +140,9 @@ public final class CDCJobPreparer {
IncrementalTaskProgress taskProgress =
PipelineTaskUtils.createIncrementalTaskProgress(dumperContext.getCommonContext().getPosition(),
jobItemContext.getInitProgress());
PipelineChannel channel =
PipelineTaskUtils.createIncrementalChannel(jobItemContext.getJobProcessContext().getProcessConfiguration().getStreamChannel(),
taskProgress);
channelProgressPairs.add(new CDCChannelProgressPair(channel,
jobItemContext));
- Dumper dumper = IncrementalDumperCreator.create(new
CreateIncrementalDumperParameter(
- dumperContext, dumperContext.getCommonContext().getPosition(),
channel, jobItemContext.getSourceMetaDataLoader(),
jobItemContext.getDataSourceManager()));
+ CreateIncrementalDumperParameter param = new
CreateIncrementalDumperParameter(
+ dumperContext, dumperContext.getCommonContext().getPosition(),
channel, jobItemContext.getSourceMetaDataLoader(),
jobItemContext.getDataSourceManager());
+ Dumper dumper = IncrementalDumperCreator.create(param);
boolean needSorting = jobItemContext.getJobConfig().isDecodeWithTX();
Importer importer = importerUsed.get() ? null
: new CDCImporter(channelProgressPairs, 1, 100L,
jobItemContext.getSink(), needSorting,
taskConfig.getImporterConfig().getRateLimitAlgorithm());
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
index 67c5ef189b7..106d6daf76f 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
@@ -196,8 +196,9 @@ public final class MigrationJobPreparer {
ExecuteEngine incrementalExecuteEngine =
jobItemContext.getJobProcessContext().getIncrementalExecuteEngine();
IncrementalTaskProgress taskProgress =
PipelineTaskUtils.createIncrementalTaskProgress(dumperContext.getCommonContext().getPosition(),
jobItemContext.getInitProgress());
PipelineChannel channel =
PipelineTaskUtils.createIncrementalChannel(jobItemContext.getJobProcessContext().getProcessConfiguration().getStreamChannel(),
taskProgress);
- Dumper dumper = IncrementalDumperCreator.create(new
CreateIncrementalDumperParameter(
- dumperContext, dumperContext.getCommonContext().getPosition(),
channel, jobItemContext.getSourceMetaDataLoader(),
jobItemContext.getDataSourceManager()));
+ CreateIncrementalDumperParameter param = new
CreateIncrementalDumperParameter(
+ dumperContext, dumperContext.getCommonContext().getPosition(),
channel, jobItemContext.getSourceMetaDataLoader(),
jobItemContext.getDataSourceManager());
+ Dumper dumper = IncrementalDumperCreator.create(param);
Collection<Importer> importers = Collections.singletonList(new
SingleChannelConsumerImporter(channel, 1, 5L, jobItemContext.getSink(),
jobItemContext));
PipelineTask incrementalTask = new
IncrementalTask(dumperContext.getCommonContext().getDataSourceName(),
incrementalExecuteEngine, dumper, importers, taskProgress);
jobItemContext.getIncrementalTasks().add(incrementalTask);