This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new c7df22d030a Refactor DumperConfiguration (#28923)
c7df22d030a is described below
commit c7df22d030abe18e2a58e45b3f2ab119eb00e268
Author: Liang Zhang <[email protected]>
AuthorDate: Fri Nov 3 10:44:44 2023 +0800
Refactor DumperConfiguration (#28923)
---
...iguration.java => BaseDumperConfiguration.java} | 16 ++++-----------
.../ingest/IncrementalDumperConfiguration.java} | 24 +++++++++++-----------
.../ingest/InventoryDumperConfiguration.java | 5 ++---
.../ingest/dumper/IncrementalDumperCreator.java | 6 +++---
.../IncrementalDumperConfigurationCreator.java | 4 ++--
.../core/preparer/PipelineJobPreparerUtils.java | 4 ++--
.../ingest/dumper/H2IncrementalDumperCreator.java | 4 ++--
.../mysql/ingest/MySQLIncrementalDumper.java | 6 +++---
.../dumper/MySQLIncrementalDumperCreator.java | 6 +++---
.../mysql/ingest/MySQLIncrementalDumperTest.java | 10 ++++-----
.../opengauss/ingest/OpenGaussWALDumper.java | 6 +++---
.../dumper/OpenGaussIncrementalDumperCreator.java | 6 +++---
.../postgresql/ingest/PostgreSQLWALDumper.java | 6 +++---
.../dumper/PostgreSQLIncrementalDumperCreator.java | 6 +++---
.../postgresql/ingest/wal/WALEventConverter.java | 6 +++---
.../postgresql/ingest/PostgreSQLWALDumperTest.java | 8 ++++----
.../ingest/wal/WALEventConverterTest.java | 10 ++++-----
.../data/pipeline/cdc/api/impl/CDCJobAPI.java | 16 +++++++--------
.../cdc/config/task/CDCTaskConfiguration.java | 4 ++--
.../pipeline/cdc/core/prepare/CDCJobPreparer.java | 4 ++--
.../migration/api/impl/MigrationJobAPI.java | 11 +++++-----
.../config/MigrationTaskConfiguration.java | 4 ++--
...ationIncrementalDumperConfigurationCreator.java | 10 ++++-----
.../migration/prepare/MigrationJobPreparer.java | 4 ++--
.../fixture/FixtureIncrementalDumperCreator.java | 4 ++--
.../core/prepare/InventoryTaskSplitterTest.java | 14 ++++++-------
.../data/pipeline/core/task/InventoryTaskTest.java | 4 ++--
27 files changed, 100 insertions(+), 108 deletions(-)
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/BaseDumperConfiguration.java
similarity index 93%
rename from
kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java
rename to
kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/BaseDumperConfiguration.java
index 4d8c81f83a8..59b9ebd41fa 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/BaseDumperConfiguration.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.data.pipeline.api.config.ingest;
-import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
@@ -35,32 +34,25 @@ import java.util.Map;
import java.util.stream.Collectors;
/**
- * Dumper configuration.
+ * Base dumper configuration.
*/
@Getter
@Setter
@ToString(exclude = {"dataSourceConfig", "tableNameSchemaNameMapping"})
-// TODO it should be final and not extends by sub-class
-// TODO fields final
-public class DumperConfiguration {
-
- private String jobId;
+public abstract class BaseDumperConfiguration {
private String dataSourceName;
private PipelineDataSourceConfiguration dataSourceConfig;
- private IngestPosition position;
-
private Map<ActualTableName, LogicTableName> tableNameMap;
private TableNameSchemaNameMapping tableNameSchemaNameMapping;
// LinkedHashSet is required
- @Getter(AccessLevel.PROTECTED)
private Map<LogicTableName, Collection<ColumnName>> targetTableColumnsMap
= new HashMap<>();
- private boolean decodeWithTX;
+ private IngestPosition position;
/**
* Get logic table name.
@@ -100,7 +92,7 @@ public class DumperConfiguration {
* Get schema name.
*
* @param actualTableName actual table name
- * @return schema name. nullable
+ * @return schema name, can be nullable
*/
public String getSchemaName(final ActualTableName actualTableName) {
return
tableNameSchemaNameMapping.getSchemaName(getLogicTableName(actualTableName));
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ingest/IncrementalDumperConfigurationCreator.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/IncrementalDumperConfiguration.java
similarity index 59%
copy from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ingest/IncrementalDumperConfigurationCreator.java
copy to
kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/IncrementalDumperConfiguration.java
index 2934a8f142f..326e25c9262 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ingest/IncrementalDumperConfigurationCreator.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/IncrementalDumperConfiguration.java
@@ -15,21 +15,21 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.common.config.ingest;
+package org.apache.shardingsphere.data.pipeline.api.config.ingest;
-import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
-import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
/**
- * Incremental dumper configuration creator.
+ * Incremental dumper configuration.
*/
-public interface IncrementalDumperConfigurationCreator {
+@Getter
+@Setter
+@ToString(callSuper = true)
+public class IncrementalDumperConfiguration extends BaseDumperConfiguration {
- /**
- * Create dumper configuration.
- *
- * @param jobDataNodeLine job data node line
- * @return dumper configuration
- */
- DumperConfiguration createDumperConfiguration(JobDataNodeLine
jobDataNodeLine);
+ private String jobId;
+
+ private boolean decodeWithTX;
}
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
index 6663e20af4a..792f60c8f37 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
@@ -31,8 +31,7 @@ import java.util.List;
@Getter
@Setter
@ToString(callSuper = true)
-// TODO fields final
-public final class InventoryDumperConfiguration extends DumperConfiguration {
+public final class InventoryDumperConfiguration extends
BaseDumperConfiguration {
private String actualTableName;
@@ -52,7 +51,7 @@ public final class InventoryDumperConfiguration extends
DumperConfiguration {
private JobRateLimitAlgorithm rateLimitAlgorithm;
- public InventoryDumperConfiguration(final DumperConfiguration
dumperConfig) {
+ public InventoryDumperConfiguration(final BaseDumperConfiguration
dumperConfig) {
setDataSourceName(dumperConfig.getDataSourceName());
setDataSourceConfig(dumperConfig.getDataSourceConfig());
setTableNameMap(dumperConfig.getTableNameMap());
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/IncrementalDumperCreator.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/IncrementalDumperCreator.java
index 615a006ef92..30705ecf7cb 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/IncrementalDumperCreator.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/IncrementalDumperCreator.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.data.pipeline.spi.ingest.dumper;
-import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
@@ -34,13 +34,13 @@ public interface IncrementalDumperCreator extends
DatabaseTypedSPI {
/**
* Create incremental dumper.
*
- * @param dumperConfig dumper configuration
+ * @param config incremental dumper configuration
* @param position position
* @param channel channel
* @param metaDataLoader meta data loader
* @return incremental dumper
*/
- IncrementalDumper createIncrementalDumper(DumperConfiguration
dumperConfig, IngestPosition position, PipelineChannel channel,
PipelineTableMetaDataLoader metaDataLoader);
+ IncrementalDumper createIncrementalDumper(IncrementalDumperConfiguration
config, IngestPosition position, PipelineChannel channel,
PipelineTableMetaDataLoader metaDataLoader);
/**
* Whether support incremental dump.
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ingest/IncrementalDumperConfigurationCreator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ingest/IncrementalDumperConfigurationCreator.java
index 2934a8f142f..f95815578dc 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ingest/IncrementalDumperConfigurationCreator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ingest/IncrementalDumperConfigurationCreator.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.data.pipeline.common.config.ingest;
-import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
/**
@@ -31,5 +31,5 @@ public interface IncrementalDumperConfigurationCreator {
* @param jobDataNodeLine job data node line
* @return dumper configuration
*/
- DumperConfiguration createDumperConfiguration(JobDataNodeLine
jobDataNodeLine);
+ IncrementalDumperConfiguration createDumperConfiguration(JobDataNodeLine
jobDataNodeLine);
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java
index 02f46a39928..0cd8d13e3af 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.data.pipeline.core.preparer;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
@@ -123,7 +123,7 @@ public final class PipelineJobPreparerUtils {
* @return ingest position
* @throws SQLException sql exception
*/
- public static IngestPosition getIncrementalPosition(final
JobItemIncrementalTasksProgress initIncremental, final DumperConfiguration
dumperConfig,
+ public static IngestPosition getIncrementalPosition(final
JobItemIncrementalTasksProgress initIncremental, final
IncrementalDumperConfiguration dumperConfig,
final
PipelineDataSourceManager dataSourceManager) throws SQLException {
if (null != initIncremental) {
Optional<IngestPosition> position =
initIncremental.getIncrementalPosition();
diff --git
a/kernel/data-pipeline/dialect/h2/src/main/java/org/apache/shardingsphere/data/pipeline/h2/ingest/dumper/H2IncrementalDumperCreator.java
b/kernel/data-pipeline/dialect/h2/src/main/java/org/apache/shardingsphere/data/pipeline/h2/ingest/dumper/H2IncrementalDumperCreator.java
index 879f0d7732e..f0fe498d04b 100644
---
a/kernel/data-pipeline/dialect/h2/src/main/java/org/apache/shardingsphere/data/pipeline/h2/ingest/dumper/H2IncrementalDumperCreator.java
+++
b/kernel/data-pipeline/dialect/h2/src/main/java/org/apache/shardingsphere/data/pipeline/h2/ingest/dumper/H2IncrementalDumperCreator.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.data.pipeline.h2.ingest.dumper;
-import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
@@ -30,7 +30,7 @@ import
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDump
public final class H2IncrementalDumperCreator implements
IncrementalDumperCreator {
@Override
- public IncrementalDumper createIncrementalDumper(final DumperConfiguration
dumperConfig, final IngestPosition position,
+ public IncrementalDumper createIncrementalDumper(final
IncrementalDumperConfiguration config, final IngestPosition position,
final PipelineChannel
channel, final PipelineTableMetaDataLoader metaDataLoader) {
throw new UnsupportedOperationException("H2 database can not support
incremental dump.");
}
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index c41f2ae9e33..8b65fa22529 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.data.pipeline.mysql.ingest;
import com.google.common.base.Preconditions;
import lombok.extern.slf4j.Slf4j;
-import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlJdbcConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
@@ -68,7 +68,7 @@ import java.util.Optional;
@Slf4j
public final class MySQLIncrementalDumper extends AbstractLifecycleExecutor
implements IncrementalDumper {
- private final DumperConfiguration dumperConfig;
+ private final IncrementalDumperConfiguration dumperConfig;
private final BinlogPosition binlogPosition;
@@ -80,7 +80,7 @@ public final class MySQLIncrementalDumper extends
AbstractLifecycleExecutor impl
private final String catalog;
- public MySQLIncrementalDumper(final DumperConfiguration dumperConfig,
final IngestPosition binlogPosition,
+ public MySQLIncrementalDumper(final IncrementalDumperConfiguration
dumperConfig, final IngestPosition binlogPosition,
final PipelineChannel channel, final
PipelineTableMetaDataLoader metaDataLoader) {
Preconditions.checkArgument(dumperConfig.getDataSourceConfig()
instanceof StandardPipelineDataSourceConfiguration, "MySQLBinlogDumper only
support StandardPipelineDataSourceConfiguration");
this.dumperConfig = dumperConfig;
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/dumper/MySQLIncrementalDumperCreator.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/dumper/MySQLIncrementalDumperCreator.java
index 1e0f4a294b3..e359cd5b8ba 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/dumper/MySQLIncrementalDumperCreator.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/dumper/MySQLIncrementalDumperCreator.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.data.pipeline.mysql.ingest.dumper;
-import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
@@ -31,9 +31,9 @@ import
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDump
public final class MySQLIncrementalDumperCreator implements
IncrementalDumperCreator {
@Override
- public IncrementalDumper createIncrementalDumper(final DumperConfiguration
dumperConfig, final IngestPosition position,
+ public IncrementalDumper createIncrementalDumper(final
IncrementalDumperConfiguration config, final IngestPosition position,
final PipelineChannel
channel, final PipelineTableMetaDataLoader metaDataLoader) {
- return new MySQLIncrementalDumper(dumperConfig, position, channel,
metaDataLoader);
+ return new MySQLIncrementalDumper(config, position, channel,
metaDataLoader);
}
@Override
diff --git
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
index dd956b53316..5cc6e395f37 100644
---
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
+++
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.data.pipeline.mysql.ingest;
import lombok.SneakyThrows;
import
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
-import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
@@ -75,7 +75,7 @@ import static org.mockito.Mockito.when;
@SuppressWarnings("unchecked")
class MySQLIncrementalDumperTest {
- private DumperConfiguration dumperConfig;
+ private IncrementalDumperConfiguration dumperConfig;
private MySQLIncrementalDumper incrementalDumper;
@@ -93,8 +93,8 @@ class MySQLIncrementalDumperTest {
when(metaDataLoader.getTableMetaData(any(),
any())).thenReturn(pipelineTableMetaData);
}
- private DumperConfiguration mockDumperConfiguration() {
- DumperConfiguration result = new DumperConfiguration();
+ private IncrementalDumperConfiguration mockDumperConfiguration() {
+ IncrementalDumperConfiguration result = new
IncrementalDumperConfiguration();
result.setDataSourceConfig(new
StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL",
"root", "root"));
result.setTableNameMap(Collections.singletonMap(new
ActualTableName("t_order"), new LogicTableName("t_order")));
result.setTableNameSchemaNameMapping(new
TableNameSchemaNameMapping(Collections.emptyMap()));
@@ -103,7 +103,7 @@ class MySQLIncrementalDumperTest {
}
@SneakyThrows(SQLException.class)
- private void initTableData(final DumperConfiguration dumperConfig) {
+ private void initTableData(final IncrementalDumperConfiguration
dumperConfig) {
try (
PipelineDataSourceManager dataSourceManager = new
DefaultPipelineDataSourceManager();
PipelineDataSourceWrapper dataSource =
dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
diff --git
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
index 4fa6c8539f7..c4b4defb390 100644
---
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
+++
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
@@ -19,7 +19,7 @@ package
org.apache.shardingsphere.data.pipeline.opengauss.ingest;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
-import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
@@ -58,7 +58,7 @@ import java.util.concurrent.atomic.AtomicReference;
@Slf4j
public final class OpenGaussWALDumper extends AbstractLifecycleExecutor
implements IncrementalDumper {
- private final DumperConfiguration dumperConfig;
+ private final IncrementalDumperConfiguration dumperConfig;
private final AtomicReference<WALPosition> walPosition;
@@ -72,7 +72,7 @@ public final class OpenGaussWALDumper extends
AbstractLifecycleExecutor implemen
private List<AbstractRowEvent> rowEvents = new LinkedList<>();
- public OpenGaussWALDumper(final DumperConfiguration dumperConfig, final
IngestPosition position,
+ public OpenGaussWALDumper(final IncrementalDumperConfiguration
dumperConfig, final IngestPosition position,
final PipelineChannel channel, final
PipelineTableMetaDataLoader metaDataLoader) {
ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperConfig.getDataSourceConfig().getClass()),
() -> new
UnsupportedSQLOperationException("PostgreSQLWALDumper only support
PipelineDataSourceConfiguration"));
diff --git
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/dumper/OpenGaussIncrementalDumperCreator.java
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/dumper/OpenGaussIncrementalDumperCreator.java
index 0d11f2a6dc6..b4c9bde4d41 100644
---
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/dumper/OpenGaussIncrementalDumperCreator.java
+++
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/dumper/OpenGaussIncrementalDumperCreator.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.data.pipeline.opengauss.ingest.dumper;
-import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
@@ -31,9 +31,9 @@ import
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDump
public final class OpenGaussIncrementalDumperCreator implements
IncrementalDumperCreator {
@Override
- public IncrementalDumper createIncrementalDumper(final DumperConfiguration
dumperConfig, final IngestPosition position,
+ public IncrementalDumper createIncrementalDumper(final
IncrementalDumperConfiguration config, final IngestPosition position,
final PipelineChannel
channel, final PipelineTableMetaDataLoader metaDataLoader) {
- return new OpenGaussWALDumper(dumperConfig, position, channel,
metaDataLoader);
+ return new OpenGaussWALDumper(config, position, channel,
metaDataLoader);
}
@Override
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
index c281fdc8a71..f377684cd2e 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
@@ -19,7 +19,7 @@ package
org.apache.shardingsphere.data.pipeline.postgresql.ingest;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
-import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
@@ -60,7 +60,7 @@ import java.util.concurrent.atomic.AtomicReference;
@Slf4j
public final class PostgreSQLWALDumper extends AbstractLifecycleExecutor
implements IncrementalDumper {
- private final DumperConfiguration dumperConfig;
+ private final IncrementalDumperConfiguration dumperConfig;
private final AtomicReference<WALPosition> walPosition;
@@ -74,7 +74,7 @@ public final class PostgreSQLWALDumper extends
AbstractLifecycleExecutor impleme
private List<AbstractRowEvent> rowEvents = new LinkedList<>();
- public PostgreSQLWALDumper(final DumperConfiguration dumperConfig, final
IngestPosition position,
+ public PostgreSQLWALDumper(final IncrementalDumperConfiguration
dumperConfig, final IngestPosition position,
final PipelineChannel channel, final
PipelineTableMetaDataLoader metaDataLoader) {
ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperConfig.getDataSourceConfig().getClass()),
() -> new
UnsupportedSQLOperationException("PostgreSQLWALDumper only support
PipelineDataSourceConfiguration"));
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/dumper/PostgreSQLIncrementalDumperCreator.java
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/dumper/PostgreSQLIncrementalDumperCreator.java
index 6ef5ec6250e..50852c68caa 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/dumper/PostgreSQLIncrementalDumperCreator.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/dumper/PostgreSQLIncrementalDumperCreator.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.data.pipeline.postgresql.ingest.dumper;
-import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
@@ -31,9 +31,9 @@ import
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDump
public final class PostgreSQLIncrementalDumperCreator implements
IncrementalDumperCreator {
@Override
- public IncrementalDumper createIncrementalDumper(final DumperConfiguration
dumperConfig, final IngestPosition position,
+ public IncrementalDumper createIncrementalDumper(final
IncrementalDumperConfiguration config, final IngestPosition position,
final PipelineChannel
channel, final PipelineTableMetaDataLoader metaDataLoader) {
- return new PostgreSQLWALDumper(dumperConfig, position, channel,
metaDataLoader);
+ return new PostgreSQLWALDumper(config, position, channel,
metaDataLoader);
}
@Override
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
index 5703ed68ce0..388c664780a 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal;
-import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import
org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
@@ -43,11 +43,11 @@ import java.util.List;
*/
public final class WALEventConverter {
- private final DumperConfiguration dumperConfig;
+ private final IncrementalDumperConfiguration dumperConfig;
private final PipelineTableMetaDataLoader metaDataLoader;
- public WALEventConverter(final DumperConfiguration dumperConfig, final
PipelineTableMetaDataLoader metaDataLoader) {
+ public WALEventConverter(final IncrementalDumperConfiguration
dumperConfig, final PipelineTableMetaDataLoader metaDataLoader) {
this.dumperConfig = dumperConfig;
this.metaDataLoader = metaDataLoader;
}
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
index b01ef0ad622..5167148ddc2 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.data.pipeline.postgresql.ingest;
import
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
-import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
@@ -72,7 +72,7 @@ class PostgreSQLWALDumperTest {
private WALPosition position;
- private DumperConfiguration dumperConfig;
+ private IncrementalDumperConfiguration dumperConfig;
private PostgreSQLWALDumper walDumper;
@@ -103,8 +103,8 @@ class PostgreSQLWALDumperTest {
}
}
- private DumperConfiguration createDumperConfiguration(final String
jdbcUrl, final String username, final String password) {
- DumperConfiguration result = new DumperConfiguration();
+ private IncrementalDumperConfiguration createDumperConfiguration(final
String jdbcUrl, final String username, final String password) {
+ IncrementalDumperConfiguration result = new
IncrementalDumperConfiguration();
result.setJobId("0101123456");
result.setDataSourceConfig(new
StandardPipelineDataSourceConfiguration(jdbcUrl, username, password));
result.setTableNameMap(Collections.singletonMap(new
ActualTableName("t_order_0"), new LogicTableName("t_order")));
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
index 85850fa1dae..29479237999 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
@@ -19,7 +19,7 @@ package
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal;
import lombok.SneakyThrows;
import
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
-import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import
org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
@@ -71,7 +71,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
class WALEventConverterTest {
- private DumperConfiguration dumperConfig;
+ private IncrementalDumperConfiguration dumperConfig;
private WALEventConverter walEventConverter;
@@ -88,8 +88,8 @@ class WALEventConverterTest {
pipelineTableMetaData = new PipelineTableMetaData("t_order",
mockOrderColumnsMetaDataMap(), Collections.emptyList());
}
- private DumperConfiguration mockDumperConfiguration() {
- DumperConfiguration result = new DumperConfiguration();
+ private IncrementalDumperConfiguration mockDumperConfiguration() {
+ IncrementalDumperConfiguration result = new
IncrementalDumperConfiguration();
result.setDataSourceConfig(new
StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=PostgreSQL",
"root", "root"));
result.setTableNameMap(Collections.singletonMap(new
ActualTableName("t_order"), new LogicTableName("t_order")));
result.setTableNameSchemaNameMapping(new
TableNameSchemaNameMapping(Collections.emptyMap()));
@@ -97,7 +97,7 @@ class WALEventConverterTest {
}
@SneakyThrows(SQLException.class)
- private void initTableData(final DumperConfiguration dumperConfig) {
+ private void initTableData(final IncrementalDumperConfiguration
dumperConfig) {
try (
PipelineDataSourceManager dataSourceManager = new
DefaultPipelineDataSourceManager();
PipelineDataSourceWrapper dataSource =
dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index ac884312417..f88832aea96 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -22,7 +22,7 @@ import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.digest.DigestUtils;
import
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
-import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
@@ -177,7 +177,7 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
if (getJobItemProgress(jobId, i).isPresent()) {
continue;
}
- DumperConfiguration dumperConfig =
buildDumperConfiguration(jobConfig, i,
getTableNameSchemaNameMapping(jobConfig.getSchemaTableNames()));
+ IncrementalDumperConfiguration dumperConfig =
buildDumperConfiguration(jobConfig, i,
getTableNameSchemaNameMapping(jobConfig.getSchemaTableNames()));
InventoryIncrementalJobItemProgress jobItemProgress =
getInventoryIncrementalJobItemProgress(jobConfig, pipelineDataSourceManager,
dumperConfig);
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persistJobItemProgress(
jobId, i,
YamlEngine.marshal(getJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress)));
@@ -189,11 +189,11 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
private static InventoryIncrementalJobItemProgress
getInventoryIncrementalJobItemProgress(final CDCJobConfiguration jobConfig,
final PipelineDataSourceManager dataSourceManager,
-
final DumperConfiguration dumperConfig) throws SQLException {
+
final IncrementalDumperConfiguration incrementalDumperConfig)
throws SQLException {
InventoryIncrementalJobItemProgress result = new
InventoryIncrementalJobItemProgress();
result.setSourceDatabaseType(jobConfig.getSourceDatabaseType());
- result.setDataSourceName(dumperConfig.getDataSourceName());
- IncrementalTaskProgress incrementalTaskProgress = new
IncrementalTaskProgress(PipelineJobPreparerUtils.getIncrementalPosition(null,
dumperConfig, dataSourceManager));
+ result.setDataSourceName(incrementalDumperConfig.getDataSourceName());
+ IncrementalTaskProgress incrementalTaskProgress = new
IncrementalTaskProgress(PipelineJobPreparerUtils.getIncrementalPosition(null,
incrementalDumperConfig, dataSourceManager));
result.setIncremental(new
JobItemIncrementalTasksProgress(incrementalTaskProgress));
return result;
}
@@ -268,7 +268,7 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
public CDCTaskConfiguration buildTaskConfiguration(final
PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final
PipelineProcessConfiguration pipelineProcessConfig) {
CDCJobConfiguration jobConfig = (CDCJobConfiguration)
pipelineJobConfig;
TableNameSchemaNameMapping tableNameSchemaNameMapping =
getTableNameSchemaNameMapping(jobConfig.getSchemaTableNames());
- DumperConfiguration dumperConfig = buildDumperConfiguration(jobConfig,
jobShardingItem, tableNameSchemaNameMapping);
+ IncrementalDumperConfiguration dumperConfig =
buildDumperConfiguration(jobConfig, jobShardingItem,
tableNameSchemaNameMapping);
ImporterConfiguration importerConfig =
buildImporterConfiguration(jobConfig, pipelineProcessConfig,
jobConfig.getSchemaTableNames(), tableNameSchemaNameMapping);
CDCTaskConfiguration result = new CDCTaskConfiguration(dumperConfig,
importerConfig);
log.debug("buildTaskConfiguration, result={}", result);
@@ -286,13 +286,13 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
return new TableNameSchemaNameMapping(tableNameSchemaMap);
}
- private DumperConfiguration buildDumperConfiguration(final
CDCJobConfiguration jobConfig, final int jobShardingItem, final
TableNameSchemaNameMapping tableNameSchemaNameMapping) {
+ private IncrementalDumperConfiguration buildDumperConfiguration(final
CDCJobConfiguration jobConfig, final int jobShardingItem, final
TableNameSchemaNameMapping tableNameSchemaNameMapping) {
JobDataNodeLine dataNodeLine =
jobConfig.getJobShardingDataNodes().get(jobShardingItem);
Map<ActualTableName, LogicTableName> tableNameMap = new
LinkedHashMap<>();
dataNodeLine.getEntries().forEach(each ->
each.getDataNodes().forEach(node -> tableNameMap.put(new
ActualTableName(node.getTableName()), new
LogicTableName(each.getLogicTableName()))));
String dataSourceName =
dataNodeLine.getEntries().iterator().next().getDataNodes().iterator().next().getDataSourceName();
StandardPipelineDataSourceConfiguration actualDataSourceConfig =
jobConfig.getDataSourceConfig().getActualDataSourceConfiguration(dataSourceName);
- DumperConfiguration result = new DumperConfiguration();
+ IncrementalDumperConfiguration result = new
IncrementalDumperConfiguration();
result.setJobId(jobConfig.getJobId());
result.setDataSourceName(dataSourceName);
result.setDataSourceConfig(actualDataSourceConfig);
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/task/CDCTaskConfiguration.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/task/CDCTaskConfiguration.java
index ae4cac23dea..af862bc1a4b 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/task/CDCTaskConfiguration.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/task/CDCTaskConfiguration.java
@@ -19,7 +19,7 @@ package
org.apache.shardingsphere.data.pipeline.cdc.config.task;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
-import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.config.PipelineTaskConfiguration;
@@ -30,7 +30,7 @@ import
org.apache.shardingsphere.data.pipeline.common.config.PipelineTaskConfigu
@Getter
public final class CDCTaskConfiguration implements PipelineTaskConfiguration {
- private final DumperConfiguration dumperConfig;
+ private final IncrementalDumperConfiguration dumperConfig;
private final ImporterConfiguration importerConfig;
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index f91a67dc1df..099c71139cc 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.data.pipeline.cdc.core.prepare;
import lombok.extern.slf4j.Slf4j;
-import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.Dumper;
@@ -144,7 +144,7 @@ public final class CDCJobPreparer {
private void initIncrementalTask(final CDCJobItemContext jobItemContext,
final AtomicBoolean importerUsed, final List<CDCChannelProgressPair>
channelProgressPairs) {
CDCTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
- DumperConfiguration dumperConfig = taskConfig.getDumperConfig();
+ IncrementalDumperConfiguration dumperConfig =
taskConfig.getDumperConfig();
ImporterConfiguration importerConfig = taskConfig.getImporterConfig();
IncrementalTaskProgress taskProgress =
PipelineTaskUtils.createIncrementalTaskProgress(dumperConfig.getPosition(),
jobItemContext.getInitProgress());
PipelineChannel channel =
PipelineTaskUtils.createIncrementalChannel(importerConfig.getConcurrency(),
jobItemContext.getJobProcessContext().getPipelineChannelCreator(),
taskProgress);
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index 1d4a13fc9f3..b7a11dae976 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -20,7 +20,7 @@ package
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.digest.DigestUtils;
import
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
-import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
@@ -262,13 +262,14 @@ public final class MigrationJobAPI extends
AbstractInventoryIncrementalJobAPIImp
@Override
public MigrationTaskConfiguration buildTaskConfiguration(final
PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final
PipelineProcessConfiguration pipelineProcessConfig) {
MigrationJobConfiguration jobConfig = (MigrationJobConfiguration)
pipelineJobConfig;
- DumperConfiguration dumperConfig = new
MigrationIncrementalDumperConfigurationCreator(jobConfig).createDumperConfiguration(jobConfig.getJobShardingDataNodes().get(jobShardingItem));
- CreateTableConfiguration createTableConfig =
buildCreateTableConfiguration(jobConfig,
dumperConfig.getTableNameSchemaNameMapping());
+ IncrementalDumperConfiguration incrementalDumperConfig = new
MigrationIncrementalDumperConfigurationCreator(
+
jobConfig).createDumperConfiguration(jobConfig.getJobShardingDataNodes().get(jobShardingItem));
+ CreateTableConfiguration createTableConfig =
buildCreateTableConfiguration(jobConfig,
incrementalDumperConfig.getTableNameSchemaNameMapping());
Set<LogicTableName> targetTableNames =
jobConfig.getTargetTableNames().stream().map(LogicTableName::new).collect(Collectors.toSet());
Map<LogicTableName, Set<String>> shardingColumnsMap = new
ShardingColumnsExtractor().getShardingColumnsMap(
((ShardingSpherePipelineDataSourceConfiguration)
jobConfig.getTarget()).getRootConfig().getRules(), targetTableNames);
- ImporterConfiguration importerConfig =
buildImporterConfiguration(jobConfig, pipelineProcessConfig,
shardingColumnsMap, dumperConfig.getTableNameSchemaNameMapping());
- MigrationTaskConfiguration result = new
MigrationTaskConfiguration(dumperConfig.getDataSourceName(), createTableConfig,
dumperConfig, importerConfig);
+ ImporterConfiguration importerConfig =
buildImporterConfiguration(jobConfig, pipelineProcessConfig,
shardingColumnsMap, incrementalDumperConfig.getTableNameSchemaNameMapping());
+ MigrationTaskConfiguration result = new
MigrationTaskConfiguration(incrementalDumperConfig.getDataSourceName(),
createTableConfig, incrementalDumperConfig, importerConfig);
log.info("buildTaskConfiguration, result={}", result);
return result;
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationTaskConfiguration.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationTaskConfiguration.java
index edc9dbf2e85..f250fafe4a2 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationTaskConfiguration.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationTaskConfiguration.java
@@ -20,7 +20,7 @@ package
org.apache.shardingsphere.data.pipeline.scenario.migration.config;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
-import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.config.CreateTableConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.config.PipelineTaskConfiguration;
@@ -37,7 +37,7 @@ public final class MigrationTaskConfiguration implements
PipelineTaskConfigurati
private final CreateTableConfiguration createTableConfig;
- private final DumperConfiguration dumperConfig;
+ private final IncrementalDumperConfiguration dumperConfig;
private final ImporterConfiguration importerConfig;
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperConfigurationCreator.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperConfigurationCreator.java
index 4763f25aa04..07cb0dfd60d 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperConfigurationCreator.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperConfigurationCreator.java
@@ -19,7 +19,7 @@ package
org.apache.shardingsphere.data.pipeline.scenario.migration.config.ingest
import lombok.RequiredArgsConstructor;
import
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
-import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
@@ -39,16 +39,16 @@ public final class
MigrationIncrementalDumperConfigurationCreator implements Inc
private final MigrationJobConfiguration jobConfig;
@Override
- public DumperConfiguration createDumperConfiguration(final JobDataNodeLine
jobDataNodeLine) {
+ public IncrementalDumperConfiguration createDumperConfiguration(final
JobDataNodeLine jobDataNodeLine) {
Map<ActualTableName, LogicTableName> tableNameMap =
JobDataNodeLineConvertUtils.buildTableNameMap(jobDataNodeLine);
TableNameSchemaNameMapping tableNameSchemaNameMapping = new
TableNameSchemaNameMapping(jobConfig.getTargetTableSchemaMap());
String dataSourceName =
jobDataNodeLine.getEntries().get(0).getDataNodes().get(0).getDataSourceName();
return buildDumperConfiguration(jobConfig.getJobId(), dataSourceName,
jobConfig.getSources().get(dataSourceName), tableNameMap,
tableNameSchemaNameMapping);
}
- private DumperConfiguration buildDumperConfiguration(final String jobId,
final String dataSourceName, final PipelineDataSourceConfiguration
sourceDataSource,
- final
Map<ActualTableName, LogicTableName> tableNameMap, final
TableNameSchemaNameMapping tableNameSchemaNameMapping) {
- DumperConfiguration result = new DumperConfiguration();
+ private IncrementalDumperConfiguration buildDumperConfiguration(final
String jobId, final String dataSourceName, final
PipelineDataSourceConfiguration sourceDataSource,
+ final
Map<ActualTableName, LogicTableName> tableNameMap, final
TableNameSchemaNameMapping tableNameSchemaNameMapping) {
+ IncrementalDumperConfiguration result = new
IncrementalDumperConfiguration();
result.setJobId(jobId);
result.setDataSourceName(dataSourceName);
result.setDataSourceConfig(sourceDataSource);
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
index 63c3858908c..ca5ab63968a 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.data.pipeline.scenario.migration.prepare;
import lombok.extern.slf4j.Slf4j;
-import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
@@ -185,7 +185,7 @@ public final class MigrationJobPreparer {
MigrationTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
PipelineChannelCreator pipelineChannelCreator =
jobItemContext.getJobProcessContext().getPipelineChannelCreator();
PipelineTableMetaDataLoader sourceMetaDataLoader =
jobItemContext.getSourceMetaDataLoader();
- DumperConfiguration dumperConfig = taskConfig.getDumperConfig();
+ IncrementalDumperConfiguration dumperConfig =
taskConfig.getDumperConfig();
ImporterConfiguration importerConfig = taskConfig.getImporterConfig();
ExecuteEngine incrementalExecuteEngine =
jobItemContext.getJobProcessContext().getIncrementalExecuteEngine();
IncrementalTaskProgress taskProgress =
PipelineTaskUtils.createIncrementalTaskProgress(dumperConfig.getPosition(),
jobItemContext.getInitProgress());
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureIncrementalDumperCreator.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureIncrementalDumperCreator.java
index 8abb8b49287..297b49a4881 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureIncrementalDumperCreator.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureIncrementalDumperCreator.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.test.it.data.pipeline.core.fixture;
-import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
@@ -30,7 +30,7 @@ import
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDump
public final class FixtureIncrementalDumperCreator implements
IncrementalDumperCreator {
@Override
- public IncrementalDumper createIncrementalDumper(final DumperConfiguration
dumperConfig, final IngestPosition position,
+ public IncrementalDumper createIncrementalDumper(final
IncrementalDumperConfiguration config, final IngestPosition position,
final PipelineChannel
channel, final PipelineTableMetaDataLoader metaDataLoader) {
return new FixtureIncrementalDumper();
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
index 5212fcfa6d2..d029ac73f49 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.test.it.data.pipeline.core.prepare;
-import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.config.ingest.BaseDumperConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager;
@@ -143,7 +143,7 @@ class InventoryTaskSplitterTest {
}
}
- private void initEmptyTablePrimaryEnvironment(final DumperConfiguration
dumperConfig) throws SQLException {
+ private void initEmptyTablePrimaryEnvironment(final
BaseDumperConfiguration dumperConfig) throws SQLException {
DataSource dataSource =
dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
try (
Connection connection = dataSource.getConnection();
@@ -153,7 +153,7 @@ class InventoryTaskSplitterTest {
}
}
- private void initIntPrimaryEnvironment(final DumperConfiguration
dumperConfig) throws SQLException {
+ private void initIntPrimaryEnvironment(final BaseDumperConfiguration
dumperConfig) throws SQLException {
DataSource dataSource =
dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
try (
Connection connection = dataSource.getConnection();
@@ -166,7 +166,7 @@ class InventoryTaskSplitterTest {
}
}
- private void initCharPrimaryEnvironment(final DumperConfiguration
dumperConfig) throws SQLException {
+ private void initCharPrimaryEnvironment(final BaseDumperConfiguration
dumperConfig) throws SQLException {
DataSource dataSource =
dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
try (
Connection connection = dataSource.getConnection();
@@ -177,7 +177,7 @@ class InventoryTaskSplitterTest {
}
}
- private void initUnionPrimaryEnvironment(final DumperConfiguration
dumperConfig) throws SQLException {
+ private void initUnionPrimaryEnvironment(final BaseDumperConfiguration
dumperConfig) throws SQLException {
DataSource dataSource =
dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
try (
Connection connection = dataSource.getConnection();
@@ -188,7 +188,7 @@ class InventoryTaskSplitterTest {
}
}
- private void initNoPrimaryEnvironment(final DumperConfiguration
dumperConfig) throws SQLException {
+ private void initNoPrimaryEnvironment(final BaseDumperConfiguration
dumperConfig) throws SQLException {
DataSource dataSource =
dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
try (
Connection connection = dataSource.getConnection();
@@ -199,7 +199,7 @@ class InventoryTaskSplitterTest {
}
}
- private void initUniqueIndexOnNotNullColumnEnvironment(final
DumperConfiguration dumperConfig) throws SQLException {
+ private void initUniqueIndexOnNotNullColumnEnvironment(final
BaseDumperConfiguration dumperConfig) throws SQLException {
DataSource dataSource =
dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
try (
Connection connection = dataSource.getConnection();
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java
index f56ea61452b..82f09ad5cf8 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.test.it.data.pipeline.core.task;
-import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.Dumper;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
@@ -84,7 +84,7 @@ class InventoryTaskTest {
inventoryTask.close();
}
- private void initTableData(final DumperConfiguration dumperConfig) throws
SQLException {
+ private void initTableData(final IncrementalDumperConfiguration
dumperConfig) throws SQLException {
PipelineDataSourceManager dataSourceManager = new
DefaultPipelineDataSourceManager();
try (
PipelineDataSourceWrapper dataSource =
dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());