This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new bf2439b5c25 Add IncrementalDumperCreator SPI to decouple 
IncrementalDumper and ScalingEntry (#20465)
bf2439b5c25 is described below

commit bf2439b5c25c51932760f80eaf87e89efc0941df
Author: Da Xiang Huang <[email protected]>
AuthorDate: Wed Aug 24 08:26:40 2022 +0800

    Add IncrementalDumperCreator SPI to decouple IncrementalDumper and 
ScalingEntry (#20465)
---
 .../ingest/dumper/IncrementalDumperCreator.java}   |  21 +++--
 .../dumper/IncrementalDumperCreatorFactory.java}   |  28 ++++--
 .../data/pipeline/core/task/IncrementalTask.java   |   5 +-
 .../scaling/core/job/dumper/DumperFactory.java     |  28 ------
 .../scaling/core/spi/ScalingEntry.java             |   7 --
 .../core/spi/fixture/ScalingEntryFixture.java      |   6 --
 .../mysql/MySQLIncrementalDumperCreator.java       |  44 +++++++++
 .../data/pipeline/mysql/MySQLScalingEntry.java     |   6 --
 ...ine.core.ingest.dumper.IncrementalDumperCreator |  19 ++++
 .../data/pipeline/mysql/MySQLScalingEntryTest.java |   3 -
 .../OpenGaussIncrementalDumperCreator.java         |  44 +++++++++
 .../pipeline/opengauss/OpenGaussScalingEntry.java  |   6 --
 ...ine.core.ingest.dumper.IncrementalDumperCreator |  19 ++++
 .../opengauss/OpenGaussScalingEntryTest.java       |   3 -
 .../PostgreSQLIncrementalDumperCreator.java        |  44 +++++++++
 .../postgresql/PostgreSQLScalingEntry.java         |   6 --
 ...ine.core.ingest.dumper.IncrementalDumperCreator |  18 ++++
 .../postgresql/PostgreSQLScalingEntryTest.java     |   3 -
 .../IncrementalDumperCreatorFactoryTest.java       | 101 +++++++++++++++++++++
 .../fixture/FixtureIncrementalDumperCreator.java   |  53 +++++++++++
 .../core/fixture/H2ScalingEntryFixture.java        |   6 --
 ...ine.core.ingest.dumper.IncrementalDumperCreator |  18 ++++
 22 files changed, 396 insertions(+), 92 deletions(-)

diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/IncrementalDumperCreator.java
similarity index 53%
copy from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
copy to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/IncrementalDumperCreator.java
index 16a446b0563..5583e056e1e 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/IncrementalDumperCreator.java
@@ -15,22 +15,31 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.scaling.core.spi;
+package org.apache.shardingsphere.data.pipeline.core.ingest.dumper;
 
+import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;
 import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
 import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
 
 /**
- * Scaling entry.
+ * Incremental dumper creator.
  */
 @SingletonSPI
