This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new fe48f6ec7e0 Clear the collected system metadata when database is
droped (#32408)
fe48f6ec7e0 is described below
commit fe48f6ec7e00637435cdb38415c87f1706ddfde1
Author: jiangML <[email protected]>
AuthorDate: Mon Aug 5 17:14:28 2024 +0800
Clear the collected system metadata when database is droped (#32408)
* Remove useless judgments
* Improve ShardingSphereStatisticsRefreshEngine, clear the collected system
metadata when drop database
* Fix checkstyle error
---
.../data/ShardingSphereDataPersistService.java | 9 +++
.../ShardingSphereStatisticsRefreshEngine.java | 93 ++++++++++++----------
.../dispatch/ListenerAssistedSubscriber.java | 9 +--
.../ResourceMetaDataChangedSubscriber.java | 13 ++-
4 files changed, 69 insertions(+), 55 deletions(-)
diff --git
a/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/data/ShardingSphereDataPersistService.java
b/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/data/ShardingSphereDataPersistService.java
index 674771de942..9a7fa807ca8 100644
---
a/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/data/ShardingSphereDataPersistService.java
+++
b/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/data/ShardingSphereDataPersistService.java
@@ -139,4 +139,13 @@ public final class ShardingSphereDataPersistService {
tableRowDataPersistService.delete(databaseName, schemaName,
alteredShardingSphereDatabaseData.getTableName(),
alteredShardingSphereDatabaseData.getDeletedRows());
}
+
+ /**
+ * Delete sharding sphere database data.
+ *
+ * @param databaseName database name
+ */
+ public void delete(final String databaseName) {
+
repository.delete(ShardingSphereDataNode.getDatabaseNamePath(databaseName));
+ }
}
diff --git
a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/refresher/ShardingSphereStatisticsRefreshEngine.java
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/refresher/ShardingSphereStatisticsRefreshEngine.java
index 70f758da0dc..faf62f67ff6 100644
---
a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/refresher/ShardingSphereStatisticsRefreshEngine.java
+++
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/refresher/ShardingSphereStatisticsRefreshEngine.java
@@ -92,10 +92,10 @@ public final class ShardingSphereStatisticsRefreshEngine {
ShardingSphereStatistics changedStatistics = new
ShardingSphereStatistics();
for (Entry<String, ShardingSphereDatabaseData> entry :
statistics.getDatabaseData().entrySet()) {
if (metaData.containsDatabase(entry.getKey())) {
- collectForDatabase(entry.getKey(), entry.getValue(),
metaData.getDatabases(), changedStatistics);
+ collectForDatabase(entry.getKey(), entry.getValue(),
metaData, changedStatistics);
}
}
- compareUpdateAndSendEvent(statistics, changedStatistics,
metaData.getDatabases());
+ compareAndUpdate(changedStatistics);
} finally {
globalLockContext.unlock(lockDefinition);
}
@@ -103,71 +103,82 @@ public final class ShardingSphereStatisticsRefreshEngine {
}
private void collectForDatabase(final String databaseName, final
ShardingSphereDatabaseData databaseData,
- final Map<String, ShardingSphereDatabase>
databases, final ShardingSphereStatistics statistics) {
+ final ShardingSphereMetaData metaData,
final ShardingSphereStatistics statistics) {
for (Entry<String, ShardingSphereSchemaData> entry :
databaseData.getSchemaData().entrySet()) {
- if
(databases.get(databaseName.toLowerCase()).containsSchema(entry.getKey())) {
- collectForSchema(databaseName, entry.getKey(),
entry.getValue(), databases, statistics);
+ if
(metaData.getDatabase(databaseName).containsSchema(entry.getKey())) {
+ collectForSchema(databaseName, entry.getKey(),
entry.getValue(), metaData, statistics);
}
}
}
private void collectForSchema(final String databaseName, final String
schemaName, final ShardingSphereSchemaData schemaData,
- final Map<String, ShardingSphereDatabase>
databases, final ShardingSphereStatistics statistics) {
+ final ShardingSphereMetaData metaData, final
ShardingSphereStatistics statistics) {
for (Entry<String, ShardingSphereTableData> entry :
schemaData.getTableData().entrySet()) {
- if
(databases.get(databaseName.toLowerCase()).getSchema(schemaName).containsTable(entry.getKey()))
{
- collectForTable(databaseName, schemaName,
databases.get(databaseName).getSchema(schemaName).getTable(entry.getKey()),
databases, statistics);
+ if
(metaData.getDatabase(databaseName).getSchema(schemaName).containsTable(entry.getKey()))
{
+ collectForTable(databaseName, schemaName,
metaData.getDatabase(databaseName).getSchema(schemaName).getTable(entry.getKey()),
metaData, statistics);
}
}
}
private void collectForTable(final String databaseName, final String
schemaName, final ShardingSphereTable table,
- final Map<String, ShardingSphereDatabase>
databases, final ShardingSphereStatistics statistics) {
+ final ShardingSphereMetaData metaData, final
ShardingSphereStatistics statistics) {
Optional<ShardingSphereStatisticsCollector> dataCollector =
TypedSPILoader.findService(ShardingSphereStatisticsCollector.class,
table.getName());
- if (!dataCollector.isPresent()) {
- return;
- }
Optional<ShardingSphereTableData> tableData = Optional.empty();
- try {
- tableData = dataCollector.get().collect(databaseName, table,
databases,
contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData());
- // CHECKSTYLE:OFF
- } catch (final Exception ex) {
- // CHECKSTYLE:ON
- log.error(String.format("Collect %s.%s.%s data failed",
databaseName, schemaName, table.getName()), ex);
+ if (dataCollector.isPresent()) {
+ try {
+ tableData = dataCollector.get().collect(databaseName, table,
metaData.getDatabases(),
contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData());
+ // CHECKSTYLE:OFF
+ } catch (final Exception ex) {
+ // CHECKSTYLE:ON
+ log.error(String.format("Collect %s.%s.%s data failed",
databaseName, schemaName, table.getName()), ex);
+ }
}
- tableData.ifPresent(optional ->
statistics.getDatabaseData().computeIfAbsent(databaseName.toLowerCase(), key ->
new ShardingSphereDatabaseData()).getSchemaData()
- .computeIfAbsent(schemaName, key -> new
ShardingSphereSchemaData()).getTableData().put(table.getName().toLowerCase(),
optional));
+ ShardingSphereDatabaseData databaseData =
statistics.containsDatabase(databaseName) ?
statistics.getDatabase(databaseName) : new ShardingSphereDatabaseData();
+ ShardingSphereSchemaData schemaData =
databaseData.containsSchema(schemaName) ? databaseData.getSchema(schemaName) :
new ShardingSphereSchemaData();
+ tableData.ifPresent(optional -> schemaData.putTable(table.getName(),
optional));
+ databaseData.putSchema(schemaName, schemaData);
+ statistics.putDatabase(databaseName, databaseData);
}
- private void compareUpdateAndSendEvent(final ShardingSphereStatistics
statistics, final ShardingSphereStatistics changedStatistics,
- final Map<String,
ShardingSphereDatabase> databases) {
- changedStatistics.getDatabaseData().forEach(
- (key, value) -> compareUpdateAndSendEventForDatabase(key,
statistics.getDatabaseData().get(key), value, statistics,
databases.get(key.toLowerCase())));
+ private void compareAndUpdate(final ShardingSphereStatistics
changedStatistics) {
+ ShardingSphereMetaData metaData =
contextManager.getMetaDataContexts().getMetaData();
+ ShardingSphereStatistics statistics =
contextManager.getMetaDataContexts().getStatistics();
+ for (Entry<String, ShardingSphereDatabaseData> entry :
changedStatistics.getDatabaseData().entrySet()) {
+ compareAndUpdateForDatabase(entry.getKey(),
statistics.getDatabase(entry.getKey()), entry.getValue(), statistics,
metaData.getDatabase(entry.getKey()));
+ }
+ for (Entry<String, ShardingSphereDatabaseData> entry :
statistics.getDatabaseData().entrySet()) {
+ if (!changedStatistics.containsDatabase(entry.getKey())) {
+ statistics.dropDatabase(entry.getKey());
+
contextManager.getPersistServiceFacade().getMetaDataPersistService().getShardingSphereDataPersistService().delete(entry.getKey());
+ }
+ }
}
- private void compareUpdateAndSendEventForDatabase(final String
databaseName, final ShardingSphereDatabaseData databaseData, final
ShardingSphereDatabaseData changedDatabaseData,
- final
ShardingSphereStatistics statistics, final ShardingSphereDatabase database) {
- changedDatabaseData.getSchemaData().forEach(
- (key, value) ->
compareUpdateAndSendEventForSchema(databaseName, key,
databaseData.getSchemaData().get(key), value, statistics,
database.getSchema(key)));
+ private void compareAndUpdateForDatabase(final String databaseName, final
ShardingSphereDatabaseData databaseData, final ShardingSphereDatabaseData
changedDatabaseData,
+ final ShardingSphereStatistics
statistics, final ShardingSphereDatabase database) {
+ for (Entry<String, ShardingSphereSchemaData> entry :
changedDatabaseData.getSchemaData().entrySet()) {
+ compareAndUpdateForSchema(databaseName, entry.getKey(),
databaseData.getSchema(entry.getKey()), entry.getValue(), statistics,
database.getSchema(entry.getKey()));
+ }
}
- private void compareUpdateAndSendEventForSchema(final String databaseName,
final String schemaName, final ShardingSphereSchemaData schemaData,
- final
ShardingSphereSchemaData changedSchemaData, final ShardingSphereStatistics
statistics, final ShardingSphereSchema schema) {
- changedSchemaData.getTableData().forEach(
- (key, value) ->
compareUpdateAndSendEventForTable(databaseName, schemaName,
schemaData.getTableData().get(key), value, statistics, schema.getTable(key)));
+ private void compareAndUpdateForSchema(final String databaseName, final
String schemaName, final ShardingSphereSchemaData schemaData,
+ final ShardingSphereSchemaData
changedSchemaData, final ShardingSphereStatistics statistics, final
ShardingSphereSchema schema) {
+ for (Entry<String, ShardingSphereTableData> entry :
changedSchemaData.getTableData().entrySet()) {
+ compareAndUpdateForTable(databaseName, schemaName,
schemaData.getTable(entry.getKey()), entry.getValue(), statistics,
schema.getTable(entry.getKey()));
+ }
}
- private void compareUpdateAndSendEventForTable(final String databaseName,
final String schemaName, final ShardingSphereTableData tableData,
- final
ShardingSphereTableData changedTableData, final ShardingSphereStatistics
statistics, final ShardingSphereTable table) {
- if (tableData.equals(changedTableData)) {
- return;
+ private void compareAndUpdateForTable(final String databaseName, final
String schemaName, final ShardingSphereTableData tableData,
+ final ShardingSphereTableData
changedTableData, final ShardingSphereStatistics statistics, final
ShardingSphereTable table) {
+ if (!tableData.equals(changedTableData)) {
+
statistics.getDatabase(databaseName).getSchema(schemaName).putTable(changedTableData.getName(),
changedTableData);
+ AlteredShardingSphereDatabaseData
alteredShardingSphereDatabaseData =
createAlteredShardingSphereDatabaseData(databaseName, schemaName, tableData,
changedTableData, table);
+
contextManager.getPersistServiceFacade().getMetaDataPersistService().getShardingSphereDataPersistService().update(alteredShardingSphereDatabaseData);
}
-
statistics.getDatabaseData().get(databaseName).getSchemaData().get(schemaName).getTableData().put(changedTableData.getName().toLowerCase(),
changedTableData);
- AlteredShardingSphereDatabaseData alteredShardingSphereDatabaseData =
getShardingSphereSchemaDataAlteredPOJO(databaseName, schemaName, tableData,
changedTableData, table);
-
contextManager.getPersistServiceFacade().getMetaDataPersistService().getShardingSphereDataPersistService().update(alteredShardingSphereDatabaseData);
}
- private AlteredShardingSphereDatabaseData
getShardingSphereSchemaDataAlteredPOJO(final String databaseName, final String
schemaName, final ShardingSphereTableData tableData,
-
final ShardingSphereTableData changedTableData, final ShardingSphereTable
table) {
+ private AlteredShardingSphereDatabaseData
createAlteredShardingSphereDatabaseData(final String databaseName, final String
schemaName, final ShardingSphereTableData tableData,
+
final ShardingSphereTableData changedTableData, final
ShardingSphereTable table) {
AlteredShardingSphereDatabaseData result = new
AlteredShardingSphereDatabaseData(databaseName, schemaName,
tableData.getName());
Map<String, ShardingSphereRowData> tableDataMap =
tableData.getRows().stream().collect(Collectors.toMap(ShardingSphereRowData::getUniqueKey,
Function.identity()));
Map<String, ShardingSphereRowData> changedTableDataMap =
changedTableData.getRows().stream().collect(Collectors.toMap(ShardingSphereRowData::getUniqueKey,
Function.identity()));
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/ListenerAssistedSubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/ListenerAssistedSubscriber.java
index 90180a3fa8e..cf3a625d5ec 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/ListenerAssistedSubscriber.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/ListenerAssistedSubscriber.java
@@ -31,7 +31,6 @@ import
org.apache.shardingsphere.mode.manager.cluster.listener.MetaDataChangedLi
import
org.apache.shardingsphere.mode.manager.cluster.lock.GlobalLockPersistService;
import
org.apache.shardingsphere.mode.metadata.refresher.ShardingSphereStatisticsRefreshEngine;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
-import org.apache.shardingsphere.mode.spi.PersistRepository;
/**
* Listener assisted subscriber.
@@ -76,11 +75,9 @@ public final class ListenerAssistedSubscriber implements
EventSubscriber {
}
private void refreshShardingSphereStatisticsData() {
- PersistRepository repository =
contextManager.getPersistServiceFacade().getRepository();
- if (contextManager.getComputeNodeInstanceContext().isCluster()
- && InstanceType.PROXY ==
contextManager.getComputeNodeInstanceContext().getInstance().getMetaData().getType()
- && repository instanceof ClusterPersistRepository) {
- new ShardingSphereStatisticsRefreshEngine(contextManager, new
GlobalLockContext(new GlobalLockPersistService((ClusterPersistRepository)
repository))).asyncRefresh();
+ if (contextManager.getComputeNodeInstanceContext().isCluster() &&
InstanceType.PROXY ==
contextManager.getComputeNodeInstanceContext().getInstance().getMetaData().getType())
{
+ new ShardingSphereStatisticsRefreshEngine(contextManager,
+ new GlobalLockContext(new
GlobalLockPersistService((ClusterPersistRepository)
contextManager.getPersistServiceFacade().getRepository()))).asyncRefresh();
}
}
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/ResourceMetaDataChangedSubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/ResourceMetaDataChangedSubscriber.java
index 4791169367c..0d68a2d71a7 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/ResourceMetaDataChangedSubscriber.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/ResourceMetaDataChangedSubscriber.java
@@ -23,18 +23,17 @@ import
org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
import
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereView;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
+import
org.apache.shardingsphere.mode.event.dispatch.metadata.schema.SchemaAddedEvent;
+import
org.apache.shardingsphere.mode.event.dispatch.metadata.schema.SchemaDeletedEvent;
import
org.apache.shardingsphere.mode.event.dispatch.metadata.schema.table.CreateOrAlterTableEvent;
import
org.apache.shardingsphere.mode.event.dispatch.metadata.schema.table.DropTableEvent;
import
org.apache.shardingsphere.mode.event.dispatch.metadata.schema.view.CreateOrAlterViewEvent;
import
org.apache.shardingsphere.mode.event.dispatch.metadata.schema.view.DropViewEvent;
import org.apache.shardingsphere.mode.lock.GlobalLockContext;
import org.apache.shardingsphere.mode.manager.ContextManager;
-import
org.apache.shardingsphere.mode.event.dispatch.metadata.schema.SchemaAddedEvent;
-import
org.apache.shardingsphere.mode.event.dispatch.metadata.schema.SchemaDeletedEvent;
import
org.apache.shardingsphere.mode.manager.cluster.lock.GlobalLockPersistService;
import
org.apache.shardingsphere.mode.metadata.refresher.ShardingSphereStatisticsRefreshEngine;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
-import org.apache.shardingsphere.mode.spi.PersistRepository;
import java.util.Map;
@@ -128,11 +127,9 @@ public final class ResourceMetaDataChangedSubscriber
implements EventSubscriber
}
private void refreshShardingSphereStatisticsData() {
- PersistRepository repository =
contextManager.getPersistServiceFacade().getRepository();
- if (contextManager.getComputeNodeInstanceContext().isCluster()
- && InstanceType.PROXY ==
contextManager.getComputeNodeInstanceContext().getInstance().getMetaData().getType()
- && repository instanceof ClusterPersistRepository) {
- new ShardingSphereStatisticsRefreshEngine(contextManager, new
GlobalLockContext(new GlobalLockPersistService((ClusterPersistRepository)
repository))).asyncRefresh();
+ if (contextManager.getComputeNodeInstanceContext().isCluster() &&
InstanceType.PROXY ==
contextManager.getComputeNodeInstanceContext().getInstance().getMetaData().getType())
{
+ new ShardingSphereStatisticsRefreshEngine(contextManager,
+ new GlobalLockContext(new
GlobalLockPersistService((ClusterPersistRepository)
contextManager.getPersistServiceFacade().getRepository()))).asyncRefresh();
}
}
}