This is an automated email from the ASF dual-hosted git repository.

tuichenchuxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e5c7187300 Refactor drop database stop heartbeat job and remove 
storage node data sources. (#24275)
6e5c7187300 is described below

commit 6e5c71873007979eb0ea6eac86c56c1b0d267622
Author: zhaojinchao <[email protected]>
AuthorDate: Tue Feb 21 17:46:04 2023 +0800

    Refactor drop database stop heartbeat job and remove storage node data 
sources. (#24275)
    
    * Refactor drop database stop heartbeat job and remove storage node data 
sources.
    
    * Fix checkstyle
    
    * Fix checkstyle
    
    * Fix checkstyle
    
    * FIX checkstyle
---
 .../dbdiscovery/rule/DatabaseDiscoveryRule.java    | 31 ++++++++++++++--------
 .../DropDatabaseDiscoveryRuleStatementUpdater.java |  6 -----
 .../rule/ReadwriteSplittingRule.java               | 26 +++++++++++++++++-
 .../builder/ReadwriteSplittingRuleBuilder.java     |  2 +-
 .../route/ReadwriteSplittingSQLRouterTest.java     |  4 +--
 .../rule/ReadwriteSplittingRuleTest.java           |  2 +-
 .../infra/metadata/ShardingSphereMetaData.java     |  2 ++
 .../type/StaticDataSourceContainedRule.java        | 12 +++++++++
 .../event/StorageNodeDataSourceDeletedEvent.java   | 20 +++++++-------
 .../registry/status/storage/node/StorageNode.java  |  4 +--
 .../subscriber/StorageNodeStatusSubscriber.java    | 15 +++++++++--
 .../StorageNodeStatusSubscriberTest.java           | 17 +++++++++---
 .../ImportDatabaseConfigurationUpdater.java        |  2 +-
 .../rdl/rule/RuleDefinitionBackendHandler.java     | 19 +++++++++++--
 14 files changed, 119 insertions(+), 43 deletions(-)

diff --git 
a/features/db-discovery/core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java
 
b/features/db-discovery/core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java
index 252481d87d8..737392d0063 100644
--- 
a/features/db-discovery/core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java
+++ 
b/features/db-discovery/core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java
@@ -44,6 +44,7 @@ import 
org.apache.shardingsphere.infra.util.exception.ShardingSpherePrecondition
 import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
 import 
org.apache.shardingsphere.mode.metadata.storage.event.PrimaryDataSourceChangedEvent;
 import 
org.apache.shardingsphere.mode.metadata.storage.event.StorageNodeDataSourceChangedEvent;
+import 
org.apache.shardingsphere.mode.metadata.storage.event.StorageNodeDataSourceDeletedEvent;
 import org.apache.shardingsphere.schedule.core.ScheduleContextFactory;
 
 import javax.sql.DataSource;
@@ -119,6 +120,17 @@ public final class DatabaseDiscoveryRule implements 
DatabaseRule, DataSourceCont
         }
     }
     
