This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new c1808284b2c Refactor StatisticsRefreshEngine (#34445)
c1808284b2c is described below
commit c1808284b2cc38f710e836d8acae6bb2d20d879c
Author: Liang Zhang <[email protected]>
AuthorDate: Thu Jan 23 21:19:37 2025 +0800
Refactor StatisticsRefreshEngine (#34445)
* Refactor StatisticsRefreshEngine
* Refactor StatisticsRefreshEngine
---
.../statistics/StatisticsRefreshEngine.java | 61 +++++++++++-----------
1 file changed, 31 insertions(+), 30 deletions(-)
diff --git
a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/refresher/statistics/StatisticsRefreshEngine.java
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/refresher/statistics/StatisticsRefreshEngine.java
index 0fa584b9104..273f5edd33e 100644
---
a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/refresher/statistics/StatisticsRefreshEngine.java
+++
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/refresher/statistics/StatisticsRefreshEngine.java
@@ -84,7 +84,7 @@ public final class StatisticsRefreshEngine {
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
- log.error("Collect data error", ex);
+ log.error("Collect statistics error.", ex);
}
}
@@ -92,15 +92,15 @@ public final class StatisticsRefreshEngine {
GlobalLockDefinition lockDefinition = new GlobalLockDefinition(new
StatisticsLock());
if (lockContext.tryLock(lockDefinition, 5000L)) {
try {
- ShardingSphereStatistics statistics =
contextManager.getMetaDataContexts().getStatistics();
+ ShardingSphereStatistics currentStatistics =
contextManager.getMetaDataContexts().getStatistics();
ShardingSphereMetaData metaData =
contextManager.getMetaDataContexts().getMetaData();
ShardingSphereStatistics changedStatistics = new
ShardingSphereStatistics();
- for (Entry<String, DatabaseStatistics> entry :
statistics.getDatabaseStatisticsMap().entrySet()) {
+ for (Entry<String, DatabaseStatistics> entry :
currentStatistics.getDatabaseStatisticsMap().entrySet()) {
if (metaData.containsDatabase(entry.getKey())) {
collectForDatabase(entry.getKey(), entry.getValue(),
metaData, changedStatistics);
}
}
- compareAndUpdate(changedStatistics);
+ compareAndUpdate(currentStatistics, changedStatistics,
metaData);
} finally {
lockContext.unlock(lockDefinition);
}
@@ -126,16 +126,19 @@ public final class StatisticsRefreshEngine {
private void collectForTable(final String databaseName, final String
schemaName, final ShardingSphereTable table,
final ShardingSphereMetaData metaData, final
ShardingSphereStatistics statistics) {
- Optional<TableStatisticsCollector> statisticsCollector =
TypedSPILoader.findService(TableStatisticsCollector.class, table.getName());
- Optional<TableStatistics> tableStatistics = Optional.empty();
- if (statisticsCollector.isPresent()) {
+ Optional<TableStatisticsCollector> tableStatisticsCollector =
TypedSPILoader.findService(TableStatisticsCollector.class, table.getName());
+ Optional<TableStatistics> tableStatistics;
+ if (tableStatisticsCollector.isPresent()) {
try {
- tableStatistics =
statisticsCollector.get().collect(databaseName, table, metaData);
+ tableStatistics =
tableStatisticsCollector.get().collect(databaseName, table, metaData);
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
- log.error(String.format("Collect %s.%s.%s data failed",
databaseName, schemaName, table.getName()), ex);
+ log.error("Collect {}.{}.{} statistics failed.", databaseName,
schemaName, table.getName(), ex);
+ tableStatistics = Optional.empty();
}
+ } else {
+ tableStatistics = Optional.empty();
}
DatabaseStatistics databaseStatistics =
statistics.containsDatabaseStatistics(databaseName) ?
statistics.getDatabaseStatistics(databaseName) : new DatabaseStatistics();
SchemaStatistics schemaStatistics =
databaseStatistics.containsSchemaStatistics(schemaName) ?
databaseStatistics.getSchemaStatistics(schemaName) : new SchemaStatistics();
@@ -144,47 +147,45 @@ public final class StatisticsRefreshEngine {
statistics.putDatabaseStatistics(databaseName, databaseStatistics);
}
- private void compareAndUpdate(final ShardingSphereStatistics
changedStatistics) {
- ShardingSphereMetaData metaData =
contextManager.getMetaDataContexts().getMetaData();
- ShardingSphereStatistics statistics =
contextManager.getMetaDataContexts().getStatistics();
+ private void compareAndUpdate(final ShardingSphereStatistics
currentStatistics, final ShardingSphereStatistics changedStatistics, final
ShardingSphereMetaData metaData) {
for (Entry<String, DatabaseStatistics> entry :
changedStatistics.getDatabaseStatisticsMap().entrySet()) {
- compareAndUpdateForDatabase(entry.getKey(),
statistics.getDatabaseStatistics(entry.getKey()), entry.getValue(), statistics,
metaData.getDatabase(entry.getKey()));
+ compareAndUpdateForDatabase(metaData.getDatabase(entry.getKey()),
currentStatistics, currentStatistics.getDatabaseStatistics(entry.getKey()),
entry.getValue());
}
- for (Entry<String, DatabaseStatistics> entry :
statistics.getDatabaseStatisticsMap().entrySet()) {
+ for (Entry<String, DatabaseStatistics> entry :
currentStatistics.getDatabaseStatisticsMap().entrySet()) {
if (!changedStatistics.containsDatabaseStatistics(entry.getKey()))
{
- statistics.dropDatabaseStatistics(entry.getKey());
+ currentStatistics.dropDatabaseStatistics(entry.getKey());
contextManager.getPersistServiceFacade().getMetaDataPersistService().getShardingSphereStatisticsPersistService().delete(entry.getKey());
}
}
}
- private void compareAndUpdateForDatabase(final String databaseName, final
DatabaseStatistics databaseStatistics, final DatabaseStatistics
changedDatabaseStatistics,
- final ShardingSphereStatistics
statistics, final ShardingSphereDatabase database) {
+ private void compareAndUpdateForDatabase(final ShardingSphereDatabase
database, final ShardingSphereStatistics currentStatistics,
+ final DatabaseStatistics
currentDatabaseStatistics, final DatabaseStatistics changedDatabaseStatistics) {
for (Entry<String, SchemaStatistics> entry :
changedDatabaseStatistics.getSchemaStatisticsMap().entrySet()) {
- compareAndUpdateForSchema(databaseName, entry.getKey(),
databaseStatistics.getSchemaStatistics(entry.getKey()), entry.getValue(),
statistics, database.getSchema(entry.getKey()));
+ compareAndUpdateForSchema(database.getName(),
database.getSchema(entry.getKey()), currentStatistics,
currentDatabaseStatistics.getSchemaStatistics(entry.getKey()),
entry.getValue());
}
}
- private void compareAndUpdateForSchema(final String databaseName, final
String schemaName, final SchemaStatistics schemaStatistics,
- final SchemaStatistics
changedSchemaStatistics, final ShardingSphereStatistics statistics, final
ShardingSphereSchema schema) {
+ private void compareAndUpdateForSchema(final String databaseName, final
ShardingSphereSchema schema, final ShardingSphereStatistics currentStatistics,
+ final SchemaStatistics
currentSchemaStatistics, final SchemaStatistics changedSchemaStatistics) {
for (Entry<String, TableStatistics> entry :
changedSchemaStatistics.getTableStatisticsMap().entrySet()) {
- compareAndUpdateForTable(databaseName, schemaName,
schemaStatistics.getTableStatistics(entry.getKey()), entry.getValue(),
statistics, schema.getTable(entry.getKey()));
+ compareAndUpdateForTable(databaseName, schema.getName(),
schema.getTable(entry.getKey()), currentStatistics,
currentSchemaStatistics.getTableStatistics(entry.getKey()), entry.getValue());
}
}
- private void compareAndUpdateForTable(final String databaseName, final
String schemaName, final TableStatistics tableStatistics,
- final TableStatistics
changedTableStatistics, final ShardingSphereStatistics statistics, final
ShardingSphereTable table) {
- if (!tableStatistics.equals(changedTableStatistics)) {
-
statistics.getDatabaseStatistics(databaseName).getSchemaStatistics(schemaName).putTableStatistics(changedTableStatistics.getName(),
changedTableStatistics);
- AlteredDatabaseStatistics alteredDatabaseStatistics =
createAlteredDatabaseStatistics(databaseName, schemaName, tableStatistics,
changedTableStatistics, table);
+ private void compareAndUpdateForTable(final String databaseName, final
String schemaName, final ShardingSphereTable table,
+ final ShardingSphereStatistics
currentStatistics, final TableStatistics currentTableStatistics, final
TableStatistics changedTableStatistics) {
+ if (!currentTableStatistics.equals(changedTableStatistics)) {
+
currentStatistics.getDatabaseStatistics(databaseName).getSchemaStatistics(schemaName).putTableStatistics(changedTableStatistics.getName(),
changedTableStatistics);
+ AlteredDatabaseStatistics alteredDatabaseStatistics =
createAlteredDatabaseStatistics(databaseName, schemaName, table,
currentTableStatistics, changedTableStatistics);
contextManager.getPersistServiceFacade().getMetaDataPersistService().getShardingSphereStatisticsPersistService().update(alteredDatabaseStatistics);
}
}
- private AlteredDatabaseStatistics createAlteredDatabaseStatistics(final
String databaseName, final String schemaName, final TableStatistics
tableStatistics,
- final
TableStatistics changedTableStatistics, final ShardingSphereTable table) {
- AlteredDatabaseStatistics result = new
AlteredDatabaseStatistics(databaseName, schemaName, tableStatistics.getName());
- Map<String, RowStatistics> tableStatisticsMap =
tableStatistics.getRows().stream().collect(Collectors.toMap(RowStatistics::getUniqueKey,
Function.identity()));
+ private AlteredDatabaseStatistics createAlteredDatabaseStatistics(final
String databaseName, final String schemaName, final ShardingSphereTable table,
+ final
TableStatistics currentTableStatistics, final TableStatistics
changedTableStatistics) {
+ AlteredDatabaseStatistics result = new
AlteredDatabaseStatistics(databaseName, schemaName,
currentTableStatistics.getName());
+ Map<String, RowStatistics> tableStatisticsMap =
currentTableStatistics.getRows().stream().collect(Collectors.toMap(RowStatistics::getUniqueKey,
Function.identity()));
Map<String, RowStatistics> changedTableStatisticsMap =
changedTableStatistics.getRows().stream().collect(Collectors.toMap(RowStatistics::getUniqueKey,
Function.identity()));
YamlRowStatisticsSwapper swapper = new YamlRowStatisticsSwapper(new
ArrayList<>(table.getAllColumns()));
for (Entry<String, RowStatistics> entry :
changedTableStatisticsMap.entrySet()) {