This is an automated email from the ASF dual-hosted git repository. duanzhengqiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push: new 704277558e5 Refactor JobDataNodeLineConvertUtils (#28983) 704277558e5 is described below commit 704277558e56e835268c5bbd0d87f31ad900bf19 Author: Liang Zhang <zhangli...@apache.org> AuthorDate: Wed Nov 8 10:48:11 2023 +0800 Refactor JobDataNodeLineConvertUtils (#28983) * Refactor JobDataNodeLineConvertUtils * Refactor JobDataNodeLineConvertUtils --- .../common/datanode/JobDataNodeLineConvertUtils.java | 13 +++++++------ .../data/pipeline/cdc/api/impl/CDCJobAPI.java | 7 +------ .../ingest/MigrationIncrementalDumperContextCreator.java | 2 +- 3 files changed, 9 insertions(+), 13 deletions(-) diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/datanode/JobDataNodeLineConvertUtils.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/datanode/JobDataNodeLineConvertUtils.java index 26e3ef3df33..40f077f5d13 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/datanode/JobDataNodeLineConvertUtils.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/datanode/JobDataNodeLineConvertUtils.java @@ -21,6 +21,7 @@ import lombok.AccessLevel; import lombok.NoArgsConstructor; import org.apache.commons.lang3.tuple.Pair; import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier; +import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper; import org.apache.shardingsphere.infra.datanode.DataNode; import java.util.LinkedHashMap; @@ -68,18 +69,18 @@ public final class JobDataNodeLineConvertUtils { } /** - * Build table name map. + * Build actual and logic table name mapper. * * @param dataNodeLine data node line - * @return actual table and logic table map + * @return actual and logic table name mapper */ - public static Map<CaseInsensitiveIdentifier, CaseInsensitiveIdentifier> buildTableNameMap(final JobDataNodeLine dataNodeLine) { - Map<CaseInsensitiveIdentifier, CaseInsensitiveIdentifier> result = new LinkedHashMap<>(); + public static ActualAndLogicTableNameMapper buildTableNameMapper(final JobDataNodeLine dataNodeLine) { + Map<CaseInsensitiveIdentifier, CaseInsensitiveIdentifier> map = new LinkedHashMap<>(); for (JobDataNodeEntry each : dataNodeLine.getEntries()) { for (DataNode dataNode : each.getDataNodes()) { - result.put(new CaseInsensitiveIdentifier(dataNode.getTableName()), new CaseInsensitiveIdentifier(each.getLogicTableName())); + map.put(new CaseInsensitiveIdentifier(dataNode.getTableName()), new CaseInsensitiveIdentifier(each.getLogicTableName())); } } - return result; + return new ActualAndLogicTableNameMapper(map); } } diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java index c81675d26da..ec1bd493e28 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java @@ -68,7 +68,6 @@ import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWith import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink; import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.DumperCommonContext; import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext; -import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper; import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; @@ -89,7 +88,6 @@ import java.nio.charset.StandardCharsets; import java.sql.SQLException; import java.util.Collection; import java.util.HashMap; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -280,11 +278,8 @@ public final class CDCJobAPI extends AbstractInventoryIncrementalJobAPIImpl { JobDataNodeLine dataNodeLine = jobConfig.getJobShardingDataNodes().get(jobShardingItem); String dataSourceName = dataNodeLine.getEntries().iterator().next().getDataNodes().iterator().next().getDataSourceName(); StandardPipelineDataSourceConfiguration actualDataSourceConfig = jobConfig.getDataSourceConfig().getActualDataSourceConfiguration(dataSourceName); - Map<CaseInsensitiveIdentifier, CaseInsensitiveIdentifier> tableNameMap = new LinkedHashMap<>(); - dataNodeLine.getEntries() - .forEach(each -> each.getDataNodes().forEach(node -> tableNameMap.put(new CaseInsensitiveIdentifier(node.getTableName()), new CaseInsensitiveIdentifier(each.getLogicTableName())))); return new IncrementalDumperContext( - new DumperCommonContext(dataSourceName, actualDataSourceConfig, new ActualAndLogicTableNameMapper(tableNameMap), tableAndSchemaNameMapper), + new DumperCommonContext(dataSourceName, actualDataSourceConfig, JobDataNodeLineConvertUtils.buildTableNameMapper(dataNodeLine), tableAndSchemaNameMapper), jobConfig.getJobId(), jobConfig.isDecodeWithTX()); } diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java index 1c4bc71d2e0..e24a578120a 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java @@ -38,7 +38,7 @@ public final class MigrationIncrementalDumperContextCreator implements Increment @Override public IncrementalDumperContext createDumperContext(final JobDataNodeLine jobDataNodeLine) { String dataSourceName = jobDataNodeLine.getEntries().get(0).getDataNodes().get(0).getDataSourceName(); - ActualAndLogicTableNameMapper tableNameMapper = new ActualAndLogicTableNameMapper(JobDataNodeLineConvertUtils.buildTableNameMap(jobDataNodeLine)); + ActualAndLogicTableNameMapper tableNameMapper = JobDataNodeLineConvertUtils.buildTableNameMapper(jobDataNodeLine); TableAndSchemaNameMapper tableAndSchemaNameMapper = new TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap()); return new IncrementalDumperContext( new DumperCommonContext(dataSourceName, jobConfig.getSources().get(dataSourceName), tableNameMapper, tableAndSchemaNameMapper), jobConfig.getJobId(), false);