+    private void initHeartBeatJobs() {
+        for (Entry<String, DatabaseDiscoveryDataSourceRule> entry : 
dataSourceRules.entrySet()) {
+            DatabaseDiscoveryDataSourceRule rule = entry.getValue();
+            String jobName = rule.getProvider().getType() + "-" + databaseName 
+ "-" + rule.getGroupName();
+            CronJob job = new CronJob(jobName, each -> new 
HeartbeatJob(databaseName, rule.getGroupName(), 
rule.getPrimaryDataSourceName(), rule.getDataSourceGroup(dataSourceMap),
+                    rule.getProvider(), rule.getDisabledDataSourceNames(), 
instanceContext).execute(null),
+                    rule.getHeartbeatProps().getProperty("keep-alive-cron"));
+            scheduleContext.startSchedule(job);
+        }
+    }
+    
     /**
      * Get single data source rule.
      *
@@ -162,24 +174,21 @@ public final class DatabaseDiscoveryRule implements 
DatabaseRule, DataSourceCont
         DatabaseDiscoveryDataSourceRule dataSourceRule = 
dataSourceRules.get(groupName);
         ShardingSpherePreconditions.checkNotNull(dataSourceRule, () -> new 
DBDiscoveryDataSourceRuleNotFoundException(databaseName));
         scheduleContext.closeSchedule(dataSourceRule.getProvider().getType() + 
"-" + databaseName + "-" + dataSourceRule.getGroupName());
+        deleteStorageNodeDataSources(dataSourceRule);
     }
     
-    @Override
-    public void closeAllHeartBeatJob() {
-        for (Entry<String, DatabaseDiscoveryDataSourceRule> entry : 
dataSourceRules.entrySet()) {
-            DatabaseDiscoveryDataSourceRule rule = entry.getValue();
-            scheduleContext.closeSchedule(rule.getProvider().getType() + "-" + 
databaseName + "-" + rule.getGroupName());
+    private void deleteStorageNodeDataSources(final 
DatabaseDiscoveryDataSourceRule dataSourceRule) {
+        for (String each : dataSourceRule.getDataSourceNames()) {
+            instanceContext.getEventBusContext().post(new 
StorageNodeDataSourceDeletedEvent(new QualifiedDatabase(databaseName, 
dataSourceRule.getGroupName(), each)));
         }
     }
     
-    private void initHeartBeatJobs() {
+    @Override
+    public void closeAllHeartBeatJob() {
         for (Entry<String, DatabaseDiscoveryDataSourceRule> entry : 
dataSourceRules.entrySet()) {
             DatabaseDiscoveryDataSourceRule rule = entry.getValue();
-            String jobName = rule.getProvider().getType() + "-" + databaseName 
+ "-" + rule.getGroupName();
-            CronJob job = new CronJob(jobName, each -> new 
HeartbeatJob(databaseName, rule.getGroupName(), 
rule.getPrimaryDataSourceName(), rule.getDataSourceGroup(dataSourceMap),
-                    rule.getProvider(), rule.getDisabledDataSourceNames(), 
instanceContext).execute(null),
-                    rule.getHeartbeatProps().getProperty("keep-alive-cron"));
-            scheduleContext.startSchedule(job);
+            scheduleContext.closeSchedule(rule.getProvider().getType() + "-" + 
databaseName + "-" + rule.getGroupName());
+            deleteStorageNodeDataSources(rule);
         }
     }
     
diff --git 
a/features/db-discovery/distsql/handler/src/main/java/org/apache/shardingsphere/dbdiscovery/distsql/handler/update/DropDatabaseDiscoveryRuleStatementUpdater.java
 
b/features/db-discovery/distsql/handler/src/main/java/org/apache/shardingsphere/dbdiscovery/distsql/handler/update/DropDatabaseDiscoveryRuleStatementUpdater.java
index 8708c833893..feb538fd2c4 100644
--- 
a/features/db-discovery/distsql/handler/src/main/java/org/apache/shardingsphere/dbdiscovery/distsql/handler/update/DropDatabaseDiscoveryRuleStatementUpdater.java
+++ 
b/features/db-discovery/distsql/handler/src/main/java/org/apache/shardingsphere/dbdiscovery/distsql/handler/update/DropDatabaseDiscoveryRuleStatementUpdater.java
@@ -20,7 +20,6 @@ package 
org.apache.shardingsphere.dbdiscovery.distsql.handler.update;
 import 
org.apache.shardingsphere.dbdiscovery.api.config.DatabaseDiscoveryRuleConfiguration;
 import 
org.apache.shardingsphere.dbdiscovery.api.config.rule.DatabaseDiscoveryDataSourceRuleConfiguration;
 import 
org.apache.shardingsphere.dbdiscovery.distsql.parser.statement.DropDatabaseDiscoveryRuleStatement;
-import 
org.apache.shardingsphere.infra.rule.identifier.type.DynamicDataSourceContainedRule;
 import 
org.apache.shardingsphere.infra.rule.identifier.type.exportable.constant.ExportableConstants;
 import 
org.apache.shardingsphere.infra.rule.identifier.type.exportable.constant.ExportableItemConstants;
 import 
org.apache.shardingsphere.distsql.handler.exception.rule.MissingRequiredRuleException;
@@ -51,7 +50,6 @@ public final class DropDatabaseDiscoveryRuleStatementUpdater 
implements RuleDefi
         String databaseName = database.getName();
         checkCurrentRuleConfiguration(databaseName, sqlStatement, 
currentRuleConfig);
         checkIsInUse(databaseName, sqlStatement, database);
-        closeHeartbeatJob(database, sqlStatement);
     }
     
     private void checkCurrentRuleConfiguration(final String databaseName, 
final DropDatabaseDiscoveryRuleStatement sqlStatement, final 
DatabaseDiscoveryRuleConfiguration currentRuleConfig) {
@@ -81,10 +79,6 @@ public final class DropDatabaseDiscoveryRuleStatementUpdater 
implements RuleDefi
         ShardingSpherePreconditions.checkState(invalid.isEmpty(), () -> new 
RuleInUsedException(RULE_TYPE, databaseName, invalid));
     }
     
-    private void closeHeartbeatJob(final ShardingSphereDatabase database, 
final DropDatabaseDiscoveryRuleStatement sqlStatement) {
-        sqlStatement.getNames().forEach(each -> 
database.getRuleMetaData().findSingleRule(DynamicDataSourceContainedRule.class).ifPresent(optional
 -> optional.closeSingleHeartBeatJob(each)));
-    }
-    
     @Override
     public boolean updateCurrentRuleConfiguration(final 
DropDatabaseDiscoveryRuleStatement sqlStatement, final 
DatabaseDiscoveryRuleConfiguration currentRuleConfig) {
         sqlStatement.getNames().forEach(each -> dropRule(currentRuleConfig, 
each));
diff --git 
a/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRule.java
 
b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRule.java
index 101682eb36b..ebd843d992c 100644
--- 
a/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRule.java
+++ 
b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRule.java
@@ -36,6 +36,7 @@ import 
org.apache.shardingsphere.infra.util.exception.ShardingSpherePrecondition
 import org.apache.shardingsphere.infra.util.expr.InlineExpressionParser;
 import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
 import 
org.apache.shardingsphere.mode.metadata.storage.event.StorageNodeDataSourceChangedEvent;
+import 
org.apache.shardingsphere.mode.metadata.storage.event.StorageNodeDataSourceDeletedEvent;
 import 
org.apache.shardingsphere.readwritesplitting.api.ReadwriteSplittingRuleConfiguration;
 import 
org.apache.shardingsphere.readwritesplitting.api.rule.ReadwriteSplittingDataSourceRuleConfiguration;
 import 
org.apache.shardingsphere.readwritesplitting.api.strategy.DynamicReadwriteSplittingStrategyConfiguration;
@@ -59,6 +60,8 @@ import java.util.stream.Collectors;
  */
 public final class ReadwriteSplittingRule implements DatabaseRule, 
