tuichenchuxin commented on a change in pull request #11345:
URL: https://github.com/apache/shardingsphere/pull/11345#discussion_r683907218
##########
File path:
shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/metadata/schema/builder/SchemaBuilder.java
##########
@@ -66,119 +59,114 @@
}
/**
- * build actual and logic table meta data.
+ * Build actual and logic table metadata.
*
* @param materials schema builder materials
- * @return actual and logic table meta data
+ * @return actual and logic table metadata
* @throws SQLException SQL exception
*/
public static Map<TableMetaData, TableMetaData> build(final
SchemaBuilderMaterials materials) throws SQLException {
Map<String, TableMetaData> actualTableMetaMap =
buildActualTableMetaDataMap(materials);
Map<String, TableMetaData> logicTableMetaMap =
buildLogicTableMetaDataMap(materials, actualTableMetaMap);
Map<TableMetaData, TableMetaData> tableMetaDataMap = new
HashMap<>(actualTableMetaMap.size(), 1);
- for (Entry<String, TableMetaData> entry :
actualTableMetaMap.entrySet()) {
+ for (Map.Entry<String, TableMetaData> entry :
actualTableMetaMap.entrySet()) {
tableMetaDataMap.put(entry.getValue(),
logicTableMetaMap.getOrDefault(entry.getKey(), entry.getValue()));
}
return tableMetaDataMap;
}
private static Map<String, TableMetaData>
buildActualTableMetaDataMap(final SchemaBuilderMaterials materials) throws
SQLException {
- Map<String, TableMetaData> result = new
HashMap<>(materials.getRules().size(), 1);
- appendRemainTables(materials, result);
- for (ShardingSphereRule rule : materials.getRules()) {
- if (rule instanceof TableContainedRule) {
- for (String table : ((TableContainedRule) rule).getTables()) {
- if (!result.containsKey(table)) {
- TableMetaDataBuilder.load(table,
materials).map(optional -> result.put(table, optional));
- }
- }
- }
+ Map<String, Collection<DataNode>> logicTableDataNodesMap =
getLogicTableDataNodesMap(materials);
+ Optional<DialectTableMetaDataLoader> dialectLoader =
SchemaBuilderWithDialectLoader.findDialectTableMetaDataLoader(materials);
+ Map<String, TableMetaData> result;
+ if (dialectLoader.isPresent()) {
+ result = SchemaBuilderWithDialectLoader.build(dialectLoader.get(),
EXECUTOR_SERVICE, materials, logicTableDataNodesMap);
+ } else {
+ result = SchemaBuilderWithDefaultLoader.build(EXECUTOR_SERVICE,
materials, logicTableDataNodesMap);
}
return result;
}
- private static void appendRemainTables(final SchemaBuilderMaterials
materials, final Map<String, TableMetaData> tables) throws SQLException {
- Optional<DialectTableMetaDataLoader> dialectLoader =
findDialectTableMetaDataLoader(materials);
- if (dialectLoader.isPresent()) {
- appendDialectRemainTables(dialectLoader.get(), materials, tables);
- return;
+ private static Map<String, Collection<DataNode>>
getLogicTableDataNodesMap(final SchemaBuilderMaterials materials) {
+ List<DataNodeContainedRule> dataNodeContainedRuleList =
materials.getRules().stream()
+ .filter(each -> each instanceof DataNodeContainedRule)
+ .map(each -> (DataNodeContainedRule) each)
+ .collect(Collectors.toList());
+ if (CollectionUtils.isEmpty(dataNodeContainedRuleList)) {
+ return Collections.emptyMap();
}
- appendDefaultRemainTables(materials, tables);
- }
-
- private static Map<String, TableMetaData> buildLogicTableMetaDataMap(final
SchemaBuilderMaterials materials, final Map<String, TableMetaData> tables) {
- Map<String, TableMetaData> result = new
HashMap<>(materials.getRules().size(), 1);
- for (ShardingSphereRule rule : materials.getRules()) {
- if (rule instanceof TableContainedRule) {
- for (String table : ((TableContainedRule) rule).getTables()) {
- if (tables.containsKey(table)) {
- TableMetaData metaData =
TableMetaDataBuilder.decorate(table, tables.get(table), materials.getRules());
- result.put(table, metaData);
- }
- }
- }
+
+ Map<String, Collection<DataNode>> logicTableDataNodes =
getLogicTableDataNodesMap(dataNodeContainedRuleList);
+ Map<String, Collection<String>> logicActualdataSourceMap =
getLogicActualDataSourceMap(materials);
+ if (MapUtils.isEmpty(logicActualdataSourceMap)) {
+ return logicTableDataNodes;
}
- return result;
+
+ return replaceLogicDataNodeWithActualDataNode(logicTableDataNodes,
logicActualdataSourceMap);
}
- private static Optional<DialectTableMetaDataLoader>
findDialectTableMetaDataLoader(final SchemaBuilderMaterials materials) {
- for (DialectTableMetaDataLoader each :
ShardingSphereServiceLoader.getSingletonServiceInstances(DialectTableMetaDataLoader.class))
{
- if
(each.getDatabaseType().equals(materials.getDatabaseType().getName())) {
- return Optional.of(each);
- }
- }
- return Optional.empty();
+ private static Map<String, Collection<DataNode>> getLogicTableDataNodesMap(
+ final List<DataNodeContainedRule> dataNodeContainedRuleList) {
+ BinaryOperator<Collection<DataNode>> logicTableMergeFunction =
getMergeFunction(DataNode.class);
+ Map<String, Collection<DataNode>> logicTableDataNodes =
dataNodeContainedRuleList.stream()
+ .flatMap(each -> each.getAllDataNodes().entrySet().stream())
+ .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue, logicTableMergeFunction));
+ return logicTableDataNodes;
}
- private static void appendDialectRemainTables(final
DialectTableMetaDataLoader dialectLoader, final SchemaBuilderMaterials
materials, final Map<String, TableMetaData> tables) throws SQLException {
- Collection<Future<Map<String, TableMetaData>>> futures = new
LinkedList<>();
- Collection<String> existedTables =
getExistedTables(materials.getRules(), tables);
- for (DataSource each : materials.getDataSourceMap().values()) {
- futures.add(EXECUTOR_SERVICE.submit(() -> dialectLoader.load(each,
existedTables)));
- }
- for (Future<Map<String, TableMetaData>> each : futures) {
- try {
- tables.putAll(each.get());
- } catch (final InterruptedException | ExecutionException ex) {
- if (ex.getCause() instanceof SQLException) {
- throw (SQLException) ex.getCause();
- }
- throw new ShardingSphereException(ex);
- }
- }
+ private static Map<String, Collection<String>>
getLogicActualDataSourceMap(final SchemaBuilderMaterials materials) {
+ BinaryOperator<Collection<String>> dataSourceContainedMergeFunction =
getMergeFunction(String.class);
+ Map<String, Collection<String>> dataSourceContainedMap =
materials.getRules().stream()
+ .filter(each -> each instanceof DataSourceContainedRule)
+ .flatMap(each -> ((DataSourceContainedRule)
each).getDataSourceMapper().entrySet().stream())
+ .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue, dataSourceContainedMergeFunction));
+ return dataSourceContainedMap;
}
- private static void appendDefaultRemainTables(final SchemaBuilderMaterials
materials, final Map<String, TableMetaData> tables) throws SQLException {
- Collection<String> existedTableNames =
getExistedTables(materials.getRules(), tables);
- for (Entry<String, DataSource> entry :
materials.getDataSourceMap().entrySet()) {
- Collection<String> tableNames =
SchemaMetaDataLoader.loadAllTableNames(entry.getValue(),
materials.getDatabaseType());
- tableNames.removeAll(existedTableNames);
- for (String each : tableNames) {
- tables.put(each, loadTableMetaData(each, entry.getValue(),
materials.getDatabaseType()));
- }
- }
+ private static <T> BinaryOperator<Collection<T>> getMergeFunction(final
Class<T> tClass) {
+ return (a, b) -> {
+ Collection<T> result = (a instanceof LinkedList) ? a : new
LinkedList<>(a);
+ result.addAll(b);
+ return result;
+ };
}
- private static TableMetaData loadTableMetaData(final String tableName,
final DataSource dataSource, final DatabaseType databaseType) throws
SQLException {
- TableMetaData result = new TableMetaData(tableName);
- try (Connection connection = new
MetaDataLoaderConnectionAdapter(databaseType, dataSource.getConnection())) {
- result.getColumns().putAll(loadColumnMetaDataMap(tableName,
databaseType, connection));
+ private static Map<String, Collection<DataNode>>
replaceLogicDataNodeWithActualDataNode(
+ final Map<String, Collection<DataNode>> logicTableDataNodes,
+ final Map<String, Collection<String>> logicActualdataSourceMap) {
+ Map<String, Collection<DataNode>> replaceResult = new
HashMap<>(logicTableDataNodes.size(), 1);
+ for (Map.Entry<String, Collection<DataNode>> entry :
logicTableDataNodes.entrySet()) {
+ Collection<DataNode> dataNodes = entry.getValue();
+ Collection<DataNode> actualDataNodeList = new LinkedList<>();
+ for (DataNode each : dataNodes) {
+ Collection<String> actualDataSourceNameList =
logicActualdataSourceMap.get(each.getDataSourceName());
+ if (CollectionUtils.isEmpty(actualDataSourceNameList)) {
+ actualDataNodeList.add(each);
+ } else {
+ for (String actualDataSourceName :
actualDataSourceNameList) {
+ actualDataNodeList.add(new
DataNode(actualDataSourceName, each.getTableName()));
+ }
+ }
+ }
+ replaceResult.put(entry.getKey(), actualDataNodeList);
}
- return result;
+ return replaceResult;
}
- private static Map<String, ColumnMetaData> loadColumnMetaDataMap(final
String tableName, final DatabaseType databaseType, final Connection connection)
throws SQLException {
- return ColumnMetaDataLoader.load(connection, tableName,
databaseType).stream().collect(Collectors.toMap(ColumnMetaData::getName, each
-> each, (a, b) -> b, LinkedHashMap::new));
- }
-
- private static Collection<String> getExistedTables(final
Collection<ShardingSphereRule> rules, final Map<String, TableMetaData> tables) {
- Collection<String> result = new LinkedHashSet<>();
- for (ShardingSphereRule each : rules) {
- if (each instanceof DataNodeContainedRule) {
- result.addAll(((DataNodeContainedRule)
each).getAllActualTables());
+ private static Map<String, TableMetaData> buildLogicTableMetaDataMap(final
SchemaBuilderMaterials materials,
Review comment:
Why is this method changed? It looks the same as it used to be.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]