This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 704277558e5 Refactor JobDataNodeLineConvertUtils (#28983)
704277558e5 is described below

commit 704277558e56e835268c5bbd0d87f31ad900bf19
Author: Liang Zhang <zhangli...@apache.org>
AuthorDate: Wed Nov 8 10:48:11 2023 +0800

    Refactor JobDataNodeLineConvertUtils (#28983)
    
    * Refactor JobDataNodeLineConvertUtils
    
    * Refactor JobDataNodeLineConvertUtils
---
 .../common/datanode/JobDataNodeLineConvertUtils.java        | 13 +++++++------
 .../data/pipeline/cdc/api/impl/CDCJobAPI.java               |  7 +------
 .../ingest/MigrationIncrementalDumperContextCreator.java    |  2 +-
 3 files changed, 9 insertions(+), 13 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/datanode/JobDataNodeLineConvertUtils.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/datanode/JobDataNodeLineConvertUtils.java
index 26e3ef3df33..40f077f5d13 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/datanode/JobDataNodeLineConvertUtils.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/datanode/JobDataNodeLineConvertUtils.java
@@ -21,6 +21,7 @@ import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import org.apache.commons.lang3.tuple.Pair;
 import 
org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper;
 import org.apache.shardingsphere.infra.datanode.DataNode;
 
 import java.util.LinkedHashMap;
@@ -68,18 +69,18 @@ public final class JobDataNodeLineConvertUtils {
     }
     
     /**
-     * Build table name map.
+     * Build actual and logic table name mapper.
      *
      * @param dataNodeLine data node line
-     * @return actual table and logic table map
+     * @return actual and logic table name mapper
      */
-    public static Map<CaseInsensitiveIdentifier, CaseInsensitiveIdentifier> 
buildTableNameMap(final JobDataNodeLine dataNodeLine) {
-        Map<CaseInsensitiveIdentifier, CaseInsensitiveIdentifier> result = new 
LinkedHashMap<>();
+    public static ActualAndLogicTableNameMapper buildTableNameMapper(final 
JobDataNodeLine dataNodeLine) {
+        Map<CaseInsensitiveIdentifier, CaseInsensitiveIdentifier> map = new 
LinkedHashMap<>();
         for (JobDataNodeEntry each : dataNodeLine.getEntries()) {
             for (DataNode dataNode : each.getDataNodes()) {
-                result.put(new 
CaseInsensitiveIdentifier(dataNode.getTableName()), new 
CaseInsensitiveIdentifier(each.getLogicTableName()));
+                map.put(new 
CaseInsensitiveIdentifier(dataNode.getTableName()), new 
CaseInsensitiveIdentifier(each.getLogicTableName()));
             }
         }
-        return result;
+        return new ActualAndLogicTableNameMapper(map);
     }
 }
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index c81675d26da..ec1bd493e28 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -68,7 +68,6 @@ import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWith
 import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.DumperCommonContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
@@ -89,7 +88,6 @@ import java.nio.charset.StandardCharsets;
 import java.sql.SQLException;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -280,11 +278,8 @@ public final class CDCJobAPI extends 
AbstractInventoryIncrementalJobAPIImpl {
         JobDataNodeLine dataNodeLine = 
jobConfig.getJobShardingDataNodes().get(jobShardingItem);
         String dataSourceName = 
dataNodeLine.getEntries().iterator().next().getDataNodes().iterator().next().getDataSourceName();
         StandardPipelineDataSourceConfiguration actualDataSourceConfig = 
jobConfig.getDataSourceConfig().getActualDataSourceConfiguration(dataSourceName);
-        Map<CaseInsensitiveIdentifier, CaseInsensitiveIdentifier> tableNameMap 
= new LinkedHashMap<>();
-        dataNodeLine.getEntries()
-                .forEach(each -> each.getDataNodes().forEach(node -> 
tableNameMap.put(new CaseInsensitiveIdentifier(node.getTableName()), new 
CaseInsensitiveIdentifier(each.getLogicTableName()))));
         return new IncrementalDumperContext(
-                new DumperCommonContext(dataSourceName, 
actualDataSourceConfig, new ActualAndLogicTableNameMapper(tableNameMap), 
tableAndSchemaNameMapper),
+                new DumperCommonContext(dataSourceName, 
actualDataSourceConfig, 
JobDataNodeLineConvertUtils.buildTableNameMapper(dataNodeLine), 
tableAndSchemaNameMapper),
                 jobConfig.getJobId(), jobConfig.isDecodeWithTX());
     }
     
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java
index 1c4bc71d2e0..e24a578120a 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java
@@ -38,7 +38,7 @@ public final class MigrationIncrementalDumperContextCreator 
implements Increment
     @Override
     public IncrementalDumperContext createDumperContext(final JobDataNodeLine 
jobDataNodeLine) {
         String dataSourceName = 
jobDataNodeLine.getEntries().get(0).getDataNodes().get(0).getDataSourceName();
-        ActualAndLogicTableNameMapper tableNameMapper = new 
ActualAndLogicTableNameMapper(JobDataNodeLineConvertUtils.buildTableNameMap(jobDataNodeLine));
+        ActualAndLogicTableNameMapper tableNameMapper = 
JobDataNodeLineConvertUtils.buildTableNameMapper(jobDataNodeLine);
         TableAndSchemaNameMapper tableAndSchemaNameMapper = new 
TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap());
         return new IncrementalDumperContext(
                 new DumperCommonContext(dataSourceName, 
jobConfig.getSources().get(dataSourceName), tableNameMapper, 
tableAndSchemaNameMapper), jobConfig.getJobId(), false);

Reply via email to