This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 986e1247b6b Cache schemaTablesMap in jobConfig (#18924)
986e1247b6b is described below

commit 986e1247b6bd624d378c2a02aaadb0dd8ca59bd2
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Thu Jul 7 15:10:06 2022 +0800

    Cache schemaTablesMap in jobConfig (#18924)
    
    * Cache schemaTablesMap in jobConfig
    
    * Unit test
---
 ...hardingRuleAlteredJobConfigurationPreparer.java | 30 +++++++++++++++++-----
 .../api/config/TableNameSchemaNameMapping.java     | 19 +++++++-------
 .../rulealtered/RuleAlteredJobConfiguration.java   |  5 ++++
 .../yaml/RuleAlteredJobConfigurationSwapper.java   |  3 ++-
 .../yaml/YamlRuleAlteredJobConfiguration.java      |  5 ++++
 .../check/consistency/DataConsistencyChecker.java  |  2 +-
 .../job/environment/ScalingEnvironmentManager.java |  5 +---
 .../datasource/PipelineDataSourceManagerTest.java  |  7 +++++
 8 files changed, 53 insertions(+), 23 deletions(-)

diff --git 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java
 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java
index 0f230a4d29f..884c1c44181 100644
--- 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java
+++ 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java
@@ -79,6 +79,7 @@ public final class 
ShardingRuleAlteredJobConfigurationPreparer implements RuleAl
         
yamlJobConfig.setJobShardingDataNodes(getJobShardingDataNodes(actualDataNodes));
         yamlJobConfig.setLogicTables(getLogicTables(actualDataNodes.keySet()));
         
yamlJobConfig.setTablesFirstDataNodes(getTablesFirstDataNodes(actualDataNodes));
+        
yamlJobConfig.setSchemaTablesMap(getSchemaTablesMap(yamlJobConfig.getDatabaseName(),
 actualDataNodes.keySet()));
     }
     
     private static Map<String, List<DataNode>> getActualDataNodes(final 
RuleAlteredJobConfiguration jobConfig) {
@@ -134,6 +135,22 @@ public final class 
ShardingRuleAlteredJobConfigurationPreparer implements RuleAl
         return new JobDataNodeLine(dataNodeEntries).marshal();
     }
     
+    private static Map<String, List<String>> getSchemaTablesMap(final String 
databaseName, final Set<String> logicTables) {
+        // TODO get by search_path
+        ShardingSphereDatabase database = 
PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabases().get(databaseName);
+        Map<String, List<String>> result = new LinkedHashMap<>();
+        database.getSchemas().forEach((schemaName, schema) -> {
+            for (String each : schema.getAllTableNames()) {
+                if (!logicTables.contains(each)) {
+                    continue;
+                }
+                result.computeIfAbsent(schemaName, unused -> new 
LinkedList<>()).add(each);
+            }
+        });
+        log.info("getSchemaTablesMap, result={}", result);
+        return result;
+    }
+    
     // TODO use jobConfig as parameter, jobShardingItem
     @Override
     public TaskConfiguration createTaskConfiguration(final 
RuleAlteredJobConfiguration jobConfig, final int jobShardingItem, final 
OnRuleAlteredActionConfiguration onRuleAlteredActionConfig) {
@@ -148,13 +165,13 @@ public final class 
ShardingRuleAlteredJobConfigurationPreparer implements RuleAl
                 tableNameMap.put(new ActualTableName(dataNode.getTableName()), 
new LogicTableName(each.getLogicTableName()));
             }
         }
-        ShardingSphereDatabase database = 
PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabases().get(jobConfig.getDatabaseName());
+        TableNameSchemaNameMapping tableNameSchemaNameMapping = new 
TableNameSchemaNameMapping(TableNameSchemaNameMapping.convert(jobConfig.getSchemaTablesMap()));
         DumperConfiguration dumperConfig = 
createDumperConfiguration(jobConfig.getDatabaseName(), dataSourceName,
-                
dataSourcePropsMap.get(dataSourceName).getAllLocalProperties(), tableNameMap, 
database);
+                
dataSourcePropsMap.get(dataSourceName).getAllLocalProperties(), tableNameMap, 
tableNameSchemaNameMapping);
         Optional<ShardingRuleConfiguration> targetRuleConfig = 
getTargetRuleConfiguration(jobConfig);
         Set<LogicTableName> reShardNeededTables = 
jobConfig.splitLogicTableNames().stream().map(LogicTableName::new).collect(Collectors.toSet());
         Map<LogicTableName, Set<String>> shardingColumnsMap = 
getShardingColumnsMap(targetRuleConfig.orElse(sourceRuleConfig), 
reShardNeededTables);
-        ImporterConfiguration importerConfig = 
createImporterConfiguration(jobConfig, onRuleAlteredActionConfig, 
shardingColumnsMap, database);
+        ImporterConfiguration importerConfig = 
createImporterConfiguration(jobConfig, onRuleAlteredActionConfig, 
shardingColumnsMap, tableNameSchemaNameMapping);
         TaskConfiguration result = new TaskConfiguration(jobConfig, 
dumperConfig, importerConfig);
         log.info("createTaskConfiguration, dataSourceName={}, result={}", 
dataSourceName, result);
         return result;
