This is an automated email from the ASF dual-hosted git repository. zhangliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push: new d39b987c308 Split status contain rule and support multiple groups of high availability (#19005) d39b987c308 is described below commit d39b987c308d44989238ec3e50a083c83a41363b Author: zhaojinchao <zhaojinc...@apache.org> AuthorDate: Sun Jul 10 23:41:50 2022 +0800 Split status contain rule and support multiple groups of high availability (#19005) * Split readwrite and discovery update status event and support multiple groups of high availability * Fix unit test * Use Entry instead of Map.Entry --- .../rule/DatabaseDiscoveryDataSourceRule.java | 18 +++++++++++++ .../dbdiscovery/rule/DatabaseDiscoveryRule.java | 23 ++++++++-------- .../rule/DatabaseDiscoveryRuleTest.java | 17 ++++++------ .../rule/ReadwriteSplittingRule.java | 15 ++++++----- .../rule/ReadwriteSplittingRuleTest.java | 8 +++--- ...obRule.java => DynamicStatusContainedRule.java} | 13 ++++++--- ...nedRule.java => StaticStatusContainedRule.java} | 4 +-- .../ClusterContextManagerCoordinator.java | 31 ++++++++++++++-------- .../ClusterContextManagerCoordinatorTest.java | 10 +++---- 9 files changed, 87 insertions(+), 52 deletions(-) diff --git a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryDataSourceRule.java b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryDataSourceRule.java index 162dec1ac8b..49b7e8bb82c 100644 --- a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryDataSourceRule.java +++ b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryDataSourceRule.java @@ -23,12 +23,14 @@ import lombok.Getter; import org.apache.shardingsphere.dbdiscovery.api.config.rule.DatabaseDiscoveryDataSourceRuleConfiguration; import org.apache.shardingsphere.dbdiscovery.spi.DatabaseDiscoveryProviderAlgorithm; +import javax.sql.DataSource; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Properties; import java.util.stream.Collectors; @@ -100,6 +102,22 @@ public final class DatabaseDiscoveryDataSourceRule { this.primaryDataSourceName = primaryDataSourceName; } + /** + * Get data source. + * + * @param dataSourceMap data source map + * @return data source + */ + public Map<String, DataSource> getDataSourceGroup(final Map<String, DataSource> dataSourceMap) { + Map<String, DataSource> result = new HashMap<>(dataSourceMap.size(), 1); + for (Entry<String, DataSource> entry : dataSourceMap.entrySet()) { + if (dataSourceNames.contains(entry.getKey())) { + result.put(entry.getKey(), entry.getValue()); + } + } + return result; + } + /** * Get data source mapper. * diff --git a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java index 52b29a6350f..7eae85e961d 100644 --- a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java +++ b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java @@ -37,8 +37,7 @@ import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDatabas import org.apache.shardingsphere.infra.rule.event.DataSourceStatusChangedEvent; import org.apache.shardingsphere.infra.rule.identifier.scope.DatabaseRule; import org.apache.shardingsphere.infra.rule.identifier.type.DataSourceContainedRule; -import org.apache.shardingsphere.infra.rule.identifier.type.RestartHeartBeatJobRule; -import org.apache.shardingsphere.infra.rule.identifier.type.StatusContainedRule; +import org.apache.shardingsphere.infra.rule.identifier.type.DynamicStatusContainedRule; import org.apache.shardingsphere.infra.rule.identifier.type.exportable.ExportableRule; import org.apache.shardingsphere.mode.metadata.storage.StorageNodeStatus; import org.apache.shardingsphere.mode.metadata.storage.event.PrimaryDataSourceChangedEvent; @@ -60,7 +59,7 @@ import java.util.Properties; /** * Database discovery rule. */ -public final class DatabaseDiscoveryRule implements DatabaseRule, DataSourceContainedRule, RestartHeartBeatJobRule, StatusContainedRule, ExportableRule { +public final class DatabaseDiscoveryRule implements DatabaseRule, DataSourceContainedRule, DynamicStatusContainedRule, ExportableRule { @Getter private final RuleConfiguration configuration; @@ -117,8 +116,8 @@ public final class DatabaseDiscoveryRule implements DatabaseRule, DataSourceCont for (Entry<String, DatabaseDiscoveryDataSourceRule> entry : dataSourceRules.entrySet()) { String groupName = entry.getKey(); DatabaseDiscoveryDataSourceRule dataSourceRule = entry.getValue(); + Map<String, DataSource> originalDataSourceMap = dataSourceRule.getDataSourceGroup(dataSourceMap); DatabaseDiscoveryEngine engine = new DatabaseDiscoveryEngine(dataSourceRule.getDatabaseDiscoveryProviderAlgorithm()); - Map<String, DataSource> originalDataSourceMap = new HashMap<>(dataSourceMap); engine.checkEnvironment(databaseName, originalDataSourceMap); dataSourceRule.changePrimaryDataSourceName(engine.changePrimaryDataSource( databaseName, groupName, entry.getValue().getPrimaryDataSourceName(), originalDataSourceMap, dataSourceRule.getDisabledDataSourceNames())); @@ -154,7 +153,7 @@ public final class DatabaseDiscoveryRule implements DatabaseRule, DataSourceCont } @Override - public void restart(final DataSourceStatusChangedEvent event, final InstanceContext instanceContext) { + public void restartHeartBeatJob(final DataSourceStatusChangedEvent event, final InstanceContext instanceContext) { PrimaryDataSourceChangedEvent dataSourceEvent = (PrimaryDataSourceChangedEvent) event; QualifiedDatabase qualifiedDatabase = dataSourceEvent.getQualifiedDatabase(); DatabaseDiscoveryDataSourceRule dataSourceRule = dataSourceRules.get(qualifiedDatabase.getGroupName()); @@ -174,7 +173,7 @@ public final class DatabaseDiscoveryRule implements DatabaseRule, DataSourceCont for (Entry<String, DatabaseDiscoveryDataSourceRule> entry : dataSourceRules.entrySet()) { DatabaseDiscoveryDataSourceRule rule = entry.getValue(); String jobName = rule.getDatabaseDiscoveryProviderAlgorithm().getType() + "-" + databaseName + "-" + rule.getGroupName(); - CronJob job = new CronJob(jobName, each -> new HeartbeatJob(databaseName, rule.getGroupName(), rule.getPrimaryDataSourceName(), dataSourceMap, + CronJob job = new CronJob(jobName, each -> new HeartbeatJob(databaseName, rule.getGroupName(), rule.getPrimaryDataSourceName(), rule.getDataSourceGroup(dataSourceMap), rule.getDatabaseDiscoveryProviderAlgorithm(), rule.getDisabledDataSourceNames()).execute(null), rule.getHeartbeatProps().getProperty("keep-alive-cron")); modeScheduleContext.get().startCronJob(job); } @@ -184,12 +183,12 @@ public final class DatabaseDiscoveryRule implements DatabaseRule, DataSourceCont @Override public void updateStatus(final DataSourceStatusChangedEvent event) { StorageNodeDataSourceChangedEvent dataSourceChangedEvent = (StorageNodeDataSourceChangedEvent) event; - for (Entry<String, DatabaseDiscoveryDataSourceRule> entry : dataSourceRules.entrySet()) { - if (StorageNodeStatus.isDisable(dataSourceChangedEvent.getDataSource().getStatus())) { - entry.getValue().disableDataSource(dataSourceChangedEvent.getQualifiedDatabase().getDataSourceName()); - } else { - entry.getValue().enableDataSource(dataSourceChangedEvent.getQualifiedDatabase().getDataSourceName()); - } + DatabaseDiscoveryDataSourceRule dataSourceRule = dataSourceRules.get(dataSourceChangedEvent.getQualifiedDatabase().getGroupName()); + Preconditions.checkState(null != dataSourceRule, "Can 't find database discovery data source rule in database `%s`.", databaseName); + if (StorageNodeStatus.isDisable(dataSourceChangedEvent.getDataSource().getStatus())) { + dataSourceRule.disableDataSource(dataSourceChangedEvent.getQualifiedDatabase().getDataSourceName()); + } else { + dataSourceRule.enableDataSource(dataSourceChangedEvent.getQualifiedDatabase().getDataSourceName()); } } diff --git a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/test/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRuleTest.java b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/test/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRuleTest.java index deb7eb92d18..8761503c605 100644 --- a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/test/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRuleTest.java +++ b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/test/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRuleTest.java @@ -44,11 +44,11 @@ import static org.mockito.Mockito.when; public final class DatabaseDiscoveryRuleTest { - private final Map<String, DataSource> dataSourceMap = Collections.singletonMap("primary", new MockedDataSource()); + private final Map<String, DataSource> dataSourceMap = Collections.singletonMap("primary_ds", new MockedDataSource()); @Test public void assertFindDataSourceRule() { - Optional<DatabaseDiscoveryDataSourceRule> actual = createRule().findDataSourceRule("test_pr"); + Optional<DatabaseDiscoveryDataSourceRule> actual = createRule().findDataSourceRule("replica_ds"); assertTrue(actual.isPresent()); assertDataSourceRule(actual.get()); } @@ -59,8 +59,8 @@ public final class DatabaseDiscoveryRuleTest { } private void assertDataSourceRule(final DatabaseDiscoveryDataSourceRule actual) { - assertThat(actual.getGroupName(), is("test_pr")); - assertThat(actual.getDataSourceNames(), is(Arrays.asList("ds_0", "ds_1"))); + assertThat(actual.getGroupName(), is("replica_ds")); + assertThat(actual.getDataSourceNames(), is(Arrays.asList("primary_ds", "replica_ds_0", "replica_ds_1"))); } @Test @@ -72,19 +72,20 @@ public final class DatabaseDiscoveryRuleTest { } private Map<String, Collection<String>> getDataSourceMapper() { - Map<String, Collection<String>> result = new HashMap<>(2, 1); - result.put("test_pr", Collections.singletonList("ds_1")); + Map<String, Collection<String>> result = new HashMap<>(1, 1); + result.put("replica_ds", Collections.singletonList("replica_ds_1")); return result; } @Test public void assertGetExportedMethods() { DatabaseDiscoveryRule databaseDiscoveryRule = createRule(); - assertThat(databaseDiscoveryRule.getExportData().get(ExportableConstants.EXPORT_DB_DISCOVERY_PRIMARY_DATA_SOURCES), is(Collections.singletonMap("test_pr", "primary"))); + assertThat(databaseDiscoveryRule.getExportData().get(ExportableConstants.EXPORT_DB_DISCOVERY_PRIMARY_DATA_SOURCES), is(Collections.singletonMap("replica_ds", "primary_ds"))); } private DatabaseDiscoveryRule createRule() { - DatabaseDiscoveryDataSourceRuleConfiguration config = new DatabaseDiscoveryDataSourceRuleConfiguration("test_pr", Arrays.asList("ds_0", "ds_1"), "", "CORE.FIXTURE"); + DatabaseDiscoveryDataSourceRuleConfiguration config = + new DatabaseDiscoveryDataSourceRuleConfiguration("replica_ds", Arrays.asList("primary_ds", "replica_ds_0", "replica_ds_1"), "", "CORE.FIXTURE"); InstanceContext instanceContext = mock(InstanceContext.class, RETURNS_DEEP_STUBS); when(instanceContext.getInstance().getCurrentInstanceId()).thenReturn("foo_id"); return new DatabaseDiscoveryRule("db_discovery", dataSourceMap, new DatabaseDiscoveryRuleConfiguration( diff --git a/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRule.java b/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRule.java index d6bc225026d..1562788b6e7 100644 --- a/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRule.java +++ b/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRule.java @@ -23,10 +23,11 @@ import lombok.Getter; import org.apache.shardingsphere.infra.config.RuleConfiguration; import org.apache.shardingsphere.infra.distsql.constant.ExportableConstants; import org.apache.shardingsphere.infra.distsql.constant.ExportableItemConstants; +import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDatabase; import org.apache.shardingsphere.infra.rule.event.DataSourceStatusChangedEvent; import org.apache.shardingsphere.infra.rule.identifier.scope.DatabaseRule; import org.apache.shardingsphere.infra.rule.identifier.type.DataSourceContainedRule; -import org.apache.shardingsphere.infra.rule.identifier.type.StatusContainedRule; +import org.apache.shardingsphere.infra.rule.identifier.type.StaticStatusContainedRule; import org.apache.shardingsphere.infra.rule.identifier.type.StorageConnectorReusableRule; import org.apache.shardingsphere.infra.rule.identifier.type.exportable.ExportableRule; import org.apache.shardingsphere.mode.metadata.storage.StorageNodeStatus; @@ -49,7 +50,7 @@ import java.util.Optional; /** * Readwrite-splitting rule. */ -public final class ReadwriteSplittingRule implements DatabaseRule, DataSourceContainedRule, StatusContainedRule, ExportableRule, StorageConnectorReusableRule { +public final class ReadwriteSplittingRule implements DatabaseRule, DataSourceContainedRule, StaticStatusContainedRule, ExportableRule, StorageConnectorReusableRule { @Getter private final RuleConfiguration configuration; @@ -116,11 +117,11 @@ public final class ReadwriteSplittingRule implements DatabaseRule, DataSourceCon @Override public void updateStatus(final DataSourceStatusChangedEvent event) { - for (Entry<String, ReadwriteSplittingDataSourceRule> entry : dataSourceRules.entrySet()) { - StorageNodeDataSourceChangedEvent dataSourceChangedEvent = (StorageNodeDataSourceChangedEvent) event; - entry.getValue().updateDisabledDataSourceNames(dataSourceChangedEvent.getQualifiedDatabase().getDataSourceName(), - StorageNodeStatus.isDisable(dataSourceChangedEvent.getDataSource().getStatus())); - } + StorageNodeDataSourceChangedEvent dataSourceEvent = (StorageNodeDataSourceChangedEvent) event; + QualifiedDatabase qualifiedDatabase = dataSourceEvent.getQualifiedDatabase(); + ReadwriteSplittingDataSourceRule dataSourceRule = dataSourceRules.get(qualifiedDatabase.getGroupName()); + Preconditions.checkState(null != dataSourceRule, "Can 't find readwrite-splitting data source rule in database `%s`.", qualifiedDatabase.getDatabaseName()); + dataSourceRule.updateDisabledDataSourceNames(dataSourceEvent.getQualifiedDatabase().getDataSourceName(), StorageNodeStatus.isDisable(dataSourceEvent.getDataSource().getStatus())); } @Override diff --git a/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-core/src/test/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRuleTest.java b/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-core/src/test/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRuleTest.java index ce4f729fc4d..429e99486cd 100644 --- a/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-core/src/test/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRuleTest.java +++ b/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-core/src/test/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRuleTest.java @@ -48,7 +48,7 @@ public final class ReadwriteSplittingRuleTest { @Test public void assertFindDataSourceRule() { - Optional<ReadwriteSplittingDataSourceRule> actual = createReadwriteSplittingRule().findDataSourceRule("test_pr"); + Optional<ReadwriteSplittingDataSourceRule> actual = createReadwriteSplittingRule().findDataSourceRule("readwrite"); assertTrue(actual.isPresent()); assertDataSourceRule(actual.get()); } @@ -60,13 +60,13 @@ public final class ReadwriteSplittingRuleTest { private ReadwriteSplittingRule createReadwriteSplittingRule() { ReadwriteSplittingDataSourceRuleConfiguration config = - new ReadwriteSplittingDataSourceRuleConfiguration("test_pr", new StaticReadwriteSplittingStrategyConfiguration("write_ds", Arrays.asList("read_ds_0", "read_ds_1")), null, "random"); + new ReadwriteSplittingDataSourceRuleConfiguration("readwrite", new StaticReadwriteSplittingStrategyConfiguration("write_ds", Arrays.asList("read_ds_0", "read_ds_1")), null, "random"); return new ReadwriteSplittingRule(new ReadwriteSplittingRuleConfiguration( Collections.singleton(config), Collections.singletonMap("random", new ShardingSphereAlgorithmConfiguration("RANDOM", new Properties())))); } private void assertDataSourceRule(final ReadwriteSplittingDataSourceRule actual) { - assertThat(actual.getName(), is("test_pr")); + assertThat(actual.getName(), is("readwrite")); assertThat(actual.getReadwriteSplittingStrategy().getWriteDataSource(), is("write_ds")); assertThat(actual.getReadwriteSplittingStrategy().getReadDataSources(), is(Arrays.asList("read_ds_0", "read_ds_1"))); assertThat(actual.getLoadBalancer().getType(), is("RANDOM")); @@ -103,7 +103,7 @@ public final class ReadwriteSplittingRuleTest { public void assertGetDataSourceMapper() { ReadwriteSplittingRule readwriteSplittingRule = createReadwriteSplittingRule(); Map<String, Collection<String>> actual = readwriteSplittingRule.getDataSourceMapper(); - Map<String, Collection<String>> expected = Collections.singletonMap("test_pr", Arrays.asList("write_ds", "read_ds_0", "read_ds_1")); + Map<String, Collection<String>> expected = Collections.singletonMap("readwrite", Arrays.asList("write_ds", "read_ds_0", "read_ds_1")); assertThat(actual, is(expected)); } } diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/RestartHeartBeatJobRule.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/DynamicStatusContainedRule.java similarity index 76% rename from shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/RestartHeartBeatJobRule.java rename to shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/DynamicStatusContainedRule.java index eb7eba3bc2d..6b9dac74120 100644 --- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/RestartHeartBeatJobRule.java +++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/DynamicStatusContainedRule.java @@ -22,14 +22,21 @@ import org.apache.shardingsphere.infra.rule.ShardingSphereRule; import org.apache.shardingsphere.infra.rule.event.DataSourceStatusChangedEvent; /** - * Restart heart beat job rule. + * Dynamic status contained rule. */ -public interface RestartHeartBeatJobRule extends ShardingSphereRule { +public interface DynamicStatusContainedRule extends ShardingSphereRule { + + /** + * Update data source status. + * + * @param event data source status changed event + */ + void updateStatus(DataSourceStatusChangedEvent event); /** * Restart heart beat job. * @param event data source status changed event * @param instanceContext instance context */ - void restart(DataSourceStatusChangedEvent event, InstanceContext instanceContext); + void restartHeartBeatJob(DataSourceStatusChangedEvent event, InstanceContext instanceContext); } diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/StatusContainedRule.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/StaticStatusContainedRule.java similarity index 91% rename from shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/StatusContainedRule.java rename to shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/StaticStatusContainedRule.java index de538148cf1..37a73dabb70 100644 --- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/StatusContainedRule.java +++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/StaticStatusContainedRule.java @@ -21,9 +21,9 @@ import org.apache.shardingsphere.infra.rule.ShardingSphereRule; import org.apache.shardingsphere.infra.rule.event.DataSourceStatusChangedEvent; /** - * Status contained rule. + * Static Status contained rule. */ -public interface StatusContainedRule extends ShardingSphereRule { +public interface StaticStatusContainedRule extends ShardingSphereRule { /** * Update data source status. diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java index 61c8793c53e..c5b0dc73913 100644 --- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java +++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java @@ -25,8 +25,9 @@ import org.apache.shardingsphere.infra.executor.sql.process.model.yaml.BatchYaml import org.apache.shardingsphere.infra.executor.sql.process.model.yaml.YamlExecuteProcessContext; import org.apache.shardingsphere.infra.instance.ComputeNodeInstance; import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDatabase; -import org.apache.shardingsphere.infra.rule.identifier.type.RestartHeartBeatJobRule; -import org.apache.shardingsphere.infra.rule.identifier.type.StatusContainedRule; +import org.apache.shardingsphere.infra.rule.ShardingSphereRule; +import org.apache.shardingsphere.infra.rule.identifier.type.StaticStatusContainedRule; +import org.apache.shardingsphere.infra.rule.identifier.type.DynamicStatusContainedRule; import org.apache.shardingsphere.infra.yaml.engine.YamlEngine; import org.apache.shardingsphere.mode.manager.ContextManager; import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.datasource.DataSourceChangedEvent; @@ -61,6 +62,7 @@ import java.sql.SQLException; import java.util.Collection; import java.util.LinkedList; import java.util.Map; +import java.util.Optional; import java.util.stream.Collectors; /** @@ -167,9 +169,16 @@ public final class ClusterContextManagerCoordinator { @Subscribe public synchronized void renew(final StorageNodeChangedEvent event) { QualifiedDatabase qualifiedDatabase = event.getQualifiedDatabase(); - contextManager.getMetaDataContexts().getMetaData().getDatabases().get(qualifiedDatabase.getDatabaseName()).getRuleMetaData().getRules() - .stream().filter(each -> each instanceof StatusContainedRule) - .forEach(each -> ((StatusContainedRule) each).updateStatus(new StorageNodeDataSourceChangedEvent(qualifiedDatabase, event.getDataSource()))); + Optional<ShardingSphereRule> dynamicStatusContainedRule = contextManager.getMetaDataContexts().getMetaData().getDatabases().get(qualifiedDatabase.getDatabaseName()).getRuleMetaData() + .getRules().stream().filter(each -> each instanceof DynamicStatusContainedRule).findFirst(); + if (dynamicStatusContainedRule.isPresent()) { + ((DynamicStatusContainedRule) dynamicStatusContainedRule.get()).updateStatus(new StorageNodeDataSourceChangedEvent(qualifiedDatabase, event.getDataSource())); + return; + } + Optional<ShardingSphereRule> staticStatusContainedRule = contextManager.getMetaDataContexts().getMetaData().getDatabases().get(qualifiedDatabase.getDatabaseName()).getRuleMetaData() + .getRules().stream().filter(each -> each instanceof StaticStatusContainedRule).findFirst(); + staticStatusContainedRule.ifPresent(shardingSphereRule -> ((StaticStatusContainedRule) shardingSphereRule) + .updateStatus(new StorageNodeDataSourceChangedEvent(qualifiedDatabase, event.getDataSource()))); } /** @@ -182,9 +191,9 @@ public final class ClusterContextManagerCoordinator { QualifiedDatabase qualifiedDatabase = event.getQualifiedDatabase(); contextManager.getMetaDataContexts().getMetaData().getDatabases().get(qualifiedDatabase.getDatabaseName()).getRuleMetaData().getRules() .stream() - .filter(each -> each instanceof RestartHeartBeatJobRule) - .forEach(each -> ((RestartHeartBeatJobRule) each) - .restart(new PrimaryDataSourceChangedEvent(qualifiedDatabase), contextManager.getInstanceContext())); + .filter(each -> each instanceof DynamicStatusContainedRule) + .forEach(each -> ((DynamicStatusContainedRule) each) + .restartHeartBeatJob(new PrimaryDataSourceChangedEvent(qualifiedDatabase), contextManager.getInstanceContext())); } /** @@ -295,13 +304,13 @@ public final class ClusterContextManagerCoordinator { private void disableDataSources() { contextManager.getMetaDataContexts().getMetaData().getDatabases().forEach((key, value) -> value.getRuleMetaData().getRules().forEach(each -> { - if (each instanceof StatusContainedRule) { - disableDataSources((StatusContainedRule) each); + if (each instanceof StaticStatusContainedRule) { + disableDataSources((StaticStatusContainedRule) each); } })); } - private void disableDataSources(final StatusContainedRule rule) { + private void disableDataSources(final StaticStatusContainedRule rule) { Map<String, StorageNodeDataSource> storageNodes = registryCenter.getStorageNodeStatusService().loadStorageNodes(); Map<String, StorageNodeDataSource> disableDataSources = storageNodes.entrySet().stream().filter(entry -> StorageNodeStatus.isDisable(entry.getValue().getStatus())) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinatorTest.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinatorTest.java index 9e0b2cc0701..8314b6f4d69 100644 --- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinatorTest.java +++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinatorTest.java @@ -43,9 +43,9 @@ import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model. import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereTable; import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser; import org.apache.shardingsphere.infra.rule.ShardingSphereRule; +import org.apache.shardingsphere.infra.rule.identifier.type.StaticStatusContainedRule; import org.apache.shardingsphere.infra.rule.identifier.type.ResourceHeldRule; -import org.apache.shardingsphere.infra.rule.identifier.type.RestartHeartBeatJobRule; -import org.apache.shardingsphere.infra.rule.identifier.type.StatusContainedRule; +import org.apache.shardingsphere.infra.rule.identifier.type.DynamicStatusContainedRule; import org.apache.shardingsphere.infra.state.StateType; import org.apache.shardingsphere.mode.manager.ContextManager; import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter; @@ -223,7 +223,7 @@ public final class ClusterContextManagerCoordinatorTest { @Test public void assertRenewForDisableStateChanged() { - StatusContainedRule statusContainedRule = mock(StatusContainedRule.class); + StaticStatusContainedRule statusContainedRule = mock(StaticStatusContainedRule.class); when(database.getRuleMetaData().getRules()).thenReturn(Collections.singletonList(statusContainedRule)); StorageNodeChangedEvent event = new StorageNodeChangedEvent(new QualifiedDatabase("db.readwrite_ds.ds_0"), new StorageNodeDataSource(StorageNodeRole.MEMBER, StorageNodeStatus.DISABLED)); coordinator.renew(event); @@ -275,7 +275,7 @@ public final class ClusterContextManagerCoordinatorTest { @Test public void assertRenewPrimaryDataSourceName() { Collection<ShardingSphereRule> rules = new LinkedList<>(); - RestartHeartBeatJobRule mockRestartHeartBeatJobRule = mock(RestartHeartBeatJobRule.class); + DynamicStatusContainedRule mockRestartHeartBeatJobRule = mock(DynamicStatusContainedRule.class); rules.add(mockRestartHeartBeatJobRule); ShardingSphereRuleMetaData ruleMetaData = new ShardingSphereRuleMetaData(rules); ShardingSphereDatabase database = mock(ShardingSphereDatabase.class); @@ -283,7 +283,7 @@ public final class ClusterContextManagerCoordinatorTest { contextManager.getMetaDataContexts().getMetaData().getDatabases().put("db", database); PrimaryStateChangedEvent mockPrimaryStateChangedEvent = new PrimaryStateChangedEvent(new QualifiedDatabase("db.readwrite_ds.test_ds")); coordinator.renew(mockPrimaryStateChangedEvent); - verify(mockRestartHeartBeatJobRule).restart(any(), any()); + verify(mockRestartHeartBeatJobRule).restartHeartBeatJob(any(), any()); } @Test