tuichenchuxin commented on a change in pull request #11345: URL: https://github.com/apache/shardingsphere/pull/11345#discussion_r683920977
########## File path: shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/metadata/schema/builder/SchemaBuilderWithDefaultLoader.java ########## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.infra.metadata.schema.builder; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.shardingsphere.infra.database.type.DatabaseType; +import org.apache.shardingsphere.infra.datanode.DataNode; +import org.apache.shardingsphere.infra.exception.ShardingSphereException; +import org.apache.shardingsphere.infra.metadata.schema.builder.loader.ColumnMetaDataLoader; +import org.apache.shardingsphere.infra.metadata.schema.builder.loader.SchemaMetaDataLoader; +import org.apache.shardingsphere.infra.metadata.schema.builder.loader.adapter.MetaDataLoaderConnectionAdapter; +import org.apache.shardingsphere.infra.metadata.schema.model.ColumnMetaData; +import org.apache.shardingsphere.infra.metadata.schema.model.TableMetaData; + +import javax.sql.DataSource; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.stream.Collectors; + +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public final class SchemaBuilderWithDefaultLoader { + + /** + * Build table metadata with default loader. + * @param executorService executorService + * @param materials schema builder materials + * @param logicTableDataNodesMap map of logicTable to DataNodes + * @return metadata map + * @throws SQLException SQL exception + */ + public static Map<String, TableMetaData> build(final ExecutorService executorService, final SchemaBuilderMaterials materials, + final Map<String, Collection<DataNode>> logicTableDataNodesMap) throws SQLException { + Map<String, TableMetaData> result = new HashMap<>(materials.getRules().size(), 1); + result.putAll(appendActualTables(executorService, materials, logicTableDataNodesMap)); + result.putAll(appendRuleLogicTables(executorService, materials, logicTableDataNodesMap)); + return result; + } + + private static Map<String, TableMetaData> appendActualTables(final ExecutorService executorService, final SchemaBuilderMaterials materials, + final Map<String, Collection<DataNode>> logicTableDataNodesMap) throws SQLException { + Collection<String> existedTableNames = logicTableDataNodesMap.values().stream() + .flatMap(each -> each.stream().map(DataNode::getTableName)) + .collect(Collectors.toSet()); + + Map<String, TableMetaData> result = new HashMap<>(materials.getRules().size()); + for (Map.Entry<String, DataSource> entry : materials.getDataSourceMap().entrySet()) { + Collection<String> tableNames = SchemaMetaDataLoader.loadAllTableNames(entry.getValue(), materials.getDatabaseType()); + tableNames.removeAll(existedTableNames); + + Collection<Future<TableMetaData>> futures = new ArrayList<>(tableNames.size()); + for (String each : tableNames) { + futures.add(executorService.submit(() -> loadTableMetaData(each, entry.getValue(), materials.getDatabaseType()))); Review comment: Each table is individually submitted to the thread pool, can it really improve performance? ########## File path: shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/metadata/schema/builder/SchemaBuilder.java ########## @@ -66,119 +59,114 @@ } /** - * build actual and logic table meta data. + * Build actual and logic table metadata. * * @param materials schema builder materials - * @return actual and logic table meta data + * @return actual and logic table metadata * @throws SQLException SQL exception */ public static Map<TableMetaData, TableMetaData> build(final SchemaBuilderMaterials materials) throws SQLException { Map<String, TableMetaData> actualTableMetaMap = buildActualTableMetaDataMap(materials); Map<String, TableMetaData> logicTableMetaMap = buildLogicTableMetaDataMap(materials, actualTableMetaMap); Map<TableMetaData, TableMetaData> tableMetaDataMap = new HashMap<>(actualTableMetaMap.size(), 1); - for (Entry<String, TableMetaData> entry : actualTableMetaMap.entrySet()) { + for (Map.Entry<String, TableMetaData> entry : actualTableMetaMap.entrySet()) { tableMetaDataMap.put(entry.getValue(), logicTableMetaMap.getOrDefault(entry.getKey(), entry.getValue())); } return tableMetaDataMap; } private static Map<String, TableMetaData> buildActualTableMetaDataMap(final SchemaBuilderMaterials materials) throws SQLException { - Map<String, TableMetaData> result = new HashMap<>(materials.getRules().size(), 1); - appendRemainTables(materials, result); - for (ShardingSphereRule rule : materials.getRules()) { - if (rule instanceof TableContainedRule) { - for (String table : ((TableContainedRule) rule).getTables()) { - if (!result.containsKey(table)) { - TableMetaDataBuilder.load(table, materials).map(optional -> result.put(table, optional)); - } - } - } + Map<String, Collection<DataNode>> logicTableDataNodesMap = getLogicTableDataNodesMap(materials); + Optional<DialectTableMetaDataLoader> dialectLoader = SchemaBuilderWithDialectLoader.findDialectTableMetaDataLoader(materials); + Map<String, TableMetaData> result; + if (dialectLoader.isPresent()) { + result = SchemaBuilderWithDialectLoader.build(dialectLoader.get(), EXECUTOR_SERVICE, materials, logicTableDataNodesMap); + } else { + result = SchemaBuilderWithDefaultLoader.build(EXECUTOR_SERVICE, materials, logicTableDataNodesMap); } return result; } - private static void appendRemainTables(final SchemaBuilderMaterials materials, final Map<String, TableMetaData> tables) throws SQLException { - Optional<DialectTableMetaDataLoader> dialectLoader = findDialectTableMetaDataLoader(materials); - if (dialectLoader.isPresent()) { - appendDialectRemainTables(dialectLoader.get(), materials, tables); - return; + private static Map<String, Collection<DataNode>> getLogicTableDataNodesMap(final SchemaBuilderMaterials materials) { + List<DataNodeContainedRule> dataNodeContainedRuleList = materials.getRules().stream() + .filter(each -> each instanceof DataNodeContainedRule) + .map(each -> (DataNodeContainedRule) each) + .collect(Collectors.toList()); + if (CollectionUtils.isEmpty(dataNodeContainedRuleList)) { + return Collections.emptyMap(); } - appendDefaultRemainTables(materials, tables); - } - - private static Map<String, TableMetaData> buildLogicTableMetaDataMap(final SchemaBuilderMaterials materials, final Map<String, TableMetaData> tables) { - Map<String, TableMetaData> result = new HashMap<>(materials.getRules().size(), 1); - for (ShardingSphereRule rule : materials.getRules()) { - if (rule instanceof TableContainedRule) { - for (String table : ((TableContainedRule) rule).getTables()) { - if (tables.containsKey(table)) { - TableMetaData metaData = TableMetaDataBuilder.decorate(table, tables.get(table), materials.getRules()); - result.put(table, metaData); - } - } - } + + Map<String, Collection<DataNode>> logicTableDataNodes = getLogicTableDataNodesMap(dataNodeContainedRuleList); + Map<String, Collection<String>> logicActualdataSourceMap = getLogicActualDataSourceMap(materials); + if (MapUtils.isEmpty(logicActualdataSourceMap)) { + return logicTableDataNodes; } - return result; + + return replaceLogicDataNodeWithActualDataNode(logicTableDataNodes, logicActualdataSourceMap); } - private static Optional<DialectTableMetaDataLoader> findDialectTableMetaDataLoader(final SchemaBuilderMaterials materials) { - for (DialectTableMetaDataLoader each : ShardingSphereServiceLoader.getSingletonServiceInstances(DialectTableMetaDataLoader.class)) { - if (each.getDatabaseType().equals(materials.getDatabaseType().getName())) { - return Optional.of(each); - } - } - return Optional.empty(); + private static Map<String, Collection<DataNode>> getLogicTableDataNodesMap( + final List<DataNodeContainedRule> dataNodeContainedRuleList) { + BinaryOperator<Collection<DataNode>> logicTableMergeFunction = getMergeFunction(DataNode.class); + Map<String, Collection<DataNode>> logicTableDataNodes = dataNodeContainedRuleList.stream() + .flatMap(each -> each.getAllDataNodes().entrySet().stream()) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, logicTableMergeFunction)); + return logicTableDataNodes; } - private static void appendDialectRemainTables(final DialectTableMetaDataLoader dialectLoader, final SchemaBuilderMaterials materials, final Map<String, TableMetaData> tables) throws SQLException { - Collection<Future<Map<String, TableMetaData>>> futures = new LinkedList<>(); - Collection<String> existedTables = getExistedTables(materials.getRules(), tables); - for (DataSource each : materials.getDataSourceMap().values()) { - futures.add(EXECUTOR_SERVICE.submit(() -> dialectLoader.load(each, existedTables))); - } - for (Future<Map<String, TableMetaData>> each : futures) { - try { - tables.putAll(each.get()); - } catch (final InterruptedException | ExecutionException ex) { - if (ex.getCause() instanceof SQLException) { - throw (SQLException) ex.getCause(); - } - throw new ShardingSphereException(ex); - } - } + private static Map<String, Collection<String>> getLogicActualDataSourceMap(final SchemaBuilderMaterials materials) { + BinaryOperator<Collection<String>> dataSourceContainedMergeFunction = getMergeFunction(String.class); + Map<String, Collection<String>> dataSourceContainedMap = materials.getRules().stream() + .filter(each -> each instanceof DataSourceContainedRule) + .flatMap(each -> ((DataSourceContainedRule) each).getDataSourceMapper().entrySet().stream()) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, dataSourceContainedMergeFunction)); + return dataSourceContainedMap; } - private static void appendDefaultRemainTables(final SchemaBuilderMaterials materials, final Map<String, TableMetaData> tables) throws SQLException { - Collection<String> existedTableNames = getExistedTables(materials.getRules(), tables); - for (Entry<String, DataSource> entry : materials.getDataSourceMap().entrySet()) { - Collection<String> tableNames = SchemaMetaDataLoader.loadAllTableNames(entry.getValue(), materials.getDatabaseType()); - tableNames.removeAll(existedTableNames); - for (String each : tableNames) { - tables.put(each, loadTableMetaData(each, entry.getValue(), materials.getDatabaseType())); - } - } + private static <T> BinaryOperator<Collection<T>> getMergeFunction(final Class<T> tClass) { Review comment: `tClass` is not used. ########## File path: shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/metadata/schema/builder/SchemaBuilder.java ########## @@ -66,119 +59,114 @@ } /** - * build actual and logic table meta data. + * Build actual and logic table metadata. * * @param materials schema builder materials - * @return actual and logic table meta data + * @return actual and logic table metadata * @throws SQLException SQL exception */ public static Map<TableMetaData, TableMetaData> build(final SchemaBuilderMaterials materials) throws SQLException { Map<String, TableMetaData> actualTableMetaMap = buildActualTableMetaDataMap(materials); Map<String, TableMetaData> logicTableMetaMap = buildLogicTableMetaDataMap(materials, actualTableMetaMap); Map<TableMetaData, TableMetaData> tableMetaDataMap = new HashMap<>(actualTableMetaMap.size(), 1); - for (Entry<String, TableMetaData> entry : actualTableMetaMap.entrySet()) { + for (Map.Entry<String, TableMetaData> entry : actualTableMetaMap.entrySet()) { tableMetaDataMap.put(entry.getValue(), logicTableMetaMap.getOrDefault(entry.getKey(), entry.getValue())); } return tableMetaDataMap; } private static Map<String, TableMetaData> buildActualTableMetaDataMap(final SchemaBuilderMaterials materials) throws SQLException { - Map<String, TableMetaData> result = new HashMap<>(materials.getRules().size(), 1); - appendRemainTables(materials, result); - for (ShardingSphereRule rule : materials.getRules()) { - if (rule instanceof TableContainedRule) { - for (String table : ((TableContainedRule) rule).getTables()) { - if (!result.containsKey(table)) { - TableMetaDataBuilder.load(table, materials).map(optional -> result.put(table, optional)); - } - } - } + Map<String, Collection<DataNode>> logicTableDataNodesMap = getLogicTableDataNodesMap(materials); + Optional<DialectTableMetaDataLoader> dialectLoader = SchemaBuilderWithDialectLoader.findDialectTableMetaDataLoader(materials); + Map<String, TableMetaData> result; + if (dialectLoader.isPresent()) { + result = SchemaBuilderWithDialectLoader.build(dialectLoader.get(), EXECUTOR_SERVICE, materials, logicTableDataNodesMap); + } else { + result = SchemaBuilderWithDefaultLoader.build(EXECUTOR_SERVICE, materials, logicTableDataNodesMap); } return result; } - private static void appendRemainTables(final SchemaBuilderMaterials materials, final Map<String, TableMetaData> tables) throws SQLException { - Optional<DialectTableMetaDataLoader> dialectLoader = findDialectTableMetaDataLoader(materials); - if (dialectLoader.isPresent()) { - appendDialectRemainTables(dialectLoader.get(), materials, tables); - return; + private static Map<String, Collection<DataNode>> getLogicTableDataNodesMap(final SchemaBuilderMaterials materials) { + List<DataNodeContainedRule> dataNodeContainedRuleList = materials.getRules().stream() + .filter(each -> each instanceof DataNodeContainedRule) + .map(each -> (DataNodeContainedRule) each) + .collect(Collectors.toList()); + if (CollectionUtils.isEmpty(dataNodeContainedRuleList)) { + return Collections.emptyMap(); } - appendDefaultRemainTables(materials, tables); - } - - private static Map<String, TableMetaData> buildLogicTableMetaDataMap(final SchemaBuilderMaterials materials, final Map<String, TableMetaData> tables) { - Map<String, TableMetaData> result = new HashMap<>(materials.getRules().size(), 1); - for (ShardingSphereRule rule : materials.getRules()) { - if (rule instanceof TableContainedRule) { - for (String table : ((TableContainedRule) rule).getTables()) { - if (tables.containsKey(table)) { - TableMetaData metaData = TableMetaDataBuilder.decorate(table, tables.get(table), materials.getRules()); - result.put(table, metaData); - } - } - } + + Map<String, Collection<DataNode>> logicTableDataNodes = getLogicTableDataNodesMap(dataNodeContainedRuleList); + Map<String, Collection<String>> logicActualdataSourceMap = getLogicActualDataSourceMap(materials); + if (MapUtils.isEmpty(logicActualdataSourceMap)) { + return logicTableDataNodes; } - return result; + + return replaceLogicDataNodeWithActualDataNode(logicTableDataNodes, logicActualdataSourceMap); Review comment: I don’t seem to see this logic in the original code. What is the meaning of this logic? ########## File path: shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/metadata/schema/builder/SchemaBuilderWithDialectLoader.java ########## @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.infra.metadata.schema.builder; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import org.apache.shardingsphere.infra.datanode.DataNode; +import org.apache.shardingsphere.infra.exception.ShardingSphereException; +import org.apache.shardingsphere.infra.metadata.schema.builder.spi.DialectTableMetaDataLoader; +import org.apache.shardingsphere.infra.metadata.schema.model.TableMetaData; +import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader; + +import javax.sql.DataSource; +import java.sql.SQLException; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.LinkedList; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.stream.Collectors; + +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public final class SchemaBuilderWithDialectLoader { + Review comment: What is the point of extracting dialect loading and default loading separately? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