-public interface ScalingEntry extends TypedSPI {
+public interface IncrementalDumperCreator<P> extends TypedSPI {
     
     /**
-     * Get incremental dumper type.
+     * Create incremental dumper.
      *
-     * @return incremental dumper type
+     * @param dumperConfig dumperConfig
+     * @param position position
+     * @param channel channel
+     * @param metaDataLoader metaDataLoader
+     * @return IncrementalDumper
      */
-    Class<? extends IncrementalDumper> getIncrementalDumperClass();
+    IncrementalDumper createIncrementalDumper(DumperConfiguration 
dumperConfig, IngestPosition<P> position,
+                                              PipelineChannel channel, 
PipelineTableMetaDataLoader metaDataLoader);
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/IncrementalDumperCreatorFactory.java
similarity index 54%
copy from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
copy to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/IncrementalDumperCreatorFactory.java
index 16a446b0563..4db34d487ef 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/IncrementalDumperCreatorFactory.java
@@ -15,22 +15,30 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.scaling.core.spi;
+package org.apache.shardingsphere.data.pipeline.core.ingest.dumper;
 
-import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;
-import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
-import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
+import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPIRegistry;
 
 /**
- * Scaling entry.
+ * Incremental dumper creator factory.
  */
-@SingletonSPI
-public interface ScalingEntry extends TypedSPI {
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class IncrementalDumperCreatorFactory {
+    
+    static {
+        ShardingSphereServiceLoader.register(IncrementalDumperCreator.class);
+    }
     
     /**
-     * Get incremental dumper type.
+     * Incremental dumper creator.
      *
-     * @return incremental dumper type
+     * @param databaseType database type
+     * @return Incremental dumper creator
      */
-    Class<? extends IncrementalDumper> getIncrementalDumperClass();
+    public static IncrementalDumperCreator getInstance(final String 
databaseType) {
+        return 
TypedSPIRegistry.getRegisteredService(IncrementalDumperCreator.class, 
databaseType);
+    }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index dd935785e80..df9150dff41 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -34,11 +34,11 @@ import 
org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTask
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecutionException;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumperCreatorFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreatorFactory;
 import 
org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.Dumper;
-import org.apache.shardingsphere.scaling.core.job.dumper.DumperFactory;
 
 import java.util.Collection;
 import java.util.LinkedList;
@@ -75,7 +75,8 @@ public final class IncrementalTask extends 
AbstractLifecycleExecutor implements
         IngestPosition<?> position = dumperConfig.getPosition();
         taskProgress = createIncrementalTaskProgress(position);
         channel = createChannel(concurrency, pipelineChannelCreator, 
taskProgress);
-        dumper = DumperFactory.createIncrementalDumper(dumperConfig, position, 
channel, sourceMetaDataLoader);
+        dumper = 
IncrementalDumperCreatorFactory.getInstance(dumperConfig.getDataSourceConfig().getDatabaseType().getType()).createIncrementalDumper(dumperConfig,
 position, channel,
+                sourceMetaDataLoader);
         importers = createImporters(concurrency, importerConfig, 
dataSourceManager, channel, jobProgressListener);
     }
     
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/dumper/DumperFactory.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/dumper/DumperFactory.java
index 33018249945..dadfcb92fd6 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/dumper/DumperFactory.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/dumper/DumperFactory.java
@@ -19,16 +19,6 @@ package org.apache.shardingsphere.scaling.core.job.dumper;
 
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
-import lombok.SneakyThrows;
-import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
-import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;
-import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
-import org.apache.shardingsphere.scaling.core.spi.ScalingEntryFactory;
-
-import java.lang.reflect.Constructor;
 
 /**
  * Dumper factory.
@@ -36,22 +26,4 @@ import java.lang.reflect.Constructor;
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
 public final class DumperFactory {
     
-    /**
-     * Create incremental dumper.
-     *
-     * @param dumperConfig dumper configuration
-     * @param position position
-     * @param channel channel
-     * @param sourceMetaDataLoader metadata loader
-     * @return incremental dumper
-     */
-    @SneakyThrows(ReflectiveOperationException.class)
-    public static IncrementalDumper createIncrementalDumper(final 
DumperConfiguration dumperConfig, final IngestPosition<?> position,
-                                                            final 
PipelineChannel channel, final PipelineTableMetaDataLoader 
sourceMetaDataLoader) {
-        String databaseType = 
dumperConfig.getDataSourceConfig().getDatabaseType().getType();
-        ScalingEntry scalingEntry = 
ScalingEntryFactory.getInstance(databaseType);
-        Constructor<? extends IncrementalDumper> constructor = 
scalingEntry.getIncrementalDumperClass()
-                .getConstructor(DumperConfiguration.class, 
IngestPosition.class, PipelineChannel.class, PipelineTableMetaDataLoader.class);
-        return constructor.newInstance(dumperConfig, position, channel, 
sourceMetaDataLoader);
-    }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
index 16a446b0563..0b48a2c7248 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.scaling.core.spi;
 
-import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;
 import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
 import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
 
@@ -27,10 +26,4 @@ import 
org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
 @SingletonSPI
 public interface ScalingEntry extends TypedSPI {
     
-    /**
-     * Get incremental dumper type.
-     *
-     * @return incremental dumper type
-     */
-    Class<? extends IncrementalDumper> getIncrementalDumperClass();
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/scaling/core/spi/fixture/ScalingEntryFixture.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/scaling/core/spi/fixture/ScalingEntryFixture.java
index a7977bdd659..5c6498c6a1f 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/scaling/core/spi/fixture/ScalingEntryFixture.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/scaling/core/spi/fixture/ScalingEntryFixture.java
@@ -17,16 +17,10 @@
 
 package org.apache.shardingsphere.scaling.core.spi.fixture;
 
-import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;
 import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
 
 public final class ScalingEntryFixture implements ScalingEntry {
     
-    @Override
-    public Class<? extends IncrementalDumper> getIncrementalDumperClass() {
-        return IncrementalDumper.class;
-    }
-    
     @Override
     public String getType() {
         return "FIXTURE";
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLIncrementalDumperCreator.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLIncrementalDumperCreator.java
new file mode 100644
index 00000000000..6fe23ba7a34
--- /dev/null
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLIncrementalDumperCreator.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.mysql;
+
+import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumperCreator;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.MySQLIncrementalDumper;
+import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.BinlogPosition;
+import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;
+
+/**
+ * MySql incremental dumper creator.
+ */
+public final class MySQLIncrementalDumperCreator implements 
IncrementalDumperCreator<BinlogPosition> {
+    
+    @Override
+    public IncrementalDumper createIncrementalDumper(final DumperConfiguration 
dumperConfig, final IngestPosition<BinlogPosition> position,
+                                                     final PipelineChannel 
channel, final PipelineTableMetaDataLoader metaDataLoader) {
+        return new MySQLIncrementalDumper(dumperConfig, position, channel, 
metaDataLoader);
+    }
+    
+    @Override
+    public String getType() {
+        return "MySQL";
+    }
+}
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLScalingEntry.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLScalingEntry.java
index 3943324a795..b7578b09250 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLScalingEntry.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLScalingEntry.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.data.pipeline.mysql;
 
-import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.MySQLIncrementalDumper;
 import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
 
 /**
@@ -25,11 +24,6 @@ import 
org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
  */
 public final class MySQLScalingEntry implements ScalingEntry {
     
-    @Override
-    public Class<MySQLIncrementalDumper> getIncrementalDumperClass() {
-        return MySQLIncrementalDumper.class;
-    }
-    
     @Override
     public String getType() {
         return "MySQL";
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumperCreator
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDump
 [...]
new file mode 100644
index 00000000000..187dd647dbd
--- /dev/null
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumperCreator
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.data.pipeline.mysql.MySQLIncrementalDumperCreator
+
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLScalingEntryTest.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLScalingEntryTest.java
index e90fa4981d1..6891062a62a 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLScalingEntryTest.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLScalingEntryTest.java
@@ -17,12 +17,10 @@
 
 package org.apache.shardingsphere.data.pipeline.mysql;
 
-import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.MySQLIncrementalDumper;
 import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
 import org.apache.shardingsphere.scaling.core.spi.ScalingEntryFactory;
 import org.junit.Test;
 
-import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.junit.Assert.assertThat;
 
@@ -32,6 +30,5 @@ public final class MySQLScalingEntryTest {
     public void assertGetScalingEntryByDatabaseType() {
         ScalingEntry scalingEntry = ScalingEntryFactory.getInstance("MySQL");
         assertThat(scalingEntry, instanceOf(MySQLScalingEntry.class));
-        assertThat(scalingEntry.getIncrementalDumperClass(), 
equalTo(MySQLIncrementalDumper.class));
     }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussIncrementalDumperCreator.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussIncrementalDumperCreator.java
new file mode 100644
index 00000000000..fce078e1591
--- /dev/null
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussIncrementalDumperCreator.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.opengauss;
+
+import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumperCreator;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import 
org.apache.shardingsphere.data.pipeline.opengauss.ingest.OpenGaussWalDumper;
+import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalPosition;
+import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;
+
+/**
+ * OpenGauss incremental dumper creator.
+ */
+public final class OpenGaussIncrementalDumperCreator implements 
IncrementalDumperCreator<WalPosition> {
+    
+    @Override
+    public IncrementalDumper createIncrementalDumper(final DumperConfiguration 
dumperConfig, final IngestPosition<WalPosition> position,
+                                                     final PipelineChannel 
channel, final PipelineTableMetaDataLoader metaDataLoader) {
+        return new OpenGaussWalDumper(dumperConfig, position, channel, 
metaDataLoader);
+    }
+    
+    @Override
+    public String getType() {
+        return "openGauss";
+    }
+}
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussScalingEntry.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussScalingEntry.java
index 8cdbadb8660..f7f5badac4a 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussScalingEntry.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussScalingEntry.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.data.pipeline.opengauss;
 
-import 
org.apache.shardingsphere.data.pipeline.opengauss.ingest.OpenGaussWalDumper;
 import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
 
 /**
@@ -25,11 +24,6 @@ import 
org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
  */
 public final class OpenGaussScalingEntry implements ScalingEntry {
     
-    @Override
-    public Class<OpenGaussWalDumper> getIncrementalDumperClass() {
-        return OpenGaussWalDumper.class;
-    }
-    
     @Override
     public String getType() {
         return "openGauss";
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumperCreator
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Increme
 [...]
new file mode 100644
index 00000000000..b0a5baf4884
--- /dev/null
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumperCreator
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.data.pipeline.opengauss.OpenGaussIncrementalDumperCreator
+
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussScalingEntryTest.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussScalingEntryTest.java
index b3534b91be9..97d642f0bbd 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussScalingEntryTest.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussScalingEntryTest.java
@@ -17,12 +17,10 @@
 
 package org.apache.shardingsphere.data.pipeline.opengauss;
 
-import 
org.apache.shardingsphere.data.pipeline.opengauss.ingest.OpenGaussWalDumper;
 import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
 import org.apache.shardingsphere.scaling.core.spi.ScalingEntryFactory;
 import org.junit.Test;
 
-import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.junit.Assert.assertThat;
 
@@ -32,6 +30,5 @@ public final class OpenGaussScalingEntryTest {
     public void assertGetScalingEntryByDatabaseType() {
         ScalingEntry actual = ScalingEntryFactory.getInstance("openGauss");
         assertThat(actual, instanceOf(OpenGaussScalingEntry.class));
-        assertThat(actual.getIncrementalDumperClass(), 
equalTo(OpenGaussWalDumper.class));
     }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLIncrementalDumperCreator.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLIncrementalDumperCreator.java
new file mode 100644
index 00000000000..684f191b380
--- /dev/null
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLIncrementalDumperCreator.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.postgresql;
+
+import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumperCreator;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLWalDumper;
+import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalPosition;
+import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;
+
+/**
+ * PostgreSQL incremental dumper creator.
+ */
+public final class PostgreSQLIncrementalDumperCreator implements 
IncrementalDumperCreator<WalPosition> {
+    
+    @Override
+    public IncrementalDumper createIncrementalDumper(final DumperConfiguration 
dumperConfig, final IngestPosition<WalPosition> position,
+                                                     final PipelineChannel 
channel, final PipelineTableMetaDataLoader metaDataLoader) {
+        return new PostgreSQLWalDumper(dumperConfig, position, channel, 
metaDataLoader);
+    }
+    
+    @Override
+    public String getType() {
+        return "PostgreSQL";
+    }
+}
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLScalingEntry.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLScalingEntry.java
index 1d04d0d25b8..202fd60f2b6 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLScalingEntry.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLScalingEntry.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.data.pipeline.postgresql;
 
-import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLWalDumper;
 import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
 
 /**
@@ -25,11 +24,6 @@ import 
org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
  */
 public final class PostgreSQLScalingEntry implements ScalingEntry {
     
-    @Override
-    public Class<PostgreSQLWalDumper> getIncrementalDumperClass() {
-        return PostgreSQLWalDumper.class;
-    }
-    
     @Override
     public String getType() {
         return "PostgreSQL";
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumperCreator
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Incre
 [...]
new file mode 100644
index 00000000000..901ed132d9d
--- /dev/null
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumperCreator
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.data.pipeline.postgresql.PostgreSQLIncrementalDumperCreator
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLScalingEntryTest.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLScalingEntryTest.java
index 067db98f702..d46e3e0836c 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLScalingEntryTest.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLScalingEntryTest.java
@@ -17,12 +17,10 @@
 
 package org.apache.shardingsphere.data.pipeline.postgresql;
 
-import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLWalDumper;
 import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
 import org.apache.shardingsphere.scaling.core.spi.ScalingEntryFactory;
 import org.junit.Test;
 
-import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.junit.Assert.assertThat;
 
@@ -32,6 +30,5 @@ public final class PostgreSQLScalingEntryTest {
     public void assertGetScalingEntryByDatabaseType() {
         ScalingEntry scalingEntry = 
ScalingEntryFactory.getInstance("PostgreSQL");
         assertThat(scalingEntry, instanceOf(PostgreSQLScalingEntry.class));
-        assertThat(scalingEntry.getIncrementalDumperClass(), 
equalTo(PostgreSQLWalDumper.class));
     }
 }
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/dumper/IncrementalDumperCreatorFactoryTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/dumper/IncrementalDumperCreatorFactoryTest.java
new file mode 100644
index 00000000000..6352be60159
--- /dev/null
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/dumper/IncrementalDumperCreatorFactoryTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.dumper;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertThat;
+import java.util.Collections;
+import 
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
+import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
+import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
+import 
org.apache.shardingsphere.data.pipeline.core.fixture.FixtureIncrementalDumper;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.SimpleMemoryPipelineChannel;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumperCreatorFactory;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.MySQLIncrementalDumper;
+import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.BinlogPosition;
+import 
org.apache.shardingsphere.data.pipeline.opengauss.ingest.OpenGaussWalDumper;
+import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLWalDumper;
+import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalPosition;
+import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+
+public final class IncrementalDumperCreatorFactoryTest {
+    
+    private PipelineDataSourceWrapper dataSource;
+    
+    @Mock
+    private WalPosition walPosition;
+    
+    @Before
+    public void setUp() {
+        PipelineDataSourceManager dataSourceManager = new 
DefaultPipelineDataSourceManager();
+        DumperConfiguration dumperConfig = mockDumperConfiguration();
+        dataSource = 
dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
+    }
+    
+    @Test
+    public void assertIncrementalDumperCreatorForMysql() {
+        IncrementalDumper actual = 
IncrementalDumperCreatorFactory.getInstance("MySQL")
+                .createIncrementalDumper(mockDumperConfiguration(), new 
BinlogPosition("binlog-000001", 4L), new SimpleMemoryPipelineChannel(100), new 
PipelineTableMetaDataLoader(dataSource));
+        assertThat(actual, instanceOf(MySQLIncrementalDumper.class));
+    }
+    
+    @Test
+    public void assertIncrementalDumperCreatorForPostgreSQL() {
+        IncrementalDumper actual = 
IncrementalDumperCreatorFactory.getInstance("PostgreSQL")
+                .createIncrementalDumper(mockDumperConfiguration(), 
walPosition, new SimpleMemoryPipelineChannel(100), new 
PipelineTableMetaDataLoader(dataSource));
+        assertThat(actual, instanceOf(PostgreSQLWalDumper.class));
+    }
+    
+    @Test
+    public void assertIncrementalDumperCreatorForOpenGauss() {
+        IncrementalDumper actual = 
IncrementalDumperCreatorFactory.getInstance("openGauss")
+                .createIncrementalDumper(mockDumperConfiguration(), 
walPosition, new SimpleMemoryPipelineChannel(100), new 
PipelineTableMetaDataLoader(dataSource));
+        assertThat(actual, instanceOf(OpenGaussWalDumper.class));
+    }
+    
+    @Test
+    public void assertIncrementalDumperCreatorForFixture() {
+        IncrementalDumper actual = 
IncrementalDumperCreatorFactory.getInstance("Fixture")
+                .createIncrementalDumper(mockDumperConfiguration(), 
walPosition, new SimpleMemoryPipelineChannel(100), new 
PipelineTableMetaDataLoader(dataSource));
+        assertThat(actual, instanceOf(FixtureIncrementalDumper.class));
+    }
+    
+    @Test
+    public void assertIncrementalDumperCreatorForH2() {
+        IncrementalDumper actual = 
IncrementalDumperCreatorFactory.getInstance("H2")
+                .createIncrementalDumper(mockDumperConfiguration(), 
walPosition, new SimpleMemoryPipelineChannel(100), new 
PipelineTableMetaDataLoader(dataSource));
+        assertThat(actual, instanceOf(FixtureIncrementalDumper.class));
+    }
+    
+    private DumperConfiguration mockDumperConfiguration() {
+        DumperConfiguration result = new DumperConfiguration();
+        result.setDataSourceConfig(new 
StandardPipelineDataSourceConfiguration("jdbc:mysql://127.0.0.1:3306/ds_0", 
"root", "root"));
+        result.setTableNameMap(Collections.singletonMap(new 
ActualTableName("t_order"), new LogicTableName("t_order")));
+        result.setTableNameSchemaNameMapping(new 
TableNameSchemaNameMapping(Collections.emptyMap()));
+        return result;
+    }
+}
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumperCreator.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumperCreator.java
new file mode 100644
index 00000000000..7ca4d4d49eb
--- /dev/null
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumperCreator.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.fixture;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumperCreator;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;
+
+/**
+ * Fixture incremental dumper creator.
+ */
+public final class FixtureIncrementalDumperCreator implements 
IncrementalDumperCreator<FinishedPosition> {
+    
+    private static final Collection<String> TYPE_ALIASES = 
Collections.unmodifiableList(Arrays.asList("Fixture", "H2"));
+    
+    @Override
+    public IncrementalDumper createIncrementalDumper(final DumperConfiguration 
dumperConfig, final IngestPosition<FinishedPosition> position,
+                                                     final PipelineChannel 
channel, final PipelineTableMetaDataLoader metaDataLoader) {
+        return new FixtureIncrementalDumper(dumperConfig, position, channel, 
metaDataLoader);
+    }
+    
+    @Override
+    public String getType() {
+        return "Fixture";
+    }
+    
+    @Override
+    public Collection<String> getTypeAliases() {
+        return TYPE_ALIASES;
+    }
+}
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/H2ScalingEntryFixture.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/H2ScalingEntryFixture.java
index 6b8f52196c6..9b30c7d3cae 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/H2ScalingEntryFixture.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/H2ScalingEntryFixture.java
@@ -17,16 +17,10 @@
 
 package org.apache.shardingsphere.data.pipeline.core.fixture;
 
-import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;
 import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
 
 public final class H2ScalingEntryFixture implements ScalingEntry {
     
-    @Override
-    public Class<? extends IncrementalDumper> getIncrementalDumperClass() {
-        return FixtureIncrementalDumper.class;
-    }
-    
     @Override
     public String getType() {
         return "H2";
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumperCreator
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumperCreator
new file mode 100644
index 00000000000..8e5d756607c
--- /dev/null
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumperCreator
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.data.pipeline.core.fixture.FixtureIncrementalDumperCreator

Reply via email to