This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new d39b987c308 Split status contain rule and support multiple groups of 
high availability (#19005)
d39b987c308 is described below

commit d39b987c308d44989238ec3e50a083c83a41363b
Author: zhaojinchao <zhaojinc...@apache.org>
AuthorDate: Sun Jul 10 23:41:50 2022 +0800

    Split status contain rule and support multiple groups of high availability 
(#19005)
    
    * Split readwrite and discovery update status event and support multiple 
groups of high availability
    
    * Fix unit test
    
    * Use Entry instead of Map.Entry
---
 .../rule/DatabaseDiscoveryDataSourceRule.java      | 18 +++++++++++++
 .../dbdiscovery/rule/DatabaseDiscoveryRule.java    | 23 ++++++++--------
 .../rule/DatabaseDiscoveryRuleTest.java            | 17 ++++++------
 .../rule/ReadwriteSplittingRule.java               | 15 ++++++-----
 .../rule/ReadwriteSplittingRuleTest.java           |  8 +++---
 ...obRule.java => DynamicStatusContainedRule.java} | 13 ++++++---
 ...nedRule.java => StaticStatusContainedRule.java} |  4 +--
 .../ClusterContextManagerCoordinator.java          | 31 ++++++++++++++--------
 .../ClusterContextManagerCoordinatorTest.java      | 10 +++----
 9 files changed, 87 insertions(+), 52 deletions(-)

diff --git 
a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryDataSourceRule.java
 
b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryDataSourceRule.java
index 162dec1ac8b..49b7e8bb82c 100644
--- 
a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryDataSourceRule.java
+++ 
b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryDataSourceRule.java
@@ -23,12 +23,14 @@ import lombok.Getter;
 import 
org.apache.shardingsphere.dbdiscovery.api.config.rule.DatabaseDiscoveryDataSourceRuleConfiguration;
 import 
org.apache.shardingsphere.dbdiscovery.spi.DatabaseDiscoveryProviderAlgorithm;
 
+import javax.sql.DataSource;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.stream.Collectors;
 
@@ -100,6 +102,22 @@ public final class DatabaseDiscoveryDataSourceRule {
         this.primaryDataSourceName = primaryDataSourceName;
     }
     
+    /**
+     *  Get data source.
+     *
+     * @param dataSourceMap data source map
+     * @return data source
+     */
+    public Map<String, DataSource> getDataSourceGroup(final Map<String, 
DataSource> dataSourceMap) {
+        Map<String, DataSource> result = new HashMap<>(dataSourceMap.size(), 
1);
+        for (Entry<String, DataSource> entry : dataSourceMap.entrySet()) {
+            if (dataSourceNames.contains(entry.getKey())) {
+                result.put(entry.getKey(), entry.getValue());
+            }
+        }
+        return result;
+    }
+    
     /**
      * Get data source mapper.
      *
diff --git 
a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java
 
b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java
index 52b29a6350f..7eae85e961d 100644
--- 
a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java
+++ 
b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java
@@ -37,8 +37,7 @@ import 
org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDatabas
 import org.apache.shardingsphere.infra.rule.event.DataSourceStatusChangedEvent;
 import org.apache.shardingsphere.infra.rule.identifier.scope.DatabaseRule;
 import 
org.apache.shardingsphere.infra.rule.identifier.type.DataSourceContainedRule;
-import 
org.apache.shardingsphere.infra.rule.identifier.type.RestartHeartBeatJobRule;
-import 
org.apache.shardingsphere.infra.rule.identifier.type.StatusContainedRule;
+import 
org.apache.shardingsphere.infra.rule.identifier.type.DynamicStatusContainedRule;
 import 
org.apache.shardingsphere.infra.rule.identifier.type.exportable.ExportableRule;
 import org.apache.shardingsphere.mode.metadata.storage.StorageNodeStatus;
 import 
org.apache.shardingsphere.mode.metadata.storage.event.PrimaryDataSourceChangedEvent;
@@ -60,7 +59,7 @@ import java.util.Properties;
 /**
  * Database discovery rule.
  */
