This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new feb9ae48679 Refactor DumperCommonContext (#28946)
feb9ae48679 is described below
commit feb9ae4867964187b1f49e5812437f6ff329e0bc
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Nov 5 15:55:13 2023 +0800
Refactor DumperCommonContext (#28946)
---
.../ingest/dumper/context/DumperCommonContext.java | 2 +-
.../dumper/context/IncrementalDumperContext.java | 8 ++++++--
.../dumper/context/InventoryDumperContext.java | 17 ++++++++++-------
.../data/pipeline/core/dumper/InventoryDumper.java | 17 +++++++++--------
.../preparer/InventoryRecordsCountCalculator.java | 2 +-
.../core/preparer/InventoryTaskSplitter.java | 16 ++++++++--------
.../core/preparer/PipelineJobPreparerUtils.java | 4 ++--
.../data/pipeline/core/task/PipelineTaskUtils.java | 2 +-
.../mysql/ingest/MySQLIncrementalDumper.java | 13 +++++++------
.../mysql/ingest/MySQLIncrementalDumperTest.java | 15 ++++++++-------
.../opengauss/ingest/OpenGaussWALDumper.java | 4 ++--
.../postgresql/ingest/PostgreSQLWALDumper.java | 4 ++--
.../postgresql/ingest/wal/WALEventConverter.java | 8 ++++----
.../postgresql/ingest/PostgreSQLWALDumperTest.java | 15 +++++++++------
.../postgresql/ingest/wal/WALEventConverterTest.java | 15 ++++++++-------
.../data/pipeline/cdc/api/impl/CDCJobAPI.java | 14 ++++++++------
.../data/pipeline/cdc/context/CDCJobItemContext.java | 4 ++--
.../pipeline/cdc/core/prepare/CDCJobPreparer.java | 20 +++++++++++---------
.../scenario/migration/api/impl/MigrationJobAPI.java | 8 +++++---
.../MigrationIncrementalDumperContextCreator.java | 12 +++++++-----
.../migration/context/MigrationJobItemContext.java | 2 +-
.../migration/prepare/MigrationJobPreparer.java | 16 +++++++++-------
.../job/service/GovernanceRepositoryAPIImplTest.java | 4 ++--
.../core/prepare/InventoryTaskSplitterTest.java | 18 +++++++++---------
.../data/pipeline/core/task/IncrementalTaskTest.java | 2 +-
.../data/pipeline/core/task/InventoryTaskTest.java | 9 +++++----
.../MigrationDataConsistencyCheckerTest.java | 2 +-
27 files changed, 139 insertions(+), 114 deletions(-)
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java
index 9166f609df9..cdcbf6b0f3f 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java
@@ -31,7 +31,7 @@ import
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPositio
@Getter
@Setter
@ToString(exclude = {"dataSourceConfig", "tableAndSchemaNameMapper"})
-public abstract class DumperCommonContext {
+public final class DumperCommonContext {
private String dataSourceName;
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/IncrementalDumperContext.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/IncrementalDumperContext.java
index 4ea3cb2c5da..4b7697c7dfa 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/IncrementalDumperContext.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/IncrementalDumperContext.java
@@ -18,16 +18,20 @@
package org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context;
import lombok.Getter;
+import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.ToString;
/**
* Incremental dumper context.
*/
+@RequiredArgsConstructor
@Getter
@Setter
-@ToString(callSuper = true)
-public final class IncrementalDumperContext extends DumperCommonContext {
+@ToString
+public final class IncrementalDumperContext {
+
+ private final DumperCommonContext commonContext;
private String jobId;
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/InventoryDumperContext.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/InventoryDumperContext.java
index e96375d8c6e..2e1d44438d7 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/InventoryDumperContext.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/InventoryDumperContext.java
@@ -30,8 +30,10 @@ import java.util.List;
*/
@Getter
@Setter
-@ToString(callSuper = true)
-public final class InventoryDumperContext extends DumperCommonContext {
+@ToString
+public final class InventoryDumperContext {
+
+ private final DumperCommonContext commonContext;
private String actualTableName;
@@ -51,11 +53,12 @@ public final class InventoryDumperContext extends
DumperCommonContext {
private JobRateLimitAlgorithm rateLimitAlgorithm;
- public InventoryDumperContext(final DumperCommonContext dumperContext) {
- setDataSourceName(dumperContext.getDataSourceName());
- setDataSourceConfig(dumperContext.getDataSourceConfig());
- setTableNameMapper(dumperContext.getTableNameMapper());
-
setTableAndSchemaNameMapper(dumperContext.getTableAndSchemaNameMapper());
+ public InventoryDumperContext(final DumperCommonContext commonContext) {
+ this.commonContext = new DumperCommonContext();
+
this.commonContext.setDataSourceName(commonContext.getDataSourceName());
+
this.commonContext.setDataSourceConfig(commonContext.getDataSourceConfig());
+
this.commonContext.setTableNameMapper(commonContext.getTableNameMapper());
+
this.commonContext.setTableAndSchemaNameMapper(commonContext.getTableAndSchemaNameMapper());
}
/**
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
index 36943d542bd..ebd0f8183ee 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
@@ -88,7 +88,7 @@ public final class InventoryDumper extends
AbstractLifecycleExecutor implements
this.dumperContext = dumperContext;
this.channel = channel;
this.dataSource = dataSource;
- DatabaseType databaseType =
dumperContext.getDataSourceConfig().getDatabaseType();
+ DatabaseType databaseType =
dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType();
inventoryDumpSQLBuilder = new
PipelineInventoryDumpSQLBuilder(databaseType);
columnValueReaderEngine = new ColumnValueReaderEngine(databaseType);
this.metaDataLoader = metaDataLoader;
@@ -96,13 +96,13 @@ public final class InventoryDumper extends
AbstractLifecycleExecutor implements
@Override
protected void runBlocking() {
- IngestPosition position = dumperContext.getPosition();
+ IngestPosition position =
dumperContext.getCommonContext().getPosition();
if (position instanceof FinishedPosition) {
log.info("Ignored because of already finished.");
return;
}
PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(
-
dumperContext.getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName()),
dumperContext.getActualTableName());
+
dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName()),
dumperContext.getActualTableName());
try (Connection connection = dataSource.getConnection()) {
dump(tableMetaData, connection);
} catch (final SQLException ex) {
@@ -114,7 +114,7 @@ public final class InventoryDumper extends
AbstractLifecycleExecutor implements
@SuppressWarnings("MagicConstant")
private void dump(final PipelineTableMetaData tableMetaData, final
Connection connection) throws SQLException {
int batchSize = dumperContext.getBatchSize();
- DatabaseType databaseType =
dumperContext.getDataSourceConfig().getDatabaseType();
+ DatabaseType databaseType =
dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType();
if (null != dumperContext.getTransactionIsolation()) {
connection.setTransactionIsolation(dumperContext.getTransactionIsolation());
}
@@ -156,11 +156,11 @@ public final class InventoryDumper extends
AbstractLifecycleExecutor implements
if (!Strings.isNullOrEmpty(dumperContext.getQuerySQL())) {
return dumperContext.getQuerySQL();
}
- String schemaName =
dumperContext.getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
+ String schemaName =
dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
if (!dumperContext.hasUniqueKey()) {
return inventoryDumpSQLBuilder.buildFetchAllSQL(schemaName,
dumperContext.getActualTableName());
}
- PrimaryKeyPosition<?> primaryKeyPosition = (PrimaryKeyPosition<?>)
dumperContext.getPosition();
+ PrimaryKeyPosition<?> primaryKeyPosition = (PrimaryKeyPosition<?>)
dumperContext.getCommonContext().getPosition();
PipelineColumnMetaData firstColumn =
dumperContext.getUniqueKeyColumns().get(0);
Collection<String> columnNames = Collections.singleton("*");
if (PipelineJdbcUtils.isIntegerColumn(firstColumn.getDataType()) ||
PipelineJdbcUtils.isStringColumn(firstColumn.getDataType())) {
@@ -179,7 +179,7 @@ public final class InventoryDumper extends
AbstractLifecycleExecutor implements
return;
}
PipelineColumnMetaData firstColumn =
dumperContext.getUniqueKeyColumns().get(0);
- PrimaryKeyPosition<?> position = (PrimaryKeyPosition<?>)
dumperContext.getPosition();
+ PrimaryKeyPosition<?> position = (PrimaryKeyPosition<?>)
dumperContext.getCommonContext().getPosition();
if (PipelineJdbcUtils.isIntegerColumn(firstColumn.getDataType()) &&
null != position.getBeginValue() && null != position.getEndValue()) {
preparedStatement.setObject(1, position.getBeginValue());
preparedStatement.setObject(2, position.getEndValue());
@@ -212,7 +212,8 @@ public final class InventoryDumper extends
AbstractLifecycleExecutor implements
private IngestPosition newPosition(final ResultSet resultSet) throws
SQLException {
return dumperContext.hasUniqueKey()
- ?
PrimaryKeyPositionFactory.newInstance(resultSet.getObject(dumperContext.getUniqueKeyColumns().get(0).getName()),
((PrimaryKeyPosition<?>) dumperContext.getPosition()).getEndValue())
+ ? PrimaryKeyPositionFactory.newInstance(
+
resultSet.getObject(dumperContext.getUniqueKeyColumns().get(0).getName()),
((PrimaryKeyPosition<?>)
dumperContext.getCommonContext().getPosition()).getEndValue())
: new PlaceholderPosition();
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java
index f1e46de7bee..5610c5bf0d7 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java
@@ -51,7 +51,7 @@ public final class InventoryRecordsCountCalculator {
* @throws SplitPipelineJobByUniqueKeyException if there's exception from
database
*/
public static long getTableRecordsCount(final InventoryDumperContext
dumperContext, final PipelineDataSourceWrapper dataSource) {
- String schemaName =
dumperContext.getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
+ String schemaName =
dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
String actualTableName = dumperContext.getActualTableName();
PipelineCommonSQLBuilder pipelineSQLBuilder = new
PipelineCommonSQLBuilder(dataSource.getDatabaseType());
Optional<String> sql =
pipelineSQLBuilder.buildEstimatedCountSQL(schemaName, actualTableName);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java
index 97159392918..f497583c9f8 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java
@@ -83,7 +83,7 @@ public final class InventoryTaskSplitter {
long startTimeMillis = System.currentTimeMillis();
InventoryIncrementalProcessContext processContext =
jobItemContext.getJobProcessContext();
for (InventoryDumperContext each :
splitInventoryDumperContext(jobItemContext)) {
- AtomicReference<IngestPosition> position = new
AtomicReference<>(each.getPosition());
+ AtomicReference<IngestPosition> position = new
AtomicReference<>(each.getCommonContext().getPosition());
PipelineChannel channel =
PipelineTaskUtils.createInventoryChannel(processContext.getPipelineChannelCreator(),
importerConfig.getBatchSize(), position);
Dumper dumper = new InventoryDumper(each, channel,
sourceDataSource, jobItemContext.getSourceMetaDataLoader());
Importer importer = new SingleChannelConsumerImporter(channel,
importerConfig.getBatchSize(), 3, TimeUnit.SECONDS, jobItemContext.getSink(),
jobItemContext);
@@ -110,12 +110,12 @@ public final class InventoryTaskSplitter {
private Collection<InventoryDumperContext> splitByTable(final
InventoryDumperContext dumperContext) {
Collection<InventoryDumperContext> result = new LinkedList<>();
- dumperContext.getTableNameMapper().getTableNameMap().forEach((key,
value) -> {
- InventoryDumperContext inventoryDumperContext = new
InventoryDumperContext(dumperContext);
+
dumperContext.getCommonContext().getTableNameMapper().getTableNameMap().forEach((key,
value) -> {
+ InventoryDumperContext inventoryDumperContext = new
InventoryDumperContext(dumperContext.getCommonContext());
// use original table name, for metadata loader, since some
database table name case-sensitive
inventoryDumperContext.setActualTableName(key.getOriginal());
inventoryDumperContext.setLogicTableName(value.getOriginal());
- inventoryDumperContext.setPosition(new PlaceholderPosition());
+ inventoryDumperContext.getCommonContext().setPosition(new
PlaceholderPosition());
inventoryDumperContext.setInsertColumnNames(dumperContext.getInsertColumnNames());
inventoryDumperContext.setUniqueKeyColumns(dumperContext.getUniqueKeyColumns());
result.add(inventoryDumperContext);
@@ -126,7 +126,7 @@ public final class InventoryTaskSplitter {
private Collection<InventoryDumperContext> splitByPrimaryKey(final
InventoryDumperContext dumperContext, final InventoryIncrementalJobItemContext
jobItemContext,
final
PipelineDataSourceWrapper dataSource) {
if (null == dumperContext.getUniqueKeyColumns()) {
- String schemaName =
dumperContext.getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
+ String schemaName =
dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
String actualTableName = dumperContext.getActualTableName();
List<PipelineColumnMetaData> uniqueKeyColumns =
PipelineTableMetaDataUtils.getUniqueKeyColumns(schemaName, actualTableName,
jobItemContext.getSourceMetaDataLoader());
dumperContext.setUniqueKeyColumns(uniqueKeyColumns);
@@ -139,8 +139,8 @@ public final class InventoryTaskSplitter {
Collection<IngestPosition> inventoryPositions =
getInventoryPositions(dumperContext, jobItemContext, dataSource);
int i = 0;
for (IngestPosition each : inventoryPositions) {
- InventoryDumperContext splitDumperContext = new
InventoryDumperContext(dumperContext);
- splitDumperContext.setPosition(each);
+ InventoryDumperContext splitDumperContext = new
InventoryDumperContext(dumperContext.getCommonContext());
+ splitDumperContext.getCommonContext().setPosition(each);
splitDumperContext.setShardingItem(i++);
splitDumperContext.setActualTableName(dumperContext.getActualTableName());
splitDumperContext.setLogicTableName(dumperContext.getLogicTableName());
@@ -204,7 +204,7 @@ public final class InventoryTaskSplitter {
String uniqueKey =
dumperContext.getUniqueKeyColumns().get(0).getName();
PipelineCommonSQLBuilder pipelineSQLBuilder = new
PipelineCommonSQLBuilder(jobItemContext.getJobConfig().getSourceDatabaseType());
String sql = pipelineSQLBuilder.buildUniqueKeyMinMaxValuesSQL(
-
dumperContext.getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName()),
dumperContext.getActualTableName(), uniqueKey);
+
dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName()),
dumperContext.getActualTableName(), uniqueKey);
try (
Connection connection = dataSource.getConnection();
Statement statement = connection.createStatement();
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java
index 2f38ea4c72c..e40fa297fe0 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java
@@ -131,8 +131,8 @@ public final class PipelineJobPreparerUtils {
return position.get();
}
}
- DatabaseType databaseType =
dumperContext.getDataSourceConfig().getDatabaseType();
- DataSource dataSource =
dataSourceManager.getDataSource(dumperContext.getDataSourceConfig());
+ DatabaseType databaseType =
dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType();
+ DataSource dataSource =
dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig());
return DatabaseTypedSPILoader.getService(PositionInitializer.class,
databaseType).init(dataSource, dumperContext.getJobId());
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java
index fedb6911f05..3522cc82bba 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java
@@ -43,7 +43,7 @@ public final class PipelineTaskUtils {
* @return inventory task id
*/
public static String generateInventoryTaskId(final InventoryDumperContext
inventoryDumperContext) {
- String result = String.format("%s.%s",
inventoryDumperContext.getDataSourceName(),
inventoryDumperContext.getActualTableName());
+ String result = String.format("%s.%s",
inventoryDumperContext.getCommonContext().getDataSourceName(),
inventoryDumperContext.getActualTableName());
return result + "#" + inventoryDumperContext.getShardingItem();
}
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index ed64a865cf7..705a02200a8 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -80,12 +80,13 @@ public final class MySQLIncrementalDumper extends
AbstractLifecycleExecutor impl
public MySQLIncrementalDumper(final IncrementalDumperContext
dumperContext, final IngestPosition binlogPosition,
final PipelineChannel channel, final
PipelineTableMetaDataLoader metaDataLoader) {
- Preconditions.checkArgument(dumperContext.getDataSourceConfig()
instanceof StandardPipelineDataSourceConfiguration, "MySQLBinlogDumper only
support StandardPipelineDataSourceConfiguration");
+
Preconditions.checkArgument(dumperContext.getCommonContext().getDataSourceConfig()
instanceof StandardPipelineDataSourceConfiguration,
+ "MySQLBinlogDumper only support
StandardPipelineDataSourceConfiguration");
this.dumperContext = dumperContext;
this.binlogPosition = (BinlogPosition) binlogPosition;
this.channel = channel;
this.metaDataLoader = metaDataLoader;
- YamlJdbcConfiguration jdbcConfig =
((StandardPipelineDataSourceConfiguration)
dumperContext.getDataSourceConfig()).getJdbcConfig();
+ YamlJdbcConfiguration jdbcConfig =
((StandardPipelineDataSourceConfiguration)
dumperContext.getCommonContext().getDataSourceConfig()).getJdbcConfig();
ConnectionPropertiesParser parser =
DatabaseTypedSPILoader.getService(ConnectionPropertiesParser.class,
TypedSPILoader.getService(DatabaseType.class, "MySQL"));
ConnectionProperties connectionProps =
parser.parse(jdbcConfig.getUrl(), null, null);
ConnectInfo connectInfo = new ConnectInfo(generateServerId(),
connectionProps.getHostname(), connectionProps.getPort(),
jdbcConfig.getUsername(), jdbcConfig.getPassword());
@@ -132,7 +133,7 @@ public final class MySQLIncrementalDumper extends
AbstractLifecycleExecutor impl
return Collections.singletonList(createPlaceholderRecord(event));
}
AbstractRowsEvent rowsEvent = (AbstractRowsEvent) event;
- if (!rowsEvent.getDatabaseName().equals(catalog) ||
!dumperContext.getTableNameMapper().containsTable(rowsEvent.getTableName())) {
+ if (!rowsEvent.getDatabaseName().equals(catalog) ||
!dumperContext.getCommonContext().getTableNameMapper().containsTable(rowsEvent.getTableName()))
{
return Collections.singletonList(createPlaceholderRecord(event));
}
PipelineTableMetaData tableMetaData =
getPipelineTableMetaData(rowsEvent.getTableName());
@@ -155,8 +156,8 @@ public final class MySQLIncrementalDumper extends
AbstractLifecycleExecutor impl
}
private PipelineTableMetaData getPipelineTableMetaData(final String
actualTableName) {
- LogicTableName logicTableName =
dumperContext.getTableNameMapper().getLogicTableName(actualTableName);
- return
metaDataLoader.getTableMetaData(dumperContext.getTableAndSchemaNameMapper().getSchemaName(logicTableName),
actualTableName);
+ LogicTableName logicTableName =
dumperContext.getCommonContext().getTableNameMapper().getLogicTableName(actualTableName);
+ return
metaDataLoader.getTableMetaData(dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(logicTableName),
actualTableName);
}
private List<DataRecord> handleWriteRowsEvent(final WriteRowsEvent event,
final PipelineTableMetaData tableMetaData) {
@@ -217,7 +218,7 @@ public final class MySQLIncrementalDumper extends
AbstractLifecycleExecutor impl
}
private DataRecord createDataRecord(final String type, final
AbstractRowsEvent rowsEvent, final int columnCount) {
- String tableName =
dumperContext.getTableNameMapper().getLogicTableName(rowsEvent.getTableName()).getOriginal();
+ String tableName =
dumperContext.getCommonContext().getTableNameMapper().getLogicTableName(rowsEvent.getTableName()).getOriginal();
IngestPosition position = new BinlogPosition(rowsEvent.getFileName(),
rowsEvent.getPosition(), rowsEvent.getServerId());
DataRecord result = new DataRecord(type, tableName, position,
columnCount);
result.setActualTableName(rowsEvent.getTableName());
diff --git
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
index 7d6ee11ad2f..5ed2365d713 100644
---
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
+++
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.data.pipeline.mysql.ingest;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.DumperCommonContext;
import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper;
import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
@@ -81,7 +82,7 @@ class MySQLIncrementalDumperTest {
void setUp() throws SQLException {
IncrementalDumperContext dumperContext = createDumperContext();
initTableData(dumperContext);
- dumperContext.setDataSourceConfig(new
StandardPipelineDataSourceConfiguration("jdbc:mock://127.0.0.1:3306/test",
"root", "root"));
+ dumperContext.getCommonContext().setDataSourceConfig(new
StandardPipelineDataSourceConfiguration("jdbc:mock://127.0.0.1:3306/test",
"root", "root"));
PipelineTableMetaDataLoader metaDataLoader =
mock(PipelineTableMetaDataLoader.class);
SimpleMemoryPipelineChannel channel = new
SimpleMemoryPipelineChannel(10000, new EmptyAckCallback());
incrementalDumper = new MySQLIncrementalDumper(dumperContext, new
BinlogPosition("binlog-000001", 4L, 0L), channel, metaDataLoader);
@@ -90,17 +91,17 @@ class MySQLIncrementalDumperTest {
}
private IncrementalDumperContext createDumperContext() {
- IncrementalDumperContext result = new IncrementalDumperContext();
- result.setDataSourceConfig(new
StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL",
"root", "root"));
- result.setTableNameMapper(new
ActualAndLogicTableNameMapper(Collections.singletonMap(new
ActualTableName("t_order"), new LogicTableName("t_order"))));
- result.setTableAndSchemaNameMapper(new
TableAndSchemaNameMapper(Collections.emptyMap()));
- return result;
+ DumperCommonContext commonContext = new DumperCommonContext();
+ commonContext.setDataSourceConfig(new
StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL",
"root", "root"));
+ commonContext.setTableNameMapper(new
ActualAndLogicTableNameMapper(Collections.singletonMap(new
ActualTableName("t_order"), new LogicTableName("t_order"))));
+ commonContext.setTableAndSchemaNameMapper(new
TableAndSchemaNameMapper(Collections.emptyMap()));
+ return new IncrementalDumperContext(commonContext);
}
private void initTableData(final IncrementalDumperContext dumperContext)
throws SQLException {
try (
PipelineDataSourceManager dataSourceManager = new
DefaultPipelineDataSourceManager();
- PipelineDataSourceWrapper dataSource =
dataSourceManager.getDataSource(dumperContext.getDataSourceConfig());
+ PipelineDataSourceWrapper dataSource =
dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig());
Connection connection = dataSource.getConnection();
Statement statement = connection.createStatement()) {
statement.execute("DROP TABLE IF EXISTS t_order");
diff --git
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
index dbc2a573ba6..dfa912c4a0e 100644
---
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
+++
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
@@ -74,7 +74,7 @@ public final class OpenGaussWALDumper extends
AbstractLifecycleExecutor implemen
public OpenGaussWALDumper(final IncrementalDumperContext dumperContext,
final IngestPosition position,
final PipelineChannel channel, final
PipelineTableMetaDataLoader metaDataLoader) {
-
ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperContext.getDataSourceConfig().getClass()),
+
ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperContext.getCommonContext().getDataSourceConfig().getClass()),
() -> new
UnsupportedSQLOperationException("PostgreSQLWALDumper only support
PipelineDataSourceConfiguration"));
this.dumperContext = dumperContext;
walPosition = new AtomicReference<>((WALPosition) position);
@@ -137,7 +137,7 @@ public final class OpenGaussWALDumper extends
AbstractLifecycleExecutor implemen
}
private PgConnection getReplicationConnectionUnwrap() throws SQLException {
- return
logicalReplication.createConnection((StandardPipelineDataSourceConfiguration)
dumperContext.getDataSourceConfig()).unwrap(PgConnection.class);
+ return
logicalReplication.createConnection((StandardPipelineDataSourceConfiguration)
dumperContext.getCommonContext().getDataSourceConfig()).unwrap(PgConnection.class);
}
private void processEventWithTX(final AbstractWALEvent event) {
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
index 23a0d808b08..b23686cbe4e 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
@@ -76,7 +76,7 @@ public final class PostgreSQLWALDumper extends
AbstractLifecycleExecutor impleme
public PostgreSQLWALDumper(final IncrementalDumperContext dumperContext,
final IngestPosition position,
final PipelineChannel channel, final
PipelineTableMetaDataLoader metaDataLoader) {
-
ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperContext.getDataSourceConfig().getClass()),
+
ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperContext.getCommonContext().getDataSourceConfig().getClass()),
() -> new
UnsupportedSQLOperationException("PostgreSQLWALDumper only support
PipelineDataSourceConfiguration"));
this.dumperContext = dumperContext;
walPosition = new AtomicReference<>((WALPosition) position);
@@ -111,7 +111,7 @@ public final class PostgreSQLWALDumper extends
AbstractLifecycleExecutor impleme
private void dump() throws SQLException {
// TODO use unified PgConnection
try (
- Connection connection =
logicalReplication.createConnection((StandardPipelineDataSourceConfiguration)
dumperContext.getDataSourceConfig());
+ Connection connection =
logicalReplication.createConnection((StandardPipelineDataSourceConfiguration)
dumperContext.getCommonContext().getDataSourceConfig());
PGReplicationStream stream =
logicalReplication.createReplicationStream(connection,
PostgreSQLPositionInitializer.getUniqueSlotName(connection,
dumperContext.getJobId()),
walPosition.get().getLogSequenceNumber())) {
PostgreSQLTimestampUtils utils = new
PostgreSQLTimestampUtils(connection.unwrap(PgConnection.class).getTimestampUtils());
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
index cc2dca00d86..389fb0e4593 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
@@ -80,7 +80,7 @@ public final class WALEventConverter {
private boolean filter(final AbstractWALEvent event) {
if (event instanceof AbstractRowEvent) {
AbstractRowEvent rowEvent = (AbstractRowEvent) event;
- return
!dumperContext.getTableNameMapper().containsTable(rowEvent.getTableName());
+ return
!dumperContext.getCommonContext().getTableNameMapper().containsTable(rowEvent.getTableName());
}
return false;
}
@@ -90,8 +90,8 @@ public final class WALEventConverter {
}
private PipelineTableMetaData getPipelineTableMetaData(final String
actualTableName) {
- LogicTableName logicTableName =
dumperContext.getTableNameMapper().getLogicTableName(actualTableName);
- return
metaDataLoader.getTableMetaData(dumperContext.getTableAndSchemaNameMapper().getSchemaName(logicTableName),
actualTableName);
+ LogicTableName logicTableName =
dumperContext.getCommonContext().getTableNameMapper().getLogicTableName(actualTableName);
+ return
metaDataLoader.getTableMetaData(dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(logicTableName),
actualTableName);
}
private DataRecord handleWriteRowEvent(final WriteRowEvent writeRowEvent,
final PipelineTableMetaData tableMetaData) {
@@ -118,7 +118,7 @@ public final class WALEventConverter {
}
private DataRecord createDataRecord(final String type, final
AbstractRowEvent rowsEvent, final int columnCount) {
- String tableName =
dumperContext.getTableNameMapper().getLogicTableName(rowsEvent.getTableName()).getOriginal();
+ String tableName =
dumperContext.getCommonContext().getTableNameMapper().getLogicTableName(rowsEvent.getTableName()).getOriginal();
DataRecord result = new DataRecord(type, rowsEvent.getSchemaName(),
tableName, new WALPosition(rowsEvent.getLogSequenceNumber()), columnCount);
result.setActualTableName(rowsEvent.getTableName());
result.setCsn(rowsEvent.getCsn());
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
index 6cbb3233bac..4a033b76e63 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.data.pipeline.postgresql.ingest;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.DumperCommonContext;
import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper;
import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
@@ -90,7 +91,8 @@ class PostgreSQLWALDumperTest {
String password = "root";
createTable(jdbcUrl, username, password);
dumperContext = createDumperContext(jdbcUrl, username, password);
- walDumper = new PostgreSQLWALDumper(dumperContext, position, channel,
new
StandardPipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperContext.getDataSourceConfig())));
+ walDumper = new PostgreSQLWALDumper(dumperContext, position, channel,
+ new
StandardPipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig())));
}
private void createTable(final String jdbcUrl, final String username,
final String password) {
@@ -105,11 +107,12 @@ class PostgreSQLWALDumperTest {
}
private IncrementalDumperContext createDumperContext(final String jdbcUrl,
final String username, final String password) {
- IncrementalDumperContext result = new IncrementalDumperContext();
+ DumperCommonContext commonContext = new DumperCommonContext();
+ commonContext.setDataSourceConfig(new
StandardPipelineDataSourceConfiguration(jdbcUrl, username, password));
+ commonContext.setTableNameMapper(new
ActualAndLogicTableNameMapper(Collections.singletonMap(new
ActualTableName("t_order_0"), new LogicTableName("t_order"))));
+ commonContext.setTableAndSchemaNameMapper(new
TableAndSchemaNameMapper(Collections.emptyMap()));
+ IncrementalDumperContext result = new
IncrementalDumperContext(commonContext);
result.setJobId("0101123456");
- result.setDataSourceConfig(new
StandardPipelineDataSourceConfiguration(jdbcUrl, username, password));
- result.setTableNameMapper(new
ActualAndLogicTableNameMapper(Collections.singletonMap(new
ActualTableName("t_order_0"), new LogicTableName("t_order"))));
- result.setTableAndSchemaNameMapper(new
TableAndSchemaNameMapper(Collections.emptyMap()));
return result;
}
@@ -120,7 +123,7 @@ class PostgreSQLWALDumperTest {
@Test
void assertStart() throws SQLException, ReflectiveOperationException {
- StandardPipelineDataSourceConfiguration dataSourceConfig =
(StandardPipelineDataSourceConfiguration) dumperContext.getDataSourceConfig();
+ StandardPipelineDataSourceConfiguration dataSourceConfig =
(StandardPipelineDataSourceConfiguration)
dumperContext.getCommonContext().getDataSourceConfig();
try {
Plugins.getMemberAccessor().set(PostgreSQLWALDumper.class.getDeclaredField("logicalReplication"),
walDumper, logicalReplication);
when(logicalReplication.createConnection(dataSourceConfig)).thenReturn(pgConnection);
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
index 15895cd370b..29c625de0a7 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.DumperCommonContext;
import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper;
import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
@@ -79,23 +80,23 @@ class WALEventConverterTest {
void setUp() throws SQLException {
IncrementalDumperContext dumperContext = mockDumperContext();
PipelineDataSourceManager dataSourceManager = new
DefaultPipelineDataSourceManager();
- walEventConverter = new WALEventConverter(dumperContext, new
StandardPipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperContext.getDataSourceConfig())));
+ walEventConverter = new WALEventConverter(dumperContext, new
StandardPipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig())));
initTableData(dumperContext);
pipelineTableMetaData = new PipelineTableMetaData("t_order",
mockOrderColumnsMetaDataMap(), Collections.emptyList());
}
private IncrementalDumperContext mockDumperContext() {
- IncrementalDumperContext result = new IncrementalDumperContext();
- result.setDataSourceConfig(new
StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=PostgreSQL",
"root", "root"));
- result.setTableNameMapper(new
ActualAndLogicTableNameMapper(Collections.singletonMap(new
ActualTableName("t_order"), new LogicTableName("t_order"))));
- result.setTableAndSchemaNameMapper(new
TableAndSchemaNameMapper(Collections.emptyMap()));
- return result;
+ DumperCommonContext commonContext = new DumperCommonContext();
+ commonContext.setDataSourceConfig(new
StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=PostgreSQL",
"root", "root"));
+ commonContext.setTableNameMapper(new
ActualAndLogicTableNameMapper(Collections.singletonMap(new
ActualTableName("t_order"), new LogicTableName("t_order"))));
+ commonContext.setTableAndSchemaNameMapper(new
TableAndSchemaNameMapper(Collections.emptyMap()));
+ return new IncrementalDumperContext(commonContext);
}
private void initTableData(final IncrementalDumperContext dumperContext)
throws SQLException {
try (
PipelineDataSourceManager dataSourceManager = new
DefaultPipelineDataSourceManager();
- PipelineDataSourceWrapper dataSource =
dataSourceManager.getDataSource(dumperContext.getDataSourceConfig());
+ PipelineDataSourceWrapper dataSource =
dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig());
Connection connection = dataSource.getConnection();
Statement statement = connection.createStatement()) {
statement.execute("DROP TABLE IF EXISTS t_order");
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 85f2aea4206..d25feef3a3b 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -26,6 +26,7 @@ import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDat
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfigurationSwapper;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.DumperCommonContext;
import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper;
import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
@@ -193,7 +194,7 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
final IncrementalDumperContext incrementalDumperContext) throws
SQLException {
InventoryIncrementalJobItemProgress result = new
InventoryIncrementalJobItemProgress();
result.setSourceDatabaseType(jobConfig.getSourceDatabaseType());
- result.setDataSourceName(incrementalDumperContext.getDataSourceName());
+
result.setDataSourceName(incrementalDumperContext.getCommonContext().getDataSourceName());
IncrementalTaskProgress incrementalTaskProgress = new
IncrementalTaskProgress(PipelineJobPreparerUtils.getIncrementalPosition(null,
incrementalDumperContext, dataSourceManager));
result.setIncremental(new
JobItemIncrementalTasksProgress(incrementalTaskProgress));
return result;
@@ -282,12 +283,13 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
dataNodeLine.getEntries().forEach(each ->
each.getDataNodes().forEach(node -> tableNameMap.put(new
ActualTableName(node.getTableName()), new
LogicTableName(each.getLogicTableName()))));
String dataSourceName =
dataNodeLine.getEntries().iterator().next().getDataNodes().iterator().next().getDataSourceName();
StandardPipelineDataSourceConfiguration actualDataSourceConfig =
jobConfig.getDataSourceConfig().getActualDataSourceConfiguration(dataSourceName);
- IncrementalDumperContext result = new IncrementalDumperContext();
+ DumperCommonContext commonContext = new DumperCommonContext();
+ commonContext.setDataSourceName(dataSourceName);
+ commonContext.setDataSourceConfig(actualDataSourceConfig);
+ commonContext.setTableNameMapper(new
ActualAndLogicTableNameMapper(tableNameMap));
+ commonContext.setTableAndSchemaNameMapper(tableAndSchemaNameMapper);
+ IncrementalDumperContext result = new
IncrementalDumperContext(commonContext);
result.setJobId(jobConfig.getJobId());
- result.setDataSourceName(dataSourceName);
- result.setDataSourceConfig(actualDataSourceConfig);
- result.setTableNameMapper(new
ActualAndLogicTableNameMapper(tableNameMap));
- result.setTableAndSchemaNameMapper(tableAndSchemaNameMapper);
result.setDecodeWithTX(jobConfig.isDecodeWithTX());
return result;
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java
index 23bc51e1f51..47e6acba40c 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java
@@ -80,7 +80,7 @@ public final class CDCJobItemContext implements
InventoryIncrementalJobItemConte
@Override
protected PipelineDataSourceWrapper initialize() {
- return
dataSourceManager.getDataSource(taskConfig.getDumperContext().getDataSourceConfig());
+ return
dataSourceManager.getDataSource(taskConfig.getDumperContext().getCommonContext().getDataSourceConfig());
}
};
@@ -99,7 +99,7 @@ public final class CDCJobItemContext implements
InventoryIncrementalJobItemConte
@Override
public String getDataSourceName() {
- return taskConfig.getDumperContext().getDataSourceName();
+ return
taskConfig.getDumperContext().getCommonContext().getDataSourceName();
}
@Override
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index 0cf3ea7d135..3d0f982de59 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -104,7 +104,8 @@ public final class CDCJobPreparer {
CDCTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
JobItemIncrementalTasksProgress initIncremental = null ==
jobItemContext.getInitProgress() ? null :
jobItemContext.getInitProgress().getIncremental();
try {
-
taskConfig.getDumperContext().setPosition(PipelineJobPreparerUtils.getIncrementalPosition(initIncremental,
taskConfig.getDumperContext(), jobItemContext.getDataSourceManager()));
+ taskConfig.getDumperContext().getCommonContext().setPosition(
+
PipelineJobPreparerUtils.getIncrementalPosition(initIncremental,
taskConfig.getDumperContext(), jobItemContext.getDataSourceManager()));
} catch (final SQLException ex) {
throw new
PrepareJobWithGetBinlogPositionException(jobItemContext.getJobId(), ex);
}
@@ -115,19 +116,19 @@ public final class CDCJobPreparer {
CDCTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
ImporterConfiguration importerConfig = taskConfig.getImporterConfig();
CDCProcessContext processContext =
jobItemContext.getJobProcessContext();
- for (InventoryDumperContext each : new
InventoryTaskSplitter(jobItemContext.getSourceDataSource(), new
InventoryDumperContext(taskConfig.getDumperContext()), importerConfig)
+ for (InventoryDumperContext each : new
InventoryTaskSplitter(jobItemContext.getSourceDataSource(), new
InventoryDumperContext(taskConfig.getDumperContext().getCommonContext()),
importerConfig)
.splitInventoryDumperContext(jobItemContext)) {
- AtomicReference<IngestPosition> position = new
AtomicReference<>(each.getPosition());
+ AtomicReference<IngestPosition> position = new
AtomicReference<>(each.getCommonContext().getPosition());
PipelineChannel channel =
PipelineTaskUtils.createInventoryChannel(processContext.getPipelineChannelCreator(),
importerConfig.getBatchSize(), position);
channelProgressPairs.add(new CDCChannelProgressPair(channel,
jobItemContext));
Dumper dumper = new InventoryDumper(each, channel,
jobItemContext.getSourceDataSource(), jobItemContext.getSourceMetaDataLoader());
Importer importer = importerUsed.get() ? null
: new CDCImporter(channelProgressPairs,
importerConfig.getBatchSize(), 3, TimeUnit.SECONDS, jobItemContext.getSink(),
- needSorting(ImporterType.INVENTORY,
hasGlobalCSN(taskConfig.getDumperContext().getDataSourceConfig().getDatabaseType())),
+ needSorting(ImporterType.INVENTORY,
hasGlobalCSN(taskConfig.getDumperContext().getCommonContext().getDataSourceConfig().getDatabaseType())),
importerConfig.getRateLimitAlgorithm());
jobItemContext.getInventoryTasks().add(new
CDCInventoryTask(PipelineTaskUtils.generateInventoryTaskId(each),
processContext.getInventoryDumperExecuteEngine(),
processContext.getInventoryImporterExecuteEngine(),
dumper, importer, position));
- if (!(each.getPosition() instanceof FinishedPosition)) {
+ if (!(each.getCommonContext().getPosition() instanceof
FinishedPosition)) {
importerUsed.set(true);
}
}
@@ -146,16 +147,17 @@ public final class CDCJobPreparer {
CDCTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
IncrementalDumperContext dumperContext = taskConfig.getDumperContext();
ImporterConfiguration importerConfig = taskConfig.getImporterConfig();
- IncrementalTaskProgress taskProgress =
PipelineTaskUtils.createIncrementalTaskProgress(dumperContext.getPosition(),
jobItemContext.getInitProgress());
+ IncrementalTaskProgress taskProgress =
PipelineTaskUtils.createIncrementalTaskProgress(dumperContext.getCommonContext().getPosition(),
jobItemContext.getInitProgress());
PipelineChannel channel =
PipelineTaskUtils.createIncrementalChannel(importerConfig.getConcurrency(),
jobItemContext.getJobProcessContext().getPipelineChannelCreator(),
taskProgress);
channelProgressPairs.add(new CDCChannelProgressPair(channel,
jobItemContext));
- Dumper dumper =
DatabaseTypedSPILoader.getService(IncrementalDumperCreator.class,
dumperContext.getDataSourceConfig().getDatabaseType())
- .createIncrementalDumper(dumperContext,
dumperContext.getPosition(), channel, jobItemContext.getSourceMetaDataLoader());
+ Dumper dumper =
DatabaseTypedSPILoader.getService(IncrementalDumperCreator.class,
dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType())
+ .createIncrementalDumper(dumperContext,
dumperContext.getCommonContext().getPosition(), channel,
jobItemContext.getSourceMetaDataLoader());
boolean needSorting = needSorting(ImporterType.INCREMENTAL,
hasGlobalCSN(importerConfig.getDataSourceConfig().getDatabaseType()));
Importer importer = importerUsed.get() ? null
: new CDCImporter(channelProgressPairs,
importerConfig.getBatchSize(), 300, TimeUnit.MILLISECONDS,
jobItemContext.getSink(), needSorting,
importerConfig.getRateLimitAlgorithm());
- PipelineTask incrementalTask = new
CDCIncrementalTask(dumperContext.getDataSourceName(),
jobItemContext.getJobProcessContext().getIncrementalExecuteEngine(), dumper,
importer, taskProgress);
+ PipelineTask incrementalTask = new CDCIncrementalTask(
+ dumperContext.getCommonContext().getDataSourceName(),
jobItemContext.getJobProcessContext().getIncrementalExecuteEngine(), dumper,
importer, taskProgress);
jobItemContext.getIncrementalTasks().add(incrementalTask);
importerUsed.set(true);
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index 0be975696ed..3a6447fa4fa 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -264,12 +264,14 @@ public final class MigrationJobAPI extends
AbstractInventoryIncrementalJobAPIImp
MigrationJobConfiguration jobConfig = (MigrationJobConfiguration)
pipelineJobConfig;
IncrementalDumperContext incrementalDumperContext = new
MigrationIncrementalDumperContextCreator(
jobConfig).createDumperContext(jobConfig.getJobShardingDataNodes().get(jobShardingItem));
- CreateTableConfiguration createTableConfig =
buildCreateTableConfiguration(jobConfig,
incrementalDumperContext.getTableAndSchemaNameMapper());
+ CreateTableConfiguration createTableConfig =
buildCreateTableConfiguration(jobConfig,
incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper());
Set<LogicTableName> targetTableNames =
jobConfig.getTargetTableNames().stream().map(LogicTableName::new).collect(Collectors.toSet());
Map<LogicTableName, Set<String>> shardingColumnsMap = new
ShardingColumnsExtractor().getShardingColumnsMap(
((ShardingSpherePipelineDataSourceConfiguration)
jobConfig.getTarget()).getRootConfig().getRules(), targetTableNames);
- ImporterConfiguration importerConfig =
buildImporterConfiguration(jobConfig, pipelineProcessConfig,
shardingColumnsMap, incrementalDumperContext.getTableAndSchemaNameMapper());
- MigrationTaskConfiguration result = new
MigrationTaskConfiguration(incrementalDumperContext.getDataSourceName(),
createTableConfig, incrementalDumperContext, importerConfig);
+ ImporterConfiguration importerConfig = buildImporterConfiguration(
+ jobConfig, pipelineProcessConfig, shardingColumnsMap,
incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper());
+ MigrationTaskConfiguration result = new MigrationTaskConfiguration(
+
incrementalDumperContext.getCommonContext().getDataSourceName(),
createTableConfig, incrementalDumperContext, importerConfig);
log.info("buildTaskConfiguration, result={}", result);
return result;
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java
index 659b15995eb..764b22dec92 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java
@@ -18,6 +18,7 @@
package
org.apache.shardingsphere.data.pipeline.scenario.migration.config.ingest;
import lombok.RequiredArgsConstructor;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.DumperCommonContext;
import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper;
import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
@@ -49,12 +50,13 @@ public final class MigrationIncrementalDumperContextCreator
implements Increment
private IncrementalDumperContext buildDumperContext(final String jobId,
final String dataSourceName, final PipelineDataSourceConfiguration
sourceDataSource,
final
Map<ActualTableName, LogicTableName> tableNameMap, final
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
- IncrementalDumperContext result = new IncrementalDumperContext();
+ DumperCommonContext commonContext = new DumperCommonContext();
+ commonContext.setDataSourceName(dataSourceName);
+ commonContext.setDataSourceConfig(sourceDataSource);
+ commonContext.setTableNameMapper(new
ActualAndLogicTableNameMapper(tableNameMap));
+ commonContext.setTableAndSchemaNameMapper(tableAndSchemaNameMapper);
+ IncrementalDumperContext result = new
IncrementalDumperContext(commonContext);
result.setJobId(jobId);
- result.setDataSourceName(dataSourceName);
- result.setDataSourceConfig(sourceDataSource);
- result.setTableNameMapper(new
ActualAndLogicTableNameMapper(tableNameMap));
- result.setTableAndSchemaNameMapper(tableAndSchemaNameMapper);
return result;
}
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
index 1fe9a79b3d9..a4eaa8330c7 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
@@ -80,7 +80,7 @@ public final class MigrationJobItemContext implements
InventoryIncrementalJobIte
@Override
protected PipelineDataSourceWrapper initialize() {
- return
dataSourceManager.getDataSource(taskConfig.getDumperContext().getDataSourceConfig());
+ return
dataSourceManager.getDataSource(taskConfig.getDumperContext().getCommonContext().getDataSourceConfig());
}
};
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
index c882dd924a1..a017a3559d2 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
@@ -88,7 +88,8 @@ public final class MigrationJobPreparer {
* @throws SQLException SQL exception
*/
public void prepare(final MigrationJobItemContext jobItemContext) throws
SQLException {
-
ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(jobItemContext.getTaskConfig().getDumperContext().getDataSourceConfig().getClass()),
+
ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(
+
jobItemContext.getTaskConfig().getDumperContext().getCommonContext().getDataSourceConfig().getClass()),
() -> new UnsupportedSQLOperationException("Migration
inventory dumper only support StandardPipelineDataSourceConfiguration"));
PipelineJobPreparerUtils.checkSourceDataSource(jobItemContext.getJobConfig().getSourceDatabaseType(),
Collections.singleton(jobItemContext.getSourceDataSource()));
if (jobItemContext.isStopping()) {
@@ -169,14 +170,15 @@ public final class MigrationJobPreparer {
MigrationTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
JobItemIncrementalTasksProgress initIncremental = null ==
jobItemContext.getInitProgress() ? null :
jobItemContext.getInitProgress().getIncremental();
try {
-
taskConfig.getDumperContext().setPosition(PipelineJobPreparerUtils.getIncrementalPosition(initIncremental,
taskConfig.getDumperContext(), jobItemContext.getDataSourceManager()));
+ taskConfig.getDumperContext().getCommonContext().setPosition(
+
PipelineJobPreparerUtils.getIncrementalPosition(initIncremental,
taskConfig.getDumperContext(), jobItemContext.getDataSourceManager()));
} catch (final SQLException ex) {
throw new
PrepareJobWithGetBinlogPositionException(jobItemContext.getJobId(), ex);
}
}
private void initInventoryTasks(final MigrationJobItemContext
jobItemContext) {
- InventoryDumperContext inventoryDumperContext = new
InventoryDumperContext(jobItemContext.getTaskConfig().getDumperContext());
+ InventoryDumperContext inventoryDumperContext = new
InventoryDumperContext(jobItemContext.getTaskConfig().getDumperContext().getCommonContext());
InventoryTaskSplitter inventoryTaskSplitter = new
InventoryTaskSplitter(jobItemContext.getSourceDataSource(),
inventoryDumperContext, jobItemContext.getTaskConfig().getImporterConfig());
jobItemContext.getInventoryTasks().addAll(inventoryTaskSplitter.splitInventoryData(jobItemContext));
}
@@ -188,12 +190,12 @@ public final class MigrationJobPreparer {
IncrementalDumperContext dumperContext = taskConfig.getDumperContext();
ImporterConfiguration importerConfig = taskConfig.getImporterConfig();
ExecuteEngine incrementalExecuteEngine =
jobItemContext.getJobProcessContext().getIncrementalExecuteEngine();
- IncrementalTaskProgress taskProgress =
PipelineTaskUtils.createIncrementalTaskProgress(dumperContext.getPosition(),
jobItemContext.getInitProgress());
+ IncrementalTaskProgress taskProgress =
PipelineTaskUtils.createIncrementalTaskProgress(dumperContext.getCommonContext().getPosition(),
jobItemContext.getInitProgress());
PipelineChannel channel =
PipelineTaskUtils.createIncrementalChannel(importerConfig.getConcurrency(),
pipelineChannelCreator, taskProgress);
- Dumper dumper =
DatabaseTypedSPILoader.getService(IncrementalDumperCreator.class,
dumperContext.getDataSourceConfig().getDatabaseType())
- .createIncrementalDumper(dumperContext,
dumperContext.getPosition(), channel, sourceMetaDataLoader);
+ Dumper dumper =
DatabaseTypedSPILoader.getService(IncrementalDumperCreator.class,
dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType())
+ .createIncrementalDumper(dumperContext,
dumperContext.getCommonContext().getPosition(), channel, sourceMetaDataLoader);
Collection<Importer> importers = createImporters(importerConfig,
jobItemContext.getSink(), channel, jobItemContext);
- PipelineTask incrementalTask = new
IncrementalTask(dumperContext.getDataSourceName(), incrementalExecuteEngine,
dumper, importers, taskProgress);
+ PipelineTask incrementalTask = new
IncrementalTask(dumperContext.getCommonContext().getDataSourceName(),
incrementalExecuteEngine, dumper, importers, taskProgress);
jobItemContext.getIncrementalTasks().add(incrementalTask);
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java
index 74ddfef8e14..7229363eb7e 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java
@@ -177,8 +177,8 @@ class GovernanceRepositoryAPIImplTest {
}
private InventoryTask mockInventoryTask(final MigrationTaskConfiguration
taskConfig) {
- InventoryDumperContext dumperContext = new
InventoryDumperContext(taskConfig.getDumperContext());
- dumperContext.setPosition(new PlaceholderPosition());
+ InventoryDumperContext dumperContext = new
InventoryDumperContext(taskConfig.getDumperContext().getCommonContext());
+ dumperContext.getCommonContext().setPosition(new
PlaceholderPosition());
dumperContext.setActualTableName("t_order");
dumperContext.setLogicTableName("t_order");
dumperContext.setUniqueKeyColumns(Collections.singletonList(PipelineContextUtils.mockOrderIdColumnMetaData()));
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
index 6a0105b71f3..9c1f051695a 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
@@ -66,7 +66,7 @@ class InventoryTaskSplitterTest {
@BeforeEach
void setUp() {
initJobItemContext();
- dumperContext = new
InventoryDumperContext(jobItemContext.getTaskConfig().getDumperContext());
+ dumperContext = new
InventoryDumperContext(jobItemContext.getTaskConfig().getDumperContext().getCommonContext());
PipelineColumnMetaData columnMetaData = new PipelineColumnMetaData(1,
"order_id", Types.INTEGER, "int", false, true, true);
dumperContext.setUniqueKeyColumns(Collections.singletonList(columnMetaData));
inventoryTaskSplitter = new
InventoryTaskSplitter(jobItemContext.getSourceDataSource(), dumperContext,
jobItemContext.getTaskConfig().getImporterConfig());
@@ -85,7 +85,7 @@ class InventoryTaskSplitterTest {
@Test
void assertSplitInventoryDataWithEmptyTable() throws SQLException {
- initEmptyTablePrimaryEnvironment(dumperContext);
+ initEmptyTablePrimaryEnvironment(dumperContext.getCommonContext());
List<InventoryTask> actual =
inventoryTaskSplitter.splitInventoryData(jobItemContext);
assertThat(actual.size(), is(1));
InventoryTask task = actual.get(0);
@@ -95,7 +95,7 @@ class InventoryTaskSplitterTest {
@Test
void assertSplitInventoryDataWithIntPrimary() throws SQLException {
- initIntPrimaryEnvironment(dumperContext);
+ initIntPrimaryEnvironment(dumperContext.getCommonContext());
List<InventoryTask> actual =
inventoryTaskSplitter.splitInventoryData(jobItemContext);
assertThat(actual.size(), is(10));
InventoryTask task = actual.get(9);
@@ -105,7 +105,7 @@ class InventoryTaskSplitterTest {
@Test
void assertSplitInventoryDataWithCharPrimary() throws SQLException {
- initCharPrimaryEnvironment(dumperContext);
+ initCharPrimaryEnvironment(dumperContext.getCommonContext());
List<InventoryTask> actual =
inventoryTaskSplitter.splitInventoryData(jobItemContext);
assertThat(actual.size(), is(1));
assertThat(actual.get(0).getTaskId(), is("ds_0.t_order#0"));
@@ -116,15 +116,15 @@ class InventoryTaskSplitterTest {
@Test
void assertSplitInventoryDataWithoutPrimaryButWithUniqueIndex() throws
SQLException {
- initUniqueIndexOnNotNullColumnEnvironment(dumperContext);
+
initUniqueIndexOnNotNullColumnEnvironment(dumperContext.getCommonContext());
List<InventoryTask> actual =
inventoryTaskSplitter.splitInventoryData(jobItemContext);
assertThat(actual.size(), is(1));
}
@Test
void assertSplitInventoryDataWithMultipleColumnsKey() throws SQLException {
- initUnionPrimaryEnvironment(dumperContext);
- try (PipelineDataSourceWrapper dataSource =
dataSourceManager.getDataSource(dumperContext.getDataSourceConfig())) {
+ initUnionPrimaryEnvironment(dumperContext.getCommonContext());
+ try (PipelineDataSourceWrapper dataSource =
dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig()))
{
List<PipelineColumnMetaData> uniqueKeyColumns =
PipelineTableMetaDataUtils.getUniqueKeyColumns(null, "t_order", new
StandardPipelineTableMetaDataLoader(dataSource));
dumperContext.setUniqueKeyColumns(uniqueKeyColumns);
List<InventoryTask> actual =
inventoryTaskSplitter.splitInventoryData(jobItemContext);
@@ -134,8 +134,8 @@ class InventoryTaskSplitterTest {
@Test
void assertSplitInventoryDataWithoutPrimaryAndUniqueIndex() throws
SQLException {
- initNoPrimaryEnvironment(dumperContext);
- try (PipelineDataSourceWrapper dataSource =
dataSourceManager.getDataSource(dumperContext.getDataSourceConfig())) {
+ initNoPrimaryEnvironment(dumperContext.getCommonContext());
+ try (PipelineDataSourceWrapper dataSource =
dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig()))
{
List<PipelineColumnMetaData> uniqueKeyColumns =
PipelineTableMetaDataUtils.getUniqueKeyColumns(null, "t_order", new
StandardPipelineTableMetaDataLoader(dataSource));
assertTrue(uniqueKeyColumns.isEmpty());
List<InventoryTask> inventoryTasks =
inventoryTaskSplitter.splitInventoryData(jobItemContext);
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/IncrementalTaskTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/IncrementalTaskTest.java
index 0e70960365a..eb42b7d5b8e 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/IncrementalTaskTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/IncrementalTaskTest.java
@@ -54,7 +54,7 @@ class IncrementalTaskTest {
@BeforeEach
void setUp() {
MigrationTaskConfiguration taskConfig =
PipelineContextUtils.mockMigrationJobItemContext(JobConfigurationBuilder.createJobConfiguration()).getTaskConfig();
- taskConfig.getDumperContext().setPosition(new PlaceholderPosition());
+ taskConfig.getDumperContext().getCommonContext().setPosition(new
PlaceholderPosition());
incrementalTask = new IncrementalTask("ds_0",
PipelineContextUtils.getExecuteEngine(), mock(Dumper.class),
Collections.singletonList(mock(Importer.class)), new
IncrementalTaskProgress(new PlaceholderPosition()));
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java
index e075cd57c08..7291ad65752 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java
@@ -76,7 +76,7 @@ class InventoryTaskTest {
initTableData(taskConfig.getDumperContext());
// TODO use t_order_0, and also others
InventoryDumperContext inventoryDumperContext =
createInventoryDumperContext("t_order", "t_order");
- AtomicReference<IngestPosition> position = new
AtomicReference<>(inventoryDumperContext.getPosition());
+ AtomicReference<IngestPosition> position = new
AtomicReference<>(inventoryDumperContext.getCommonContext().getPosition());
InventoryTask inventoryTask = new
InventoryTask(PipelineTaskUtils.generateInventoryTaskId(inventoryDumperContext),
PipelineContextUtils.getExecuteEngine(),
PipelineContextUtils.getExecuteEngine(), mock(Dumper.class),
mock(Importer.class), position);
CompletableFuture.allOf(inventoryTask.start().toArray(new
CompletableFuture[0])).get(10L, TimeUnit.SECONDS);
@@ -87,7 +87,7 @@ class InventoryTaskTest {
private void initTableData(final IncrementalDumperContext dumperContext)
throws SQLException {
PipelineDataSourceManager dataSourceManager = new
DefaultPipelineDataSourceManager();
try (
- PipelineDataSourceWrapper dataSource =
dataSourceManager.getDataSource(dumperContext.getDataSourceConfig());
+ PipelineDataSourceWrapper dataSource =
dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig());
Connection connection = dataSource.getConnection();
Statement statement = connection.createStatement()) {
statement.execute("DROP TABLE IF EXISTS t_order");
@@ -98,11 +98,12 @@ class InventoryTaskTest {
}
private InventoryDumperContext createInventoryDumperContext(final String
logicTableName, final String actualTableName) {
- InventoryDumperContext result = new
InventoryDumperContext(taskConfig.getDumperContext());
+ InventoryDumperContext result = new
InventoryDumperContext(taskConfig.getDumperContext().getCommonContext());
result.setLogicTableName(logicTableName);
result.setActualTableName(actualTableName);
result.setUniqueKeyColumns(Collections.singletonList(PipelineContextUtils.mockOrderIdColumnMetaData()));
- result.setPosition(null == taskConfig.getDumperContext().getPosition()
? new IntegerPrimaryKeyPosition(0, 1000) :
taskConfig.getDumperContext().getPosition());
+ result.getCommonContext().setPosition(
+ null ==
taskConfig.getDumperContext().getCommonContext().getPosition() ? new
IntegerPrimaryKeyPosition(0, 1000) :
taskConfig.getDumperContext().getCommonContext().getPosition());
return result;
}
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
index c500b0e370e..23f8210a468 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
@@ -74,7 +74,7 @@ class MigrationDataConsistencyCheckerTest {
private MigrationJobConfiguration createJobConfiguration() throws
SQLException {
MigrationJobItemContext jobItemContext =
PipelineContextUtils.mockMigrationJobItemContext(JobConfigurationBuilder.createJobConfiguration());
-
initTableData(jobItemContext.getTaskConfig().getDumperContext().getDataSourceConfig());
+
initTableData(jobItemContext.getTaskConfig().getDumperContext().getCommonContext().getDataSourceConfig());
initTableData(jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig());
return jobItemContext.getJobConfig();
}