This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 4cf4ed22d1e Add IncrementalDumperCreator (#32508)
4cf4ed22d1e is described below
commit 4cf4ed22d1e6e325bf2d47ed03d6a7583431b8bc
Author: Liang Zhang <[email protected]>
AuthorDate: Thu Aug 15 00:21:42 2024 +0800
Add IncrementalDumperCreator (#32508)
* Refactor MySQLIncrementalDumper
* Add IncrementalDumperCreator
---
.../incremental/IncrementalDumperCreator.java | 49 ++++++++++++++++++++++
.../mysql/ingest/MySQLColumnValueReader.java | 5 +--
.../mysql/ingest/MySQLIncrementalDumper.java | 16 +++----
.../pipeline/mysql/ingest/client/ConnectInfo.java | 3 ++
.../dumper/MySQLIncrementalDumperCreator.java | 4 +-
.../opengauss/ingest/OpenGaussWALDumper.java | 8 +---
.../dumper/OpenGaussIncrementalDumperCreator.java | 4 +-
.../postgresql/ingest/PostgreSQLWALDumper.java | 14 +++----
.../dumper/PostgreSQLIncrementalDumperCreator.java | 4 +-
.../pipeline/cdc/core/prepare/CDCJobPreparer.java | 6 +--
.../migration/preparer/MigrationJobPreparer.java | 6 +--
.../h2/dumper/H2IncrementalDumperCreator.java | 4 +-
12 files changed, 78 insertions(+), 45 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/incremental/IncrementalDumperCreator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/incremental/IncrementalDumperCreator.java
new file mode 100644
index 00000000000..48cd2a5d92e
--- /dev/null
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/incremental/IncrementalDumperCreator.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
+import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
+import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
+import
org.apache.shardingsphere.infra.exception.generic.UnsupportedSQLOperationException;
+
+/**
+ * Incremental dumper creator.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class IncrementalDumperCreator {
+
+ /**
+ * Create incremental dumper.
+ *
+ * @param dumperContext incremental dumper context
+ * @param channel channel
+ * @param metaDataLoader meta data loader
+ * @return incremental dumper
+ */
+ public static IncrementalDumper create(final IncrementalDumperContext
dumperContext, final PipelineChannel channel, final PipelineTableMetaDataLoader
metaDataLoader) {
+
ShardingSpherePreconditions.checkState(dumperContext.getCommonContext().getDataSourceConfig()
instanceof StandardPipelineDataSourceConfiguration,
+ () -> new UnsupportedSQLOperationException("Incremental dumper
only support StandardPipelineDataSourceConfiguration"));
+ return
DatabaseTypedSPILoader.getService(DialectIncrementalDumperCreator.class,
dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType())
+ .createIncrementalDumper(dumperContext,
dumperContext.getCommonContext().getPosition(), channel, metaDataLoader);
+ }
+}
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLColumnValueReader.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLColumnValueReader.java
index 32acba6c788..47d6ee5f2b5 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLColumnValueReader.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLColumnValueReader.java
@@ -33,10 +33,7 @@ public final class MySQLColumnValueReader implements
DialectColumnValueReader {
@Override
public Optional<Object> read(final ResultSet resultSet, final
ResultSetMetaData metaData, final int columnIndex) throws SQLException {
- if (isYearDataType(metaData.getColumnTypeName(columnIndex))) {
- return Optional.of(resultSet.getShort(columnIndex));
- }
- return Optional.empty();
+ return isYearDataType(metaData.getColumnTypeName(columnIndex)) ?
Optional.of(resultSet.getShort(columnIndex)) : Optional.empty();
}
private boolean isYearDataType(final String columnDataTypeName) {
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index 43074284e25..83b6c4aa716 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -17,12 +17,11 @@
package org.apache.shardingsphere.data.pipeline.mysql.ingest;
-import com.google.common.base.Preconditions;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
import
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
import
org.apache.shardingsphere.data.pipeline.core.execute.AbstractPipelineLifecycleRunnable;
-import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumper;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
@@ -30,8 +29,6 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
import
org.apache.shardingsphere.data.pipeline.core.ingest.record.PlaceholderRecord;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
-import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
-import
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
@@ -46,10 +43,12 @@ import
org.apache.shardingsphere.data.pipeline.mysql.ingest.client.ConnectInfo;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.client.MySQLClient;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.MySQLDataTypeHandler;
import
org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.column.value.string.MySQLBinaryString;
+import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
import
org.apache.shardingsphere.infra.database.core.connector.ConnectionProperties;
import
org.apache.shardingsphere.infra.database.core.connector.ConnectionPropertiesParser;
import
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
+import
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import java.io.Serializable;
@@ -79,17 +78,14 @@ public final class MySQLIncrementalDumper extends
AbstractPipelineLifecycleRunna
private final String catalog;
- public MySQLIncrementalDumper(final IncrementalDumperContext
dumperContext, final IngestPosition binlogPosition,
- final PipelineChannel channel, final
PipelineTableMetaDataLoader metaDataLoader) {
-
Preconditions.checkArgument(dumperContext.getCommonContext().getDataSourceConfig()
instanceof StandardPipelineDataSourceConfiguration,
- "MySQLBinlogDumper only support
StandardPipelineDataSourceConfiguration");
+ public MySQLIncrementalDumper(final IncrementalDumperContext
dumperContext, final IngestPosition binlogPosition, final PipelineChannel
channel, final PipelineTableMetaDataLoader metaDataLoader) {
this.dumperContext = dumperContext;
this.binlogPosition = (BinlogPosition) binlogPosition;
this.channel = channel;
this.metaDataLoader = metaDataLoader;
StandardPipelineDataSourceConfiguration pipelineDataSourceConfig =
(StandardPipelineDataSourceConfiguration)
dumperContext.getCommonContext().getDataSourceConfig();
- ConnectionPropertiesParser parser =
DatabaseTypedSPILoader.getService(ConnectionPropertiesParser.class,
TypedSPILoader.getService(DatabaseType.class, "MySQL"));
- ConnectionProperties connectionProps =
parser.parse(pipelineDataSourceConfig.getUrl(), null, null);
+ DatabaseType databaseType =
TypedSPILoader.getService(DatabaseType.class, "MySQL");
+ ConnectionProperties connectionProps =
DatabaseTypedSPILoader.getService(ConnectionPropertiesParser.class,
databaseType).parse(pipelineDataSourceConfig.getUrl(), null, null);
ConnectInfo connectInfo = new ConnectInfo(
generateServerId(), connectionProps.getHostname(),
connectionProps.getPort(), pipelineDataSourceConfig.getUsername(),
pipelineDataSourceConfig.getPassword());
log.info("incremental dump, jdbcUrl={}, serverId={}, hostname={},
port={}", pipelineDataSourceConfig.getUrl(), connectInfo.getServerId(),
connectInfo.getHost(), connectInfo.getPort());
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/ConnectInfo.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/ConnectInfo.java
index 6e275029ffe..159c67475cc 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/ConnectInfo.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/ConnectInfo.java
@@ -20,6 +20,9 @@ package
org.apache.shardingsphere.data.pipeline.mysql.ingest.client;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
+/**
+ * Connect info.
+ */
@RequiredArgsConstructor
@Getter
public final class ConnectInfo {
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/dumper/MySQLIncrementalDumperCreator.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/dumper/MySQLIncrementalDumperCreator.java
index 3ca55e71bd5..23275e1744d 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/dumper/MySQLIncrementalDumperCreator.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/dumper/MySQLIncrementalDumperCreator.java
@@ -31,8 +31,8 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.Di
public final class MySQLIncrementalDumperCreator implements
DialectIncrementalDumperCreator {
@Override
- public IncrementalDumper createIncrementalDumper(final
IncrementalDumperContext context, final IngestPosition position,
- final PipelineChannel
channel, final PipelineTableMetaDataLoader metaDataLoader) {
+ public IncrementalDumper createIncrementalDumper(final
IncrementalDumperContext context,
+ final IngestPosition
position, final PipelineChannel channel, final PipelineTableMetaDataLoader
metaDataLoader) {
return new MySQLIncrementalDumper(context, position, channel,
metaDataLoader);
}
diff --git
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
index 54918dc2c7b..088c1686185 100644
---
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
+++
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
@@ -41,8 +41,6 @@ import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.Abstr
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.BeginTXEvent;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.CommitTXEvent;
import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
-import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
-import
org.apache.shardingsphere.infra.exception.generic.UnsupportedSQLOperationException;
import org.opengauss.jdbc.PgConnection;
import org.opengauss.replication.PGReplicationStream;
@@ -89,8 +87,6 @@ public final class OpenGaussWALDumper extends
AbstractPipelineLifecycleRunnable
public OpenGaussWALDumper(final IncrementalDumperContext dumperContext,
final IngestPosition position,
final PipelineChannel channel, final
PipelineTableMetaDataLoader metaDataLoader) {
-
ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperContext.getCommonContext().getDataSourceConfig().getClass()),
- () -> new
UnsupportedSQLOperationException("PostgreSQLWALDumper only support
PipelineDataSourceConfiguration"));
this.dumperContext = dumperContext;
walPosition = new AtomicReference<>((WALPosition) position);
this.channel = channel;
@@ -125,8 +121,8 @@ public final class OpenGaussWALDumper extends
AbstractPipelineLifecycleRunnable
PGReplicationStream stream = null;
int majorVersion = getMajorVersion();
try (PgConnection connection = getReplicationConnectionUnwrap()) {
- stream = logicalReplication.createReplicationStream(connection,
walPosition.get().getLogSequenceNumber(),
- PostgreSQLSlotNameGenerator.getUniqueSlotName(connection,
dumperContext.getJobId()), majorVersion);
+ stream = logicalReplication.createReplicationStream(
+ connection, walPosition.get().getLogSequenceNumber(),
PostgreSQLSlotNameGenerator.getUniqueSlotName(connection,
dumperContext.getJobId()), majorVersion);
DecodingPlugin decodingPlugin = new MppdbDecodingPlugin(new
OpenGaussTimestampUtils(connection.getTimestampUtils()), decodeWithTX,
majorVersion >= 3);
while (isRunning()) {
ByteBuffer message = stream.readPending();
diff --git
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/dumper/OpenGaussIncrementalDumperCreator.java
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/dumper/OpenGaussIncrementalDumperCreator.java
index b4bee325669..7814e44d3fc 100644
---
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/dumper/OpenGaussIncrementalDumperCreator.java
+++
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/dumper/OpenGaussIncrementalDumperCreator.java
@@ -31,8 +31,8 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.Di
public final class OpenGaussIncrementalDumperCreator implements
DialectIncrementalDumperCreator {
@Override
- public IncrementalDumper createIncrementalDumper(final
IncrementalDumperContext context, final IngestPosition position,
- final PipelineChannel
channel, final PipelineTableMetaDataLoader metaDataLoader) {
+ public IncrementalDumper createIncrementalDumper(final
IncrementalDumperContext context,
+ final IngestPosition
position, final PipelineChannel channel, final PipelineTableMetaDataLoader
metaDataLoader) {
return new OpenGaussWALDumper(context, position, channel,
metaDataLoader);
}
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
index 7efb7e9d27a..7d8563bda56 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
@@ -19,15 +19,15 @@ package
org.apache.shardingsphere.data.pipeline.postgresql.ingest;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
-import
org.apache.shardingsphere.data.pipeline.core.execute.AbstractPipelineLifecycleRunnable;
import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.core.exception.IngestException;
+import
org.apache.shardingsphere.data.pipeline.core.execute.AbstractPipelineLifecycleRunnable;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumper;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.core.exception.IngestException;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.slot.PostgreSQLSlotNameGenerator;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.PostgreSQLLogicalReplication;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALEventConverter;
@@ -41,8 +41,6 @@ import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.Abstr
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.BeginTXEvent;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.CommitTXEvent;
import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
-import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
-import
org.apache.shardingsphere.infra.exception.generic.UnsupportedSQLOperationException;
import org.postgresql.jdbc.PgConnection;
import org.postgresql.replication.PGReplicationStream;
@@ -79,8 +77,6 @@ public final class PostgreSQLWALDumper extends
AbstractPipelineLifecycleRunnable
public PostgreSQLWALDumper(final IncrementalDumperContext dumperContext,
final IngestPosition position,
final PipelineChannel channel, final
PipelineTableMetaDataLoader metaDataLoader) {
-
ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperContext.getCommonContext().getDataSourceConfig().getClass()),
- () -> new
UnsupportedSQLOperationException("PostgreSQLWALDumper only support
PipelineDataSourceConfiguration"));
this.dumperContext = dumperContext;
walPosition = new AtomicReference<>((WALPosition) position);
this.channel = channel;
@@ -115,8 +111,8 @@ public final class PostgreSQLWALDumper extends
AbstractPipelineLifecycleRunnable
// TODO use unified PgConnection
try (
Connection connection =
logicalReplication.createConnection((StandardPipelineDataSourceConfiguration)
dumperContext.getCommonContext().getDataSourceConfig());
- PGReplicationStream stream =
logicalReplication.createReplicationStream(connection,
PostgreSQLSlotNameGenerator.getUniqueSlotName(connection,
dumperContext.getJobId()),
- walPosition.get().getLogSequenceNumber())) {
+ PGReplicationStream stream =
logicalReplication.createReplicationStream(
+ connection,
PostgreSQLSlotNameGenerator.getUniqueSlotName(connection,
dumperContext.getJobId()), walPosition.get().getLogSequenceNumber())) {
PostgreSQLTimestampUtils utils = new
PostgreSQLTimestampUtils(connection.unwrap(PgConnection.class).getTimestampUtils());
DecodingPlugin decodingPlugin = new TestDecodingPlugin(utils);
while (isRunning()) {
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/dumper/PostgreSQLIncrementalDumperCreator.java
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/dumper/PostgreSQLIncrementalDumperCreator.java
index e8a073e34ec..a7cc59f1411 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/dumper/PostgreSQLIncrementalDumperCreator.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/dumper/PostgreSQLIncrementalDumperCreator.java
@@ -31,8 +31,8 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.Di
public final class PostgreSQLIncrementalDumperCreator implements
DialectIncrementalDumperCreator {
@Override
- public IncrementalDumper createIncrementalDumper(final
IncrementalDumperContext context, final IngestPosition position,
- final PipelineChannel
channel, final PipelineTableMetaDataLoader metaDataLoader) {
+ public IncrementalDumper createIncrementalDumper(final
IncrementalDumperContext context,
+ final IngestPosition
position, final PipelineChannel channel, final PipelineTableMetaDataLoader
metaDataLoader) {
return new PostgreSQLWALDumper(context, position, channel,
metaDataLoader);
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index 70874a8c6d6..4341d35c924 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -31,8 +31,8 @@ import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWith
import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
import
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.DialectIncrementalDumperCreator;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperCreator;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumper;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
@@ -46,7 +46,6 @@ import
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.Inventory
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
import
org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress;
-import
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import java.sql.SQLException;
@@ -140,8 +139,7 @@ public final class CDCJobPreparer {
IncrementalTaskProgress taskProgress =
PipelineTaskUtils.createIncrementalTaskProgress(dumperContext.getCommonContext().getPosition(),
jobItemContext.getInitProgress());
PipelineChannel channel =
PipelineTaskUtils.createIncrementalChannel(jobItemContext.getJobProcessContext().getProcessConfiguration().getStreamChannel(),
taskProgress);
channelProgressPairs.add(new CDCChannelProgressPair(channel,
jobItemContext));
- Dumper dumper =
DatabaseTypedSPILoader.getService(DialectIncrementalDumperCreator.class,
dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType())
- .createIncrementalDumper(dumperContext,
dumperContext.getCommonContext().getPosition(), channel,
jobItemContext.getSourceMetaDataLoader());
+ Dumper dumper = IncrementalDumperCreator.create(dumperContext,
channel, jobItemContext.getSourceMetaDataLoader());
boolean needSorting = jobItemContext.getJobConfig().isDecodeWithTX();
Importer importer = importerUsed.get() ? null
: new CDCImporter(channelProgressPairs, 1, 100L,
jobItemContext.getSink(), needSorting,
taskConfig.getImporterConfig().getRateLimitAlgorithm());
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
index 0f4ad4a36b4..a05eb2e58d0 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
@@ -33,6 +33,7 @@ import
org.apache.shardingsphere.data.pipeline.core.importer.SingleChannelConsum
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.DialectIncrementalDumperCreator;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperCreator;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
@@ -42,7 +43,6 @@ import
org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncremen
import org.apache.shardingsphere.data.pipeline.core.job.progress.JobOffsetInfo;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
-import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.PipelineJobDataSourcePreparer;
import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.CreateTableConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter;
@@ -191,13 +191,11 @@ public final class MigrationJobPreparer {
private void initIncrementalTasks(final MigrationJobItemContext
jobItemContext) {
MigrationTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
- PipelineTableMetaDataLoader sourceMetaDataLoader =
jobItemContext.getSourceMetaDataLoader();
IncrementalDumperContext dumperContext = taskConfig.getDumperContext();
ExecuteEngine incrementalExecuteEngine =
jobItemContext.getJobProcessContext().getIncrementalExecuteEngine();
IncrementalTaskProgress taskProgress =
PipelineTaskUtils.createIncrementalTaskProgress(dumperContext.getCommonContext().getPosition(),
jobItemContext.getInitProgress());
PipelineChannel channel =
PipelineTaskUtils.createIncrementalChannel(jobItemContext.getJobProcessContext().getProcessConfiguration().getStreamChannel(),
taskProgress);
- Dumper dumper =
DatabaseTypedSPILoader.getService(DialectIncrementalDumperCreator.class,
dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType())
- .createIncrementalDumper(dumperContext,
dumperContext.getCommonContext().getPosition(), channel, sourceMetaDataLoader);
+ Dumper dumper = IncrementalDumperCreator.create(dumperContext,
channel, jobItemContext.getSourceMetaDataLoader());
Collection<Importer> importers = Collections.singletonList(new
SingleChannelConsumerImporter(channel, 1, 5L, jobItemContext.getSink(),
jobItemContext));
PipelineTask incrementalTask = new
IncrementalTask(dumperContext.getCommonContext().getDataSourceName(),
incrementalExecuteEngine, dumper, importers, taskProgress);
jobItemContext.getIncrementalTasks().add(incrementalTask);
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/h2/dumper/H2IncrementalDumperCreator.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/h2/dumper/H2IncrementalDumperCreator.java
index f0b8e831303..23c432cd61e 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/h2/dumper/H2IncrementalDumperCreator.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/h2/dumper/H2IncrementalDumperCreator.java
@@ -32,8 +32,8 @@ import static org.mockito.Mockito.mock;
public final class H2IncrementalDumperCreator implements
DialectIncrementalDumperCreator {
@Override
- public IncrementalDumper createIncrementalDumper(final
IncrementalDumperContext context, final IngestPosition position,
- final PipelineChannel
channel, final PipelineTableMetaDataLoader metaDataLoader) {
+ public IncrementalDumper createIncrementalDumper(final
IncrementalDumperContext context,
+ final IngestPosition
position, final PipelineChannel channel, final PipelineTableMetaDataLoader
metaDataLoader) {
return mock(IncrementalDumper.class);
}