This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 986e1247b6b Cache schemaTablesMap in jobConfig (#18924)
986e1247b6b is described below
commit 986e1247b6bd624d378c2a02aaadb0dd8ca59bd2
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Thu Jul 7 15:10:06 2022 +0800
Cache schemaTablesMap in jobConfig (#18924)
* Cache schemaTablesMap in jobConfig
* Unit test
---
...hardingRuleAlteredJobConfigurationPreparer.java | 30 +++++++++++++++++-----
.../api/config/TableNameSchemaNameMapping.java | 19 +++++++-------
.../rulealtered/RuleAlteredJobConfiguration.java | 5 ++++
.../yaml/RuleAlteredJobConfigurationSwapper.java | 3 ++-
.../yaml/YamlRuleAlteredJobConfiguration.java | 5 ++++
.../check/consistency/DataConsistencyChecker.java | 2 +-
.../job/environment/ScalingEnvironmentManager.java | 5 +---
.../datasource/PipelineDataSourceManagerTest.java | 7 +++++
8 files changed, 53 insertions(+), 23 deletions(-)
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java
index 0f230a4d29f..884c1c44181 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java
@@ -79,6 +79,7 @@ public final class
ShardingRuleAlteredJobConfigurationPreparer implements RuleAl
yamlJobConfig.setJobShardingDataNodes(getJobShardingDataNodes(actualDataNodes));
yamlJobConfig.setLogicTables(getLogicTables(actualDataNodes.keySet()));
yamlJobConfig.setTablesFirstDataNodes(getTablesFirstDataNodes(actualDataNodes));
+
yamlJobConfig.setSchemaTablesMap(getSchemaTablesMap(yamlJobConfig.getDatabaseName(),
actualDataNodes.keySet()));
}
private static Map<String, List<DataNode>> getActualDataNodes(final
RuleAlteredJobConfiguration jobConfig) {
@@ -134,6 +135,22 @@ public final class
ShardingRuleAlteredJobConfigurationPreparer implements RuleAl
return new JobDataNodeLine(dataNodeEntries).marshal();
}
+ private static Map<String, List<String>> getSchemaTablesMap(final String
databaseName, final Set<String> logicTables) {
+ // TODO get by search_path
+ ShardingSphereDatabase database =
PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabases().get(databaseName);
+ Map<String, List<String>> result = new LinkedHashMap<>();
+ database.getSchemas().forEach((schemaName, schema) -> {
+ for (String each : schema.getAllTableNames()) {
+ if (!logicTables.contains(each)) {
+ continue;
+ }
+ result.computeIfAbsent(schemaName, unused -> new
LinkedList<>()).add(each);
+ }
+ });
+ log.info("getSchemaTablesMap, result={}", result);
+ return result;
+ }
+
// TODO use jobConfig as parameter, jobShardingItem
@Override
public TaskConfiguration createTaskConfiguration(final
RuleAlteredJobConfiguration jobConfig, final int jobShardingItem, final
OnRuleAlteredActionConfiguration onRuleAlteredActionConfig) {
@@ -148,13 +165,13 @@ public final class
ShardingRuleAlteredJobConfigurationPreparer implements RuleAl
tableNameMap.put(new ActualTableName(dataNode.getTableName()),
new LogicTableName(each.getLogicTableName()));
}
}
- ShardingSphereDatabase database =
PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabases().get(jobConfig.getDatabaseName());
+ TableNameSchemaNameMapping tableNameSchemaNameMapping = new
TableNameSchemaNameMapping(TableNameSchemaNameMapping.convert(jobConfig.getSchemaTablesMap()));
DumperConfiguration dumperConfig =
createDumperConfiguration(jobConfig.getDatabaseName(), dataSourceName,
-
dataSourcePropsMap.get(dataSourceName).getAllLocalProperties(), tableNameMap,
database);
+
dataSourcePropsMap.get(dataSourceName).getAllLocalProperties(), tableNameMap,
tableNameSchemaNameMapping);
Optional<ShardingRuleConfiguration> targetRuleConfig =
getTargetRuleConfiguration(jobConfig);
Set<LogicTableName> reShardNeededTables =
jobConfig.splitLogicTableNames().stream().map(LogicTableName::new).collect(Collectors.toSet());
Map<LogicTableName, Set<String>> shardingColumnsMap =
getShardingColumnsMap(targetRuleConfig.orElse(sourceRuleConfig),
reShardNeededTables);
- ImporterConfiguration importerConfig =
createImporterConfiguration(jobConfig, onRuleAlteredActionConfig,
shardingColumnsMap, database);
+ ImporterConfiguration importerConfig =
createImporterConfiguration(jobConfig, onRuleAlteredActionConfig,
shardingColumnsMap, tableNameSchemaNameMapping);
TaskConfiguration result = new TaskConfiguration(jobConfig,
dumperConfig, importerConfig);
log.info("createTaskConfiguration, dataSourceName={}, result={}",
dataSourceName, result);
return result;
@@ -210,20 +227,19 @@ public final class
ShardingRuleAlteredJobConfigurationPreparer implements RuleAl
}
private static DumperConfiguration createDumperConfiguration(final String
databaseName, final String dataSourceName, final Map<String, Object> props,
- final
Map<ActualTableName, LogicTableName> tableNameMap, final ShardingSphereDatabase
database) {
+ final
Map<ActualTableName, LogicTableName> tableNameMap, final
TableNameSchemaNameMapping tableNameSchemaNameMapping) {
DumperConfiguration result = new DumperConfiguration();
result.setDatabaseName(databaseName);
result.setDataSourceName(dataSourceName);
result.setDataSourceConfig(new
StandardPipelineDataSourceConfiguration(YamlEngine.marshal(props)));
result.setTableNameMap(tableNameMap);
- result.setTableNameSchemaNameMapping(new
TableNameSchemaNameMapping(TableNameSchemaNameMapping.convert(database.getSchemas())));
+ result.setTableNameSchemaNameMapping(tableNameSchemaNameMapping);
return result;
}
private static ImporterConfiguration createImporterConfiguration(final
RuleAlteredJobConfiguration jobConfig, final OnRuleAlteredActionConfiguration
onRuleAlteredActionConfig,
- final
Map<LogicTableName, Set<String>> shardingColumnsMap, final
ShardingSphereDatabase database) {
+ final
Map<LogicTableName, Set<String>> shardingColumnsMap, final
TableNameSchemaNameMapping tableNameSchemaNameMapping) {
PipelineDataSourceConfiguration dataSourceConfig =
PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getTarget().getType(),
jobConfig.getTarget().getParameter());
- TableNameSchemaNameMapping tableNameSchemaNameMapping = new
TableNameSchemaNameMapping(TableNameSchemaNameMapping.convert(database.getSchemas()));
int batchSize = onRuleAlteredActionConfig.getOutput().getBatchSize();
int retryTimes = jobConfig.getRetryTimes();
return new ImporterConfiguration(dataSourceConfig,
unmodifiable(shardingColumnsMap), tableNameSchemaNameMapping, batchSize,
retryTimes);
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/TableNameSchemaNameMapping.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/TableNameSchemaNameMapping.java
index 59637ef06e4..c910ccc60ad 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/TableNameSchemaNameMapping.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/TableNameSchemaNameMapping.java
@@ -18,18 +18,18 @@
package org.apache.shardingsphere.data.pipeline.api.config;
import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
+import lombok.ToString;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
-import
org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
/**
* Table name and schema name mapping.
*/
@RequiredArgsConstructor
-@Slf4j
+@ToString
public final class TableNameSchemaNameMapping {
private final Map<LogicTableName, String> mapping;
@@ -37,17 +37,16 @@ public final class TableNameSchemaNameMapping {
/**
* Convert table name and schema name mapping from schemas.
*
- * @param schemas logic table name and schema map
+ * @param schemaTablesMap schema name and table names map
* @return logic table name and schema name map
*/
- public static Map<LogicTableName, String> convert(final Map<String,
ShardingSphereSchema> schemas) {
- Map<LogicTableName, String> result = new HashMap<>();
- schemas.forEach((schemaName, schema) -> {
- for (String each : schema.getAllTableNames()) {
+ public static Map<LogicTableName, String> convert(final Map<String,
List<String>> schemaTablesMap) {
+ Map<LogicTableName, String> result = new LinkedHashMap<>();
+ schemaTablesMap.forEach((schemaName, tableNames) -> {
+ for (String each : tableNames) {
result.put(new LogicTableName(each), schemaName);
}
});
- log.info("mapping={}", result);
return result;
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfiguration.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfiguration.java
index b32411d9d19..f112b5168fc 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfiguration.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfiguration.java
@@ -56,6 +56,11 @@ public final class RuleAlteredJobConfiguration implements
PipelineJobConfigurati
*/
private final Map<String, List<String>> alteredRuleYamlClassNameTablesMap;
+ /**
+ * Map{schema name, logic table names}.
+ */
+ private final Map<String, List<String>> schemaTablesMap;
+
private final String logicTables;
/**
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/yaml/RuleAlteredJobConfigurationSwapper.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/yaml/RuleAlteredJobConfigurationSwapper.java
index df11b1661e9..b05fa66894c 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/yaml/RuleAlteredJobConfigurationSwapper.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/yaml/RuleAlteredJobConfigurationSwapper.java
@@ -44,6 +44,7 @@ public final class RuleAlteredJobConfigurationSwapper
implements YamlConfigurati
result.setSource(dataSourceConfigSwapper.swapToYamlConfiguration(data.getSource()));
result.setTarget(dataSourceConfigSwapper.swapToYamlConfiguration(data.getTarget()));
result.setAlteredRuleYamlClassNameTablesMap(data.getAlteredRuleYamlClassNameTablesMap());
+ result.setSchemaTablesMap(data.getSchemaTablesMap());
result.setLogicTables(data.getLogicTables());
result.setTablesFirstDataNodes(data.getTablesFirstDataNodes());
result.setJobShardingDataNodes(data.getJobShardingDataNodes());
@@ -58,7 +59,7 @@ public final class RuleAlteredJobConfigurationSwapper
implements YamlConfigurati
yamlConfig.getActiveVersion(), yamlConfig.getNewVersion(),
yamlConfig.getSourceDatabaseType(),
yamlConfig.getTargetDatabaseType(),
dataSourceConfigSwapper.swapToObject(yamlConfig.getSource()),
dataSourceConfigSwapper.swapToObject(yamlConfig.getTarget()),
- yamlConfig.getAlteredRuleYamlClassNameTablesMap(),
yamlConfig.getLogicTables(),
+ yamlConfig.getAlteredRuleYamlClassNameTablesMap(),
yamlConfig.getSchemaTablesMap(), yamlConfig.getLogicTables(),
yamlConfig.getTablesFirstDataNodes(),
yamlConfig.getJobShardingDataNodes(),
yamlConfig.getConcurrency(), yamlConfig.getRetryTimes());
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/yaml/YamlRuleAlteredJobConfiguration.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/yaml/YamlRuleAlteredJobConfiguration.java
index a7b0cd9c7aa..9b3403ce0a3 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/yaml/YamlRuleAlteredJobConfiguration.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/yaml/YamlRuleAlteredJobConfiguration.java
@@ -64,6 +64,11 @@ public final class YamlRuleAlteredJobConfiguration
implements YamlConfiguration
*/
private Map<String, List<String>> alteredRuleYamlClassNameTablesMap;
+ /**
+ * Map{schema name, logic table names}.
+ */
+ private Map<String, List<String>> schemaTablesMap;
+
private String logicTables;
/**
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
index e58a79a9317..baf4d1d271c 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
@@ -78,7 +78,7 @@ public final class DataConsistencyChecker {
this.jobConfig = jobConfig;
logicTableNames = jobConfig.splitLogicTableNames();
ShardingSphereDatabase database =
PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabases().get(jobConfig.getDatabaseName());
- tableNameSchemaNameMapping = new
TableNameSchemaNameMapping(TableNameSchemaNameMapping.convert(database.getSchemas()));
+ tableNameSchemaNameMapping = new
TableNameSchemaNameMapping(TableNameSchemaNameMapping.convert(jobConfig.getSchemaTablesMap()));
}
/**
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java
index 81db1ef4bca..ca1f3b6eeb3 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java
@@ -23,11 +23,9 @@ import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAltere
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
-import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
-import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import java.sql.Connection;
import java.sql.PreparedStatement;
@@ -52,8 +50,7 @@ public final class ScalingEnvironmentManager {
log.info("cleanupTargetTables, tables={}", tables);
PipelineDataSourceConfiguration target = jobConfig.getTarget();
PipelineSQLBuilder pipelineSQLBuilder =
PipelineSQLBuilderFactory.getInstance(jobConfig.getTargetDatabaseType());
- ShardingSphereDatabase database =
PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabases().get(jobConfig.getDatabaseName());
- TableNameSchemaNameMapping tableNameSchemaNameMapping = new
TableNameSchemaNameMapping(TableNameSchemaNameMapping.convert(database.getSchemas()));
+ TableNameSchemaNameMapping tableNameSchemaNameMapping = new
TableNameSchemaNameMapping(TableNameSchemaNameMapping.convert(jobConfig.getSchemaTablesMap()));
try (
PipelineDataSourceWrapper dataSource =
PipelineDataSourceFactory.newInstance(PipelineDataSourceConfigurationFactory.newInstance(target.getType(),
target.getParameter()));
Connection connection = dataSource.getConnection()) {
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManagerTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManagerTest.java
index 720de47b899..7cf6418e7df 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManagerTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManagerTest.java
@@ -21,8 +21,10 @@ import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAltere
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
import
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
+import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
import javax.sql.DataSource;
@@ -38,6 +40,11 @@ public final class PipelineDataSourceManagerTest {
private RuleAlteredJobConfiguration jobConfig;
+ @BeforeClass
+ public static void beforeClass() {
+ PipelineContextUtil.mockModeConfigAndContextManager();
+ }
+
@Before
public void setUp() {
jobConfig = JobConfigurationBuilder.createJobConfiguration();