sandynz commented on code in PR #20966:
URL: https://github.com/apache/shardingsphere/pull/20966#discussion_r970655051
##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/StandardPipelineTableMetaDataLoader.java:
##########
@@ -88,34 +93,66 @@ private boolean isSchemaAvailable() {
private Map<TableName, PipelineTableMetaData> loadTableMetaData0(final
Connection connection, final String schemaName, final String tableNamePattern)
throws SQLException {
Map<String, Map<String, PipelineColumnMetaData>>
tablePipelineColumnMetaDataMap = new LinkedHashMap<>();
- try (ResultSet resultSet =
connection.getMetaData().getColumns(connection.getCatalog(), schemaName,
tableNamePattern, "%")) {
+ Map<String, Map<String, Collection<String>>> uniqueKeysMap = new
HashMap<>();
+ Map<String, Set<String>> primaryKeysMap = new HashMap<>();
Review Comment:
Could we remove uniqueKeysMap & primaryKeysMap, and just load them on demand
##########
shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/general/mysql.xml:
##########
@@ -62,7 +64,7 @@
</create-table-order-item>
<full-insert-order>
- INSERT INTO
+ INSERT IGNORE INTO
Review Comment:
It's better not add `IGNORE`, else we could not know when ISNERT SQL
execution failed
##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/StandardPipelineTableMetaDataLoader.java:
##########
@@ -88,34 +93,66 @@ private boolean isSchemaAvailable() {
private Map<TableName, PipelineTableMetaData> loadTableMetaData0(final
Connection connection, final String schemaName, final String tableNamePattern)
throws SQLException {
Map<String, Map<String, PipelineColumnMetaData>>
tablePipelineColumnMetaDataMap = new LinkedHashMap<>();
- try (ResultSet resultSet =
connection.getMetaData().getColumns(connection.getCatalog(), schemaName,
tableNamePattern, "%")) {
+ Map<String, Map<String, Collection<String>>> uniqueKeysMap = new
HashMap<>();
+ Map<String, Set<String>> primaryKeysMap = new HashMap<>();
+ List<String> tableNames = new ArrayList<>();
+ try (ResultSet resultSet =
connection.getMetaData().getTables(connection.getCatalog(), schemaName,
tableNamePattern, null)) {
while (resultSet.next()) {
- int ordinalPosition = resultSet.getInt("ORDINAL_POSITION");
String tableName = resultSet.getString("TABLE_NAME");
- Map<String, PipelineColumnMetaData> columnMetaDataMap =
tablePipelineColumnMetaDataMap.computeIfAbsent(tableName, k -> new
LinkedHashMap<>());
- String columnName = resultSet.getString("COLUMN_NAME");
- if (columnMetaDataMap.containsKey(columnName)) {
- continue;
- }
- int dataType = resultSet.getInt("DATA_TYPE");
- String dataTypeName = resultSet.getString("TYPE_NAME");
- Set<String> primaryKeys;
- try {
- primaryKeys = loadPrimaryKeys(connection, schemaName,
tableName);
- } catch (final SQLException ex) {
- log.error("loadPrimaryKeys failed, tableName={}",
tableName);
- throw ex;
+ tableNames.add(tableName);
+ primaryKeysMap.put(tableName, loadPrimaryKeys(connection,
schemaName, tableName));
+ uniqueKeysMap.put(tableName,
loadUniqueIndexesOfTable(connection, schemaName, tableName));
+ }
+ }
+ for (String each : tableNames) {
+ try (ResultSet resultSet =
connection.getMetaData().getColumns(connection.getCatalog(), schemaName,
tableNamePattern, "%")) {
+ while (resultSet.next()) {
+ int ordinalPosition = resultSet.getInt("ORDINAL_POSITION");
+ String tableName = resultSet.getString("TABLE_NAME");
+ Map<String, PipelineColumnMetaData> columnMetaDataMap =
tablePipelineColumnMetaDataMap.computeIfAbsent(tableName, k -> new
LinkedHashMap<>());
+ String columnName = resultSet.getString("COLUMN_NAME");
+ if (columnMetaDataMap.containsKey(columnName)) {
+ continue;
+ }
+ int dataType = resultSet.getInt("DATA_TYPE");
+ String dataTypeName = resultSet.getString("TYPE_NAME");
+ Set<String> primaryKeys =
primaryKeysMap.getOrDefault(each, Collections.emptySet());
+ boolean primaryKey = primaryKeys.contains(columnName);
+ boolean isNullable =
"YES".equals(resultSet.getString("IS_NULLABLE"));
+ Map<String, Collection<String>> uniqueKeys =
uniqueKeysMap.getOrDefault(tableName, Collections.emptyMap());
+ boolean isUniqueKey = primaryKey ||
uniqueKeys.values().stream().anyMatch(names -> names.contains(columnName));
+ PipelineColumnMetaData columnMetaData = new
PipelineColumnMetaData(ordinalPosition, columnName, dataType, dataTypeName,
isNullable, primaryKey, isUniqueKey);
+ columnMetaDataMap.put(columnName, columnMetaData);
}
- boolean primaryKey = primaryKeys.contains(columnName);
- boolean isNullable =
"YES".equals(resultSet.getString("IS_NULLABLE"));
- PipelineColumnMetaData columnMetaData = new
PipelineColumnMetaData(ordinalPosition, columnName, dataType, dataTypeName,
isNullable, primaryKey);
- columnMetaDataMap.put(columnName, columnMetaData);
}
}
Map<TableName, PipelineTableMetaData> result = new LinkedHashMap<>();
for (Entry<String, Map<String, PipelineColumnMetaData>> entry :
tablePipelineColumnMetaDataMap.entrySet()) {
String tableName = entry.getKey();
- result.put(new TableName(tableName), new
PipelineTableMetaData(tableName, entry.getValue(),
loadIndexesOfTable(connection, schemaName, entry.getValue(), tableName)));
+ Map<String, PipelineColumnMetaData> metaDataMap =
tablePipelineColumnMetaDataMap.get(tableName);
+ Map<String, Collection<String>> uniqueKeys =
uniqueKeysMap.getOrDefault(tableName, Collections.emptyMap());
+ Collection<PipelineIndexMetaData> uniqueIndexMetaData =
uniqueKeys.entrySet().stream()
+ .map(each -> new PipelineIndexMetaData(each.getKey(),
each.getValue().stream().map(metaDataMap::get).collect(Collectors.toList()))).collect(Collectors.toList());
+ result.put(new TableName(tableName), new
PipelineTableMetaData(tableName, entry.getValue(), uniqueIndexMetaData));
+ }
+ return result;
+ }
+
+ private Map<String, Collection<String>> loadUniqueIndexesOfTable(final
Connection connection, final String schemaName, final String tableName) throws
SQLException {
Review Comment:
Is the old `loadIndexesOfTable` method still needed?
##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/StandardPipelineTableMetaDataLoader.java:
##########
@@ -88,34 +93,66 @@ private boolean isSchemaAvailable() {
private Map<TableName, PipelineTableMetaData> loadTableMetaData0(final
Connection connection, final String schemaName, final String tableNamePattern)
throws SQLException {
Map<String, Map<String, PipelineColumnMetaData>>
tablePipelineColumnMetaDataMap = new LinkedHashMap<>();
- try (ResultSet resultSet =
connection.getMetaData().getColumns(connection.getCatalog(), schemaName,
tableNamePattern, "%")) {
+ Map<String, Map<String, Collection<String>>> uniqueKeysMap = new
HashMap<>();
+ Map<String, Set<String>> primaryKeysMap = new HashMap<>();
+ List<String> tableNames = new ArrayList<>();
+ try (ResultSet resultSet =
connection.getMetaData().getTables(connection.getCatalog(), schemaName,
tableNamePattern, null)) {
while (resultSet.next()) {
- int ordinalPosition = resultSet.getInt("ORDINAL_POSITION");
String tableName = resultSet.getString("TABLE_NAME");
- Map<String, PipelineColumnMetaData> columnMetaDataMap =
tablePipelineColumnMetaDataMap.computeIfAbsent(tableName, k -> new
LinkedHashMap<>());
- String columnName = resultSet.getString("COLUMN_NAME");
- if (columnMetaDataMap.containsKey(columnName)) {
- continue;
- }
- int dataType = resultSet.getInt("DATA_TYPE");
- String dataTypeName = resultSet.getString("TYPE_NAME");
- Set<String> primaryKeys;
- try {
- primaryKeys = loadPrimaryKeys(connection, schemaName,
tableName);
- } catch (final SQLException ex) {
- log.error("loadPrimaryKeys failed, tableName={}",
tableName);
- throw ex;
+ tableNames.add(tableName);
+ primaryKeysMap.put(tableName, loadPrimaryKeys(connection,
schemaName, tableName));
+ uniqueKeysMap.put(tableName,
loadUniqueIndexesOfTable(connection, schemaName, tableName));
+ }
+ }
+ for (String each : tableNames) {
+ try (ResultSet resultSet =
connection.getMetaData().getColumns(connection.getCatalog(), schemaName,
tableNamePattern, "%")) {
+ while (resultSet.next()) {
Review Comment:
Looks `tableNamePattern` should be `each`, since it's for loop to load
columns for every table
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]