This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new c2025c7d832 Add CreateIncrementalDumperParameter (#32509)
c2025c7d832 is described below

commit c2025c7d83293076de63d90ee478b82e4a0351a7
Author: Liang Zhang <[email protected]>
AuthorDate: Thu Aug 15 00:49:13 2024 +0800

    Add CreateIncrementalDumperParameter (#32509)
---
 ....java => CreateIncrementalDumperParameter.java} | 31 +++++++++++-----------
 .../DialectIncrementalDumperCreator.java           | 10 ++-----
 .../incremental/IncrementalDumperCreator.java      | 13 +++------
 .../dumper/MySQLIncrementalDumperCreator.java      | 12 +++------
 .../dumper/OpenGaussIncrementalDumperCreator.java  | 12 +++------
 .../dumper/PostgreSQLIncrementalDumperCreator.java | 12 +++------
 .../pipeline/cdc/core/prepare/CDCJobPreparer.java  |  4 ++-
 .../migration/preparer/MigrationJobPreparer.java   |  6 ++++-
 .../h2/dumper/H2IncrementalDumperCreator.java      | 10 +++----
 9 files changed, 45 insertions(+), 65 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/incremental/DialectIncrementalDumperCreator.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/incremental/CreateIncrementalDumperParameter.java
similarity index 62%
copy from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/incremental/DialectIncrementalDumperCreator.java
copy to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/incremental/CreateIncrementalDumperParameter.java
index e3b9d7a829f..8e0eb3f2f57 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/incremental/DialectIncrementalDumperCreator.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/incremental/CreateIncrementalDumperParameter.java
@@ -17,26 +17,27 @@
 
 package org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental;
 
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
-import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPI;
-import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
 
 /**
- * Dialect incremental dumper creator.
+ * Create incremental dumper parameter.
  */
-@SingletonSPI
-public interface DialectIncrementalDumperCreator extends DatabaseTypedSPI {
+@RequiredArgsConstructor
+@Getter
+public final class CreateIncrementalDumperParameter {
     
-    /**
-     * Create incremental dumper.
-     *
-     * @param context incremental dumper context
-     * @param position position
-     * @param channel channel
-     * @param metaDataLoader meta data loader
-     * @return incremental dumper
-     */
-    IncrementalDumper createIncrementalDumper(IncrementalDumperContext 
context, IngestPosition position, PipelineChannel channel, 
PipelineTableMetaDataLoader metaDataLoader);
+    private final IncrementalDumperContext context;
+    
+    private final IngestPosition position;
+    
+    private final PipelineChannel channel;
+    
+    private final PipelineTableMetaDataLoader metaDataLoader;
+    
+    private final PipelineDataSourceManager dataSourceManager;
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/incremental/DialectIncrementalDumperCreator.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/incremental/DialectIncrementalDumperCreator.java
index e3b9d7a829f..7a98bed7d7f 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/incremental/DialectIncrementalDumperCreator.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/incremental/DialectIncrementalDumperCreator.java
@@ -17,9 +17,6 @@
 
 package org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental;
 
-import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPI;
 import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
 
@@ -32,11 +29,8 @@ public interface DialectIncrementalDumperCreator extends 
DatabaseTypedSPI {
     /**
      * Create incremental dumper.
      *
-     * @param context incremental dumper context
-     * @param position position
-     * @param channel channel
-     * @param metaDataLoader meta data loader
+     * @param param create incremental dumper parameter
      * @return incremental dumper
      */
-    IncrementalDumper createIncrementalDumper(IncrementalDumperContext 
context, IngestPosition position, PipelineChannel channel, 
PipelineTableMetaDataLoader metaDataLoader);
+    IncrementalDumper createIncrementalDumper(CreateIncrementalDumperParameter 
param);
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/incremental/IncrementalDumperCreator.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/incremental/IncrementalDumperCreator.java
index 48cd2a5d92e..b573c4c1039 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/incremental/IncrementalDumperCreator.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/incremental/IncrementalDumperCreator.java
@@ -20,8 +20,6 @@ package 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental;
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import 
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
 import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
 import 
org.apache.shardingsphere.infra.exception.generic.UnsupportedSQLOperationException;
@@ -35,15 +33,12 @@ public final class IncrementalDumperCreator {
     /**
      * Create incremental dumper.
      *
-     * @param dumperContext incremental dumper context
-     * @param channel channel
-     * @param metaDataLoader meta data loader
+     * @param param create incremental dumper parameter
      * @return incremental dumper
      */
-    public static IncrementalDumper create(final IncrementalDumperContext 
dumperContext, final PipelineChannel channel, final PipelineTableMetaDataLoader 
metaDataLoader) {
-        
ShardingSpherePreconditions.checkState(dumperContext.getCommonContext().getDataSourceConfig()
 instanceof StandardPipelineDataSourceConfiguration,
+    public static IncrementalDumper create(final 
CreateIncrementalDumperParameter param) {
+        
ShardingSpherePreconditions.checkState(param.getContext().getCommonContext().getDataSourceConfig()
 instanceof StandardPipelineDataSourceConfiguration,
                 () -> new UnsupportedSQLOperationException("Incremental dumper 
only support StandardPipelineDataSourceConfiguration"));
-        return 
DatabaseTypedSPILoader.getService(DialectIncrementalDumperCreator.class, 
dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType())
-                .createIncrementalDumper(dumperContext, 
dumperContext.getCommonContext().getPosition(), channel, metaDataLoader);
+        return 
DatabaseTypedSPILoader.getService(DialectIncrementalDumperCreator.class, 
param.getContext().getCommonContext().getDataSourceConfig().getDatabaseType()).createIncrementalDumper(param);
     }
 }
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/dumper/MySQLIncrementalDumperCreator.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/dumper/MySQLIncrementalDumperCreator.java
index 23275e1744d..6dd40c95fa1 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/dumper/MySQLIncrementalDumperCreator.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/dumper/MySQLIncrementalDumperCreator.java
@@ -17,13 +17,10 @@
 
 package org.apache.shardingsphere.data.pipeline.mysql.ingest.dumper;
 
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
-import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.CreateIncrementalDumperParameter;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.DialectIncrementalDumperCreator;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumper;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.MySQLIncrementalDumper;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.DialectIncrementalDumperCreator;
 
 /**
  * MySQL incremental dumper creator.
@@ -31,9 +28,8 @@ import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.Di
 public final class MySQLIncrementalDumperCreator implements 
DialectIncrementalDumperCreator {
     
     @Override
-    public IncrementalDumper createIncrementalDumper(final 
IncrementalDumperContext context,
-                                                     final IngestPosition 
position, final PipelineChannel channel, final PipelineTableMetaDataLoader 
metaDataLoader) {
-        return new MySQLIncrementalDumper(context, position, channel, 
metaDataLoader);
+    public IncrementalDumper createIncrementalDumper(final 
CreateIncrementalDumperParameter param) {
+        return new MySQLIncrementalDumper(param.getContext(), 
param.getPosition(), param.getChannel(), param.getMetaDataLoader());
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/dumper/OpenGaussIncrementalDumperCreator.java
 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/dumper/OpenGaussIncrementalDumperCreator.java
index 7814e44d3fc..e977e22aa57 100644
--- 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/dumper/OpenGaussIncrementalDumperCreator.java
+++ 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/dumper/OpenGaussIncrementalDumperCreator.java
@@ -17,13 +17,10 @@
 
 package org.apache.shardingsphere.data.pipeline.opengauss.ingest.dumper;
 
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
-import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.CreateIncrementalDumperParameter;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.DialectIncrementalDumperCreator;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumper;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.opengauss.ingest.OpenGaussWALDumper;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.DialectIncrementalDumperCreator;
 
 /**
  * OpenGauss incremental dumper creator.
@@ -31,9 +28,8 @@ import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.Di
 public final class OpenGaussIncrementalDumperCreator implements 
DialectIncrementalDumperCreator {
     
     @Override
-    public IncrementalDumper createIncrementalDumper(final 
IncrementalDumperContext context,
-                                                     final IngestPosition 
position, final PipelineChannel channel, final PipelineTableMetaDataLoader 
metaDataLoader) {
-        return new OpenGaussWALDumper(context, position, channel, 
metaDataLoader);
+    public IncrementalDumper createIncrementalDumper(final 
CreateIncrementalDumperParameter param) {
+        return new OpenGaussWALDumper(param.getContext(), param.getPosition(), 
param.getChannel(), param.getMetaDataLoader());
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/dumper/PostgreSQLIncrementalDumperCreator.java
 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/dumper/PostgreSQLIncrementalDumperCreator.java
index a7cc59f1411..d54e3eeb942 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/dumper/PostgreSQLIncrementalDumperCreator.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/dumper/PostgreSQLIncrementalDumperCreator.java
@@ -17,13 +17,10 @@
 
 package org.apache.shardingsphere.data.pipeline.postgresql.ingest.dumper;
 
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
-import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.CreateIncrementalDumperParameter;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.DialectIncrementalDumperCreator;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumper;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLWALDumper;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.DialectIncrementalDumperCreator;
 
 /**
  * PostgreSQL incremental dumper creator.
@@ -31,9 +28,8 @@ import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.Di
 public final class PostgreSQLIncrementalDumperCreator implements 
DialectIncrementalDumperCreator {
     
     @Override
-    public IncrementalDumper createIncrementalDumper(final 
IncrementalDumperContext context,
-                                                     final IngestPosition 
position, final PipelineChannel channel, final PipelineTableMetaDataLoader 
metaDataLoader) {
-        return new PostgreSQLWALDumper(context, position, channel, 
metaDataLoader);
+    public IncrementalDumper createIncrementalDumper(final 
CreateIncrementalDumperParameter param) {
+        return new PostgreSQLWALDumper(param.getContext(), 
param.getPosition(), param.getChannel(), param.getMetaDataLoader());
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index 4341d35c924..8cd25dc4c20 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -31,6 +31,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWith
 import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
 import 
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.CreateIncrementalDumperParameter;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperCreator;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumper;
@@ -139,7 +140,8 @@ public final class CDCJobPreparer {
         IncrementalTaskProgress taskProgress = 
PipelineTaskUtils.createIncrementalTaskProgress(dumperContext.getCommonContext().getPosition(),
 jobItemContext.getInitProgress());
         PipelineChannel channel = 
PipelineTaskUtils.createIncrementalChannel(jobItemContext.getJobProcessContext().getProcessConfiguration().getStreamChannel(),
 taskProgress);
         channelProgressPairs.add(new CDCChannelProgressPair(channel, 
jobItemContext));
-        Dumper dumper = IncrementalDumperCreator.create(dumperContext, 
channel, jobItemContext.getSourceMetaDataLoader());
+        Dumper dumper = IncrementalDumperCreator.create(new 
CreateIncrementalDumperParameter(
+                dumperContext, dumperContext.getCommonContext().getPosition(), 
channel, jobItemContext.getSourceMetaDataLoader(), 
jobItemContext.getDataSourceManager()));
         boolean needSorting = jobItemContext.getJobConfig().isDecodeWithTX();
         Importer importer = importerUsed.get() ? null
                 : new CDCImporter(channelProgressPairs, 1, 100L, 
jobItemContext.getSink(), needSorting, 
taskConfig.getImporterConfig().getRateLimitAlgorithm());
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
index a05eb2e58d0..b69a38cd42e 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
@@ -31,6 +31,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
 import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
 import 
org.apache.shardingsphere.data.pipeline.core.importer.SingleChannelConsumerImporter;
 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.CreateIncrementalDumperParameter;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.DialectIncrementalDumperCreator;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperCreator;
@@ -43,6 +44,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncremen
 import org.apache.shardingsphere.data.pipeline.core.job.progress.JobOffsetInfo;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.PipelineJobDataSourcePreparer;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.CreateTableConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter;
@@ -191,11 +193,13 @@ public final class MigrationJobPreparer {
     
     private void initIncrementalTasks(final MigrationJobItemContext 
jobItemContext) {
         MigrationTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
+        PipelineTableMetaDataLoader sourceMetaDataLoader = 
jobItemContext.getSourceMetaDataLoader();
         IncrementalDumperContext dumperContext = taskConfig.getDumperContext();
         ExecuteEngine incrementalExecuteEngine = 
jobItemContext.getJobProcessContext().getIncrementalExecuteEngine();
         IncrementalTaskProgress taskProgress = 
PipelineTaskUtils.createIncrementalTaskProgress(dumperContext.getCommonContext().getPosition(),
 jobItemContext.getInitProgress());
         PipelineChannel channel = 
PipelineTaskUtils.createIncrementalChannel(jobItemContext.getJobProcessContext().getProcessConfiguration().getStreamChannel(),
 taskProgress);
-        Dumper dumper = IncrementalDumperCreator.create(dumperContext, 
channel, jobItemContext.getSourceMetaDataLoader());
+        Dumper dumper = IncrementalDumperCreator.create(
+                new CreateIncrementalDumperParameter(dumperContext, 
dumperContext.getCommonContext().getPosition(), channel, sourceMetaDataLoader, 
jobItemContext.getDataSourceManager()));
         Collection<Importer> importers = Collections.singletonList(new 
SingleChannelConsumerImporter(channel, 1, 5L, jobItemContext.getSink(), 
jobItemContext));
         PipelineTask incrementalTask = new 
IncrementalTask(dumperContext.getCommonContext().getDataSourceName(), 
incrementalExecuteEngine, dumper, importers, taskProgress);
         jobItemContext.getIncrementalTasks().add(incrementalTask);
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/h2/dumper/H2IncrementalDumperCreator.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/h2/dumper/H2IncrementalDumperCreator.java
index 23c432cd61e..bd7fdab3922 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/h2/dumper/H2IncrementalDumperCreator.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/h2/dumper/H2IncrementalDumperCreator.java
@@ -17,12 +17,9 @@
 
 package org.apache.shardingsphere.test.it.data.pipeline.core.fixture.h2.dumper;
 
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
-import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumper;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.CreateIncrementalDumperParameter;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.DialectIncrementalDumperCreator;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumper;
 
 import static org.mockito.Mockito.mock;
 
@@ -32,8 +29,7 @@ import static org.mockito.Mockito.mock;
 public final class H2IncrementalDumperCreator implements 
DialectIncrementalDumperCreator {
     
     @Override
-    public IncrementalDumper createIncrementalDumper(final 
IncrementalDumperContext context,
-                                                     final IngestPosition 
position, final PipelineChannel channel, final PipelineTableMetaDataLoader 
metaDataLoader) {
+    public IncrementalDumper createIncrementalDumper(final 
CreateIncrementalDumperParameter param) {
         return mock(IncrementalDumper.class);
     }
     

Reply via email to