-public final class DatabaseDiscoveryRule implements DatabaseRule, 
DataSourceContainedRule, RestartHeartBeatJobRule, StatusContainedRule, 
ExportableRule {
+public final class DatabaseDiscoveryRule implements DatabaseRule, 
DataSourceContainedRule, DynamicStatusContainedRule, ExportableRule {
     
     @Getter
     private final RuleConfiguration configuration;
@@ -117,8 +116,8 @@ public final class DatabaseDiscoveryRule implements 
DatabaseRule, DataSourceCont
         for (Entry<String, DatabaseDiscoveryDataSourceRule> entry : 
dataSourceRules.entrySet()) {
             String groupName = entry.getKey();
             DatabaseDiscoveryDataSourceRule dataSourceRule = entry.getValue();
+            Map<String, DataSource> originalDataSourceMap = 
dataSourceRule.getDataSourceGroup(dataSourceMap);
             DatabaseDiscoveryEngine engine = new 
DatabaseDiscoveryEngine(dataSourceRule.getDatabaseDiscoveryProviderAlgorithm());
-            Map<String, DataSource> originalDataSourceMap = new 
HashMap<>(dataSourceMap);
             engine.checkEnvironment(databaseName, originalDataSourceMap);
             
dataSourceRule.changePrimaryDataSourceName(engine.changePrimaryDataSource(
                     databaseName, groupName, 
entry.getValue().getPrimaryDataSourceName(), originalDataSourceMap, 
dataSourceRule.getDisabledDataSourceNames()));
@@ -154,7 +153,7 @@ public final class DatabaseDiscoveryRule implements 
DatabaseRule, DataSourceCont
     }
     
     @Override
-    public void restart(final DataSourceStatusChangedEvent event, final 
InstanceContext instanceContext) {
+    public void restartHeartBeatJob(final DataSourceStatusChangedEvent event, 
final InstanceContext instanceContext) {
         PrimaryDataSourceChangedEvent dataSourceEvent = 
(PrimaryDataSourceChangedEvent) event;
         QualifiedDatabase qualifiedDatabase = 
dataSourceEvent.getQualifiedDatabase();
         DatabaseDiscoveryDataSourceRule dataSourceRule = 
dataSourceRules.get(qualifiedDatabase.getGroupName());
@@ -174,7 +173,7 @@ public final class DatabaseDiscoveryRule implements 
DatabaseRule, DataSourceCont
             for (Entry<String, DatabaseDiscoveryDataSourceRule> entry : 
dataSourceRules.entrySet()) {
                 DatabaseDiscoveryDataSourceRule rule = entry.getValue();
                 String jobName = 
rule.getDatabaseDiscoveryProviderAlgorithm().getType() + "-" + databaseName + 
"-" + rule.getGroupName();
-                CronJob job = new CronJob(jobName, each -> new 
HeartbeatJob(databaseName, rule.getGroupName(), 
rule.getPrimaryDataSourceName(), dataSourceMap,
+                CronJob job = new CronJob(jobName, each -> new 
HeartbeatJob(databaseName, rule.getGroupName(), 
rule.getPrimaryDataSourceName(), rule.getDataSourceGroup(dataSourceMap),
                         rule.getDatabaseDiscoveryProviderAlgorithm(), 
rule.getDisabledDataSourceNames()).execute(null), 
rule.getHeartbeatProps().getProperty("keep-alive-cron"));
                 modeScheduleContext.get().startCronJob(job);
             }
@@ -184,12 +183,12 @@ public final class DatabaseDiscoveryRule implements 
DatabaseRule, DataSourceCont
     @Override
     public void updateStatus(final DataSourceStatusChangedEvent event) {
         StorageNodeDataSourceChangedEvent dataSourceChangedEvent = 
(StorageNodeDataSourceChangedEvent) event;
-        for (Entry<String, DatabaseDiscoveryDataSourceRule> entry : 
dataSourceRules.entrySet()) {
-            if 
(StorageNodeStatus.isDisable(dataSourceChangedEvent.getDataSource().getStatus()))
 {
-                
entry.getValue().disableDataSource(dataSourceChangedEvent.getQualifiedDatabase().getDataSourceName());
-            } else {
-                
entry.getValue().enableDataSource(dataSourceChangedEvent.getQualifiedDatabase().getDataSourceName());
-            }
+        DatabaseDiscoveryDataSourceRule dataSourceRule = 
dataSourceRules.get(dataSourceChangedEvent.getQualifiedDatabase().getGroupName());
+        Preconditions.checkState(null != dataSourceRule, "Can 't find database 
discovery data source rule in database `%s`.", databaseName);
+        if 
(StorageNodeStatus.isDisable(dataSourceChangedEvent.getDataSource().getStatus()))
 {
+            
dataSourceRule.disableDataSource(dataSourceChangedEvent.getQualifiedDatabase().getDataSourceName());
+        } else {
+            
dataSourceRule.enableDataSource(dataSourceChangedEvent.getQualifiedDatabase().getDataSourceName());
         }
     }
     
diff --git 
a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/test/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRuleTest.java
 
b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/test/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRuleTest.java
index deb7eb92d18..8761503c605 100644
--- 
a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/test/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRuleTest.java
+++ 
b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/test/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRuleTest.java
@@ -44,11 +44,11 @@ import static org.mockito.Mockito.when;
 
 public final class DatabaseDiscoveryRuleTest {
     
-    private final Map<String, DataSource> dataSourceMap = 
Collections.singletonMap("primary", new MockedDataSource());
+    private final Map<String, DataSource> dataSourceMap = 
Collections.singletonMap("primary_ds", new MockedDataSource());
     
     @Test
     public void assertFindDataSourceRule() {
-        Optional<DatabaseDiscoveryDataSourceRule> actual = 
createRule().findDataSourceRule("test_pr");
+        Optional<DatabaseDiscoveryDataSourceRule> actual = 
createRule().findDataSourceRule("replica_ds");
         assertTrue(actual.isPresent());
         assertDataSourceRule(actual.get());
     }
@@ -59,8 +59,8 @@ public final class DatabaseDiscoveryRuleTest {
     }
     
     private void assertDataSourceRule(final DatabaseDiscoveryDataSourceRule 
actual) {
-        assertThat(actual.getGroupName(), is("test_pr"));
-        assertThat(actual.getDataSourceNames(), is(Arrays.asList("ds_0", 
"ds_1")));
+        assertThat(actual.getGroupName(), is("replica_ds"));
+        assertThat(actual.getDataSourceNames(), is(Arrays.asList("primary_ds", 
"replica_ds_0", "replica_ds_1")));
     }
     
     @Test
@@ -72,19 +72,20 @@ public final class DatabaseDiscoveryRuleTest {
     }
     
     private Map<String, Collection<String>> getDataSourceMapper() {
-        Map<String, Collection<String>> result = new HashMap<>(2, 1);
-        result.put("test_pr", Collections.singletonList("ds_1"));
+        Map<String, Collection<String>> result = new HashMap<>(1, 1);
+        result.put("replica_ds", Collections.singletonList("replica_ds_1"));
         return result;
     }
     
     @Test
     public void assertGetExportedMethods() {
         DatabaseDiscoveryRule databaseDiscoveryRule = createRule();
-        
assertThat(databaseDiscoveryRule.getExportData().get(ExportableConstants.EXPORT_DB_DISCOVERY_PRIMARY_DATA_SOURCES),
 is(Collections.singletonMap("test_pr", "primary")));
+        
assertThat(databaseDiscoveryRule.getExportData().get(ExportableConstants.EXPORT_DB_DISCOVERY_PRIMARY_DATA_SOURCES),
 is(Collections.singletonMap("replica_ds", "primary_ds")));
     }
     
     private DatabaseDiscoveryRule createRule() {
-        DatabaseDiscoveryDataSourceRuleConfiguration config = new 
DatabaseDiscoveryDataSourceRuleConfiguration("test_pr", Arrays.asList("ds_0", 
"ds_1"), "", "CORE.FIXTURE");
+        DatabaseDiscoveryDataSourceRuleConfiguration config =
+                new DatabaseDiscoveryDataSourceRuleConfiguration("replica_ds", 
Arrays.asList("primary_ds", "replica_ds_0", "replica_ds_1"), "", 
"CORE.FIXTURE");
         InstanceContext instanceContext = mock(InstanceContext.class, 
RETURNS_DEEP_STUBS);
         
when(instanceContext.getInstance().getCurrentInstanceId()).thenReturn("foo_id");
         return new DatabaseDiscoveryRule("db_discovery", dataSourceMap, new 
DatabaseDiscoveryRuleConfiguration(
diff --git 
a/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRule.java
 
b/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRule.java
index d6bc225026d..1562788b6e7 100644
--- 
a/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRule.java
+++ 
b/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRule.java
@@ -23,10 +23,11 @@ import lombok.Getter;
 import org.apache.shardingsphere.infra.config.RuleConfiguration;
 import org.apache.shardingsphere.infra.distsql.constant.ExportableConstants;
 import 
org.apache.shardingsphere.infra.distsql.constant.ExportableItemConstants;
+import 
org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDatabase;
 import org.apache.shardingsphere.infra.rule.event.DataSourceStatusChangedEvent;
 import org.apache.shardingsphere.infra.rule.identifier.scope.DatabaseRule;
 import 
org.apache.shardingsphere.infra.rule.identifier.type.DataSourceContainedRule;
-import 
org.apache.shardingsphere.infra.rule.identifier.type.StatusContainedRule;
+import 
org.apache.shardingsphere.infra.rule.identifier.type.StaticStatusContainedRule;
 import 
org.apache.shardingsphere.infra.rule.identifier.type.StorageConnectorReusableRule;
 import 
org.apache.shardingsphere.infra.rule.identifier.type.exportable.ExportableRule;
 import org.apache.shardingsphere.mode.metadata.storage.StorageNodeStatus;
@@ -49,7 +50,7 @@ import java.util.Optional;
 /**
  * Readwrite-splitting rule.
  */
-public final class ReadwriteSplittingRule implements DatabaseRule, 
DataSourceContainedRule, StatusContainedRule, ExportableRule, 
StorageConnectorReusableRule {
+public final class ReadwriteSplittingRule implements DatabaseRule, 
DataSourceContainedRule, StaticStatusContainedRule, ExportableRule, 
StorageConnectorReusableRule {
     
     @Getter
     private final RuleConfiguration configuration;
@@ -116,11 +117,11 @@ public final class ReadwriteSplittingRule implements 
DatabaseRule, DataSourceCon
     
     @Override
     public void updateStatus(final DataSourceStatusChangedEvent event) {
-        for (Entry<String, ReadwriteSplittingDataSourceRule> entry : 
dataSourceRules.entrySet()) {
-            StorageNodeDataSourceChangedEvent dataSourceChangedEvent = 
(StorageNodeDataSourceChangedEvent) event;
-            
entry.getValue().updateDisabledDataSourceNames(dataSourceChangedEvent.getQualifiedDatabase().getDataSourceName(),
-                    
StorageNodeStatus.isDisable(dataSourceChangedEvent.getDataSource().getStatus()));
-        }
+        StorageNodeDataSourceChangedEvent dataSourceEvent = 
(StorageNodeDataSourceChangedEvent) event;
+        QualifiedDatabase qualifiedDatabase = 
dataSourceEvent.getQualifiedDatabase();
+        ReadwriteSplittingDataSourceRule dataSourceRule = 
dataSourceRules.get(qualifiedDatabase.getGroupName());
+        Preconditions.checkState(null != dataSourceRule, "Can 't find 
readwrite-splitting data source rule in database `%s`.", 
qualifiedDatabase.getDatabaseName());
+        
dataSourceRule.updateDisabledDataSourceNames(dataSourceEvent.getQualifiedDatabase().getDataSourceName(),
 StorageNodeStatus.isDisable(dataSourceEvent.getDataSource().getStatus()));
     }
     
     @Override
diff --git 
a/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-core/src/test/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRuleTest.java
 
b/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-core/src/test/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRuleTest.java
index ce4f729fc4d..429e99486cd 100644
--- 
a/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-core/src/test/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRuleTest.java
+++ 
b/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-core/src/test/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRuleTest.java
@@ -48,7 +48,7 @@ public final class ReadwriteSplittingRuleTest {
     
     @Test
     public void assertFindDataSourceRule() {
-        Optional<ReadwriteSplittingDataSourceRule> actual = 
createReadwriteSplittingRule().findDataSourceRule("test_pr");
+        Optional<ReadwriteSplittingDataSourceRule> actual = 
createReadwriteSplittingRule().findDataSourceRule("readwrite");
         assertTrue(actual.isPresent());
         assertDataSourceRule(actual.get());
     }
@@ -60,13 +60,13 @@ public final class ReadwriteSplittingRuleTest {
     
     private ReadwriteSplittingRule createReadwriteSplittingRule() {
         ReadwriteSplittingDataSourceRuleConfiguration config =
-                new ReadwriteSplittingDataSourceRuleConfiguration("test_pr", 
new StaticReadwriteSplittingStrategyConfiguration("write_ds", 
Arrays.asList("read_ds_0", "read_ds_1")), null, "random");
+                new ReadwriteSplittingDataSourceRuleConfiguration("readwrite", 
new StaticReadwriteSplittingStrategyConfiguration("write_ds", 
Arrays.asList("read_ds_0", "read_ds_1")), null, "random");
         return new ReadwriteSplittingRule(new 
ReadwriteSplittingRuleConfiguration(
                 Collections.singleton(config), 
Collections.singletonMap("random", new 
ShardingSphereAlgorithmConfiguration("RANDOM", new Properties()))));
     }
     
     private void assertDataSourceRule(final ReadwriteSplittingDataSourceRule 
actual) {
-        assertThat(actual.getName(), is("test_pr"));
+        assertThat(actual.getName(), is("readwrite"));
         
assertThat(actual.getReadwriteSplittingStrategy().getWriteDataSource(), 
is("write_ds"));
         
assertThat(actual.getReadwriteSplittingStrategy().getReadDataSources(), 
is(Arrays.asList("read_ds_0", "read_ds_1")));
         assertThat(actual.getLoadBalancer().getType(), is("RANDOM"));
@@ -103,7 +103,7 @@ public final class ReadwriteSplittingRuleTest {
     public void assertGetDataSourceMapper() {
         ReadwriteSplittingRule readwriteSplittingRule = 
createReadwriteSplittingRule();
         Map<String, Collection<String>> actual = 
readwriteSplittingRule.getDataSourceMapper();
-        Map<String, Collection<String>> expected = 
Collections.singletonMap("test_pr", Arrays.asList("write_ds", "read_ds_0", 
"read_ds_1"));
+        Map<String, Collection<String>> expected = 
Collections.singletonMap("readwrite", Arrays.asList("write_ds", "read_ds_0", 
"read_ds_1"));
         assertThat(actual, is(expected));
     }
 }
diff --git 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/RestartHeartBeatJobRule.java
 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/DynamicStatusContainedRule.java
similarity index 76%
rename from 
shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/RestartHeartBeatJobRule.java
rename to 
shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/DynamicStatusContainedRule.java
index eb7eba3bc2d..6b9dac74120 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/RestartHeartBeatJobRule.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/DynamicStatusContainedRule.java
@@ -22,14 +22,21 @@ import 
org.apache.shardingsphere.infra.rule.ShardingSphereRule;
 import org.apache.shardingsphere.infra.rule.event.DataSourceStatusChangedEvent;
 
 /**
- * Restart heart beat job rule.
+ * Dynamic status contained rule.
  */
-public interface RestartHeartBeatJobRule extends ShardingSphereRule {
+public interface DynamicStatusContainedRule extends ShardingSphereRule {
+    
+    /**
+     * Update data source status.
+     *
+     * @param event data source status changed event
+     */
+    void updateStatus(DataSourceStatusChangedEvent event);
     
     /**
      * Restart heart beat job.
      * @param event data source status changed event
      * @param instanceContext instance context
      */
-    void restart(DataSourceStatusChangedEvent event, InstanceContext 
instanceContext);
+    void restartHeartBeatJob(DataSourceStatusChangedEvent event, 
InstanceContext instanceContext);
 }
diff --git 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/StatusContainedRule.java
 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/StaticStatusContainedRule.java
similarity index 91%
rename from 
shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/StatusContainedRule.java
rename to 
shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/StaticStatusContainedRule.java
index de538148cf1..37a73dabb70 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/StatusContainedRule.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/StaticStatusContainedRule.java
@@ -21,9 +21,9 @@ import 
org.apache.shardingsphere.infra.rule.ShardingSphereRule;
 import org.apache.shardingsphere.infra.rule.event.DataSourceStatusChangedEvent;
 
 /**
- * Status contained rule.
+ * Static Status contained rule.
  */
-public interface StatusContainedRule extends ShardingSphereRule {
+public interface StaticStatusContainedRule extends ShardingSphereRule {
     
     /**
      * Update data source status.
diff --git 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
index 61c8793c53e..c5b0dc73913 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
@@ -25,8 +25,9 @@ import 
org.apache.shardingsphere.infra.executor.sql.process.model.yaml.BatchYaml
 import 
org.apache.shardingsphere.infra.executor.sql.process.model.yaml.YamlExecuteProcessContext;
 import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
 import 
org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDatabase;
-import 
org.apache.shardingsphere.infra.rule.identifier.type.RestartHeartBeatJobRule;
-import 
org.apache.shardingsphere.infra.rule.identifier.type.StatusContainedRule;
+import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
+import 
org.apache.shardingsphere.infra.rule.identifier.type.StaticStatusContainedRule;
+import 
org.apache.shardingsphere.infra.rule.identifier.type.DynamicStatusContainedRule;
 import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.datasource.DataSourceChangedEvent;
@@ -61,6 +62,7 @@ import java.sql.SQLException;
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.Map;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
 /**
@@ -167,9 +169,16 @@ public final class ClusterContextManagerCoordinator {
     @Subscribe
     public synchronized void renew(final StorageNodeChangedEvent event) {
         QualifiedDatabase qualifiedDatabase = event.getQualifiedDatabase();
-        
contextManager.getMetaDataContexts().getMetaData().getDatabases().get(qualifiedDatabase.getDatabaseName()).getRuleMetaData().getRules()
-                .stream().filter(each -> each instanceof StatusContainedRule)
-                .forEach(each -> ((StatusContainedRule) each).updateStatus(new 
StorageNodeDataSourceChangedEvent(qualifiedDatabase, event.getDataSource())));
+        Optional<ShardingSphereRule> dynamicStatusContainedRule = 
contextManager.getMetaDataContexts().getMetaData().getDatabases().get(qualifiedDatabase.getDatabaseName()).getRuleMetaData()
+                .getRules().stream().filter(each -> each instanceof 
DynamicStatusContainedRule).findFirst();
+        if (dynamicStatusContainedRule.isPresent()) {
+            ((DynamicStatusContainedRule) 
dynamicStatusContainedRule.get()).updateStatus(new 
StorageNodeDataSourceChangedEvent(qualifiedDatabase, event.getDataSource()));
+            return;
+        }
+        Optional<ShardingSphereRule> staticStatusContainedRule = 
contextManager.getMetaDataContexts().getMetaData().getDatabases().get(qualifiedDatabase.getDatabaseName()).getRuleMetaData()
+                .getRules().stream().filter(each -> each instanceof 
StaticStatusContainedRule).findFirst();
+        staticStatusContainedRule.ifPresent(shardingSphereRule -> 
((StaticStatusContainedRule) shardingSphereRule)
+                .updateStatus(new 
StorageNodeDataSourceChangedEvent(qualifiedDatabase, event.getDataSource())));
     }
     
     /**
@@ -182,9 +191,9 @@ public final class ClusterContextManagerCoordinator {
         QualifiedDatabase qualifiedDatabase = event.getQualifiedDatabase();
         
contextManager.getMetaDataContexts().getMetaData().getDatabases().get(qualifiedDatabase.getDatabaseName()).getRuleMetaData().getRules()
                 .stream()
-                .filter(each -> each instanceof RestartHeartBeatJobRule)
-                .forEach(each -> ((RestartHeartBeatJobRule) each)
-                        .restart(new 
PrimaryDataSourceChangedEvent(qualifiedDatabase), 
contextManager.getInstanceContext()));
+                .filter(each -> each instanceof DynamicStatusContainedRule)
+                .forEach(each -> ((DynamicStatusContainedRule) each)
+                        .restartHeartBeatJob(new 
PrimaryDataSourceChangedEvent(qualifiedDatabase), 
contextManager.getInstanceContext()));
     }
     
     /**
@@ -295,13 +304,13 @@ public final class ClusterContextManagerCoordinator {
     
     private void disableDataSources() {
         
contextManager.getMetaDataContexts().getMetaData().getDatabases().forEach((key, 
value) -> value.getRuleMetaData().getRules().forEach(each -> {
-            if (each instanceof StatusContainedRule) {
-                disableDataSources((StatusContainedRule) each);
+            if (each instanceof StaticStatusContainedRule) {
+                disableDataSources((StaticStatusContainedRule) each);
             }
         }));
     }
     
-    private void disableDataSources(final StatusContainedRule rule) {
+    private void disableDataSources(final StaticStatusContainedRule rule) {
         Map<String, StorageNodeDataSource> storageNodes = 
registryCenter.getStorageNodeStatusService().loadStorageNodes();
         Map<String, StorageNodeDataSource> disableDataSources = 
storageNodes.entrySet().stream().filter(entry -> 
StorageNodeStatus.isDisable(entry.getValue().getStatus()))
                 .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
diff --git 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinatorTest.java
 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinatorTest.java
index 9e0b2cc0701..8314b6f4d69 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinatorTest.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinatorTest.java
@@ -43,9 +43,9 @@ import 
org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.
 import 
org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereTable;
 import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser;
 import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
+import 
org.apache.shardingsphere.infra.rule.identifier.type.StaticStatusContainedRule;
 import org.apache.shardingsphere.infra.rule.identifier.type.ResourceHeldRule;
-import 
org.apache.shardingsphere.infra.rule.identifier.type.RestartHeartBeatJobRule;
-import 
org.apache.shardingsphere.infra.rule.identifier.type.StatusContainedRule;
+import 
org.apache.shardingsphere.infra.rule.identifier.type.DynamicStatusContainedRule;
 import org.apache.shardingsphere.infra.state.StateType;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
@@ -223,7 +223,7 @@ public final class ClusterContextManagerCoordinatorTest {
     
     @Test
     public void assertRenewForDisableStateChanged() {
-        StatusContainedRule statusContainedRule = 
mock(StatusContainedRule.class);
+        StaticStatusContainedRule statusContainedRule = 
mock(StaticStatusContainedRule.class);
         
when(database.getRuleMetaData().getRules()).thenReturn(Collections.singletonList(statusContainedRule));
         StorageNodeChangedEvent event = new StorageNodeChangedEvent(new 
QualifiedDatabase("db.readwrite_ds.ds_0"), new 
StorageNodeDataSource(StorageNodeRole.MEMBER, StorageNodeStatus.DISABLED));
         coordinator.renew(event);
@@ -275,7 +275,7 @@ public final class ClusterContextManagerCoordinatorTest {
     @Test
     public void assertRenewPrimaryDataSourceName() {
         Collection<ShardingSphereRule> rules = new LinkedList<>();
-        RestartHeartBeatJobRule mockRestartHeartBeatJobRule = 
mock(RestartHeartBeatJobRule.class);
+        DynamicStatusContainedRule mockRestartHeartBeatJobRule = 
mock(DynamicStatusContainedRule.class);
         rules.add(mockRestartHeartBeatJobRule);
         ShardingSphereRuleMetaData ruleMetaData = new 
ShardingSphereRuleMetaData(rules);
         ShardingSphereDatabase database = mock(ShardingSphereDatabase.class);
@@ -283,7 +283,7 @@ public final class ClusterContextManagerCoordinatorTest {
         
contextManager.getMetaDataContexts().getMetaData().getDatabases().put("db", 
database);
         PrimaryStateChangedEvent mockPrimaryStateChangedEvent = new 
PrimaryStateChangedEvent(new QualifiedDatabase("db.readwrite_ds.test_ds"));
         coordinator.renew(mockPrimaryStateChangedEvent);
-        verify(mockRestartHeartBeatJobRule).restart(any(), any());
+        verify(mockRestartHeartBeatJobRule).restartHeartBeatJob(any(), any());
     }
     
     @Test

Reply via email to