This is an automated email from the ASF dual-hosted git repository.

sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f85b14f63b Refactor DumperCommonContext (#28948)
3f85b14f63b is described below

commit 3f85b14f63b5f229c252e3a2bf24b1d00e566410
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Nov 5 22:33:57 2023 +0800

    Refactor DumperCommonContext (#28948)
    
    * Refactor DumperCommonContext
    
    * Refactor IncrementalDumperContext
---
 .../ingest/dumper/context/DumperCommonContext.java | 10 ++++----
 .../dumper/context/IncrementalDumperContext.java   |  6 ++---
 .../dumper/context/InventoryDumperContext.java     |  7 ++----
 .../mysql/ingest/MySQLIncrementalDumperTest.java   | 10 ++++----
 .../postgresql/ingest/PostgreSQLWALDumperTest.java | 12 ++++------
 .../ingest/wal/WALEventConverterTest.java          | 10 ++++----
 .../data/pipeline/cdc/api/impl/CDCJobAPI.java      | 16 ++++---------
 .../MigrationIncrementalDumperContextCreator.java  | 28 +++++-----------------
 8 files changed, 36 insertions(+), 63 deletions(-)

diff --git 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java
 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java
index cdcbf6b0f3f..fc97bb8c813 100644
--- 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java
+++ 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context;
 
 import lombok.Getter;
+import lombok.RequiredArgsConstructor;
 import lombok.Setter;
 import lombok.ToString;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
@@ -28,18 +29,19 @@ import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPositio
 /**
  * Dumper common context.
  */
+@RequiredArgsConstructor
 @Getter
 @Setter
 @ToString(exclude = {"dataSourceConfig", "tableAndSchemaNameMapper"})
 public final class DumperCommonContext {
     
-    private String dataSourceName;
+    private final String dataSourceName;
     
-    private PipelineDataSourceConfiguration dataSourceConfig;
+    private final PipelineDataSourceConfiguration dataSourceConfig;
     
-    private ActualAndLogicTableNameMapper tableNameMapper;
+    private final ActualAndLogicTableNameMapper tableNameMapper;
     
-    private TableAndSchemaNameMapper tableAndSchemaNameMapper;
+    private final TableAndSchemaNameMapper tableAndSchemaNameMapper;
     
     private IngestPosition position;
 }
diff --git 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/IncrementalDumperContext.java
 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/IncrementalDumperContext.java
index 4b7697c7dfa..4bcb2ce0d46 100644
--- 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/IncrementalDumperContext.java
+++ 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/IncrementalDumperContext.java
@@ -19,7 +19,6 @@ package 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
-import lombok.Setter;
 import lombok.ToString;
 
 /**
@@ -27,13 +26,12 @@ import lombok.ToString;
  */
 @RequiredArgsConstructor
 @Getter
-@Setter
 @ToString
 public final class IncrementalDumperContext {
     
     private final DumperCommonContext commonContext;
     
-    private String jobId;
+    private final String jobId;
     
-    private boolean decodeWithTX;
+    private final boolean decodeWithTX;
 }
diff --git 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/InventoryDumperContext.java
 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/InventoryDumperContext.java
index 2e1d44438d7..b39cffbb8ac 100644
--- 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/InventoryDumperContext.java
+++ 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/InventoryDumperContext.java
@@ -54,11 +54,8 @@ public final class InventoryDumperContext {
     private JobRateLimitAlgorithm rateLimitAlgorithm;
     
     public InventoryDumperContext(final DumperCommonContext commonContext) {
-        this.commonContext = new DumperCommonContext();
-        
this.commonContext.setDataSourceName(commonContext.getDataSourceName());
-        
this.commonContext.setDataSourceConfig(commonContext.getDataSourceConfig());
-        
this.commonContext.setTableNameMapper(commonContext.getTableNameMapper());
-        
this.commonContext.setTableAndSchemaNameMapper(commonContext.getTableAndSchemaNameMapper());
+        this.commonContext = new DumperCommonContext(
+                commonContext.getDataSourceName(), 
commonContext.getDataSourceConfig(), commonContext.getTableNameMapper(), 
commonContext.getTableAndSchemaNameMapper());
     }
     
     /**
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
index c83e3ede6d3..bd8ee875b13 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
@@ -97,11 +97,11 @@ class MySQLIncrementalDumperTest {
     }
     
     private IncrementalDumperContext createDumperContext() {
-        DumperCommonContext commonContext = new DumperCommonContext();
-        commonContext.setDataSourceConfig(new 
StandardPipelineDataSourceConfiguration("jdbc:mock://127.0.0.1:3306/test", 
"root", "root"));
-        commonContext.setTableNameMapper(new 
ActualAndLogicTableNameMapper(Collections.singletonMap(new 
ActualTableName("t_order"), new LogicTableName("t_order"))));
-        commonContext.setTableAndSchemaNameMapper(new 
TableAndSchemaNameMapper(Collections.emptyMap()));
-        return new IncrementalDumperContext(commonContext);
+        DumperCommonContext commonContext = new DumperCommonContext(null,
+                new 
StandardPipelineDataSourceConfiguration("jdbc:mock://127.0.0.1:3306/test", 
"root", "root"),
+                new ActualAndLogicTableNameMapper(Collections.singletonMap(new 
ActualTableName("t_order"), new LogicTableName("t_order"))),
+                new TableAndSchemaNameMapper(Collections.emptyMap()));
+        return new IncrementalDumperContext(commonContext, null, false);
     }
     
     private void initTableData(final IncrementalDumperContext dumperContext) 
throws SQLException {
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
index 4a033b76e63..a6a90a79817 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
@@ -107,13 +107,11 @@ class PostgreSQLWALDumperTest {
     }
     
     private IncrementalDumperContext createDumperContext(final String jdbcUrl, 
final String username, final String password) {
-        DumperCommonContext commonContext = new DumperCommonContext();
-        commonContext.setDataSourceConfig(new 
StandardPipelineDataSourceConfiguration(jdbcUrl, username, password));
-        commonContext.setTableNameMapper(new 
ActualAndLogicTableNameMapper(Collections.singletonMap(new 
ActualTableName("t_order_0"), new LogicTableName("t_order"))));
-        commonContext.setTableAndSchemaNameMapper(new 
TableAndSchemaNameMapper(Collections.emptyMap()));
-        IncrementalDumperContext result = new 
IncrementalDumperContext(commonContext);
-        result.setJobId("0101123456");
-        return result;
+        DumperCommonContext commonContext = new DumperCommonContext(null,
+                new StandardPipelineDataSourceConfiguration(jdbcUrl, username, 
password),
+                new ActualAndLogicTableNameMapper(Collections.singletonMap(new 
ActualTableName("t_order_0"), new LogicTableName("t_order"))),
+                new TableAndSchemaNameMapper(Collections.emptyMap()));
+        return new IncrementalDumperContext(commonContext, "0101123456", 
false);
     }
     
     @AfterEach
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
index 29c625de0a7..9306159d72b 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
@@ -86,11 +86,11 @@ class WALEventConverterTest {
     }
     
     private IncrementalDumperContext mockDumperContext() {
-        DumperCommonContext commonContext = new DumperCommonContext();
-        commonContext.setDataSourceConfig(new 
StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=PostgreSQL",
 "root", "root"));
-        commonContext.setTableNameMapper(new 
ActualAndLogicTableNameMapper(Collections.singletonMap(new 
ActualTableName("t_order"), new LogicTableName("t_order"))));
-        commonContext.setTableAndSchemaNameMapper(new 
TableAndSchemaNameMapper(Collections.emptyMap()));
-        return new IncrementalDumperContext(commonContext);
+        DumperCommonContext commonContext = new DumperCommonContext(null,
+                new 
StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=PostgreSQL",
 "root", "root"),
+                new ActualAndLogicTableNameMapper(Collections.singletonMap(new 
ActualTableName("t_order"), new LogicTableName("t_order"))),
+                new TableAndSchemaNameMapper(Collections.emptyMap()));
+        return new IncrementalDumperContext(commonContext, null, false);
     }
     
     private void initTableData(final IncrementalDumperContext dumperContext) 
throws SQLException {
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index d25feef3a3b..3b96545fde3 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -279,19 +279,13 @@ public final class CDCJobAPI extends 
AbstractInventoryIncrementalJobAPIImpl {
     
     private IncrementalDumperContext buildDumperContext(final 
CDCJobConfiguration jobConfig, final int jobShardingItem, final 
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
         JobDataNodeLine dataNodeLine = 
jobConfig.getJobShardingDataNodes().get(jobShardingItem);
-        Map<ActualTableName, LogicTableName> tableNameMap = new 
LinkedHashMap<>();
-        dataNodeLine.getEntries().forEach(each -> 
each.getDataNodes().forEach(node -> tableNameMap.put(new 
ActualTableName(node.getTableName()), new 
LogicTableName(each.getLogicTableName()))));
         String dataSourceName = 
dataNodeLine.getEntries().iterator().next().getDataNodes().iterator().next().getDataSourceName();
         StandardPipelineDataSourceConfiguration actualDataSourceConfig = 
jobConfig.getDataSourceConfig().getActualDataSourceConfiguration(dataSourceName);
-        DumperCommonContext commonContext = new DumperCommonContext();
-        commonContext.setDataSourceName(dataSourceName);
-        commonContext.setDataSourceConfig(actualDataSourceConfig);
-        commonContext.setTableNameMapper(new 
ActualAndLogicTableNameMapper(tableNameMap));
-        commonContext.setTableAndSchemaNameMapper(tableAndSchemaNameMapper);
-        IncrementalDumperContext result = new 
IncrementalDumperContext(commonContext);
-        result.setJobId(jobConfig.getJobId());
-        result.setDecodeWithTX(jobConfig.isDecodeWithTX());
-        return result;
+        Map<ActualTableName, LogicTableName> tableNameMap = new 
LinkedHashMap<>();
+        dataNodeLine.getEntries().forEach(each -> 
each.getDataNodes().forEach(node -> tableNameMap.put(new 
ActualTableName(node.getTableName()), new 
LogicTableName(each.getLogicTableName()))));
+        return new IncrementalDumperContext(
+                new DumperCommonContext(dataSourceName, 
actualDataSourceConfig, new ActualAndLogicTableNameMapper(tableNameMap), 
tableAndSchemaNameMapper),
+                jobConfig.getJobId(), jobConfig.isDecodeWithTX());
     }
     
     private ImporterConfiguration buildImporterConfiguration(final 
CDCJobConfiguration jobConfig, final PipelineProcessConfiguration 
pipelineProcessConfig, final Collection<String> schemaTableNames,
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java
index 764b22dec92..a5ee5745334 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java
@@ -19,21 +19,16 @@ package 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.ingest
 
 import lombok.RequiredArgsConstructor;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.DumperCommonContext;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
-import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
-import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
-import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import 
org.apache.shardingsphere.data.pipeline.common.config.ingest.IncrementalDumperContextCreator;
 import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
 import 
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLineConvertUtils;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
 
-import java.util.Map;
-
 /**
- * Migration incremental dumper configuration creator.
+ * Migration incremental dumper context creator.
  */
 @RequiredArgsConstructor
 public final class MigrationIncrementalDumperContextCreator implements 
IncrementalDumperContextCreator {
@@ -42,21 +37,10 @@ public final class MigrationIncrementalDumperContextCreator 
implements Increment
     
     @Override
     public IncrementalDumperContext createDumperContext(final JobDataNodeLine 
jobDataNodeLine) {
-        Map<ActualTableName, LogicTableName> tableNameMap = 
JobDataNodeLineConvertUtils.buildTableNameMap(jobDataNodeLine);
-        TableAndSchemaNameMapper tableAndSchemaNameMapper = new 
TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap());
         String dataSourceName = 
jobDataNodeLine.getEntries().get(0).getDataNodes().get(0).getDataSourceName();
-        return buildDumperContext(jobConfig.getJobId(), dataSourceName, 
jobConfig.getSources().get(dataSourceName), tableNameMap, 
tableAndSchemaNameMapper);
-    }
-    
-    private IncrementalDumperContext buildDumperContext(final String jobId, 
final String dataSourceName, final PipelineDataSourceConfiguration 
sourceDataSource,
-                                                        final 
Map<ActualTableName, LogicTableName> tableNameMap, final 
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
-        DumperCommonContext commonContext = new DumperCommonContext();
-        commonContext.setDataSourceName(dataSourceName);
-        commonContext.setDataSourceConfig(sourceDataSource);
-        commonContext.setTableNameMapper(new 
ActualAndLogicTableNameMapper(tableNameMap));
-        commonContext.setTableAndSchemaNameMapper(tableAndSchemaNameMapper);
-        IncrementalDumperContext result = new 
IncrementalDumperContext(commonContext);
-        result.setJobId(jobId);
-        return result;
+        ActualAndLogicTableNameMapper tableNameMapper = new 
ActualAndLogicTableNameMapper(JobDataNodeLineConvertUtils.buildTableNameMap(jobDataNodeLine));
+        TableAndSchemaNameMapper tableAndSchemaNameMapper = new 
TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap());
+        return new IncrementalDumperContext(
+                new DumperCommonContext(dataSourceName, 
jobConfig.getSources().get(dataSourceName), tableNameMapper, 
tableAndSchemaNameMapper), jobConfig.getJobId(), false);
     }
 }

Reply via email to