@@ -210,20 +227,19 @@ public final class 
ShardingRuleAlteredJobConfigurationPreparer implements RuleAl
     }
     
     private static DumperConfiguration createDumperConfiguration(final String 
databaseName, final String dataSourceName, final Map<String, Object> props,
-                                                                 final 
Map<ActualTableName, LogicTableName> tableNameMap, final ShardingSphereDatabase 
database) {
+                                                                 final 
Map<ActualTableName, LogicTableName> tableNameMap, final 
TableNameSchemaNameMapping tableNameSchemaNameMapping) {
         DumperConfiguration result = new DumperConfiguration();
         result.setDatabaseName(databaseName);
         result.setDataSourceName(dataSourceName);
         result.setDataSourceConfig(new 
StandardPipelineDataSourceConfiguration(YamlEngine.marshal(props)));
         result.setTableNameMap(tableNameMap);
-        result.setTableNameSchemaNameMapping(new 
TableNameSchemaNameMapping(TableNameSchemaNameMapping.convert(database.getSchemas())));
+        result.setTableNameSchemaNameMapping(tableNameSchemaNameMapping);
         return result;
     }
     
     private static ImporterConfiguration createImporterConfiguration(final 
RuleAlteredJobConfiguration jobConfig, final OnRuleAlteredActionConfiguration 
onRuleAlteredActionConfig,
-                                                                     final 
Map<LogicTableName, Set<String>> shardingColumnsMap, final 
ShardingSphereDatabase database) {
+                                                                     final 
Map<LogicTableName, Set<String>> shardingColumnsMap, final 
TableNameSchemaNameMapping tableNameSchemaNameMapping) {
         PipelineDataSourceConfiguration dataSourceConfig = 
PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getTarget().getType(),
 jobConfig.getTarget().getParameter());
-        TableNameSchemaNameMapping tableNameSchemaNameMapping = new 
TableNameSchemaNameMapping(TableNameSchemaNameMapping.convert(database.getSchemas()));
         int batchSize = onRuleAlteredActionConfig.getOutput().getBatchSize();
         int retryTimes = jobConfig.getRetryTimes();
         return new ImporterConfiguration(dataSourceConfig, 
unmodifiable(shardingColumnsMap), tableNameSchemaNameMapping, batchSize, 
retryTimes);
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/TableNameSchemaNameMapping.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/TableNameSchemaNameMapping.java
index 59637ef06e4..c910ccc60ad 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/TableNameSchemaNameMapping.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/TableNameSchemaNameMapping.java
@@ -18,18 +18,18 @@
 package org.apache.shardingsphere.data.pipeline.api.config;
 
 import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
+import lombok.ToString;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
-import 
org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
 
-import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 
 /**
  * Table name and schema name mapping.
  */
 @RequiredArgsConstructor