DataSourceContainedRule, StaticDataSourceContainedRule, ExportableRule, 
StorageConnectorReusableRule {
     
+    private final String databaseName;
+    
     @Getter
     private final RuleConfiguration configuration;
     
@@ -66,7 +69,8 @@ public final class ReadwriteSplittingRule implements 
DatabaseRule, DataSourceCon
     
     private final Map<String, ReadwriteSplittingDataSourceRule> 
dataSourceRules;
     
-    public ReadwriteSplittingRule(final ReadwriteSplittingRuleConfiguration 
ruleConfig, final Collection<ShardingSphereRule> builtRules) {
+    public ReadwriteSplittingRule(final String databaseName, final 
ReadwriteSplittingRuleConfiguration ruleConfig, final 
Collection<ShardingSphereRule> builtRules) {
+        this.databaseName = databaseName;
         configuration = ruleConfig;
         for (ReadwriteSplittingDataSourceRuleConfiguration 
dataSourceRuleConfiguration : ruleConfig.getDataSources()) {
             if 
(ruleConfig.getLoadBalancers().containsKey(dataSourceRuleConfiguration.getLoadBalancerName()))
 {
@@ -176,6 +180,26 @@ public final class ReadwriteSplittingRule implements 
DatabaseRule, DataSourceCon
         
dataSourceRule.updateDisabledDataSourceNames(dataSourceEvent.getQualifiedDatabase().getDataSourceName(),
 DataSourceState.DISABLED == dataSourceEvent.getDataSource().getStatus());
     }
     
+    @Override
+    public void cleanStorageNodeDataSource(final String groupName) {
+        Preconditions.checkNotNull(dataSourceRules.get(groupName), 
String.format("`%s` group name not exist in database `%s`", groupName, 
databaseName));
+        deleteStorageNodeDataSources(dataSourceRules.get(groupName));
+    }
+    
+    private void deleteStorageNodeDataSources(final 
ReadwriteSplittingDataSourceRule rule) {
+        if (rule.getReadwriteSplittingStrategy() instanceof 
DynamicReadwriteSplittingStrategy) {
+            return;
+        }
+        rule.getReadwriteSplittingStrategy().getReadDataSources().forEach(each 
-> new StorageNodeDataSourceDeletedEvent(new QualifiedDatabase(databaseName, 
rule.getName(), each)));
+    }
+    
+    @Override
+    public void cleanStorageNodeDataSources() {
+        for (Entry<String, ReadwriteSplittingDataSourceRule> entry : 
dataSourceRules.entrySet()) {
+            deleteStorageNodeDataSources(entry.getValue());
+        }
+    }
+    
     @Override
     public Map<String, Object> getExportData() {
         Map<String, Object> result = new HashMap<>(2, 1);
diff --git 
a/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/builder/ReadwriteSplittingRuleBuilder.java
 
b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/builder/ReadwriteSplittingRuleBuilder.java
index f0d3afbcbf6..1fe9cf64e35 100644
--- 
a/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/builder/ReadwriteSplittingRuleBuilder.java
+++ 
b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/builder/ReadwriteSplittingRuleBuilder.java
@@ -36,7 +36,7 @@ public final class ReadwriteSplittingRuleBuilder implements 
DatabaseRuleBuilder<
     @Override
     public ReadwriteSplittingRule build(final 
ReadwriteSplittingRuleConfiguration config, final String databaseName,
                                         final Map<String, DataSource> 
dataSources, final Collection<ShardingSphereRule> builtRules, final 
InstanceContext instanceContext) {
-        return new ReadwriteSplittingRule(config, builtRules);
+        return new ReadwriteSplittingRule(databaseName, config, builtRules);
     }
     
     @Override
diff --git 
a/features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/route/ReadwriteSplittingSQLRouterTest.java
 
b/features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/route/ReadwriteSplittingSQLRouterTest.java
index 0ad3206292d..26a56b8dfbd 100644
--- 
a/features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/route/ReadwriteSplittingSQLRouterTest.java
+++ 
b/features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/route/ReadwriteSplittingSQLRouterTest.java
@@ -86,14 +86,14 @@ public final class ReadwriteSplittingSQLRouterTest {
     
     @Before
     public void setUp() {
-        staticRule = new ReadwriteSplittingRule(new 
ReadwriteSplittingRuleConfiguration(Collections.singleton(new 
ReadwriteSplittingDataSourceRuleConfiguration(DATASOURCE_NAME,
+        staticRule = new ReadwriteSplittingRule("logic_db", new 
ReadwriteSplittingRuleConfiguration(Collections.singleton(new 
ReadwriteSplittingDataSourceRuleConfiguration(DATASOURCE_NAME,
                 new 
StaticReadwriteSplittingStrategyConfiguration(WRITE_DATASOURCE, 
Collections.singletonList(READ_DATASOURCE)), null, "")),
                 Collections.emptyMap()), Collections.emptyList());
         sqlRouter = (ReadwriteSplittingSQLRouter) 
OrderedSPILoader.getServices(SQLRouter.class, 
Collections.singleton(staticRule)).get(staticRule);
         DynamicDataSourceContainedRule dynamicDataSourceRule = 
mock(DynamicDataSourceContainedRule.class, RETURNS_DEEP_STUBS);
         
when(dynamicDataSourceRule.getPrimaryDataSourceName("readwrite_ds")).thenReturn(WRITE_DATASOURCE);
         
when(dynamicDataSourceRule.getReplicaDataSourceNames("readwrite_ds")).thenReturn(Collections.emptyList());
-        dynamicRule = new ReadwriteSplittingRule(new 
ReadwriteSplittingRuleConfiguration(Collections.singleton(new 
ReadwriteSplittingDataSourceRuleConfiguration(DATASOURCE_NAME, null,
+        dynamicRule = new ReadwriteSplittingRule("logic_db", new 
ReadwriteSplittingRuleConfiguration(Collections.singleton(new 
ReadwriteSplittingDataSourceRuleConfiguration(DATASOURCE_NAME, null,
                 new 
DynamicReadwriteSplittingStrategyConfiguration("readwrite_ds", "true"), "")), 
Collections.emptyMap()),
                 Collections.singleton(dynamicDataSourceRule));
         dynamicSqlRouter = (ReadwriteSplittingSQLRouter) 
OrderedSPILoader.getServices(SQLRouter.class, 
Collections.singleton(dynamicRule)).get(dynamicRule);
diff --git 
a/features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRuleTest.java
 
b/features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRuleTest.java
index a4c8ae40a45..bb117c99443 100644
--- 
a/features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRuleTest.java
+++ 
b/features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRuleTest.java
@@ -56,7 +56,7 @@ public final class ReadwriteSplittingRuleTest {
     private ReadwriteSplittingRule createReadwriteSplittingRule() {
         ReadwriteSplittingDataSourceRuleConfiguration config =
                 new ReadwriteSplittingDataSourceRuleConfiguration("readwrite", 
new StaticReadwriteSplittingStrategyConfiguration("write_ds", 
Arrays.asList("read_ds_0", "read_ds_1")), null, "random");
-        return new ReadwriteSplittingRule(new 
ReadwriteSplittingRuleConfiguration(
+        return new ReadwriteSplittingRule("logic_db", new 
ReadwriteSplittingRuleConfiguration(
                 Collections.singleton(config), 
Collections.singletonMap("random", new AlgorithmConfiguration("RANDOM", new 
Properties()))), Collections.emptyList());
     }
     
diff --git 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/ShardingSphereMetaData.java
 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/ShardingSphereMetaData.java
index 11c8f301bc6..b0ae4078eef 100644
--- 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/ShardingSphereMetaData.java
+++ 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/ShardingSphereMetaData.java
@@ -25,6 +25,7 @@ import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import 
org.apache.shardingsphere.infra.metadata.database.rule.ShardingSphereRuleMetaData;
 import 
org.apache.shardingsphere.infra.rule.identifier.type.DynamicDataSourceContainedRule;
 import org.apache.shardingsphere.infra.rule.identifier.type.ResourceHeldRule;
+import 
org.apache.shardingsphere.infra.rule.identifier.type.StaticDataSourceContainedRule;
 
 import java.util.Collections;
 import java.util.LinkedHashMap;
@@ -125,6 +126,7 @@ public final class ShardingSphereMetaData {
         globalRuleMetaData.findRules(ResourceHeldRule.class).forEach(each -> 
each.closeStaleResource(databaseName));
         
database.getRuleMetaData().findRules(ResourceHeldRule.class).forEach(each -> 
each.closeStaleResource(databaseName));
         
database.getRuleMetaData().findSingleRule(DynamicDataSourceContainedRule.class).ifPresent(DynamicDataSourceContainedRule::closeAllHeartBeatJob);
+        
database.getRuleMetaData().findSingleRule(StaticDataSourceContainedRule.class).ifPresent(StaticDataSourceContainedRule::cleanStorageNodeDataSources);
         Optional.ofNullable(database.getResourceMetaData()).ifPresent(optional 
-> optional.getDataSources().values().forEach(each -> 
database.getResourceMetaData().close(each)));
     }
 }
diff --git 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/StaticDataSourceContainedRule.java
 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/StaticDataSourceContainedRule.java
index b12c8e6c044..728674bf615 100644
--- 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/StaticDataSourceContainedRule.java
+++ 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/StaticDataSourceContainedRule.java
@@ -31,4 +31,16 @@ public interface StaticDataSourceContainedRule extends 
ShardingSphereRule {
      * @param event data source status changed event
      */
     void updateStatus(DataSourceStatusChangedEvent event);
+    
+    /**
+     * Clean single storage node data source.
+     *
+     * @param groupName group name
+     */
+    void cleanStorageNodeDataSource(String groupName);
+    
+    /**
+     * Clean storage nodes data sources.
+     */
+    void cleanStorageNodeDataSources();
 }
diff --git 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/StaticDataSourceContainedRule.java
 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/storage/event/StorageNodeDataSourceDeletedEvent.java
similarity index 62%
copy from 
infra/common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/StaticDataSourceContainedRule.java
copy to 
mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/storage/event/StorageNodeDataSourceDeletedEvent.java
index b12c8e6c044..010ed71d979 100644
--- 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/StaticDataSourceContainedRule.java
+++ 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/storage/event/StorageNodeDataSourceDeletedEvent.java
@@ -15,20 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.infra.rule.identifier.type;
+package org.apache.shardingsphere.mode.metadata.storage.event;
 
-import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
-import org.apache.shardingsphere.infra.rule.event.DataSourceStatusChangedEvent;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import 
org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDatabase;
 
 /**
- * Static data source contained rule.
+ * Storage node data source deleted event.
  */
-public interface StaticDataSourceContainedRule extends ShardingSphereRule {
+@RequiredArgsConstructor
+@Getter
+public final class StorageNodeDataSourceDeletedEvent {
     
-    /**
-     * Update data source status.
-     *
-     * @param event data source status changed event
-     */
-    void updateStatus(DataSourceStatusChangedEvent event);
+    private final QualifiedDatabase qualifiedDatabase;
 }
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/node/StorageNode.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/node/StorageNode.java
index ef9c5003a59..4a451545dd6 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/node/StorageNode.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/node/StorageNode.java
@@ -55,12 +55,12 @@ public final class StorageNode {
     }
     
     /**
-     * Get storage node status path.
+     * Get storage node data source path.
      *
      * @param database cluster database
      * @return status path of storage node
      */
-    public static String getStatusPath(final QualifiedDatabase database) {
+    public static String getStorageNodeDataSourcePath(final QualifiedDatabase 
database) {
         return String.join("/", getRootPath(), database.toString());
     }
     
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/StorageNodeStatusSubscriber.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/StorageNodeStatusSubscriber.java
index 6548aa98c48..7aa3904c788 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/StorageNodeStatusSubscriber.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/StorageNodeStatusSubscriber.java
@@ -28,6 +28,7 @@ import 
org.apache.shardingsphere.mode.metadata.storage.StorageNodeDataSource;
 import org.apache.shardingsphere.mode.metadata.storage.StorageNodeRole;
 import 
org.apache.shardingsphere.mode.metadata.storage.event.DataSourceDisabledEvent;
 import 
org.apache.shardingsphere.mode.metadata.storage.event.PrimaryDataSourceChangedEvent;
+import 
org.apache.shardingsphere.mode.metadata.storage.event.StorageNodeDataSourceDeletedEvent;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 
 /**
@@ -50,7 +51,7 @@ public final class StorageNodeStatusSubscriber {
      */
     @Subscribe
     public void update(final DataSourceDisabledEvent event) {
-        repository.persist(StorageNode.getStatusPath(new 
QualifiedDatabase(event.getDatabaseName(), event.getGroupName(), 
event.getDataSourceName())),
+        repository.persist(StorageNode.getStorageNodeDataSourcePath(new 
QualifiedDatabase(event.getDatabaseName(), event.getGroupName(), 
event.getDataSourceName())),
                 YamlEngine.marshal(new 
YamlStorageNodeDataSourceSwapper().swapToYamlConfiguration(event.getStorageNodeDataSource())));
     }
     
@@ -61,7 +62,17 @@ public final class StorageNodeStatusSubscriber {
      */
     @Subscribe
     public void update(final PrimaryDataSourceChangedEvent event) {
-        
repository.persist(StorageNode.getStatusPath(event.getQualifiedDatabase()),
+        
repository.persist(StorageNode.getStorageNodeDataSourcePath(event.getQualifiedDatabase()),
                 YamlEngine.marshal(new 
YamlStorageNodeDataSourceSwapper().swapToYamlConfiguration(new 
StorageNodeDataSource(StorageNodeRole.PRIMARY, DataSourceState.ENABLED))));
     }
+    
+    /**
+     * Delete storage node data source.
+     *
+     * @param event storage node data source deleted event
+     */
+    @Subscribe
+    public void delete(final StorageNodeDataSourceDeletedEvent event) {
+        
repository.delete(StorageNode.getStorageNodeDataSourcePath(event.getQualifiedDatabase()));
+    }
 }
diff --git 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/StorageNodeStatusSubscriberTest.java
 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/StorageNodeStatusSubscriberTest.java
index 9571e4daf0c..f721e3db0aa 100644
--- 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/StorageNodeStatusSubscriberTest.java
+++ 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/StorageNodeStatusSubscriberTest.java
@@ -27,6 +27,7 @@ import 
org.apache.shardingsphere.mode.metadata.storage.StorageNodeDataSource;
 import org.apache.shardingsphere.mode.metadata.storage.StorageNodeRole;
 import 
org.apache.shardingsphere.mode.metadata.storage.event.DataSourceDisabledEvent;
 import 
org.apache.shardingsphere.mode.metadata.storage.event.PrimaryDataSourceChangedEvent;
+import 
org.apache.shardingsphere.mode.metadata.storage.event.StorageNodeDataSourceDeletedEvent;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -51,7 +52,7 @@ public final class StorageNodeStatusSubscriberTest {
         StorageNodeDataSource storageNodeDataSource = new 
StorageNodeDataSource(StorageNodeRole.MEMBER, DataSourceState.DISABLED);
         DataSourceDisabledEvent dataSourceDisabledEvent = new 
DataSourceDisabledEvent(databaseName, groupName, dataSourceName, 
storageNodeDataSource);
         new StorageNodeStatusSubscriber(repository, 
eventBusContext).update(dataSourceDisabledEvent);
-        verify(repository).persist(StorageNode.getStatusPath(new 
QualifiedDatabase(databaseName, groupName, dataSourceName)),
+        
verify(repository).persist(StorageNode.getStorageNodeDataSourcePath(new 
QualifiedDatabase(databaseName, groupName, dataSourceName)),
                 YamlEngine.marshal(new 
YamlStorageNodeDataSourceSwapper().swapToYamlConfiguration(new 
StorageNodeDataSource(StorageNodeRole.MEMBER, DataSourceState.DISABLED))));
     }
     
@@ -63,7 +64,7 @@ public final class StorageNodeStatusSubscriberTest {
         StorageNodeDataSource storageNodeDataSource = new 
StorageNodeDataSource(StorageNodeRole.MEMBER, DataSourceState.ENABLED);
         DataSourceDisabledEvent dataSourceDisabledEvent = new 
DataSourceDisabledEvent(databaseName, groupName, dataSourceName, 
storageNodeDataSource);
         new StorageNodeStatusSubscriber(repository, 
eventBusContext).update(dataSourceDisabledEvent);
-        verify(repository).persist(StorageNode.getStatusPath(new 
QualifiedDatabase(databaseName, groupName, dataSourceName)),
+        
verify(repository).persist(StorageNode.getStorageNodeDataSourcePath(new 
QualifiedDatabase(databaseName, groupName, dataSourceName)),
                 YamlEngine.marshal(new 
YamlStorageNodeDataSourceSwapper().swapToYamlConfiguration(storageNodeDataSource)));
     }
     
@@ -74,7 +75,17 @@ public final class StorageNodeStatusSubscriberTest {
         String dataSourceName = "replica_ds_0";
         PrimaryDataSourceChangedEvent event = new 
PrimaryDataSourceChangedEvent(new QualifiedDatabase(databaseName, groupName, 
dataSourceName));
         new StorageNodeStatusSubscriber(repository, 
eventBusContext).update(event);
-        verify(repository).persist(StorageNode.getStatusPath(new 
QualifiedDatabase(databaseName, groupName, dataSourceName)),
+        
verify(repository).persist(StorageNode.getStorageNodeDataSourcePath(new 
QualifiedDatabase(databaseName, groupName, dataSourceName)),
                 YamlEngine.marshal(new 
YamlStorageNodeDataSourceSwapper().swapToYamlConfiguration(new 
StorageNodeDataSource(StorageNodeRole.PRIMARY, DataSourceState.ENABLED))));
     }
+    
+    @Test
+    public void assertDeleteStorageNodeDataSourceDataSourceState() {
+        String databaseName = "replica_query_db";
+        String groupName = "readwrite_ds";
+        String dataSourceName = "replica_ds_0";
+        StorageNodeDataSourceDeletedEvent event = new 
StorageNodeDataSourceDeletedEvent(new QualifiedDatabase(databaseName, 
groupName, dataSourceName));
+        new StorageNodeStatusSubscriber(repository, 
eventBusContext).delete(event);
+        verify(repository).delete(StorageNode.getStorageNodeDataSourcePath(new 
QualifiedDatabase(databaseName, groupName, dataSourceName)));
+    }
 }
diff --git 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/ImportDatabaseConfigurationUpdater.java
 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/ImportDatabaseConfigurationUpdater.java
index 9977705be9e..0eedb5077e9 100644
--- 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/ImportDatabaseConfigurationUpdater.java
+++ 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/ImportDatabaseConfigurationUpdater.java
@@ -169,7 +169,7 @@ public final class ImportDatabaseConfigurationUpdater 
implements RALUpdater<Impo
                 ReadwriteSplittingRuleConfiguration 
readwriteSplittingRuleConfig = new 
YamlReadwriteSplittingRuleConfigurationSwapper().swapToObject((YamlReadwriteSplittingRuleConfiguration)
 each);
                 
readwriteSplittingRuleConfigurationImportChecker.check(database, 
readwriteSplittingRuleConfig);
                 ruleConfigs.add(readwriteSplittingRuleConfig);
-                rules.add(new 
ReadwriteSplittingRule(readwriteSplittingRuleConfig, rules));
+                rules.add(new ReadwriteSplittingRule(databaseName, 
readwriteSplittingRuleConfig, rules));
             } else if (each instanceof YamlDatabaseDiscoveryRuleConfiguration) 
{
                 DatabaseDiscoveryRuleConfiguration databaseDiscoveryRuleConfig 
= new 
YamlDatabaseDiscoveryRuleConfigurationSwapper().swapToObject((YamlDatabaseDiscoveryRuleConfiguration)
 each);
                 
databaseDiscoveryRuleConfigurationImportChecker.check(database, 
databaseDiscoveryRuleConfig);
diff --git 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rdl/rule/RuleDefinitionBackendHandler.java
 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rdl/rule/RuleDefinitionBackendHandler.java
index df4b5b33f3e..b98fab4185b 100644
--- 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rdl/rule/RuleDefinitionBackendHandler.java
+++ 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rdl/rule/RuleDefinitionBackendHandler.java
@@ -17,6 +17,8 @@
 
 package org.apache.shardingsphere.proxy.backend.handler.distsql.rdl.rule;
 
+import 
org.apache.shardingsphere.dbdiscovery.distsql.handler.update.DropDatabaseDiscoveryRuleStatementUpdater;
+import 
org.apache.shardingsphere.dbdiscovery.distsql.parser.statement.DropDatabaseDiscoveryRuleStatement;
 import 
org.apache.shardingsphere.distsql.handler.update.RuleDefinitionAlterUpdater;
 import 
org.apache.shardingsphere.distsql.handler.update.RuleDefinitionCreateUpdater;
 import 
org.apache.shardingsphere.distsql.handler.update.RuleDefinitionDropUpdater;
@@ -24,6 +26,8 @@ import 
org.apache.shardingsphere.distsql.handler.update.RuleDefinitionUpdater;
 import 
org.apache.shardingsphere.distsql.parser.statement.rdl.RuleDefinitionStatement;
 import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import 
org.apache.shardingsphere.infra.rule.identifier.type.DynamicDataSourceContainedRule;
+import 
org.apache.shardingsphere.infra.rule.identifier.type.StaticDataSourceContainedRule;
 import 
org.apache.shardingsphere.infra.util.exception.external.sql.type.generic.UnsupportedSQLOperationException;
 import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
@@ -31,6 +35,8 @@ import 
org.apache.shardingsphere.proxy.backend.handler.distsql.rdl.RDLBackendHan
 import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
 import 
org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
 import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
+import 
org.apache.shardingsphere.readwritesplitting.distsql.handler.update.DropReadwriteSplittingRuleStatementUpdater;
+import 
org.apache.shardingsphere.readwritesplitting.distsql.parser.statement.DropReadwriteSplittingRuleStatement;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 
 import java.util.Collection;
@@ -85,7 +91,7 @@ public final class RuleDefinitionBackendHandler<T extends 
RuleDefinitionStatemen
             result.remove(currentRuleConfig);
             result.add(processAlter(sqlStatement, (RuleDefinitionAlterUpdater) 
updater, currentRuleConfig));
         } else if (updater instanceof RuleDefinitionDropUpdater) {
-            processDrop(result, sqlStatement, (RuleDefinitionDropUpdater) 
updater, currentRuleConfig);
+            processDrop(database, result, sqlStatement, 
(RuleDefinitionDropUpdater) updater, currentRuleConfig);
         } else {
             throw new UnsupportedSQLOperationException(String.format("Cannot 
support RDL updater type `%s`", updater.getClass().getName()));
         }
@@ -110,13 +116,22 @@ public final class RuleDefinitionBackendHandler<T extends 
RuleDefinitionStatemen
     }
     
     @SuppressWarnings({"rawtypes", "unchecked"})
-    private void processDrop(final Collection<RuleConfiguration> configs, 
final T sqlStatement, final RuleDefinitionDropUpdater updater, final 
RuleConfiguration currentRuleConfig) {
+    private void processDrop(final ShardingSphereDatabase database, final 
Collection<RuleConfiguration> configs, final T sqlStatement,
+                             final RuleDefinitionDropUpdater updater, final 
RuleConfiguration currentRuleConfig) {
         if (!updater.hasAnyOneToBeDropped(sqlStatement, currentRuleConfig)) {
             return;
         }
         if (updater.updateCurrentRuleConfiguration(sqlStatement, 
currentRuleConfig)) {
             configs.remove(currentRuleConfig);
         }
+        if (updater instanceof DropDatabaseDiscoveryRuleStatementUpdater) {
+            
database.getRuleMetaData().findSingleRule(DynamicDataSourceContainedRule.class)
+                    .ifPresent(optional -> 
((DropDatabaseDiscoveryRuleStatement) 
sqlStatement).getNames().forEach(optional::closeSingleHeartBeatJob));
+        }
+        if (updater instanceof DropReadwriteSplittingRuleStatementUpdater) {
+            
database.getRuleMetaData().findSingleRule(StaticDataSourceContainedRule.class)
+                    .ifPresent(optional -> 
((DropReadwriteSplittingRuleStatement) 
sqlStatement).getNames().forEach(optional::cleanStorageNodeDataSource));
+        }
     }
     
     private boolean getRefreshStatus(final SQLStatement sqlStatement, final 
RuleConfiguration currentRuleConfig, final RuleDefinitionUpdater<?, ?> updater) 
{


Reply via email to