This is an automated email from the ASF dual-hosted git repository.
menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 8fde3fb #7318, add data source disable and enable state (#8921)
8fde3fb is described below
commit 8fde3fb95783576198cc62f73a160cb281746178
Author: Zhang Yonglun <[email protected]>
AuthorDate: Wed Jan 6 22:17:17 2021 +0800
#7318, add data source disable and enable state (#8921)
* #7318, add data source disable and enable state
* #7318, add data source disable and enable state
---
.../org/apache/shardingsphere/ha/spi/HAType.java | 20 +++-
.../shardingsphere/ha/rule/HADataSourceRule.java | 5 +-
.../org/apache/shardingsphere/ha/rule/HARule.java | 14 ++-
.../ha/fixture/TestHATypeFixture.java | 9 +-
.../apache/shardingsphere/ha/mgr/MGRHAType.java | 111 +++++++++++++++++++--
.../shardingsphere/ha/mgr/MGRPeriodicalJob.java | 16 ++-
.../ha/route/fixture/TestRouteHATypeFixture.java | 10 +-
.../governance/core/registry/RegistryCenter.java | 13 +++
.../rule/event/impl/DataSourceDisabledEvent.java | 31 ++----
9 files changed, 182 insertions(+), 47 deletions(-)
diff --git
a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-api/src/main/java/org/apache/shardingsphere/ha/spi/HAType.java
b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-api/src/main/java/org/apache/shardingsphere/ha/spi/HAType.java
index a5fdca0..c5f6c61 100644
---
a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-api/src/main/java/org/apache/shardingsphere/ha/spi/HAType.java
+++
b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-api/src/main/java/org/apache/shardingsphere/ha/spi/HAType.java
@@ -21,6 +21,7 @@ import org.apache.shardingsphere.infra.spi.typed.TypedSPI;
import javax.sql.DataSource;
import java.sql.SQLException;
+import java.util.Collection;
import java.util.Map;
/**
@@ -40,18 +41,29 @@ public interface HAType extends TypedSPI {
/**
* Update primary data source.
*
- * @param dataSourceMap data source map
+ * @param originalDataSourceMap original data source map
* @param schemaName schema name
+ * @param disabledDataSourceNames disabled data source names
*/
- void updatePrimaryDataSource(Map<String, DataSource> dataSourceMap, String
schemaName);
+ void updatePrimaryDataSource(Map<String, DataSource>
originalDataSourceMap, String schemaName, Collection<String>
disabledDataSourceNames);
+
+ /**
+ * Update member state.
+ *
+ * @param originalDataSourceMap original data source map
+ * @param schemaName schema name
+ * @param disabledDataSourceNames disabled data source names
+ */
+ void updateMemberState(Map<String, DataSource> originalDataSourceMap,
String schemaName, Collection<String> disabledDataSourceNames);
/**
* Start periodical update.
*
- * @param dataSourceMap data source map
+ * @param originalDataSourceMap original data source map
* @param schemaName schema name
+ * @param disabledDataSourceNames disabled data source names
*/
- void startPeriodicalUpdate(Map<String, DataSource> dataSourceMap, String
schemaName);
+ void startPeriodicalUpdate(Map<String, DataSource> originalDataSourceMap,
String schemaName, Collection<String> disabledDataSourceNames);
/**
* Stop periodical update.
diff --git
a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/main/java/org/apache/shardingsphere/ha/rule/HADataSourceRule.java
b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/main/java/org/apache/shardingsphere/ha/rule/HADataSourceRule.java
index 5075a56..2796324 100644
---
a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/main/java/org/apache/shardingsphere/ha/rule/HADataSourceRule.java
+++
b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/main/java/org/apache/shardingsphere/ha/rule/HADataSourceRule.java
@@ -19,7 +19,6 @@ package org.apache.shardingsphere.ha.rule;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
-import lombok.AccessLevel;
import lombok.Getter;
import
org.apache.shardingsphere.ha.api.config.rule.HADataSourceRuleConfiguration;
import org.apache.shardingsphere.ha.spi.ReplicaLoadBalanceAlgorithm;
@@ -46,7 +45,6 @@ public final class HADataSourceRule {
private final boolean replicaQuery;
- @Getter(AccessLevel.NONE)
private final Collection<String> disabledDataSourceNames = new HashSet<>();
public HADataSourceRule(final HADataSourceRuleConfiguration config, final
ReplicaLoadBalanceAlgorithm loadBalancer) {
@@ -92,8 +90,7 @@ public final class HADataSourceRule {
*/
public Map<String, Collection<String>> getDataSourceMapper() {
Map<String, Collection<String>> result = new HashMap<>(1, 1);
- Collection<String> actualDataSourceNames = new LinkedList<>();
- actualDataSourceNames.addAll(dataSourceNames);
+ Collection<String> actualDataSourceNames = new
LinkedList<>(dataSourceNames);
result.put(name, actualDataSourceNames);
return result;
}
diff --git
a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/main/java/org/apache/shardingsphere/ha/rule/HARule.java
b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/main/java/org/apache/shardingsphere/ha/rule/HARule.java
index c5c21d7..af46490 100644
---
a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/main/java/org/apache/shardingsphere/ha/rule/HARule.java
+++
b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/main/java/org/apache/shardingsphere/ha/rule/HARule.java
@@ -73,15 +73,18 @@ public final class HARule implements
DataSourceContainedRule, StatusContainedRul
?
TypedSPIRegistry.getRegisteredService(ReplicaLoadBalanceAlgorithm.class) :
loadBalancers.get(each.getLoadBalancerName());
dataSourceRules.put(each.getName(), new HADataSourceRule(each,
loadBalanceAlgorithm));
}
+ Map<String, DataSource> originalDataSourceMap = new
HashMap<>(dataSourceMap);
+ Collection<String> disabledDataSourceNames =
dataSourceRules.values().iterator().next().getDisabledDataSourceNames();
if (null == haType) {
haType = TypedSPIRegistry.getRegisteredService(HAType.class,
config.getHaConfiguration().getType(), config.getHaConfiguration().getProps());
- haType.updatePrimaryDataSource(dataSourceMap, schemaName);
+ haType.updatePrimaryDataSource(originalDataSourceMap, schemaName,
disabledDataSourceNames);
+ haType.updateMemberState(originalDataSourceMap, schemaName,
disabledDataSourceNames);
} else {
haType.stopPeriodicalUpdate();
}
try {
haType.checkHAConfig(dataSourceMap, schemaName);
- haType.startPeriodicalUpdate(dataSourceMap, schemaName);
+ haType.startPeriodicalUpdate(originalDataSourceMap, schemaName,
disabledDataSourceNames);
} catch (final SQLException ex) {
throw new ShardingSphereException(ex);
}
@@ -99,15 +102,18 @@ public final class HARule implements
DataSourceContainedRule, StatusContainedRul
?
TypedSPIRegistry.getRegisteredService(ReplicaLoadBalanceAlgorithm.class) :
loadBalancers.get(each.getLoadBalancerName());
dataSourceRules.put(each.getName(), new HADataSourceRule(each,
loadBalanceAlgorithm));
}
+ Map<String, DataSource> originalDataSourceMap = new
HashMap<>(dataSourceMap);
+ Collection<String> disabledDataSourceNames =
dataSourceRules.values().iterator().next().getDisabledDataSourceNames();
if (null == haType) {
haType = TypedSPIRegistry.getRegisteredService(HAType.class,
config.getHaType().getType(), config.getHaType().getProps());
- haType.updatePrimaryDataSource(dataSourceMap, schemaName);
+ haType.updatePrimaryDataSource(originalDataSourceMap, schemaName,
disabledDataSourceNames);
+ haType.updateMemberState(originalDataSourceMap, schemaName,
disabledDataSourceNames);
} else {
haType.stopPeriodicalUpdate();
}
try {
haType.checkHAConfig(dataSourceMap, schemaName);
- haType.startPeriodicalUpdate(dataSourceMap, schemaName);
+ haType.startPeriodicalUpdate(originalDataSourceMap, schemaName,
disabledDataSourceNames);
} catch (final SQLException ex) {
throw new ShardingSphereException(ex);
}
diff --git
a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/test/java/org/apache/shardingsphere/ha/fixture/TestHATypeFixture.java
b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/test/java/org/apache/shardingsphere/ha/fixture/TestHATypeFixture.java
index 1bd200a..1641fe8 100644
---
a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/test/java/org/apache/shardingsphere/ha/fixture/TestHATypeFixture.java
+++
b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-common/src/test/java/org/apache/shardingsphere/ha/fixture/TestHATypeFixture.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.ha.fixture;
import org.apache.shardingsphere.ha.spi.HAType;
import javax.sql.DataSource;
+import java.util.Collection;
import java.util.Map;
/**
@@ -32,11 +33,15 @@ public final class TestHATypeFixture implements HAType {
}
@Override
- public void updatePrimaryDataSource(final Map<String, DataSource>
dataSourceMap, final String schemaName) {
+ public void updatePrimaryDataSource(final Map<String, DataSource>
activeDataSourceMap, final String schemaName, final Collection<String>
disabledDataSourceNames) {
}
@Override
- public void startPeriodicalUpdate(final Map<String, DataSource>
dataSourceMap, final String schemaName) {
+ public void updateMemberState(final Map<String, DataSource>
originalDataSourceMap, final String schemaName, final Collection<String>
disabledDataSourceNames) {
+ }
+
+ @Override
+ public void startPeriodicalUpdate(final Map<String, DataSource>
originalDataSourceMap, final String schemaName, final Collection<String>
disabledDataSourceNames) {
}
@Override
diff --git
a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-provider/shardingsphere-ha-mgr/src/main/java/org/apache/shardingsphere/ha/mgr/MGRHAType.java
b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-provider/shardingsphere-ha-mgr/src/main/java/org/apache/shardingsphere/ha/mgr/MGRHAType.java
index 6065d2e..b1a2aeb 100644
---
a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-provider/shardingsphere-ha-mgr/src/main/java/org/apache/shardingsphere/ha/mgr/MGRHAType.java
+++
b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-provider/shardingsphere-ha-mgr/src/main/java/org/apache/shardingsphere/ha/mgr/MGRHAType.java
@@ -27,12 +27,18 @@ import
org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration
import
org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;
import org.apache.shardingsphere.ha.spi.HAType;
import
org.apache.shardingsphere.infra.config.exception.ShardingSphereConfigurationException;
+import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
+import org.apache.shardingsphere.infra.rule.event.impl.DataSourceDisabledEvent;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
@@ -51,6 +57,8 @@ public final class MGRHAType implements HAType {
private static final String SINGLE_PRIMARY = "SELECT * FROM
performance_schema.global_variables WHERE
VARIABLE_NAME='group_replication_single_primary_mode'";
+ private static final String MEMBER_LIST = "SELECT MEMBER_HOST,
MEMBER_PORT, MEMBER_STATE FROM performance_schema.replication_group_members";
+
private static CoordinatorRegistryCenter coordinatorRegistryCenter;
private ScheduleJobBootstrap scheduleJobBootstrap;
@@ -115,11 +123,19 @@ public final class MGRHAType implements HAType {
}
@Override
- public void updatePrimaryDataSource(final Map<String, DataSource>
dataSourceMap, final String schemaName) {
- String newPrimaryDataSource =
determinePrimaryDataSource(dataSourceMap);
+ public void updatePrimaryDataSource(final Map<String, DataSource>
originalDataSourceMap, final String schemaName, final Collection<String>
disabledDataSourceNames) {
+ Map<String, DataSource> activeDataSourceMap = new
HashMap<>(originalDataSourceMap);
+ if (!disabledDataSourceNames.isEmpty()) {
+ activeDataSourceMap.entrySet().removeIf(each ->
disabledDataSourceNames.contains(each.getKey()));
+ }
+ String newPrimaryDataSource =
determinePrimaryDataSource(activeDataSourceMap);
if (newPrimaryDataSource.isEmpty()) {
return;
}
+ // TODO post primary datasource event
+// if (!newPrimaryDataSource.equals(oldPrimaryDataSource)) {
+// ShardingSphereEventBus.getInstance().post(new
PrimaryDataSourceUpdateEvent(schemaName, newPrimaryDataSource,
newPrimaryDataSource));
+// }
oldPrimaryDataSource = newPrimaryDataSource;
}
@@ -149,8 +165,7 @@ public final class MGRHAType implements HAType {
private String findPrimaryDataSourceName(final String
primaryDataSourceURL, final Map<String, DataSource> dataSourceMap) {
String result = "";
for (Entry<String, DataSource> entry : dataSourceMap.entrySet()) {
- DataSource dataSource = entry.getValue();
- try (Connection connection = dataSource.getConnection()) {
+ try (Connection connection = entry.getValue().getConnection()) {
if
(connection.getMetaData().getURL().contains(primaryDataSourceURL)) {
return entry.getKey();
}
@@ -162,13 +177,97 @@ public final class MGRHAType implements HAType {
}
@Override
- public void startPeriodicalUpdate(final Map<String, DataSource>
dataSourceMap, final String schemaName) {
+ public void updateMemberState(final Map<String, DataSource>
originalDataSourceMap, final String schemaName, final Collection<String>
disabledDataSourceNames) {
+ Map<String, DataSource> activeDataSourceMap = new
HashMap<>(originalDataSourceMap);
+ if (!disabledDataSourceNames.isEmpty()) {
+ activeDataSourceMap.entrySet().removeIf(each ->
disabledDataSourceNames.contains(each.getKey()));
+ }
+ List<String> memberDataSourceURLs =
findMemberDataSourceURLs(activeDataSourceMap);
+ if (memberDataSourceURLs.isEmpty()) {
+ return;
+ }
+ Map<String, String> dataSourceURLs = new HashMap<>(16, 1);
+ determineDisabledDataSource(schemaName, activeDataSourceMap,
memberDataSourceURLs, dataSourceURLs);
+ determineEnabledDataSource(originalDataSourceMap, schemaName,
memberDataSourceURLs, dataSourceURLs);
+ }
+
+ private List<String> findMemberDataSourceURLs(final Map<String,
DataSource> activeDataSourceMap) {
+ List<String> result = new LinkedList<>();
+ try (Connection connection =
activeDataSourceMap.get(oldPrimaryDataSource).getConnection();
+ Statement statement = connection.createStatement()) {
+ ResultSet resultSet = statement.executeQuery(MEMBER_LIST);
+ while (resultSet.next()) {
+ if (!"ONLINE".equals(resultSet.getString("MEMBER_STATE"))) {
+ continue;
+ }
+ result.add(String.format("%s:%s",
resultSet.getString("MEMBER_HOST"), resultSet.getString("MEMBER_PORT")));
+ }
+ } catch (final SQLException ex) {
+ log.error("An exception occurred while find member data source
urls", ex);
+ }
+ return result;
+ }
+
+ private void determineDisabledDataSource(final String schemaName, final
Map<String, DataSource> activeDataSourceMap,
+ final List<String>
memberDataSourceURLs, final Map<String, String> dataSourceURLs) {
+ for (Entry<String, DataSource> entry : activeDataSourceMap.entrySet())
{
+ boolean disable = true;
+ String url = "";
+ try (Connection connection = entry.getValue().getConnection()) {
+ url = connection.getMetaData().getURL();
+ for (String each : memberDataSourceURLs) {
+ if (url.contains(each)) {
+ disable = false;
+ break;
+ }
+ }
+ } catch (final SQLException ex) {
+ log.error("An exception occurred while find data source urls",
ex);
+ }
+ if (disable) {
+ ShardingSphereEventBus.getInstance().post(new
DataSourceDisabledEvent(schemaName, entry.getKey(), true));
+ } else if (!"".equals(url)) {
+ dataSourceURLs.put(entry.getKey(), url);
+ }
+ }
+ }
+
+ private void determineEnabledDataSource(final Map<String, DataSource>
originalDataSourceMap, final String schemaName,
+ final List<String>
memberDataSourceURLs, final Map<String, String> dataSourceURLs) {
+ for (String each : memberDataSourceURLs) {
+ boolean enable = true;
+ for (Entry<String, String> entry : dataSourceURLs.entrySet()) {
+ if (entry.getValue().contains(each)) {
+ enable = false;
+ break;
+ }
+ }
+ if (!enable) {
+ continue;
+ }
+ for (Entry<String, DataSource> entry :
originalDataSourceMap.entrySet()) {
+ String url;
+ try (Connection connection = entry.getValue().getConnection())
{
+ url = connection.getMetaData().getURL();
+ if (null != url && url.contains(each)) {
+ ShardingSphereEventBus.getInstance().post(new
DataSourceDisabledEvent(schemaName, entry.getKey(), false));
+ break;
+ }
+ } catch (final SQLException ex) {
+ log.error("An exception occurred while find enable data
source urls", ex);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void startPeriodicalUpdate(final Map<String, DataSource>
originalDataSourceMap, final String schemaName, final Collection<String>
disabledDataSourceNames) {
if (null == coordinatorRegistryCenter) {
ZookeeperConfiguration zkConfig = new
ZookeeperConfiguration(props.getProperty("zkServerLists"), "mgr-elasticjob");
coordinatorRegistryCenter = new ZookeeperRegistryCenter(zkConfig);
coordinatorRegistryCenter.init();
}
- scheduleJobBootstrap = new
ScheduleJobBootstrap(coordinatorRegistryCenter, new MGRPeriodicalJob(this,
dataSourceMap, schemaName),
+ scheduleJobBootstrap = new
ScheduleJobBootstrap(coordinatorRegistryCenter, new MGRPeriodicalJob(this,
originalDataSourceMap, schemaName, disabledDataSourceNames),
JobConfiguration.newBuilder("MGRPeriodicalJob",
1).cron(props.getProperty("keepAliveCron")).build());
scheduleJobBootstrap.schedule();
}
diff --git
a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-provider/shardingsphere-ha-mgr/src/main/java/org/apache/shardingsphere/ha/mgr/MGRPeriodicalJob.java
b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-provider/shardingsphere-ha-mgr/src/main/java/org/apache/shardingsphere/ha/mgr/MGRPeriodicalJob.java
index 4318191..f3cee18 100644
---
a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-provider/shardingsphere-ha-mgr/src/main/java/org/apache/shardingsphere/ha/mgr/MGRPeriodicalJob.java
+++
b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-provider/shardingsphere-ha-mgr/src/main/java/org/apache/shardingsphere/ha/mgr/MGRPeriodicalJob.java
@@ -24,6 +24,8 @@ import
org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import org.apache.shardingsphere.ha.spi.HAType;
import javax.sql.DataSource;
+import java.util.Collection;
+import java.util.HashMap;
import java.util.Map;
@RequiredArgsConstructor
@@ -32,14 +34,20 @@ public final class MGRPeriodicalJob implements SimpleJob {
private final HAType haType;
- private final Map<String, DataSource> dataSourceMap;
+ private final Map<String, DataSource> originalDataSourceMap;
private final String schemaName;
+ private final Collection<String> disabledDataSourceNames;
+
@Override
public void execute(final ShardingContext shardingContext) {
- log.info("---------------MGRPeriodicalJob--------------");
- log.info("dataSourceMap: " + dataSourceMap.toString());
- haType.updatePrimaryDataSource(dataSourceMap, schemaName);
+ Map<String, DataSource> activeDataSourceMap = new
HashMap<>(originalDataSourceMap);
+ if (!disabledDataSourceNames.isEmpty()) {
+ activeDataSourceMap.entrySet().removeIf(each ->
disabledDataSourceNames.contains(each.getKey()));
+ }
+ log.info(" +++ " + activeDataSourceMap.toString());
+ haType.updatePrimaryDataSource(originalDataSourceMap, schemaName,
disabledDataSourceNames);
+ haType.updateMemberState(originalDataSourceMap, schemaName,
disabledDataSourceNames);
}
}
diff --git
a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-route/src/test/java/org/apache/shardingsphere/ha/route/fixture/TestRouteHATypeFixture.java
b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-route/src/test/java/org/apache/shardingsphere/ha/route/fixture/TestRouteHATypeFixture.java
index 204454e..b69a196 100644
---
a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-route/src/test/java/org/apache/shardingsphere/ha/route/fixture/TestRouteHATypeFixture.java
+++
b/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-route/src/test/java/org/apache/shardingsphere/ha/route/fixture/TestRouteHATypeFixture.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.ha.route.fixture;
import org.apache.shardingsphere.ha.spi.HAType;
import javax.sql.DataSource;
+import java.util.Collection;
import java.util.Map;
/**
@@ -32,12 +33,15 @@ public final class TestRouteHATypeFixture implements HAType
{
}
@Override
- public void updatePrimaryDataSource(final Map<String, DataSource>
dataSourceMap, final String schemaName) {
-
+ public void updatePrimaryDataSource(final Map<String, DataSource>
activeDataSourceMap, final String schemaName, final Collection<String>
disabledDataSourceNames) {
+ }
+
+ @Override
+ public void updateMemberState(final Map<String, DataSource>
originalDataSourceMap, final String schemaName, final Collection<String>
disabledDataSourceNames) {
}
@Override
- public void startPeriodicalUpdate(final Map<String, DataSource>
dataSourceMap, final String schemaName) {
+ public void startPeriodicalUpdate(final Map<String, DataSource>
originalDataSourceMap, final String schemaName, final Collection<String>
disabledDataSourceNames) {
}
@Override
diff --git
a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenter.java
b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenter.java
index 224379e..459abf0 100644
---
a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenter.java
+++
b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenter.java
@@ -18,10 +18,12 @@
package org.apache.shardingsphere.governance.core.registry;
import com.google.common.base.Strings;
+import com.google.common.eventbus.Subscribe;
import org.apache.shardingsphere.governance.core.lock.node.LockNode;
import
org.apache.shardingsphere.governance.core.registry.instance.GovernanceInstance;
import org.apache.shardingsphere.governance.repository.api.RegistryRepository;
import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
+import org.apache.shardingsphere.infra.rule.event.impl.DataSourceDisabledEvent;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
@@ -54,6 +56,17 @@ public final class RegistryCenter {
}
/**
+ * Persist data source disabled state.
+ *
+ * @param event data source disabled event.
+ */
+ @Subscribe
+ public synchronized void renew(final DataSourceDisabledEvent event) {
+ String value = event.isDisabled() ?
RegistryCenterNodeStatus.DISABLED.toString() : "";
+ repository.persist(node.getDataSourcePath(event.getSchemaName(),
event.getDataSourceName()), value);
+ }
+
+ /**
* Persist instance online.
*/
public void persistInstanceOnline() {
diff --git
a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-provider/shardingsphere-ha-mgr/src/main/java/org/apache/shardingsphere/ha/mgr/MGRPeriodicalJob.java
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/event/impl/DataSourceDisabledEvent.java
similarity index 55%
copy from
shardingsphere-features/shardingsphere-ha/shardingsphere-ha-provider/shardingsphere-ha-mgr/src/main/java/org/apache/shardingsphere/ha/mgr/MGRPeriodicalJob.java
copy to
shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/event/impl/DataSourceDisabledEvent.java
index 4318191..dff7bed 100644
---
a/shardingsphere-features/shardingsphere-ha/shardingsphere-ha-provider/shardingsphere-ha-mgr/src/main/java/org/apache/shardingsphere/ha/mgr/MGRPeriodicalJob.java
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/event/impl/DataSourceDisabledEvent.java
@@ -15,31 +15,22 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.ha.mgr;
+package org.apache.shardingsphere.infra.rule.event.impl;
+import lombok.Getter;
import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.elasticjob.api.ShardingContext;
-import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
-import org.apache.shardingsphere.ha.spi.HAType;
-
-import javax.sql.DataSource;
-import java.util.Map;
+import org.apache.shardingsphere.infra.rule.event.RuleChangedEvent;
+/**
+ * Data source disabled event.
+ */
@RequiredArgsConstructor
-@Slf4j
-public final class MGRPeriodicalJob implements SimpleJob {
-
- private final HAType haType;
-
- private final Map<String, DataSource> dataSourceMap;
+@Getter
+public final class DataSourceDisabledEvent implements RuleChangedEvent {
private final String schemaName;
- @Override
- public void execute(final ShardingContext shardingContext) {
- log.info("---------------MGRPeriodicalJob--------------");
- log.info("dataSourceMap: " + dataSourceMap.toString());
- haType.updatePrimaryDataSource(dataSourceMap, schemaName);
- }
+ private final String dataSourceName;
+
+ private final boolean isDisabled;
}