This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 4cf4ed22d1e Add IncrementalDumperCreator (#32508)
4cf4ed22d1e is described below

commit 4cf4ed22d1e6e325bf2d47ed03d6a7583431b8bc
Author: Liang Zhang <[email protected]>
AuthorDate: Thu Aug 15 00:21:42 2024 +0800

    Add IncrementalDumperCreator (#32508)
    
    * Refactor MySQLIncrementalDumper
    
    * Add IncrementalDumperCreator
---
 .../incremental/IncrementalDumperCreator.java      | 49 ++++++++++++++++++++++
 .../mysql/ingest/MySQLColumnValueReader.java       |  5 +--
 .../mysql/ingest/MySQLIncrementalDumper.java       | 16 +++----
 .../pipeline/mysql/ingest/client/ConnectInfo.java  |  3 ++
 .../dumper/MySQLIncrementalDumperCreator.java      |  4 +-
 .../opengauss/ingest/OpenGaussWALDumper.java       |  8 +---
 .../dumper/OpenGaussIncrementalDumperCreator.java  |  4 +-
 .../postgresql/ingest/PostgreSQLWALDumper.java     | 14 +++----
 .../dumper/PostgreSQLIncrementalDumperCreator.java |  4 +-
 .../pipeline/cdc/core/prepare/CDCJobPreparer.java  |  6 +--
 .../migration/preparer/MigrationJobPreparer.java   |  6 +--
 .../h2/dumper/H2IncrementalDumperCreator.java      |  4 +-
 12 files changed, 78 insertions(+), 45 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/incremental/IncrementalDumperCreator.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/incremental/IncrementalDumperCreator.java
new file mode 100644
index 00000000000..48cd2a5d92e
--- /dev/null
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/incremental/IncrementalDumperCreator.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import 
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import 
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
+import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
+import 
org.apache.shardingsphere.infra.exception.generic.UnsupportedSQLOperationException;
+
+/**
+ * Incremental dumper creator.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class IncrementalDumperCreator {
+    
+    /**
+     * Create incremental dumper.
+     *
+     * @param dumperContext incremental dumper context
+     * @param channel channel
+     * @param metaDataLoader meta data loader
+     * @return incremental dumper
+     */
+    public static IncrementalDumper create(final IncrementalDumperContext 
dumperContext, final PipelineChannel channel, final PipelineTableMetaDataLoader 
metaDataLoader) {
+        
ShardingSpherePreconditions.checkState(dumperContext.getCommonContext().getDataSourceConfig()
 instanceof StandardPipelineDataSourceConfiguration,
+                () -> new UnsupportedSQLOperationException("Incremental dumper 
only support StandardPipelineDataSourceConfiguration"));
+        return 
DatabaseTypedSPILoader.getService(DialectIncrementalDumperCreator.class, 
dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType())
+                .createIncrementalDumper(dumperContext, 
dumperContext.getCommonContext().getPosition(), channel, metaDataLoader);
+    }
+}
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLColumnValueReader.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLColumnValueReader.java
index 32acba6c788..47d6ee5f2b5 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLColumnValueReader.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLColumnValueReader.java
@@ -33,10 +33,7 @@ public final class MySQLColumnValueReader implements 
DialectColumnValueReader {
     
     @Override
     public Optional<Object> read(final ResultSet resultSet, final 
ResultSetMetaData metaData, final int columnIndex) throws SQLException {
-        if (isYearDataType(metaData.getColumnTypeName(columnIndex))) {
-            return Optional.of(resultSet.getShort(columnIndex));
-        }
-        return Optional.empty();
+        return isYearDataType(metaData.getColumnTypeName(columnIndex)) ? 
Optional.of(resultSet.getShort(columnIndex)) : Optional.empty();
     }
     
     private boolean isYearDataType(final String columnDataTypeName) {
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index 43074284e25..83b6c4aa716 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -17,12 +17,11 @@
 
 package org.apache.shardingsphere.data.pipeline.mysql.ingest;
 
-import com.google.common.base.Preconditions;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
 import 
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
 import 
org.apache.shardingsphere.data.pipeline.core.execute.AbstractPipelineLifecycleRunnable;
-import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumper;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
@@ -30,8 +29,6 @@ import 
org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.record.PlaceholderRecord;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
-import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
-import 
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
@@ -46,10 +43,12 @@ import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.client.ConnectInfo;
 import org.apache.shardingsphere.data.pipeline.mysql.ingest.client.MySQLClient;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.MySQLDataTypeHandler;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.column.value.string.MySQLBinaryString;
+import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
 import 
org.apache.shardingsphere.infra.database.core.connector.ConnectionProperties;
 import 
org.apache.shardingsphere.infra.database.core.connector.ConnectionPropertiesParser;
 import 
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
+import 
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 
 import java.io.Serializable;
@@ -79,17 +78,14 @@ public final class MySQLIncrementalDumper extends 
AbstractPipelineLifecycleRunna
     
     private final String catalog;
     
-    public MySQLIncrementalDumper(final IncrementalDumperContext 
dumperContext, final IngestPosition binlogPosition,
-                                  final PipelineChannel channel, final 
PipelineTableMetaDataLoader metaDataLoader) {
-        
Preconditions.checkArgument(dumperContext.getCommonContext().getDataSourceConfig()
 instanceof StandardPipelineDataSourceConfiguration,
-                "MySQLBinlogDumper only support 
StandardPipelineDataSourceConfiguration");
+    public MySQLIncrementalDumper(final IncrementalDumperContext 
dumperContext, final IngestPosition binlogPosition, final PipelineChannel 
channel, final PipelineTableMetaDataLoader metaDataLoader) {
         this.dumperContext = dumperContext;
         this.binlogPosition = (BinlogPosition) binlogPosition;
         this.channel = channel;
         this.metaDataLoader = metaDataLoader;
         StandardPipelineDataSourceConfiguration pipelineDataSourceConfig = 
(StandardPipelineDataSourceConfiguration) 
dumperContext.getCommonContext().getDataSourceConfig();
-        ConnectionPropertiesParser parser = 
DatabaseTypedSPILoader.getService(ConnectionPropertiesParser.class, 
TypedSPILoader.getService(DatabaseType.class, "MySQL"));
-        ConnectionProperties connectionProps = 
parser.parse(pipelineDataSourceConfig.getUrl(), null, null);
+        DatabaseType databaseType = 
TypedSPILoader.getService(DatabaseType.class, "MySQL");
+        ConnectionProperties connectionProps = 
DatabaseTypedSPILoader.getService(ConnectionPropertiesParser.class, 
databaseType).parse(pipelineDataSourceConfig.getUrl(), null, null);
         ConnectInfo connectInfo = new ConnectInfo(
                 generateServerId(), connectionProps.getHostname(), 
connectionProps.getPort(), pipelineDataSourceConfig.getUsername(), 
pipelineDataSourceConfig.getPassword());
         log.info("incremental dump, jdbcUrl={}, serverId={}, hostname={}, 
port={}", pipelineDataSourceConfig.getUrl(), connectInfo.getServerId(), 
connectInfo.getHost(), connectInfo.getPort());
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/ConnectInfo.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/ConnectInfo.java
index 6e275029ffe..159c67475cc 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/ConnectInfo.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/ConnectInfo.java
@@ -20,6 +20,9 @@ package 
org.apache.shardingsphere.data.pipeline.mysql.ingest.client;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 
+/**
+ * Connect info.
+ */
 @RequiredArgsConstructor
 @Getter
 public final class ConnectInfo {
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/dumper/MySQLIncrementalDumperCreator.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/dumper/MySQLIncrementalDumperCreator.java
index 3ca55e71bd5..23275e1744d 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/dumper/MySQLIncrementalDumperCreator.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/dumper/MySQLIncrementalDumperCreator.java
@@ -31,8 +31,8 @@ import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.Di
 public final class MySQLIncrementalDumperCreator implements 
DialectIncrementalDumperCreator {
     
     @Override
-    public IncrementalDumper createIncrementalDumper(final 
IncrementalDumperContext context, final IngestPosition position,
-                                                     final PipelineChannel 
channel, final PipelineTableMetaDataLoader metaDataLoader) {
+    public IncrementalDumper createIncrementalDumper(final 
IncrementalDumperContext context,
+                                                     final IngestPosition 
position, final PipelineChannel channel, final PipelineTableMetaDataLoader 
metaDataLoader) {
         return new MySQLIncrementalDumper(context, position, channel, 
metaDataLoader);
     }
     
diff --git 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
index 54918dc2c7b..088c1686185 100644
--- 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
+++ 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
@@ -41,8 +41,6 @@ import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.Abstr
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.BeginTXEvent;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.CommitTXEvent;
 import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
-import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
-import 
org.apache.shardingsphere.infra.exception.generic.UnsupportedSQLOperationException;
 import org.opengauss.jdbc.PgConnection;
 import org.opengauss.replication.PGReplicationStream;
 
@@ -89,8 +87,6 @@ public final class OpenGaussWALDumper extends 
AbstractPipelineLifecycleRunnable
     
     public OpenGaussWALDumper(final IncrementalDumperContext dumperContext, 
final IngestPosition position,
                               final PipelineChannel channel, final 
PipelineTableMetaDataLoader metaDataLoader) {
-        
ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperContext.getCommonContext().getDataSourceConfig().getClass()),
-                () -> new 
UnsupportedSQLOperationException("PostgreSQLWALDumper only support 
PipelineDataSourceConfiguration"));
         this.dumperContext = dumperContext;
         walPosition = new AtomicReference<>((WALPosition) position);
         this.channel = channel;
@@ -125,8 +121,8 @@ public final class OpenGaussWALDumper extends 
AbstractPipelineLifecycleRunnable
         PGReplicationStream stream = null;
         int majorVersion = getMajorVersion();
         try (PgConnection connection = getReplicationConnectionUnwrap()) {
-            stream = logicalReplication.createReplicationStream(connection, 
walPosition.get().getLogSequenceNumber(),
-                    PostgreSQLSlotNameGenerator.getUniqueSlotName(connection, 
dumperContext.getJobId()), majorVersion);
+            stream = logicalReplication.createReplicationStream(
+                    connection, walPosition.get().getLogSequenceNumber(), 
PostgreSQLSlotNameGenerator.getUniqueSlotName(connection, 
dumperContext.getJobId()), majorVersion);
             DecodingPlugin decodingPlugin = new MppdbDecodingPlugin(new 
OpenGaussTimestampUtils(connection.getTimestampUtils()), decodeWithTX, 
majorVersion >= 3);
             while (isRunning()) {
                 ByteBuffer message = stream.readPending();
diff --git 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/dumper/OpenGaussIncrementalDumperCreator.java
 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/dumper/OpenGaussIncrementalDumperCreator.java
index b4bee325669..7814e44d3fc 100644
--- 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/dumper/OpenGaussIncrementalDumperCreator.java
+++ 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/dumper/OpenGaussIncrementalDumperCreator.java
@@ -31,8 +31,8 @@ import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.Di
 public final class OpenGaussIncrementalDumperCreator implements 
DialectIncrementalDumperCreator {
     
     @Override
-    public IncrementalDumper createIncrementalDumper(final 
IncrementalDumperContext context, final IngestPosition position,
-                                                     final PipelineChannel 
channel, final PipelineTableMetaDataLoader metaDataLoader) {
+    public IncrementalDumper createIncrementalDumper(final 
IncrementalDumperContext context,
+                                                     final IngestPosition 
position, final PipelineChannel channel, final PipelineTableMetaDataLoader 
metaDataLoader) {
         return new OpenGaussWALDumper(context, position, channel, 
metaDataLoader);
     }
     
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
index 7efb7e9d27a..7d8563bda56 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
@@ -19,15 +19,15 @@ package 
org.apache.shardingsphere.data.pipeline.postgresql.ingest;
 
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.core.execute.AbstractPipelineLifecycleRunnable;
 import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.core.exception.IngestException;
+import 
org.apache.shardingsphere.data.pipeline.core.execute.AbstractPipelineLifecycleRunnable;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumper;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.core.exception.IngestException;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.slot.PostgreSQLSlotNameGenerator;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.PostgreSQLLogicalReplication;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALEventConverter;
@@ -41,8 +41,6 @@ import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.Abstr
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.BeginTXEvent;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.CommitTXEvent;
 import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
-import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
-import 
org.apache.shardingsphere.infra.exception.generic.UnsupportedSQLOperationException;
 import org.postgresql.jdbc.PgConnection;
 import org.postgresql.replication.PGReplicationStream;
 
@@ -79,8 +77,6 @@ public final class PostgreSQLWALDumper extends 
AbstractPipelineLifecycleRunnable
     
     public PostgreSQLWALDumper(final IncrementalDumperContext dumperContext, 
final IngestPosition position,
                                final PipelineChannel channel, final 
PipelineTableMetaDataLoader metaDataLoader) {
-        
ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperContext.getCommonContext().getDataSourceConfig().getClass()),
-                () -> new 
UnsupportedSQLOperationException("PostgreSQLWALDumper only support 
PipelineDataSourceConfiguration"));
         this.dumperContext = dumperContext;
         walPosition = new AtomicReference<>((WALPosition) position);
         this.channel = channel;
@@ -115,8 +111,8 @@ public final class PostgreSQLWALDumper extends 
AbstractPipelineLifecycleRunnable
         // TODO use unified PgConnection
         try (
                 Connection connection = 
logicalReplication.createConnection((StandardPipelineDataSourceConfiguration) 
dumperContext.getCommonContext().getDataSourceConfig());
-                PGReplicationStream stream = 
logicalReplication.createReplicationStream(connection, 
PostgreSQLSlotNameGenerator.getUniqueSlotName(connection, 
dumperContext.getJobId()),
-                        walPosition.get().getLogSequenceNumber())) {
+                PGReplicationStream stream = 
logicalReplication.createReplicationStream(
+                        connection, 
PostgreSQLSlotNameGenerator.getUniqueSlotName(connection, 
dumperContext.getJobId()), walPosition.get().getLogSequenceNumber())) {
             PostgreSQLTimestampUtils utils = new 
PostgreSQLTimestampUtils(connection.unwrap(PgConnection.class).getTimestampUtils());
             DecodingPlugin decodingPlugin = new TestDecodingPlugin(utils);
             while (isRunning()) {
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/dumper/PostgreSQLIncrementalDumperCreator.java
 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/dumper/PostgreSQLIncrementalDumperCreator.java
index e8a073e34ec..a7cc59f1411 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/dumper/PostgreSQLIncrementalDumperCreator.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/dumper/PostgreSQLIncrementalDumperCreator.java
@@ -31,8 +31,8 @@ import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.Di
 public final class PostgreSQLIncrementalDumperCreator implements 
DialectIncrementalDumperCreator {
     
     @Override
-    public IncrementalDumper createIncrementalDumper(final 
IncrementalDumperContext context, final IngestPosition position,
-                                                     final PipelineChannel 
channel, final PipelineTableMetaDataLoader metaDataLoader) {
+    public IncrementalDumper createIncrementalDumper(final 
IncrementalDumperContext context,
+                                                     final IngestPosition 
position, final PipelineChannel channel, final PipelineTableMetaDataLoader 
metaDataLoader) {
         return new PostgreSQLWALDumper(context, position, channel, 
metaDataLoader);
     }
     
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index 70874a8c6d6..4341d35c924 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -31,8 +31,8 @@ import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWith
 import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
 import 
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.DialectIncrementalDumperCreator;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperCreator;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumper;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
@@ -46,7 +46,6 @@ import 
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.Inventory
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress;
-import 
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 
 import java.sql.SQLException;
@@ -140,8 +139,7 @@ public final class CDCJobPreparer {
         IncrementalTaskProgress taskProgress = 
PipelineTaskUtils.createIncrementalTaskProgress(dumperContext.getCommonContext().getPosition(),
 jobItemContext.getInitProgress());
         PipelineChannel channel = 
PipelineTaskUtils.createIncrementalChannel(jobItemContext.getJobProcessContext().getProcessConfiguration().getStreamChannel(),
 taskProgress);
         channelProgressPairs.add(new CDCChannelProgressPair(channel, 
jobItemContext));
-        Dumper dumper = 
DatabaseTypedSPILoader.getService(DialectIncrementalDumperCreator.class, 
dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType())
-                .createIncrementalDumper(dumperContext, 
dumperContext.getCommonContext().getPosition(), channel, 
jobItemContext.getSourceMetaDataLoader());
+        Dumper dumper = IncrementalDumperCreator.create(dumperContext, 
channel, jobItemContext.getSourceMetaDataLoader());
         boolean needSorting = jobItemContext.getJobConfig().isDecodeWithTX();
         Importer importer = importerUsed.get() ? null
                 : new CDCImporter(channelProgressPairs, 1, 100L, 
jobItemContext.getSink(), needSorting, 
taskConfig.getImporterConfig().getRateLimitAlgorithm());
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
index 0f4ad4a36b4..a05eb2e58d0 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
@@ -33,6 +33,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.importer.SingleChannelConsum
 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.DialectIncrementalDumperCreator;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperCreator;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
 import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
@@ -42,7 +43,6 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncremen
 import org.apache.shardingsphere.data.pipeline.core.job.progress.JobOffsetInfo;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.PipelineJobDataSourcePreparer;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.CreateTableConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter;
@@ -191,13 +191,11 @@ public final class MigrationJobPreparer {
     
     private void initIncrementalTasks(final MigrationJobItemContext 
jobItemContext) {
         MigrationTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
-        PipelineTableMetaDataLoader sourceMetaDataLoader = 
jobItemContext.getSourceMetaDataLoader();
         IncrementalDumperContext dumperContext = taskConfig.getDumperContext();
         ExecuteEngine incrementalExecuteEngine = 
jobItemContext.getJobProcessContext().getIncrementalExecuteEngine();
         IncrementalTaskProgress taskProgress = 
PipelineTaskUtils.createIncrementalTaskProgress(dumperContext.getCommonContext().getPosition(),
 jobItemContext.getInitProgress());
         PipelineChannel channel = 
PipelineTaskUtils.createIncrementalChannel(jobItemContext.getJobProcessContext().getProcessConfiguration().getStreamChannel(),
 taskProgress);
-        Dumper dumper = 
DatabaseTypedSPILoader.getService(DialectIncrementalDumperCreator.class, 
dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType())
-                .createIncrementalDumper(dumperContext, 
dumperContext.getCommonContext().getPosition(), channel, sourceMetaDataLoader);
+        Dumper dumper = IncrementalDumperCreator.create(dumperContext, 
channel, jobItemContext.getSourceMetaDataLoader());
         Collection<Importer> importers = Collections.singletonList(new 
SingleChannelConsumerImporter(channel, 1, 5L, jobItemContext.getSink(), 
jobItemContext));
         PipelineTask incrementalTask = new 
IncrementalTask(dumperContext.getCommonContext().getDataSourceName(), 
incrementalExecuteEngine, dumper, importers, taskProgress);
         jobItemContext.getIncrementalTasks().add(incrementalTask);
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/h2/dumper/H2IncrementalDumperCreator.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/h2/dumper/H2IncrementalDumperCreator.java
index f0b8e831303..23c432cd61e 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/h2/dumper/H2IncrementalDumperCreator.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/h2/dumper/H2IncrementalDumperCreator.java
@@ -32,8 +32,8 @@ import static org.mockito.Mockito.mock;
 public final class H2IncrementalDumperCreator implements 
DialectIncrementalDumperCreator {
     
     @Override
-    public IncrementalDumper createIncrementalDumper(final 
IncrementalDumperContext context, final IngestPosition position,
-                                                     final PipelineChannel 
channel, final PipelineTableMetaDataLoader metaDataLoader) {
+    public IncrementalDumper createIncrementalDumper(final 
IncrementalDumperContext context,
+                                                     final IngestPosition 
position, final PipelineChannel channel, final PipelineTableMetaDataLoader 
metaDataLoader) {
         return mock(IncrementalDumper.class);
     }
     

Reply via email to