This is an automated email from the ASF dual-hosted git repository.
tuichenchuxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 6e5c7187300 Refactor drop database stop heartbeat job and remove
storage node data sources. (#24275)
6e5c7187300 is described below
commit 6e5c71873007979eb0ea6eac86c56c1b0d267622
Author: zhaojinchao <[email protected]>
AuthorDate: Tue Feb 21 17:46:04 2023 +0800
Refactor drop database stop heartbeat job and remove storage node data
sources. (#24275)
* Refactor drop database stop heartbeat job and remove storage node data
sources.
* Fix checkstyle
* Fix checkstyle
* Fix checkstyle
* FIX checkstyle
---
.../dbdiscovery/rule/DatabaseDiscoveryRule.java | 31 ++++++++++++++--------
.../DropDatabaseDiscoveryRuleStatementUpdater.java | 6 -----
.../rule/ReadwriteSplittingRule.java | 26 +++++++++++++++++-
.../builder/ReadwriteSplittingRuleBuilder.java | 2 +-
.../route/ReadwriteSplittingSQLRouterTest.java | 4 +--
.../rule/ReadwriteSplittingRuleTest.java | 2 +-
.../infra/metadata/ShardingSphereMetaData.java | 2 ++
.../type/StaticDataSourceContainedRule.java | 12 +++++++++
.../event/StorageNodeDataSourceDeletedEvent.java | 20 +++++++-------
.../registry/status/storage/node/StorageNode.java | 4 +--
.../subscriber/StorageNodeStatusSubscriber.java | 15 +++++++++--
.../StorageNodeStatusSubscriberTest.java | 17 +++++++++---
.../ImportDatabaseConfigurationUpdater.java | 2 +-
.../rdl/rule/RuleDefinitionBackendHandler.java | 19 +++++++++++--
14 files changed, 119 insertions(+), 43 deletions(-)
diff --git
a/features/db-discovery/core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java
b/features/db-discovery/core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java
index 252481d87d8..737392d0063 100644
---
a/features/db-discovery/core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java
+++
b/features/db-discovery/core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java
@@ -44,6 +44,7 @@ import
org.apache.shardingsphere.infra.util.exception.ShardingSpherePrecondition
import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
import
org.apache.shardingsphere.mode.metadata.storage.event.PrimaryDataSourceChangedEvent;
import
org.apache.shardingsphere.mode.metadata.storage.event.StorageNodeDataSourceChangedEvent;
+import
org.apache.shardingsphere.mode.metadata.storage.event.StorageNodeDataSourceDeletedEvent;
import org.apache.shardingsphere.schedule.core.ScheduleContextFactory;
import javax.sql.DataSource;
@@ -119,6 +120,17 @@ public final class DatabaseDiscoveryRule implements
DatabaseRule, DataSourceCont
}
}
+ private void initHeartBeatJobs() {
+ for (Entry<String, DatabaseDiscoveryDataSourceRule> entry :
dataSourceRules.entrySet()) {
+ DatabaseDiscoveryDataSourceRule rule = entry.getValue();
+ String jobName = rule.getProvider().getType() + "-" + databaseName
+ "-" + rule.getGroupName();
+ CronJob job = new CronJob(jobName, each -> new
HeartbeatJob(databaseName, rule.getGroupName(),
rule.getPrimaryDataSourceName(), rule.getDataSourceGroup(dataSourceMap),
+ rule.getProvider(), rule.getDisabledDataSourceNames(),
instanceContext).execute(null),
+ rule.getHeartbeatProps().getProperty("keep-alive-cron"));
+ scheduleContext.startSchedule(job);
+ }
+ }
+
/**
* Get single data source rule.
*
@@ -162,24 +174,21 @@ public final class DatabaseDiscoveryRule implements
DatabaseRule, DataSourceCont
DatabaseDiscoveryDataSourceRule dataSourceRule =
dataSourceRules.get(groupName);
ShardingSpherePreconditions.checkNotNull(dataSourceRule, () -> new
DBDiscoveryDataSourceRuleNotFoundException(databaseName));
scheduleContext.closeSchedule(dataSourceRule.getProvider().getType() +
"-" + databaseName + "-" + dataSourceRule.getGroupName());
+ deleteStorageNodeDataSources(dataSourceRule);
}
- @Override
- public void closeAllHeartBeatJob() {
- for (Entry<String, DatabaseDiscoveryDataSourceRule> entry :
dataSourceRules.entrySet()) {
- DatabaseDiscoveryDataSourceRule rule = entry.getValue();
- scheduleContext.closeSchedule(rule.getProvider().getType() + "-" +
databaseName + "-" + rule.getGroupName());
+ private void deleteStorageNodeDataSources(final
DatabaseDiscoveryDataSourceRule dataSourceRule) {
+ for (String each : dataSourceRule.getDataSourceNames()) {
+ instanceContext.getEventBusContext().post(new
StorageNodeDataSourceDeletedEvent(new QualifiedDatabase(databaseName,
dataSourceRule.getGroupName(), each)));
}
}
- private void initHeartBeatJobs() {
+ @Override
+ public void closeAllHeartBeatJob() {
for (Entry<String, DatabaseDiscoveryDataSourceRule> entry :
dataSourceRules.entrySet()) {
DatabaseDiscoveryDataSourceRule rule = entry.getValue();
- String jobName = rule.getProvider().getType() + "-" + databaseName
+ "-" + rule.getGroupName();
- CronJob job = new CronJob(jobName, each -> new
HeartbeatJob(databaseName, rule.getGroupName(),
rule.getPrimaryDataSourceName(), rule.getDataSourceGroup(dataSourceMap),
- rule.getProvider(), rule.getDisabledDataSourceNames(),
instanceContext).execute(null),
- rule.getHeartbeatProps().getProperty("keep-alive-cron"));
- scheduleContext.startSchedule(job);
+ scheduleContext.closeSchedule(rule.getProvider().getType() + "-" +
databaseName + "-" + rule.getGroupName());
+ deleteStorageNodeDataSources(rule);
}
}
diff --git
a/features/db-discovery/distsql/handler/src/main/java/org/apache/shardingsphere/dbdiscovery/distsql/handler/update/DropDatabaseDiscoveryRuleStatementUpdater.java
b/features/db-discovery/distsql/handler/src/main/java/org/apache/shardingsphere/dbdiscovery/distsql/handler/update/DropDatabaseDiscoveryRuleStatementUpdater.java
index 8708c833893..feb538fd2c4 100644
---
a/features/db-discovery/distsql/handler/src/main/java/org/apache/shardingsphere/dbdiscovery/distsql/handler/update/DropDatabaseDiscoveryRuleStatementUpdater.java
+++
b/features/db-discovery/distsql/handler/src/main/java/org/apache/shardingsphere/dbdiscovery/distsql/handler/update/DropDatabaseDiscoveryRuleStatementUpdater.java
@@ -20,7 +20,6 @@ package
org.apache.shardingsphere.dbdiscovery.distsql.handler.update;
import
org.apache.shardingsphere.dbdiscovery.api.config.DatabaseDiscoveryRuleConfiguration;
import
org.apache.shardingsphere.dbdiscovery.api.config.rule.DatabaseDiscoveryDataSourceRuleConfiguration;
import
org.apache.shardingsphere.dbdiscovery.distsql.parser.statement.DropDatabaseDiscoveryRuleStatement;
-import
org.apache.shardingsphere.infra.rule.identifier.type.DynamicDataSourceContainedRule;
import
org.apache.shardingsphere.infra.rule.identifier.type.exportable.constant.ExportableConstants;
import
org.apache.shardingsphere.infra.rule.identifier.type.exportable.constant.ExportableItemConstants;
import
org.apache.shardingsphere.distsql.handler.exception.rule.MissingRequiredRuleException;
@@ -51,7 +50,6 @@ public final class DropDatabaseDiscoveryRuleStatementUpdater
implements RuleDefi
String databaseName = database.getName();
checkCurrentRuleConfiguration(databaseName, sqlStatement,
currentRuleConfig);
checkIsInUse(databaseName, sqlStatement, database);
- closeHeartbeatJob(database, sqlStatement);
}
private void checkCurrentRuleConfiguration(final String databaseName,
final DropDatabaseDiscoveryRuleStatement sqlStatement, final
DatabaseDiscoveryRuleConfiguration currentRuleConfig) {
@@ -81,10 +79,6 @@ public final class DropDatabaseDiscoveryRuleStatementUpdater
implements RuleDefi
ShardingSpherePreconditions.checkState(invalid.isEmpty(), () -> new
RuleInUsedException(RULE_TYPE, databaseName, invalid));
}
- private void closeHeartbeatJob(final ShardingSphereDatabase database,
final DropDatabaseDiscoveryRuleStatement sqlStatement) {
- sqlStatement.getNames().forEach(each ->
database.getRuleMetaData().findSingleRule(DynamicDataSourceContainedRule.class).ifPresent(optional
-> optional.closeSingleHeartBeatJob(each)));
- }
-
@Override
public boolean updateCurrentRuleConfiguration(final
DropDatabaseDiscoveryRuleStatement sqlStatement, final
DatabaseDiscoveryRuleConfiguration currentRuleConfig) {
sqlStatement.getNames().forEach(each -> dropRule(currentRuleConfig,
each));
diff --git
a/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRule.java
b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRule.java
index 101682eb36b..ebd843d992c 100644
---
a/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRule.java
+++
b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRule.java
@@ -36,6 +36,7 @@ import
org.apache.shardingsphere.infra.util.exception.ShardingSpherePrecondition
import org.apache.shardingsphere.infra.util.expr.InlineExpressionParser;
import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
import
org.apache.shardingsphere.mode.metadata.storage.event.StorageNodeDataSourceChangedEvent;
+import
org.apache.shardingsphere.mode.metadata.storage.event.StorageNodeDataSourceDeletedEvent;
import
org.apache.shardingsphere.readwritesplitting.api.ReadwriteSplittingRuleConfiguration;
import
org.apache.shardingsphere.readwritesplitting.api.rule.ReadwriteSplittingDataSourceRuleConfiguration;
import
org.apache.shardingsphere.readwritesplitting.api.strategy.DynamicReadwriteSplittingStrategyConfiguration;
@@ -59,6 +60,8 @@ import java.util.stream.Collectors;
*/
public final class ReadwriteSplittingRule implements DatabaseRule,
DataSourceContainedRule, StaticDataSourceContainedRule, ExportableRule,
StorageConnectorReusableRule {
+ private final String databaseName;
+
@Getter
private final RuleConfiguration configuration;
@@ -66,7 +69,8 @@ public final class ReadwriteSplittingRule implements
DatabaseRule, DataSourceCon
private final Map<String, ReadwriteSplittingDataSourceRule>
dataSourceRules;
- public ReadwriteSplittingRule(final ReadwriteSplittingRuleConfiguration
ruleConfig, final Collection<ShardingSphereRule> builtRules) {
+ public ReadwriteSplittingRule(final String databaseName, final
ReadwriteSplittingRuleConfiguration ruleConfig, final
Collection<ShardingSphereRule> builtRules) {
+ this.databaseName = databaseName;
configuration = ruleConfig;
for (ReadwriteSplittingDataSourceRuleConfiguration
dataSourceRuleConfiguration : ruleConfig.getDataSources()) {
if
(ruleConfig.getLoadBalancers().containsKey(dataSourceRuleConfiguration.getLoadBalancerName()))
{
@@ -176,6 +180,26 @@ public final class ReadwriteSplittingRule implements
DatabaseRule, DataSourceCon
dataSourceRule.updateDisabledDataSourceNames(dataSourceEvent.getQualifiedDatabase().getDataSourceName(),
DataSourceState.DISABLED == dataSourceEvent.getDataSource().getStatus());
}
+ @Override
+ public void cleanStorageNodeDataSource(final String groupName) {
+ Preconditions.checkNotNull(dataSourceRules.get(groupName),
String.format("`%s` group name not exist in database `%s`", groupName,
databaseName));
+ deleteStorageNodeDataSources(dataSourceRules.get(groupName));
+ }
+
+ private void deleteStorageNodeDataSources(final
ReadwriteSplittingDataSourceRule rule) {
+ if (rule.getReadwriteSplittingStrategy() instanceof
DynamicReadwriteSplittingStrategy) {
+ return;
+ }
+ rule.getReadwriteSplittingStrategy().getReadDataSources().forEach(each
-> new StorageNodeDataSourceDeletedEvent(new QualifiedDatabase(databaseName,
rule.getName(), each)));
+ }
+
+ @Override
+ public void cleanStorageNodeDataSources() {
+ for (Entry<String, ReadwriteSplittingDataSourceRule> entry :
dataSourceRules.entrySet()) {
+ deleteStorageNodeDataSources(entry.getValue());
+ }
+ }
+
@Override
public Map<String, Object> getExportData() {
Map<String, Object> result = new HashMap<>(2, 1);
diff --git
a/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/builder/ReadwriteSplittingRuleBuilder.java
b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/builder/ReadwriteSplittingRuleBuilder.java
index f0d3afbcbf6..1fe9cf64e35 100644
---
a/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/builder/ReadwriteSplittingRuleBuilder.java
+++
b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/builder/ReadwriteSplittingRuleBuilder.java
@@ -36,7 +36,7 @@ public final class ReadwriteSplittingRuleBuilder implements
DatabaseRuleBuilder<
@Override
public ReadwriteSplittingRule build(final
ReadwriteSplittingRuleConfiguration config, final String databaseName,
final Map<String, DataSource>
dataSources, final Collection<ShardingSphereRule> builtRules, final
InstanceContext instanceContext) {
- return new ReadwriteSplittingRule(config, builtRules);
+ return new ReadwriteSplittingRule(databaseName, config, builtRules);
}
@Override
diff --git
a/features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/route/ReadwriteSplittingSQLRouterTest.java
b/features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/route/ReadwriteSplittingSQLRouterTest.java
index 0ad3206292d..26a56b8dfbd 100644
---
a/features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/route/ReadwriteSplittingSQLRouterTest.java
+++
b/features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/route/ReadwriteSplittingSQLRouterTest.java
@@ -86,14 +86,14 @@ public final class ReadwriteSplittingSQLRouterTest {
@Before
public void setUp() {
- staticRule = new ReadwriteSplittingRule(new
ReadwriteSplittingRuleConfiguration(Collections.singleton(new
ReadwriteSplittingDataSourceRuleConfiguration(DATASOURCE_NAME,
+ staticRule = new ReadwriteSplittingRule("logic_db", new
ReadwriteSplittingRuleConfiguration(Collections.singleton(new
ReadwriteSplittingDataSourceRuleConfiguration(DATASOURCE_NAME,
new
StaticReadwriteSplittingStrategyConfiguration(WRITE_DATASOURCE,
Collections.singletonList(READ_DATASOURCE)), null, "")),
Collections.emptyMap()), Collections.emptyList());
sqlRouter = (ReadwriteSplittingSQLRouter)
OrderedSPILoader.getServices(SQLRouter.class,
Collections.singleton(staticRule)).get(staticRule);
DynamicDataSourceContainedRule dynamicDataSourceRule =
mock(DynamicDataSourceContainedRule.class, RETURNS_DEEP_STUBS);
when(dynamicDataSourceRule.getPrimaryDataSourceName("readwrite_ds")).thenReturn(WRITE_DATASOURCE);
when(dynamicDataSourceRule.getReplicaDataSourceNames("readwrite_ds")).thenReturn(Collections.emptyList());
- dynamicRule = new ReadwriteSplittingRule(new
ReadwriteSplittingRuleConfiguration(Collections.singleton(new
ReadwriteSplittingDataSourceRuleConfiguration(DATASOURCE_NAME, null,
+ dynamicRule = new ReadwriteSplittingRule("logic_db", new
ReadwriteSplittingRuleConfiguration(Collections.singleton(new
ReadwriteSplittingDataSourceRuleConfiguration(DATASOURCE_NAME, null,
new
DynamicReadwriteSplittingStrategyConfiguration("readwrite_ds", "true"), "")),
Collections.emptyMap()),
Collections.singleton(dynamicDataSourceRule));
dynamicSqlRouter = (ReadwriteSplittingSQLRouter)
OrderedSPILoader.getServices(SQLRouter.class,
Collections.singleton(dynamicRule)).get(dynamicRule);
diff --git
a/features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRuleTest.java
b/features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRuleTest.java
index a4c8ae40a45..bb117c99443 100644
---
a/features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRuleTest.java
+++
b/features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRuleTest.java
@@ -56,7 +56,7 @@ public final class ReadwriteSplittingRuleTest {
private ReadwriteSplittingRule createReadwriteSplittingRule() {
ReadwriteSplittingDataSourceRuleConfiguration config =
new ReadwriteSplittingDataSourceRuleConfiguration("readwrite",
new StaticReadwriteSplittingStrategyConfiguration("write_ds",
Arrays.asList("read_ds_0", "read_ds_1")), null, "random");
- return new ReadwriteSplittingRule(new
ReadwriteSplittingRuleConfiguration(
+ return new ReadwriteSplittingRule("logic_db", new
ReadwriteSplittingRuleConfiguration(
Collections.singleton(config),
Collections.singletonMap("random", new AlgorithmConfiguration("RANDOM", new
Properties()))), Collections.emptyList());
}
diff --git
a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/ShardingSphereMetaData.java
b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/ShardingSphereMetaData.java
index 11c8f301bc6..b0ae4078eef 100644
---
a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/ShardingSphereMetaData.java
+++
b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/ShardingSphereMetaData.java
@@ -25,6 +25,7 @@ import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import
org.apache.shardingsphere.infra.metadata.database.rule.ShardingSphereRuleMetaData;
import
org.apache.shardingsphere.infra.rule.identifier.type.DynamicDataSourceContainedRule;
import org.apache.shardingsphere.infra.rule.identifier.type.ResourceHeldRule;
+import
org.apache.shardingsphere.infra.rule.identifier.type.StaticDataSourceContainedRule;
import java.util.Collections;
import java.util.LinkedHashMap;
@@ -125,6 +126,7 @@ public final class ShardingSphereMetaData {
globalRuleMetaData.findRules(ResourceHeldRule.class).forEach(each ->
each.closeStaleResource(databaseName));
database.getRuleMetaData().findRules(ResourceHeldRule.class).forEach(each ->
each.closeStaleResource(databaseName));
database.getRuleMetaData().findSingleRule(DynamicDataSourceContainedRule.class).ifPresent(DynamicDataSourceContainedRule::closeAllHeartBeatJob);
+
database.getRuleMetaData().findSingleRule(StaticDataSourceContainedRule.class).ifPresent(StaticDataSourceContainedRule::cleanStorageNodeDataSources);
Optional.ofNullable(database.getResourceMetaData()).ifPresent(optional
-> optional.getDataSources().values().forEach(each ->
database.getResourceMetaData().close(each)));
}
}
diff --git
a/infra/common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/StaticDataSourceContainedRule.java
b/infra/common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/StaticDataSourceContainedRule.java
index b12c8e6c044..728674bf615 100644
---
a/infra/common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/StaticDataSourceContainedRule.java
+++
b/infra/common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/StaticDataSourceContainedRule.java
@@ -31,4 +31,16 @@ public interface StaticDataSourceContainedRule extends
ShardingSphereRule {
* @param event data source status changed event
*/
void updateStatus(DataSourceStatusChangedEvent event);
+
+ /**
+ * Clean single storage node data source.
+ *
+ * @param groupName group name
+ */
+ void cleanStorageNodeDataSource(String groupName);
+
+ /**
+ * Clean storage nodes data sources.
+ */
+ void cleanStorageNodeDataSources();
}
diff --git
a/infra/common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/StaticDataSourceContainedRule.java
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/storage/event/StorageNodeDataSourceDeletedEvent.java
similarity index 62%
copy from
infra/common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/StaticDataSourceContainedRule.java
copy to
mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/storage/event/StorageNodeDataSourceDeletedEvent.java
index b12c8e6c044..010ed71d979 100644
---
a/infra/common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/StaticDataSourceContainedRule.java
+++
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/storage/event/StorageNodeDataSourceDeletedEvent.java
@@ -15,20 +15,18 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.infra.rule.identifier.type;
+package org.apache.shardingsphere.mode.metadata.storage.event;
-import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
-import org.apache.shardingsphere.infra.rule.event.DataSourceStatusChangedEvent;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import
org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDatabase;
/**
- * Static data source contained rule.
+ * Storage node data source deleted event.
*/
-public interface StaticDataSourceContainedRule extends ShardingSphereRule {
+@RequiredArgsConstructor
+@Getter
+public final class StorageNodeDataSourceDeletedEvent {
- /**
- * Update data source status.
- *
- * @param event data source status changed event
- */
- void updateStatus(DataSourceStatusChangedEvent event);
+ private final QualifiedDatabase qualifiedDatabase;
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/node/StorageNode.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/node/StorageNode.java
index ef9c5003a59..4a451545dd6 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/node/StorageNode.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/node/StorageNode.java
@@ -55,12 +55,12 @@ public final class StorageNode {
}
/**
- * Get storage node status path.
+ * Get storage node data source path.
*
* @param database cluster database
* @return status path of storage node
*/
- public static String getStatusPath(final QualifiedDatabase database) {
+ public static String getStorageNodeDataSourcePath(final QualifiedDatabase
database) {
return String.join("/", getRootPath(), database.toString());
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/StorageNodeStatusSubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/StorageNodeStatusSubscriber.java
index 6548aa98c48..7aa3904c788 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/StorageNodeStatusSubscriber.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/StorageNodeStatusSubscriber.java
@@ -28,6 +28,7 @@ import
org.apache.shardingsphere.mode.metadata.storage.StorageNodeDataSource;
import org.apache.shardingsphere.mode.metadata.storage.StorageNodeRole;
import
org.apache.shardingsphere.mode.metadata.storage.event.DataSourceDisabledEvent;
import
org.apache.shardingsphere.mode.metadata.storage.event.PrimaryDataSourceChangedEvent;
+import
org.apache.shardingsphere.mode.metadata.storage.event.StorageNodeDataSourceDeletedEvent;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
/**
@@ -50,7 +51,7 @@ public final class StorageNodeStatusSubscriber {
*/
@Subscribe
public void update(final DataSourceDisabledEvent event) {
- repository.persist(StorageNode.getStatusPath(new
QualifiedDatabase(event.getDatabaseName(), event.getGroupName(),
event.getDataSourceName())),
+ repository.persist(StorageNode.getStorageNodeDataSourcePath(new
QualifiedDatabase(event.getDatabaseName(), event.getGroupName(),
event.getDataSourceName())),
YamlEngine.marshal(new
YamlStorageNodeDataSourceSwapper().swapToYamlConfiguration(event.getStorageNodeDataSource())));
}
@@ -61,7 +62,17 @@ public final class StorageNodeStatusSubscriber {
*/
@Subscribe
public void update(final PrimaryDataSourceChangedEvent event) {
-
repository.persist(StorageNode.getStatusPath(event.getQualifiedDatabase()),
+
repository.persist(StorageNode.getStorageNodeDataSourcePath(event.getQualifiedDatabase()),
YamlEngine.marshal(new
YamlStorageNodeDataSourceSwapper().swapToYamlConfiguration(new
StorageNodeDataSource(StorageNodeRole.PRIMARY, DataSourceState.ENABLED))));
}
+
+ /**
+ * Delete storage node data source.
+ *
+ * @param event storage node data source deleted event
+ */
+ @Subscribe
+ public void delete(final StorageNodeDataSourceDeletedEvent event) {
+
repository.delete(StorageNode.getStorageNodeDataSourcePath(event.getQualifiedDatabase()));
+ }
}
diff --git
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/StorageNodeStatusSubscriberTest.java
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/StorageNodeStatusSubscriberTest.java
index 9571e4daf0c..f721e3db0aa 100644
---
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/StorageNodeStatusSubscriberTest.java
+++
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/StorageNodeStatusSubscriberTest.java
@@ -27,6 +27,7 @@ import
org.apache.shardingsphere.mode.metadata.storage.StorageNodeDataSource;
import org.apache.shardingsphere.mode.metadata.storage.StorageNodeRole;
import
org.apache.shardingsphere.mode.metadata.storage.event.DataSourceDisabledEvent;
import
org.apache.shardingsphere.mode.metadata.storage.event.PrimaryDataSourceChangedEvent;
+import
org.apache.shardingsphere.mode.metadata.storage.event.StorageNodeDataSourceDeletedEvent;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -51,7 +52,7 @@ public final class StorageNodeStatusSubscriberTest {
StorageNodeDataSource storageNodeDataSource = new
StorageNodeDataSource(StorageNodeRole.MEMBER, DataSourceState.DISABLED);
DataSourceDisabledEvent dataSourceDisabledEvent = new
DataSourceDisabledEvent(databaseName, groupName, dataSourceName,
storageNodeDataSource);
new StorageNodeStatusSubscriber(repository,
eventBusContext).update(dataSourceDisabledEvent);
- verify(repository).persist(StorageNode.getStatusPath(new
QualifiedDatabase(databaseName, groupName, dataSourceName)),
+
verify(repository).persist(StorageNode.getStorageNodeDataSourcePath(new
QualifiedDatabase(databaseName, groupName, dataSourceName)),
YamlEngine.marshal(new
YamlStorageNodeDataSourceSwapper().swapToYamlConfiguration(new
StorageNodeDataSource(StorageNodeRole.MEMBER, DataSourceState.DISABLED))));
}
@@ -63,7 +64,7 @@ public final class StorageNodeStatusSubscriberTest {
StorageNodeDataSource storageNodeDataSource = new
StorageNodeDataSource(StorageNodeRole.MEMBER, DataSourceState.ENABLED);
DataSourceDisabledEvent dataSourceDisabledEvent = new
DataSourceDisabledEvent(databaseName, groupName, dataSourceName,
storageNodeDataSource);
new StorageNodeStatusSubscriber(repository,
eventBusContext).update(dataSourceDisabledEvent);
- verify(repository).persist(StorageNode.getStatusPath(new
QualifiedDatabase(databaseName, groupName, dataSourceName)),
+
verify(repository).persist(StorageNode.getStorageNodeDataSourcePath(new
QualifiedDatabase(databaseName, groupName, dataSourceName)),
YamlEngine.marshal(new
YamlStorageNodeDataSourceSwapper().swapToYamlConfiguration(storageNodeDataSource)));
}
@@ -74,7 +75,17 @@ public final class StorageNodeStatusSubscriberTest {
String dataSourceName = "replica_ds_0";
PrimaryDataSourceChangedEvent event = new
PrimaryDataSourceChangedEvent(new QualifiedDatabase(databaseName, groupName,
dataSourceName));
new StorageNodeStatusSubscriber(repository,
eventBusContext).update(event);
- verify(repository).persist(StorageNode.getStatusPath(new
QualifiedDatabase(databaseName, groupName, dataSourceName)),
+
verify(repository).persist(StorageNode.getStorageNodeDataSourcePath(new
QualifiedDatabase(databaseName, groupName, dataSourceName)),
YamlEngine.marshal(new
YamlStorageNodeDataSourceSwapper().swapToYamlConfiguration(new
StorageNodeDataSource(StorageNodeRole.PRIMARY, DataSourceState.ENABLED))));
}
+
+ @Test
+ public void assertDeleteStorageNodeDataSourceDataSourceState() {
+ String databaseName = "replica_query_db";
+ String groupName = "readwrite_ds";
+ String dataSourceName = "replica_ds_0";
+ StorageNodeDataSourceDeletedEvent event = new
StorageNodeDataSourceDeletedEvent(new QualifiedDatabase(databaseName,
groupName, dataSourceName));
+ new StorageNodeStatusSubscriber(repository,
eventBusContext).delete(event);
+ verify(repository).delete(StorageNode.getStorageNodeDataSourcePath(new
QualifiedDatabase(databaseName, groupName, dataSourceName)));
+ }
}
diff --git
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/ImportDatabaseConfigurationUpdater.java
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/ImportDatabaseConfigurationUpdater.java
index 9977705be9e..0eedb5077e9 100644
---
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/ImportDatabaseConfigurationUpdater.java
+++
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/ImportDatabaseConfigurationUpdater.java
@@ -169,7 +169,7 @@ public final class ImportDatabaseConfigurationUpdater
implements RALUpdater<Impo
ReadwriteSplittingRuleConfiguration
readwriteSplittingRuleConfig = new
YamlReadwriteSplittingRuleConfigurationSwapper().swapToObject((YamlReadwriteSplittingRuleConfiguration)
each);
readwriteSplittingRuleConfigurationImportChecker.check(database,
readwriteSplittingRuleConfig);
ruleConfigs.add(readwriteSplittingRuleConfig);
- rules.add(new
ReadwriteSplittingRule(readwriteSplittingRuleConfig, rules));
+ rules.add(new ReadwriteSplittingRule(databaseName,
readwriteSplittingRuleConfig, rules));
} else if (each instanceof YamlDatabaseDiscoveryRuleConfiguration)
{
DatabaseDiscoveryRuleConfiguration databaseDiscoveryRuleConfig
= new
YamlDatabaseDiscoveryRuleConfigurationSwapper().swapToObject((YamlDatabaseDiscoveryRuleConfiguration)
each);
databaseDiscoveryRuleConfigurationImportChecker.check(database,
databaseDiscoveryRuleConfig);
diff --git
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rdl/rule/RuleDefinitionBackendHandler.java
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rdl/rule/RuleDefinitionBackendHandler.java
index df4b5b33f3e..b98fab4185b 100644
---
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rdl/rule/RuleDefinitionBackendHandler.java
+++
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rdl/rule/RuleDefinitionBackendHandler.java
@@ -17,6 +17,8 @@
package org.apache.shardingsphere.proxy.backend.handler.distsql.rdl.rule;
+import
org.apache.shardingsphere.dbdiscovery.distsql.handler.update.DropDatabaseDiscoveryRuleStatementUpdater;
+import
org.apache.shardingsphere.dbdiscovery.distsql.parser.statement.DropDatabaseDiscoveryRuleStatement;
import
org.apache.shardingsphere.distsql.handler.update.RuleDefinitionAlterUpdater;
import
org.apache.shardingsphere.distsql.handler.update.RuleDefinitionCreateUpdater;
import
org.apache.shardingsphere.distsql.handler.update.RuleDefinitionDropUpdater;
@@ -24,6 +26,8 @@ import
org.apache.shardingsphere.distsql.handler.update.RuleDefinitionUpdater;
import
org.apache.shardingsphere.distsql.parser.statement.rdl.RuleDefinitionStatement;
import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import
org.apache.shardingsphere.infra.rule.identifier.type.DynamicDataSourceContainedRule;
+import
org.apache.shardingsphere.infra.rule.identifier.type.StaticDataSourceContainedRule;
import
org.apache.shardingsphere.infra.util.exception.external.sql.type.generic.UnsupportedSQLOperationException;
import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
@@ -31,6 +35,8 @@ import
org.apache.shardingsphere.proxy.backend.handler.distsql.rdl.RDLBackendHan
import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
import
org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
+import
org.apache.shardingsphere.readwritesplitting.distsql.handler.update.DropReadwriteSplittingRuleStatementUpdater;
+import
org.apache.shardingsphere.readwritesplitting.distsql.parser.statement.DropReadwriteSplittingRuleStatement;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import java.util.Collection;
@@ -85,7 +91,7 @@ public final class RuleDefinitionBackendHandler<T extends
RuleDefinitionStatemen
result.remove(currentRuleConfig);
result.add(processAlter(sqlStatement, (RuleDefinitionAlterUpdater)
updater, currentRuleConfig));
} else if (updater instanceof RuleDefinitionDropUpdater) {
- processDrop(result, sqlStatement, (RuleDefinitionDropUpdater)
updater, currentRuleConfig);
+ processDrop(database, result, sqlStatement,
(RuleDefinitionDropUpdater) updater, currentRuleConfig);
} else {
throw new UnsupportedSQLOperationException(String.format("Cannot
support RDL updater type `%s`", updater.getClass().getName()));
}
@@ -110,13 +116,22 @@ public final class RuleDefinitionBackendHandler<T extends
RuleDefinitionStatemen
}
@SuppressWarnings({"rawtypes", "unchecked"})
- private void processDrop(final Collection<RuleConfiguration> configs,
final T sqlStatement, final RuleDefinitionDropUpdater updater, final
RuleConfiguration currentRuleConfig) {
+ private void processDrop(final ShardingSphereDatabase database, final
Collection<RuleConfiguration> configs, final T sqlStatement,
+ final RuleDefinitionDropUpdater updater, final
RuleConfiguration currentRuleConfig) {
if (!updater.hasAnyOneToBeDropped(sqlStatement, currentRuleConfig)) {
return;
}
if (updater.updateCurrentRuleConfiguration(sqlStatement,
currentRuleConfig)) {
configs.remove(currentRuleConfig);
}
+ if (updater instanceof DropDatabaseDiscoveryRuleStatementUpdater) {
+
database.getRuleMetaData().findSingleRule(DynamicDataSourceContainedRule.class)
+ .ifPresent(optional ->
((DropDatabaseDiscoveryRuleStatement)
sqlStatement).getNames().forEach(optional::closeSingleHeartBeatJob));
+ }
+ if (updater instanceof DropReadwriteSplittingRuleStatementUpdater) {
+
database.getRuleMetaData().findSingleRule(StaticDataSourceContainedRule.class)
+ .ifPresent(optional ->
((DropReadwriteSplittingRuleStatement)
sqlStatement).getNames().forEach(optional::cleanStorageNodeDataSource));
+ }
}
private boolean getRefreshStatus(final SQLStatement sqlStatement, final
RuleConfiguration currentRuleConfig, final RuleDefinitionUpdater<?, ?> updater)
{