This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 3f85b14f63b Refactor DumperCommonContext (#28948)
3f85b14f63b is described below
commit 3f85b14f63b5f229c252e3a2bf24b1d00e566410
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Nov 5 22:33:57 2023 +0800
Refactor DumperCommonContext (#28948)
* Refactor DumperCommonContext
* Refactor IncrementalDumperContext
---
.../ingest/dumper/context/DumperCommonContext.java | 10 ++++----
.../dumper/context/IncrementalDumperContext.java | 6 ++---
.../dumper/context/InventoryDumperContext.java | 7 ++----
.../mysql/ingest/MySQLIncrementalDumperTest.java | 10 ++++----
.../postgresql/ingest/PostgreSQLWALDumperTest.java | 12 ++++------
.../ingest/wal/WALEventConverterTest.java | 10 ++++----
.../data/pipeline/cdc/api/impl/CDCJobAPI.java | 16 ++++---------
.../MigrationIncrementalDumperContextCreator.java | 28 +++++-----------------
8 files changed, 36 insertions(+), 63 deletions(-)
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java
index cdcbf6b0f3f..fc97bb8c813 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context;
import lombok.Getter;
+import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
@@ -28,18 +29,19 @@ import
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPositio
/**
* Dumper common context.
*/
+@RequiredArgsConstructor
@Getter
@Setter
@ToString(exclude = {"dataSourceConfig", "tableAndSchemaNameMapper"})
public final class DumperCommonContext {
- private String dataSourceName;
+ private final String dataSourceName;
- private PipelineDataSourceConfiguration dataSourceConfig;
+ private final PipelineDataSourceConfiguration dataSourceConfig;
- private ActualAndLogicTableNameMapper tableNameMapper;
+ private final ActualAndLogicTableNameMapper tableNameMapper;
- private TableAndSchemaNameMapper tableAndSchemaNameMapper;
+ private final TableAndSchemaNameMapper tableAndSchemaNameMapper;
private IngestPosition position;
}
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/IncrementalDumperContext.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/IncrementalDumperContext.java
index 4b7697c7dfa..4bcb2ce0d46 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/IncrementalDumperContext.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/IncrementalDumperContext.java
@@ -19,7 +19,6 @@ package
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
-import lombok.Setter;
import lombok.ToString;
/**
@@ -27,13 +26,12 @@ import lombok.ToString;
*/
@RequiredArgsConstructor
@Getter
-@Setter
@ToString
public final class IncrementalDumperContext {
private final DumperCommonContext commonContext;
- private String jobId;
+ private final String jobId;
- private boolean decodeWithTX;
+ private final boolean decodeWithTX;
}
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/InventoryDumperContext.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/InventoryDumperContext.java
index 2e1d44438d7..b39cffbb8ac 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/InventoryDumperContext.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/InventoryDumperContext.java
@@ -54,11 +54,8 @@ public final class InventoryDumperContext {
private JobRateLimitAlgorithm rateLimitAlgorithm;
public InventoryDumperContext(final DumperCommonContext commonContext) {
- this.commonContext = new DumperCommonContext();
-
this.commonContext.setDataSourceName(commonContext.getDataSourceName());
-
this.commonContext.setDataSourceConfig(commonContext.getDataSourceConfig());
-
this.commonContext.setTableNameMapper(commonContext.getTableNameMapper());
-
this.commonContext.setTableAndSchemaNameMapper(commonContext.getTableAndSchemaNameMapper());
+ this.commonContext = new DumperCommonContext(
+ commonContext.getDataSourceName(),
commonContext.getDataSourceConfig(), commonContext.getTableNameMapper(),
commonContext.getTableAndSchemaNameMapper());
}
/**
diff --git
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
index c83e3ede6d3..bd8ee875b13 100644
---
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
+++
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
@@ -97,11 +97,11 @@ class MySQLIncrementalDumperTest {
}
private IncrementalDumperContext createDumperContext() {
- DumperCommonContext commonContext = new DumperCommonContext();
- commonContext.setDataSourceConfig(new
StandardPipelineDataSourceConfiguration("jdbc:mock://127.0.0.1:3306/test",
"root", "root"));
- commonContext.setTableNameMapper(new
ActualAndLogicTableNameMapper(Collections.singletonMap(new
ActualTableName("t_order"), new LogicTableName("t_order"))));
- commonContext.setTableAndSchemaNameMapper(new
TableAndSchemaNameMapper(Collections.emptyMap()));
- return new IncrementalDumperContext(commonContext);
+ DumperCommonContext commonContext = new DumperCommonContext(null,
+ new
StandardPipelineDataSourceConfiguration("jdbc:mock://127.0.0.1:3306/test",
"root", "root"),
+ new ActualAndLogicTableNameMapper(Collections.singletonMap(new
ActualTableName("t_order"), new LogicTableName("t_order"))),
+ new TableAndSchemaNameMapper(Collections.emptyMap()));
+ return new IncrementalDumperContext(commonContext, null, false);
}
private void initTableData(final IncrementalDumperContext dumperContext)
throws SQLException {
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
index 4a033b76e63..a6a90a79817 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
@@ -107,13 +107,11 @@ class PostgreSQLWALDumperTest {
}
private IncrementalDumperContext createDumperContext(final String jdbcUrl,
final String username, final String password) {
- DumperCommonContext commonContext = new DumperCommonContext();
- commonContext.setDataSourceConfig(new
StandardPipelineDataSourceConfiguration(jdbcUrl, username, password));
- commonContext.setTableNameMapper(new
ActualAndLogicTableNameMapper(Collections.singletonMap(new
ActualTableName("t_order_0"), new LogicTableName("t_order"))));
- commonContext.setTableAndSchemaNameMapper(new
TableAndSchemaNameMapper(Collections.emptyMap()));
- IncrementalDumperContext result = new
IncrementalDumperContext(commonContext);
- result.setJobId("0101123456");
- return result;
+ DumperCommonContext commonContext = new DumperCommonContext(null,
+ new StandardPipelineDataSourceConfiguration(jdbcUrl, username,
password),
+ new ActualAndLogicTableNameMapper(Collections.singletonMap(new
ActualTableName("t_order_0"), new LogicTableName("t_order"))),
+ new TableAndSchemaNameMapper(Collections.emptyMap()));
+ return new IncrementalDumperContext(commonContext, "0101123456",
false);
}
@AfterEach
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
index 29c625de0a7..9306159d72b 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
@@ -86,11 +86,11 @@ class WALEventConverterTest {
}
private IncrementalDumperContext mockDumperContext() {
- DumperCommonContext commonContext = new DumperCommonContext();
- commonContext.setDataSourceConfig(new
StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=PostgreSQL",
"root", "root"));
- commonContext.setTableNameMapper(new
ActualAndLogicTableNameMapper(Collections.singletonMap(new
ActualTableName("t_order"), new LogicTableName("t_order"))));
- commonContext.setTableAndSchemaNameMapper(new
TableAndSchemaNameMapper(Collections.emptyMap()));
- return new IncrementalDumperContext(commonContext);
+ DumperCommonContext commonContext = new DumperCommonContext(null,
+ new
StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=PostgreSQL",
"root", "root"),
+ new ActualAndLogicTableNameMapper(Collections.singletonMap(new
ActualTableName("t_order"), new LogicTableName("t_order"))),
+ new TableAndSchemaNameMapper(Collections.emptyMap()));
+ return new IncrementalDumperContext(commonContext, null, false);
}
private void initTableData(final IncrementalDumperContext dumperContext)
throws SQLException {
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index d25feef3a3b..3b96545fde3 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -279,19 +279,13 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
private IncrementalDumperContext buildDumperContext(final
CDCJobConfiguration jobConfig, final int jobShardingItem, final
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
JobDataNodeLine dataNodeLine =
jobConfig.getJobShardingDataNodes().get(jobShardingItem);
- Map<ActualTableName, LogicTableName> tableNameMap = new
LinkedHashMap<>();
- dataNodeLine.getEntries().forEach(each ->
each.getDataNodes().forEach(node -> tableNameMap.put(new
ActualTableName(node.getTableName()), new
LogicTableName(each.getLogicTableName()))));
String dataSourceName =
dataNodeLine.getEntries().iterator().next().getDataNodes().iterator().next().getDataSourceName();
StandardPipelineDataSourceConfiguration actualDataSourceConfig =
jobConfig.getDataSourceConfig().getActualDataSourceConfiguration(dataSourceName);
- DumperCommonContext commonContext = new DumperCommonContext();
- commonContext.setDataSourceName(dataSourceName);
- commonContext.setDataSourceConfig(actualDataSourceConfig);
- commonContext.setTableNameMapper(new
ActualAndLogicTableNameMapper(tableNameMap));
- commonContext.setTableAndSchemaNameMapper(tableAndSchemaNameMapper);
- IncrementalDumperContext result = new
IncrementalDumperContext(commonContext);
- result.setJobId(jobConfig.getJobId());
- result.setDecodeWithTX(jobConfig.isDecodeWithTX());
- return result;
+ Map<ActualTableName, LogicTableName> tableNameMap = new
LinkedHashMap<>();
+ dataNodeLine.getEntries().forEach(each ->
each.getDataNodes().forEach(node -> tableNameMap.put(new
ActualTableName(node.getTableName()), new
LogicTableName(each.getLogicTableName()))));
+ return new IncrementalDumperContext(
+ new DumperCommonContext(dataSourceName,
actualDataSourceConfig, new ActualAndLogicTableNameMapper(tableNameMap),
tableAndSchemaNameMapper),
+ jobConfig.getJobId(), jobConfig.isDecodeWithTX());
}
private ImporterConfiguration buildImporterConfiguration(final
CDCJobConfiguration jobConfig, final PipelineProcessConfiguration
pipelineProcessConfig, final Collection<String> schemaTableNames,
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java
index 764b22dec92..a5ee5745334 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java
@@ -19,21 +19,16 @@ package
org.apache.shardingsphere.data.pipeline.scenario.migration.config.ingest
import lombok.RequiredArgsConstructor;
import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.DumperCommonContext;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper;
import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
-import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
-import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
-import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import
org.apache.shardingsphere.data.pipeline.common.config.ingest.IncrementalDumperContextCreator;
import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
import
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLineConvertUtils;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
-import java.util.Map;
-
/**
- * Migration incremental dumper configuration creator.
+ * Migration incremental dumper context creator.
*/
@RequiredArgsConstructor
public final class MigrationIncrementalDumperContextCreator implements
IncrementalDumperContextCreator {
@@ -42,21 +37,10 @@ public final class MigrationIncrementalDumperContextCreator
implements Increment
@Override
public IncrementalDumperContext createDumperContext(final JobDataNodeLine
jobDataNodeLine) {
- Map<ActualTableName, LogicTableName> tableNameMap =
JobDataNodeLineConvertUtils.buildTableNameMap(jobDataNodeLine);
- TableAndSchemaNameMapper tableAndSchemaNameMapper = new
TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap());
String dataSourceName =
jobDataNodeLine.getEntries().get(0).getDataNodes().get(0).getDataSourceName();
- return buildDumperContext(jobConfig.getJobId(), dataSourceName,
jobConfig.getSources().get(dataSourceName), tableNameMap,
tableAndSchemaNameMapper);
- }
-
- private IncrementalDumperContext buildDumperContext(final String jobId,
final String dataSourceName, final PipelineDataSourceConfiguration
sourceDataSource,
- final
Map<ActualTableName, LogicTableName> tableNameMap, final
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
- DumperCommonContext commonContext = new DumperCommonContext();
- commonContext.setDataSourceName(dataSourceName);
- commonContext.setDataSourceConfig(sourceDataSource);
- commonContext.setTableNameMapper(new
ActualAndLogicTableNameMapper(tableNameMap));
- commonContext.setTableAndSchemaNameMapper(tableAndSchemaNameMapper);
- IncrementalDumperContext result = new
IncrementalDumperContext(commonContext);
- result.setJobId(jobId);
- return result;
+ ActualAndLogicTableNameMapper tableNameMapper = new
ActualAndLogicTableNameMapper(JobDataNodeLineConvertUtils.buildTableNameMap(jobDataNodeLine));
+ TableAndSchemaNameMapper tableAndSchemaNameMapper = new
TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap());
+ return new IncrementalDumperContext(
+ new DumperCommonContext(dataSourceName,
jobConfig.getSources().get(dataSourceName), tableNameMapper,
tableAndSchemaNameMapper), jobConfig.getJobId(), false);
}
}