-@Slf4j
+@ToString
 public final class TableNameSchemaNameMapping {
     
     private final Map<LogicTableName, String> mapping;
@@ -37,17 +37,16 @@ public final class TableNameSchemaNameMapping {
     /**
      * Convert table name and schema name mapping from schemas.
      *
-     * @param schemas logic table name and schema map
+     * @param schemaTablesMap schema name and table names map
      * @return logic table name and schema name map
      */
-    public static Map<LogicTableName, String> convert(final Map<String, 
ShardingSphereSchema> schemas) {
-        Map<LogicTableName, String> result = new HashMap<>();
-        schemas.forEach((schemaName, schema) -> {
-            for (String each : schema.getAllTableNames()) {
+    public static Map<LogicTableName, String> convert(final Map<String, 
List<String>> schemaTablesMap) {
+        Map<LogicTableName, String> result = new LinkedHashMap<>();
+        schemaTablesMap.forEach((schemaName, tableNames) -> {
+            for (String each : tableNames) {
                 result.put(new LogicTableName(each), schemaName);
             }
         });
-        log.info("mapping={}", result);
         return result;
     }
     
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfiguration.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfiguration.java
index b32411d9d19..f112b5168fc 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfiguration.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfiguration.java
@@ -56,6 +56,11 @@ public final class RuleAlteredJobConfiguration implements 
PipelineJobConfigurati
      */
     private final Map<String, List<String>> alteredRuleYamlClassNameTablesMap;
     
+    /**
+     * Map{schema name, logic table names}.
+     */
+    private final Map<String, List<String>> schemaTablesMap;
+    
     private final String logicTables;
     
     /**
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/yaml/RuleAlteredJobConfigurationSwapper.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/yaml/RuleAlteredJobConfigurationSwapper.java
index df11b1661e9..b05fa66894c 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/yaml/RuleAlteredJobConfigurationSwapper.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/yaml/RuleAlteredJobConfigurationSwapper.java
@@ -44,6 +44,7 @@ public final class RuleAlteredJobConfigurationSwapper 
implements YamlConfigurati
         
result.setSource(dataSourceConfigSwapper.swapToYamlConfiguration(data.getSource()));
         
result.setTarget(dataSourceConfigSwapper.swapToYamlConfiguration(data.getTarget()));
         
result.setAlteredRuleYamlClassNameTablesMap(data.getAlteredRuleYamlClassNameTablesMap());
+        result.setSchemaTablesMap(data.getSchemaTablesMap());
         result.setLogicTables(data.getLogicTables());
         result.setTablesFirstDataNodes(data.getTablesFirstDataNodes());
         result.setJobShardingDataNodes(data.getJobShardingDataNodes());
@@ -58,7 +59,7 @@ public final class RuleAlteredJobConfigurationSwapper 
implements YamlConfigurati
                 yamlConfig.getActiveVersion(), yamlConfig.getNewVersion(),
                 yamlConfig.getSourceDatabaseType(), 
yamlConfig.getTargetDatabaseType(),
                 dataSourceConfigSwapper.swapToObject(yamlConfig.getSource()), 
dataSourceConfigSwapper.swapToObject(yamlConfig.getTarget()),
-                yamlConfig.getAlteredRuleYamlClassNameTablesMap(), 
yamlConfig.getLogicTables(),
+                yamlConfig.getAlteredRuleYamlClassNameTablesMap(), 
yamlConfig.getSchemaTablesMap(), yamlConfig.getLogicTables(),
                 yamlConfig.getTablesFirstDataNodes(), 
yamlConfig.getJobShardingDataNodes(),
                 yamlConfig.getConcurrency(), yamlConfig.getRetryTimes());
     }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/yaml/YamlRuleAlteredJobConfiguration.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/yaml/YamlRuleAlteredJobConfiguration.java
index a7b0cd9c7aa..9b3403ce0a3 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/yaml/YamlRuleAlteredJobConfiguration.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/yaml/YamlRuleAlteredJobConfiguration.java
@@ -64,6 +64,11 @@ public final class YamlRuleAlteredJobConfiguration 
implements YamlConfiguration
      */
     private Map<String, List<String>> alteredRuleYamlClassNameTablesMap;
     
+    /**
+     * Map{schema name, logic table names}.
+     */
+    private Map<String, List<String>> schemaTablesMap;
+    
     private String logicTables;
     
     /**
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
index e58a79a9317..baf4d1d271c 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
@@ -78,7 +78,7 @@ public final class DataConsistencyChecker {
         this.jobConfig = jobConfig;
         logicTableNames = jobConfig.splitLogicTableNames();
         ShardingSphereDatabase database = 
PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabases().get(jobConfig.getDatabaseName());
-        tableNameSchemaNameMapping = new 
TableNameSchemaNameMapping(TableNameSchemaNameMapping.convert(database.getSchemas()));
+        tableNameSchemaNameMapping = new 
TableNameSchemaNameMapping(TableNameSchemaNameMapping.convert(jobConfig.getSchemaTablesMap()));
     }
     
     /**
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java
index 81db1ef4bca..ca1f3b6eeb3 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java
@@ -23,11 +23,9 @@ import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAltere
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
-import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
 import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
-import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 
 import java.sql.Connection;
 import java.sql.PreparedStatement;
@@ -52,8 +50,7 @@ public final class ScalingEnvironmentManager {
         log.info("cleanupTargetTables, tables={}", tables);
         PipelineDataSourceConfiguration target = jobConfig.getTarget();
         PipelineSQLBuilder pipelineSQLBuilder = 
PipelineSQLBuilderFactory.getInstance(jobConfig.getTargetDatabaseType());
-        ShardingSphereDatabase database = 
PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabases().get(jobConfig.getDatabaseName());
-        TableNameSchemaNameMapping tableNameSchemaNameMapping = new 
TableNameSchemaNameMapping(TableNameSchemaNameMapping.convert(database.getSchemas()));
+        TableNameSchemaNameMapping tableNameSchemaNameMapping = new 
TableNameSchemaNameMapping(TableNameSchemaNameMapping.convert(jobConfig.getSchemaTablesMap()));
         try (
                 PipelineDataSourceWrapper dataSource = 
PipelineDataSourceFactory.newInstance(PipelineDataSourceConfigurationFactory.newInstance(target.getType(),
 target.getParameter()));
                 Connection connection = dataSource.getConnection()) {
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManagerTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManagerTest.java
index 720de47b899..7cf6418e7df 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManagerTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManagerTest.java
@@ -21,8 +21,10 @@ import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAltere
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
+import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
 import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import javax.sql.DataSource;
@@ -38,6 +40,11 @@ public final class PipelineDataSourceManagerTest {
     
     private RuleAlteredJobConfiguration jobConfig;
     
+    @BeforeClass
+    public static void beforeClass() {
+        PipelineContextUtil.mockModeConfigAndContextManager();
+    }
+    
     @Before
     public void setUp() {
         jobConfig = JobConfigurationBuilder.createJobConfiguration();

Reply via email to