This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new c2025c7d832 Add CreateIncrementalDumperParameter (#32509)
c2025c7d832 is described below
commit c2025c7d83293076de63d90ee478b82e4a0351a7
Author: Liang Zhang <[email protected]>
AuthorDate: Thu Aug 15 00:49:13 2024 +0800
Add CreateIncrementalDumperParameter (#32509)
---
....java => CreateIncrementalDumperParameter.java} | 31 +++++++++++-----------
.../DialectIncrementalDumperCreator.java | 10 ++-----
.../incremental/IncrementalDumperCreator.java | 13 +++------
.../dumper/MySQLIncrementalDumperCreator.java | 12 +++------
.../dumper/OpenGaussIncrementalDumperCreator.java | 12 +++------
.../dumper/PostgreSQLIncrementalDumperCreator.java | 12 +++------
.../pipeline/cdc/core/prepare/CDCJobPreparer.java | 4 ++-
.../migration/preparer/MigrationJobPreparer.java | 6 ++++-
.../h2/dumper/H2IncrementalDumperCreator.java | 10 +++----
9 files changed, 45 insertions(+), 65 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/incremental/DialectIncrementalDumperCreator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/incremental/CreateIncrementalDumperParameter.java
similarity index 62%
copy from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/incremental/DialectIncrementalDumperCreator.java
copy to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/incremental/CreateIncrementalDumperParameter.java
index e3b9d7a829f..8e0eb3f2f57 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/incremental/DialectIncrementalDumperCreator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/incremental/CreateIncrementalDumperParameter.java
@@ -17,26 +17,27 @@
package org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
-import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPI;
-import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
/**
- * Dialect incremental dumper creator.
+ * Create incremental dumper parameter.
*/
-@SingletonSPI
-public interface DialectIncrementalDumperCreator extends DatabaseTypedSPI {
+@RequiredArgsConstructor
+@Getter
+public final class CreateIncrementalDumperParameter {
- /**
- * Create incremental dumper.
- *
- * @param context incremental dumper context
- * @param position position
- * @param channel channel
- * @param metaDataLoader meta data loader
- * @return incremental dumper
- */
- IncrementalDumper createIncrementalDumper(IncrementalDumperContext
context, IngestPosition position, PipelineChannel channel,
PipelineTableMetaDataLoader metaDataLoader);
+ private final IncrementalDumperContext context;
+
+ private final IngestPosition position;
+
+ private final PipelineChannel channel;
+
+ private final PipelineTableMetaDataLoader metaDataLoader;
+
+ private final PipelineDataSourceManager dataSourceManager;
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/incremental/DialectIncrementalDumperCreator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/incremental/DialectIncrementalDumperCreator.java
index e3b9d7a829f..7a98bed7d7f 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/incremental/DialectIncrementalDumperCreator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/incremental/DialectIncrementalDumperCreator.java
@@ -17,9 +17,6 @@
package org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental;
-import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
-import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPI;
import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
@@ -32,11 +29,8 @@ public interface DialectIncrementalDumperCreator extends
DatabaseTypedSPI {
/**
* Create incremental dumper.
*
- * @param context incremental dumper context
- * @param position position
- * @param channel channel
- * @param metaDataLoader meta data loader
+ * @param param create incremental dumper parameter
* @return incremental dumper
*/
- IncrementalDumper createIncrementalDumper(IncrementalDumperContext
context, IngestPosition position, PipelineChannel channel,
PipelineTableMetaDataLoader metaDataLoader);
+ IncrementalDumper createIncrementalDumper(CreateIncrementalDumperParameter
param);
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/incremental/IncrementalDumperCreator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/incremental/IncrementalDumperCreator.java
index 48cd2a5d92e..b573c4c1039 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/incremental/IncrementalDumperCreator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/incremental/IncrementalDumperCreator.java
@@ -20,8 +20,6 @@ package
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
-import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import
org.apache.shardingsphere.infra.exception.generic.UnsupportedSQLOperationException;
@@ -35,15 +33,12 @@ public final class IncrementalDumperCreator {
/**
* Create incremental dumper.
*
- * @param dumperContext incremental dumper context
- * @param channel channel
- * @param metaDataLoader meta data loader
+ * @param param create incremental dumper parameter
* @return incremental dumper
*/
- public static IncrementalDumper create(final IncrementalDumperContext
dumperContext, final PipelineChannel channel, final PipelineTableMetaDataLoader
metaDataLoader) {
-
ShardingSpherePreconditions.checkState(dumperContext.getCommonContext().getDataSourceConfig()
instanceof StandardPipelineDataSourceConfiguration,
+ public static IncrementalDumper create(final
CreateIncrementalDumperParameter param) {
+
ShardingSpherePreconditions.checkState(param.getContext().getCommonContext().getDataSourceConfig()
instanceof StandardPipelineDataSourceConfiguration,
() -> new UnsupportedSQLOperationException("Incremental dumper
only support StandardPipelineDataSourceConfiguration"));
- return
DatabaseTypedSPILoader.getService(DialectIncrementalDumperCreator.class,
dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType())
- .createIncrementalDumper(dumperContext,
dumperContext.getCommonContext().getPosition(), channel, metaDataLoader);
+ return
DatabaseTypedSPILoader.getService(DialectIncrementalDumperCreator.class,
param.getContext().getCommonContext().getDataSourceConfig().getDatabaseType()).createIncrementalDumper(param);
}
}
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/dumper/MySQLIncrementalDumperCreator.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/dumper/MySQLIncrementalDumperCreator.java
index 23275e1744d..6dd40c95fa1 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/dumper/MySQLIncrementalDumperCreator.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/dumper/MySQLIncrementalDumperCreator.java
@@ -17,13 +17,10 @@
package org.apache.shardingsphere.data.pipeline.mysql.ingest.dumper;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
-import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.CreateIncrementalDumperParameter;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.DialectIncrementalDumperCreator;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumper;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
-import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.MySQLIncrementalDumper;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.DialectIncrementalDumperCreator;
/**
* MySQL incremental dumper creator.
@@ -31,9 +28,8 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.Di
public final class MySQLIncrementalDumperCreator implements
DialectIncrementalDumperCreator {
@Override
- public IncrementalDumper createIncrementalDumper(final
IncrementalDumperContext context,
- final IngestPosition
position, final PipelineChannel channel, final PipelineTableMetaDataLoader
metaDataLoader) {
- return new MySQLIncrementalDumper(context, position, channel,
metaDataLoader);
+ public IncrementalDumper createIncrementalDumper(final
CreateIncrementalDumperParameter param) {
+ return new MySQLIncrementalDumper(param.getContext(),
param.getPosition(), param.getChannel(), param.getMetaDataLoader());
}
@Override
diff --git
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/dumper/OpenGaussIncrementalDumperCreator.java
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/dumper/OpenGaussIncrementalDumperCreator.java
index 7814e44d3fc..e977e22aa57 100644
---
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/dumper/OpenGaussIncrementalDumperCreator.java
+++
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/dumper/OpenGaussIncrementalDumperCreator.java
@@ -17,13 +17,10 @@
package org.apache.shardingsphere.data.pipeline.opengauss.ingest.dumper;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
-import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.CreateIncrementalDumperParameter;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.DialectIncrementalDumperCreator;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumper;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
-import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.opengauss.ingest.OpenGaussWALDumper;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.DialectIncrementalDumperCreator;
/**
* OpenGauss incremental dumper creator.
@@ -31,9 +28,8 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.Di
public final class OpenGaussIncrementalDumperCreator implements
DialectIncrementalDumperCreator {
@Override
- public IncrementalDumper createIncrementalDumper(final
IncrementalDumperContext context,
- final IngestPosition
position, final PipelineChannel channel, final PipelineTableMetaDataLoader
metaDataLoader) {
- return new OpenGaussWALDumper(context, position, channel,
metaDataLoader);
+ public IncrementalDumper createIncrementalDumper(final
CreateIncrementalDumperParameter param) {
+ return new OpenGaussWALDumper(param.getContext(), param.getPosition(),
param.getChannel(), param.getMetaDataLoader());
}
@Override
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/dumper/PostgreSQLIncrementalDumperCreator.java
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/dumper/PostgreSQLIncrementalDumperCreator.java
index a7cc59f1411..d54e3eeb942 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/dumper/PostgreSQLIncrementalDumperCreator.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/dumper/PostgreSQLIncrementalDumperCreator.java
@@ -17,13 +17,10 @@
package org.apache.shardingsphere.data.pipeline.postgresql.ingest.dumper;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
-import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.CreateIncrementalDumperParameter;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.DialectIncrementalDumperCreator;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumper;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
-import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLWALDumper;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.DialectIncrementalDumperCreator;
/**
* PostgreSQL incremental dumper creator.
@@ -31,9 +28,8 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.Di
public final class PostgreSQLIncrementalDumperCreator implements
DialectIncrementalDumperCreator {
@Override
- public IncrementalDumper createIncrementalDumper(final
IncrementalDumperContext context,
- final IngestPosition
position, final PipelineChannel channel, final PipelineTableMetaDataLoader
metaDataLoader) {
- return new PostgreSQLWALDumper(context, position, channel,
metaDataLoader);
+ public IncrementalDumper createIncrementalDumper(final
CreateIncrementalDumperParameter param) {
+ return new PostgreSQLWALDumper(param.getContext(),
param.getPosition(), param.getChannel(), param.getMetaDataLoader());
}
@Override
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index 4341d35c924..8cd25dc4c20 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -31,6 +31,7 @@ import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWith
import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
import
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.CreateIncrementalDumperParameter;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperCreator;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumper;
@@ -139,7 +140,8 @@ public final class CDCJobPreparer {
IncrementalTaskProgress taskProgress =
PipelineTaskUtils.createIncrementalTaskProgress(dumperContext.getCommonContext().getPosition(),
jobItemContext.getInitProgress());
PipelineChannel channel =
PipelineTaskUtils.createIncrementalChannel(jobItemContext.getJobProcessContext().getProcessConfiguration().getStreamChannel(),
taskProgress);
channelProgressPairs.add(new CDCChannelProgressPair(channel,
jobItemContext));
- Dumper dumper = IncrementalDumperCreator.create(dumperContext,
channel, jobItemContext.getSourceMetaDataLoader());
+ Dumper dumper = IncrementalDumperCreator.create(new
CreateIncrementalDumperParameter(
+ dumperContext, dumperContext.getCommonContext().getPosition(),
channel, jobItemContext.getSourceMetaDataLoader(),
jobItemContext.getDataSourceManager()));
boolean needSorting = jobItemContext.getJobConfig().isDecodeWithTX();
Importer importer = importerUsed.get() ? null
: new CDCImporter(channelProgressPairs, 1, 100L,
jobItemContext.getSink(), needSorting,
taskConfig.getImporterConfig().getRateLimitAlgorithm());
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
index a05eb2e58d0..b69a38cd42e 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
@@ -31,6 +31,7 @@ import
org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
import
org.apache.shardingsphere.data.pipeline.core.importer.SingleChannelConsumerImporter;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.CreateIncrementalDumperParameter;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.DialectIncrementalDumperCreator;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperCreator;
@@ -43,6 +44,7 @@ import
org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncremen
import org.apache.shardingsphere.data.pipeline.core.job.progress.JobOffsetInfo;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
+import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.PipelineJobDataSourcePreparer;
import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.CreateTableConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter;
@@ -191,11 +193,13 @@ public final class MigrationJobPreparer {
private void initIncrementalTasks(final MigrationJobItemContext
jobItemContext) {
MigrationTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
+ PipelineTableMetaDataLoader sourceMetaDataLoader =
jobItemContext.getSourceMetaDataLoader();
IncrementalDumperContext dumperContext = taskConfig.getDumperContext();
ExecuteEngine incrementalExecuteEngine =
jobItemContext.getJobProcessContext().getIncrementalExecuteEngine();
IncrementalTaskProgress taskProgress =
PipelineTaskUtils.createIncrementalTaskProgress(dumperContext.getCommonContext().getPosition(),
jobItemContext.getInitProgress());
PipelineChannel channel =
PipelineTaskUtils.createIncrementalChannel(jobItemContext.getJobProcessContext().getProcessConfiguration().getStreamChannel(),
taskProgress);
- Dumper dumper = IncrementalDumperCreator.create(dumperContext,
channel, jobItemContext.getSourceMetaDataLoader());
+ Dumper dumper = IncrementalDumperCreator.create(
+ new CreateIncrementalDumperParameter(dumperContext,
dumperContext.getCommonContext().getPosition(), channel, sourceMetaDataLoader,
jobItemContext.getDataSourceManager()));
Collection<Importer> importers = Collections.singletonList(new
SingleChannelConsumerImporter(channel, 1, 5L, jobItemContext.getSink(),
jobItemContext));
PipelineTask incrementalTask = new
IncrementalTask(dumperContext.getCommonContext().getDataSourceName(),
incrementalExecuteEngine, dumper, importers, taskProgress);
jobItemContext.getIncrementalTasks().add(incrementalTask);
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/h2/dumper/H2IncrementalDumperCreator.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/h2/dumper/H2IncrementalDumperCreator.java
index 23c432cd61e..bd7fdab3922 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/h2/dumper/H2IncrementalDumperCreator.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/h2/dumper/H2IncrementalDumperCreator.java
@@ -17,12 +17,9 @@
package org.apache.shardingsphere.test.it.data.pipeline.core.fixture.h2.dumper;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
-import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumper;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
-import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.CreateIncrementalDumperParameter;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.DialectIncrementalDumperCreator;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumper;
import static org.mockito.Mockito.mock;
@@ -32,8 +29,7 @@ import static org.mockito.Mockito.mock;
public final class H2IncrementalDumperCreator implements
DialectIncrementalDumperCreator {
@Override
- public IncrementalDumper createIncrementalDumper(final
IncrementalDumperContext context,
- final IngestPosition
position, final PipelineChannel channel, final PipelineTableMetaDataLoader
metaDataLoader) {
+ public IncrementalDumper createIncrementalDumper(final
CreateIncrementalDumperParameter param) {
return mock(IncrementalDumper.class);
}