This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 94624c0 Move @subscribe from ClusterContextManager to
ClusterContextManagerBuilder (#12114)
94624c0 is described below
commit 94624c06781677d27341f04429e1610c5131742b
Author: Haoran Meng <[email protected]>
AuthorDate: Tue Aug 31 14:13:45 2021 +0800
Move @subscribe from ClusterContextManager to ClusterContextManagerBuilder
(#12114)
* Move @subscribe from ClusterContextManager to ClusterContextManagerBuilder
* Move @subscribe from ClusterContextManager to ClusterContextManagerBuilder
---
.../manager/cluster/ClusterContextManager.java | 372 -------------------
.../cluster/ClusterContextManagerBuilder.java | 403 ++++++++++++++++++++-
....java => ClusterContextManagerBuilderTest.java} | 169 +++++----
.../manager/cluster/ClusterContextManagerTest.java | 185 +---------
4 files changed, 472 insertions(+), 657 deletions(-)
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManager.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManager.java
index 5b2c24c..ae85898 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManager.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManager.java
@@ -17,67 +17,22 @@
package org.apache.shardingsphere.mode.manager.cluster;
-import com.google.common.collect.Maps;
-import com.google.common.eventbus.Subscribe;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.authority.rule.AuthorityRule;
-import org.apache.shardingsphere.infra.config.RuleConfiguration;
-import
org.apache.shardingsphere.infra.config.datasource.DataSourceConfiguration;
-import org.apache.shardingsphere.infra.config.datasource.DataSourceConverter;
-import
org.apache.shardingsphere.infra.config.properties.ConfigurationProperties;
import
org.apache.shardingsphere.infra.config.properties.ConfigurationPropertyKey;
-import org.apache.shardingsphere.infra.database.type.DatabaseType;
-import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
-import org.apache.shardingsphere.infra.lock.InnerLockReleasedEvent;
-import org.apache.shardingsphere.infra.lock.LockNameUtil;
import org.apache.shardingsphere.infra.lock.ShardingSphereLock;
-import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
-import
org.apache.shardingsphere.infra.metadata.resource.ShardingSphereResource;
-import
org.apache.shardingsphere.infra.metadata.rule.ShardingSphereRuleMetaData;
-import org.apache.shardingsphere.infra.metadata.schema.ShardingSphereSchema;
-import
org.apache.shardingsphere.infra.optimize.core.metadata.FederateSchemaMetadata;
-import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
-import org.apache.shardingsphere.infra.rule.builder.ShardingSphereRulesBuilder;
import
org.apache.shardingsphere.infra.rule.event.impl.DataSourceNameDisabledEvent;
-import org.apache.shardingsphere.infra.rule.event.impl.PrimaryDataSourceEvent;
import
org.apache.shardingsphere.infra.rule.identifier.type.StatusContainedRule;
import org.apache.shardingsphere.mode.manager.ContextManager;
import
org.apache.shardingsphere.mode.manager.cluster.governance.lock.ShardingSphereDistributeLock;
-import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.authority.event.AuthorityChangedEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.datasource.DataSourceChangeCompletedEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.datasource.DataSourceChangedEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.datasource.DataSourceDeletedEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.props.PropertiesChangedEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.rule.GlobalRuleConfigurationsChangedEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.rule.RuleConfigurationsChangedEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.schema.SchemaChangedEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.metadata.event.SchemaAddedEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.metadata.event.SchemaDeletedEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.state.event.DisabledStateChangedEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.state.event.PrimaryStateChangedEvent;
import
org.apache.shardingsphere.mode.manager.cluster.governance.schema.GovernanceSchema;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
-import org.apache.shardingsphere.mode.metadata.MetaDataContextsBuilder;
import org.apache.shardingsphere.mode.persist.PersistService;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
-import
org.apache.shardingsphere.transaction.ShardingSphereTransactionManagerEngine;
-import
org.apache.shardingsphere.transaction.config.TransactionRuleConfiguration;
import org.apache.shardingsphere.transaction.context.TransactionContexts;
-import org.apache.shardingsphere.transaction.core.TransactionType;
-import org.apache.shardingsphere.transaction.rule.TransactionRule;
-import javax.sql.DataSource;
-import java.sql.SQLException;
import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.Map.Entry;
import java.util.Optional;
-import java.util.stream.Collectors;
/**
* Cluster context manager.
@@ -101,7 +56,6 @@ public final class ClusterContextManager implements
ContextManager {
public void init(final MetaDataContexts metaDataContexts, final
TransactionContexts transactionContexts) {
this.metaDataContexts = metaDataContexts;
this.transactionContexts = transactionContexts;
- ShardingSphereEventBus.getInstance().register(this);
disableDataSources();
persistMetaData();
lock = createShardingSphereLock(registryCenter.getRepository());
@@ -131,332 +85,6 @@ public final class ClusterContextManager implements
ContextManager {
? new ShardingSphereDistributeLock(repository,
metaDataContexts.getProps().<Long>getValue(ConfigurationPropertyKey.LOCK_WAIT_TIMEOUT_MILLISECONDS))
: null;
}
- /**
- * Renew to persist meta data.
- *
- * @param event schema added event
- * @throws SQLException SQL exception
- */
- @Subscribe
- public synchronized void renew(final SchemaAddedEvent event) throws
SQLException {
- persistSchema(event.getSchemaName());
- ShardingSphereMetaData metaData = buildMetaData(event.getSchemaName());
-
metaDataContexts.getOptimizeContextFactory().getSchemaMetadatas().getSchemas().put(event.getSchemaName(),
new FederateSchemaMetadata(event.getSchemaName(),
metaData.getSchema().getTables()));
- metaDataContexts.getMetaDataMap().put(event.getSchemaName(), metaData);
- metaDataContexts = new MetaDataContexts(persistService,
- metaDataContexts.getMetaDataMap(),
metaDataContexts.getGlobalRuleMetaData(), metaDataContexts.getExecutorEngine(),
- metaDataContexts.getProps(),
metaDataContexts.getOptimizeContextFactory());
- ShardingSphereEventBus.getInstance().post(new
DataSourceChangeCompletedEvent(event.getSchemaName(),
-
metaDataContexts.getMetaDataMap().get(event.getSchemaName()).getResource().getDatabaseType(),
metaData.getResource().getDataSources()));
- }
-
- /**
- * Renew to delete schema.
- *
- * @param event schema delete event
- */
- @Subscribe
- public synchronized void renew(final SchemaDeletedEvent event) {
- String schemaName = event.getSchemaName();
- closeDataSources(schemaName,
metaDataContexts.getMetaData(schemaName).getResource().getDataSources().values());
- Map<String, ShardingSphereMetaData> schemaMetaData = new
HashMap<>(metaDataContexts.getMetaDataMap());
- schemaMetaData.remove(schemaName);
-
metaDataContexts.getOptimizeContextFactory().getSchemaMetadatas().getSchemas().remove(schemaName);
- metaDataContexts = new MetaDataContexts(persistService,
- schemaMetaData, metaDataContexts.getGlobalRuleMetaData(),
metaDataContexts.getExecutorEngine(), metaDataContexts.getProps(),
metaDataContexts.getOptimizeContextFactory());
- ShardingSphereEventBus.getInstance().post(new
DataSourceDeletedEvent(schemaName));
- }
-
- /**
- * Renew properties.
- *
- * @param event properties changed event
- */
- @Subscribe
- public synchronized void renew(final PropertiesChangedEvent event) {
- ConfigurationProperties props = new
ConfigurationProperties(event.getProps());
- metaDataContexts = new MetaDataContexts(persistService,
- metaDataContexts.getMetaDataMap(),
metaDataContexts.getGlobalRuleMetaData(), metaDataContexts.getExecutorEngine(),
props, metaDataContexts.getOptimizeContextFactory());
- }
-
- /**
- * Renew authority.
- *
- * @param event authority changed event
- */
- @Subscribe
- public synchronized void renew(final AuthorityChangedEvent event) {
- Optional<AuthorityRule> rule =
metaDataContexts.getGlobalRuleMetaData().getRules().stream().filter(each ->
each instanceof AuthorityRule).findAny().map(each -> (AuthorityRule) each);
- rule.ifPresent(optional ->
optional.refresh(metaDataContexts.getMetaDataMap(), event.getUsers()));
- }
-
- /**
- * Renew meta data of the schema.
- *
- * @param event meta data changed event
- */
- @Subscribe
- public synchronized void renew(final SchemaChangedEvent event) {
- try {
- Map<String, ShardingSphereMetaData> schemaMetaData = new
HashMap<>(metaDataContexts.getMetaDataMap().size(), 1);
- for (Entry<String, ShardingSphereMetaData> entry :
metaDataContexts.getMetaDataMap().entrySet()) {
- String schemaName = entry.getKey();
- ShardingSphereMetaData originalMetaData = entry.getValue();
- ShardingSphereMetaData metaData =
event.getSchemaName().equals(schemaName) ? getChangedMetaData(originalMetaData,
event.getSchema(), schemaName) : originalMetaData;
- schemaMetaData.put(schemaName, metaData);
-
metaDataContexts.getOptimizeContextFactory().getSchemaMetadatas().getSchemas().put(event.getSchemaName(),
- new FederateSchemaMetadata(event.getSchemaName(),
metaData.getSchema().getTables()));
- }
- metaDataContexts = new MetaDataContexts(persistService,
- schemaMetaData, metaDataContexts.getGlobalRuleMetaData(),
metaDataContexts.getExecutorEngine(), metaDataContexts.getProps(),
metaDataContexts.getOptimizeContextFactory());
- } finally {
- ShardingSphereEventBus.getInstance().post(new
InnerLockReleasedEvent(LockNameUtil.getMetadataRefreshLockName()));
- }
- }
-
- /**
- * Renew rule configurations.
- *
- * @param event rule configurations changed event
- * @throws SQLException SQL exception
- */
- @Subscribe
- public synchronized void renew(final RuleConfigurationsChangedEvent event)
throws SQLException {
- String schemaName = event.getSchemaName();
- ShardingSphereMetaData metaData =
getChangedMetaData(metaDataContexts.getMetaDataMap().get(schemaName),
event.getRuleConfigurations());
- Map<String, ShardingSphereMetaData> schemaMetaData =
rebuildSchemaMetaData(schemaName, metaData);
- metaDataContexts = new MetaDataContexts(persistService,
schemaMetaData, metaDataContexts.getGlobalRuleMetaData(),
metaDataContexts.getExecutorEngine(),
- metaDataContexts.getProps(),
metaDataContexts.getOptimizeContextFactory());
- persistService.getSchemaMetaDataService().persist(schemaName,
schemaMetaData.get(schemaName).getSchema());
- }
-
- /**
- * Renew data source configuration.
- *
- * @param event data source changed event.
- * @throws SQLException SQL exception
- */
- @Subscribe
- public synchronized void renew(final DataSourceChangedEvent event) throws
SQLException {
- String schemaName = event.getSchemaName();
- Collection<DataSource> pendingClosedDataSources =
getPendingClosedDataSources(schemaName, event.getDataSourceConfigurations());
- ShardingSphereMetaData metaData =
rebuildMetaData(metaDataContexts.getMetaDataMap().get(schemaName),
event.getDataSourceConfigurations());
- Map<String, ShardingSphereMetaData> schemaMetaData =
rebuildSchemaMetaData(schemaName, metaData);
- metaDataContexts = new MetaDataContexts(persistService, schemaMetaData,
- metaDataContexts.getGlobalRuleMetaData(),
metaDataContexts.getExecutorEngine(), metaDataContexts.getProps(),
metaDataContexts.getOptimizeContextFactory());
- ShardingSphereEventBus.getInstance().post(new
DataSourceChangeCompletedEvent(event.getSchemaName(),
-
metaDataContexts.getMetaDataMap().get(event.getSchemaName()).getResource().getDatabaseType(),
schemaMetaData.get(event.getSchemaName()).getResource().getDataSources()));
- closeDataSources(schemaName, pendingClosedDataSources);
- }
-
- /**
- * Renew disabled data source names.
- *
- * @param event disabled state changed event
- */
- @Subscribe
- public synchronized void renew(final DisabledStateChangedEvent event) {
- GovernanceSchema governanceSchema = event.getGovernanceSchema();
- Collection<ShardingSphereRule> rules =
metaDataContexts.getMetaDataMap().get(governanceSchema.getSchemaName()).getRuleMetaData().getRules();
- for (ShardingSphereRule each : rules) {
- if (each instanceof StatusContainedRule) {
- ((StatusContainedRule) each).updateRuleStatus(new
DataSourceNameDisabledEvent(governanceSchema.getDataSourceName(),
event.isDisabled()));
- }
- }
- }
-
- /**
- * Renew primary data source names.
- *
- * @param event primary state changed event
- */
- @Subscribe
- public synchronized void renew(final PrimaryStateChangedEvent event) {
- GovernanceSchema governanceSchema = event.getGovernanceSchema();
- Collection<ShardingSphereRule> rules =
metaDataContexts.getMetaDataMap().get(governanceSchema.getSchemaName()).getRuleMetaData().getRules();
- for (ShardingSphereRule each : rules) {
- if (each instanceof StatusContainedRule) {
- ((StatusContainedRule) each).updateRuleStatus(new
PrimaryDataSourceEvent(governanceSchema.getSchemaName(),
governanceSchema.getDataSourceName(), event.getPrimaryDataSourceName()));
- }
- }
- }
-
- /**
- * Renew global rule configurations.
- *
- * @param event global rule configurations changed event
- */
- @Subscribe
- public synchronized void renew(final GlobalRuleConfigurationsChangedEvent
event) {
- Collection<RuleConfiguration> newGlobalConfigs =
event.getRuleConfigurations();
- if (!newGlobalConfigs.isEmpty()) {
- ShardingSphereRuleMetaData newGlobalRuleMetaData = new
ShardingSphereRuleMetaData(newGlobalConfigs,
-
ShardingSphereRulesBuilder.buildGlobalRules(newGlobalConfigs,
metaDataContexts.getMetaDataMap()));
- metaDataContexts = new MetaDataContexts(persistService,
metaDataContexts.getMetaDataMap(), newGlobalRuleMetaData,
metaDataContexts.getExecutorEngine(),
- metaDataContexts.getProps(),
metaDataContexts.getOptimizeContextFactory());
- }
- }
-
- private Map<String, ShardingSphereMetaData> rebuildSchemaMetaData(final
String schemaName, final ShardingSphereMetaData metaData) {
- Map<String, ShardingSphereMetaData> result = new
HashMap<>(metaDataContexts.getMetaDataMap());
- result.put(schemaName, metaData);
-
metaDataContexts.getOptimizeContextFactory().getSchemaMetadatas().getSchemas().put(schemaName,
new FederateSchemaMetadata(schemaName, metaData.getSchema().getTables()));
- return result;
- }
-
- private void persistSchema(final String schemaName) {
- if (!persistService.getDataSourceService().isExisted(schemaName)) {
- persistService.getDataSourceService().persist(schemaName, new
LinkedHashMap<>());
- }
- if (!persistService.getSchemaRuleService().isExisted(schemaName)) {
- persistService.getSchemaRuleService().persist(schemaName, new
LinkedList<>());
- }
- }
-
- private ShardingSphereMetaData buildMetaData(final String schemaName)
throws SQLException {
- Map<String, Map<String, DataSource>> dataSourcesMap =
createDataSourcesMap(Collections.singletonMap(schemaName,
persistService.getDataSourceService().load(schemaName)));
- return new MetaDataContextsBuilder(dataSourcesMap,
- Collections.singletonMap(schemaName,
persistService.getSchemaRuleService().load(schemaName)),
- persistService.getGlobalRuleService().load(),
-
metaDataContexts.getProps().getProps()).build(persistService).getMetaData(schemaName);
- }
-
- private ShardingSphereMetaData getChangedMetaData(final
ShardingSphereMetaData originalMetaData, final ShardingSphereSchema schema,
final String schemaName) {
- // TODO refresh table addressing mapper
- return new ShardingSphereMetaData(schemaName,
originalMetaData.getResource(), originalMetaData.getRuleMetaData(), schema);
- }
-
- private ShardingSphereMetaData getChangedMetaData(final
ShardingSphereMetaData originalMetaData, final Collection<RuleConfiguration>
ruleConfigs) throws SQLException {
- MetaDataContextsBuilder builder = new
MetaDataContextsBuilder(Collections.singletonMap(originalMetaData.getName(),
originalMetaData.getResource().getDataSources()),
- Collections.singletonMap(originalMetaData.getName(),
ruleConfigs), persistService.getGlobalRuleService().load(),
metaDataContexts.getProps().getProps());
- return
builder.build(persistService).getMetaDataMap().values().iterator().next();
- }
-
- private ShardingSphereMetaData rebuildMetaData(final
ShardingSphereMetaData originalMetaData, final Map<String,
DataSourceConfiguration> newDataSourceConfigs) throws SQLException {
- Collection<String> deletedDataSources =
getDeletedDataSources(originalMetaData, newDataSourceConfigs).keySet();
- Map<String, DataSource> changedDataSources =
buildChangedDataSources(originalMetaData, newDataSourceConfigs);
- Map<String, Map<String, DataSource>> dataSourcesMap =
Collections.singletonMap(originalMetaData.getName(),
-
getNewDataSources(originalMetaData.getResource().getDataSources(),
getAddedDataSources(originalMetaData, newDataSourceConfigs),
changedDataSources, deletedDataSources));
- return new MetaDataContextsBuilder(dataSourcesMap,
Collections.singletonMap(originalMetaData.getName(),
- originalMetaData.getRuleMetaData().getConfigurations()),
persistService.getGlobalRuleService().load(),
-
metaDataContexts.getProps().getProps()).build(persistService).getMetaData(originalMetaData.getName());
- }
-
- private Map<String, DataSource> getNewDataSources(final Map<String,
DataSource> originalDataSources,
- final Map<String,
DataSource> addedDataSources, final Map<String, DataSource> changedDataSources,
final Collection<String> deletedDataSources) {
- Map<String, DataSource> result = new
LinkedHashMap<>(originalDataSources);
- result.keySet().removeAll(deletedDataSources);
- result.putAll(changedDataSources);
- result.putAll(addedDataSources);
- return result;
- }
-
- private Map<String, DataSource> getDeletedDataSources(final
ShardingSphereMetaData originalMetaData, final Map<String,
DataSourceConfiguration> newDataSourceConfigs) {
- return
originalMetaData.getResource().getDataSources().entrySet().stream().filter(entry
-> !newDataSourceConfigs.containsKey(entry.getKey()))
- .collect(Collectors.toMap(Entry::getKey, Entry::getValue));
- }
-
- private Map<String, DataSource> getChangedDataSources(final
ShardingSphereMetaData originalMetaData, final Map<String,
DataSourceConfiguration> newDataSourceConfigs) {
- Collection<String> changedDataSourceNames =
getChangedDataSourceConfiguration(originalMetaData,
newDataSourceConfigs).keySet();
- return
originalMetaData.getResource().getDataSources().entrySet().stream().filter(entry
-> changedDataSourceNames.contains(entry.getKey()))
- .collect(Collectors.toMap(Entry::getKey, Entry::getValue));
- }
-
- private Map<String, DataSource> getAddedDataSources(final
ShardingSphereMetaData originalMetaData, final Map<String,
DataSourceConfiguration> newDataSourceConfigs) {
- return
DataSourceConverter.getDataSourceMap(Maps.filterKeys(newDataSourceConfigs, each
-> !originalMetaData.getResource().getDataSources().containsKey(each)));
- }
-
- private Map<String, DataSourceConfiguration>
getChangedDataSourceConfiguration(final ShardingSphereMetaData originalMetaData,
-
final Map<String, DataSourceConfiguration> dataSourceConfigurations) {
- return dataSourceConfigurations.entrySet().stream()
- .filter(entry ->
isModifiedDataSource(originalMetaData.getResource().getDataSources(),
entry.getKey(), entry.getValue()))
- .collect(Collectors.toMap(Entry::getKey, Entry::getValue,
(oldValue, currentValue) -> oldValue, LinkedHashMap::new));
- }
-
- private Map<String, DataSource> buildChangedDataSources(final
ShardingSphereMetaData originalMetaData, final Map<String,
DataSourceConfiguration> newDataSourceConfigs) {
- return
DataSourceConverter.getDataSourceMap(getChangedDataSourceConfiguration(originalMetaData,
newDataSourceConfigs));
- }
-
- private boolean isModifiedDataSource(final Map<String, DataSource>
originalDataSources, final String dataSourceName, final DataSourceConfiguration
dataSourceConfiguration) {
- DataSourceConfiguration dataSourceConfig =
DataSourceConverter.getDataSourceConfigurationMap(originalDataSources).get(dataSourceName);
- return null != dataSourceConfig &&
!dataSourceConfiguration.equals(dataSourceConfig);
- }
-
- private Map<String, Map<String, DataSource>> createDataSourcesMap(final
Map<String, Map<String, DataSourceConfiguration>> dataSourcesConfigs) {
- return
dataSourcesConfigs.entrySet().stream().collect(Collectors.toMap(Entry::getKey,
- entry -> DataSourceConverter.getDataSourceMap(entry.getValue())));
- }
-
- private Collection<DataSource> getPendingClosedDataSources(final String
schemaName, final Map<String, DataSourceConfiguration>
dataSourceConfigurations) {
- Collection<DataSource> result = new LinkedList<>();
-
result.addAll(getDeletedDataSources(metaDataContexts.getMetaData(schemaName),
dataSourceConfigurations).values());
-
result.addAll(getChangedDataSources(metaDataContexts.getMetaData(schemaName),
dataSourceConfigurations).values());
- return result;
- }
-
- private void closeDataSources(final String schemaName, final
Collection<DataSource> dataSources) {
- ShardingSphereResource resource =
metaDataContexts.getMetaData(schemaName).getResource();
- dataSources.forEach(each -> closeDataSource(resource, each));
- }
-
- private void closeDataSource(final ShardingSphereResource resource, final
DataSource dataSource) {
- try {
- resource.close(dataSource);
- // CHECKSTYLE:OFF
- } catch (final Exception ignore) {
- // CHECKSTYLE:ON
- }
- }
-
- /**
- * Renew transaction manager engine contexts.
- *
- * @param event data source change completed event
- * @throws Exception exception
- */
- @Subscribe
- public synchronized void renewTransactionContext(final
DataSourceChangeCompletedEvent event) throws Exception {
- closeStaleEngine(event.getSchemaName());
- Map<String, ShardingSphereTransactionManagerEngine> existedEngines =
transactionContexts.getEngines();
- existedEngines.put(event.getSchemaName(),
createNewEngine(event.getDatabaseType(), event.getDataSources()));
- renewContexts(existedEngines);
- }
-
- /**
- * Renew transaction manager engine context.
- *
- * @param event data source deleted event.
- * @throws Exception exception
- */
- @Subscribe
- public synchronized void renewTransactionContext(final
DataSourceDeletedEvent event) throws Exception {
- closeStaleEngine(event.getSchemaName());
- renewContexts(transactionContexts.getEngines());
- }
-
- private void closeStaleEngine(final String schemaName) throws Exception {
- ShardingSphereTransactionManagerEngine staleEngine =
transactionContexts.getEngines().remove(schemaName);
- if (null != staleEngine) {
- staleEngine.close();
- }
- }
-
- private ShardingSphereTransactionManagerEngine createNewEngine(final
DatabaseType databaseType, final Map<String, DataSource> dataSources) {
- ShardingSphereTransactionManagerEngine result = new
ShardingSphereTransactionManagerEngine();
- result.init(databaseType, dataSources, getTransactionRule());
- return result;
- }
-
- private TransactionRule getTransactionRule() {
- Optional<TransactionRule> transactionRule =
metaDataContexts.getGlobalRuleMetaData().getRules().stream().filter(
- each -> each instanceof TransactionRule).map(each ->
(TransactionRule) each).findFirst();
- return transactionRule.orElseGet(() -> new TransactionRule(new
TransactionRuleConfiguration(TransactionType.LOCAL.name(), null)));
- }
-
- private void renewContexts(final Map<String,
ShardingSphereTransactionManagerEngine> engines) {
- transactionContexts = new TransactionContexts(engines);
- }
-
@Override
public synchronized void renewMetaDataContexts(final MetaDataContexts
metaDataContexts) {
this.metaDataContexts = metaDataContexts;
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
index 5b0b14b..2ac88d6 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
@@ -18,13 +18,43 @@
package org.apache.shardingsphere.mode.manager.cluster;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.eventbus.Subscribe;
+import org.apache.shardingsphere.authority.rule.AuthorityRule;
import org.apache.shardingsphere.infra.config.RuleConfiguration;
import
org.apache.shardingsphere.infra.config.datasource.DataSourceConfiguration;
import org.apache.shardingsphere.infra.config.datasource.DataSourceConverter;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
+import
org.apache.shardingsphere.infra.config.properties.ConfigurationProperties;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
+import org.apache.shardingsphere.infra.lock.InnerLockReleasedEvent;
+import org.apache.shardingsphere.infra.lock.LockNameUtil;
+import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import
org.apache.shardingsphere.infra.metadata.resource.ShardingSphereResource;
+import
org.apache.shardingsphere.infra.metadata.rule.ShardingSphereRuleMetaData;
+import org.apache.shardingsphere.infra.metadata.schema.ShardingSphereSchema;
+import
org.apache.shardingsphere.infra.optimize.core.metadata.FederateSchemaMetadata;
+import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
+import org.apache.shardingsphere.infra.rule.builder.ShardingSphereRulesBuilder;
+import
org.apache.shardingsphere.infra.rule.event.impl.DataSourceNameDisabledEvent;
+import org.apache.shardingsphere.infra.rule.event.impl.PrimaryDataSourceEvent;
+import
org.apache.shardingsphere.infra.rule.identifier.type.StatusContainedRule;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.ContextManagerBuilder;
+import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.authority.event.AuthorityChangedEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.datasource.DataSourceChangeCompletedEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.datasource.DataSourceChangedEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.datasource.DataSourceDeletedEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.props.PropertiesChangedEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.rule.GlobalRuleConfigurationsChangedEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.rule.RuleConfigurationsChangedEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.schema.SchemaChangedEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.metadata.event.SchemaAddedEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.metadata.event.SchemaDeletedEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.state.event.DisabledStateChangedEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.state.event.PrimaryStateChangedEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.governance.schema.GovernanceSchema;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.mode.metadata.MetaDataContextsBuilder;
import org.apache.shardingsphere.mode.persist.PersistService;
@@ -41,8 +71,10 @@ import
org.apache.shardingsphere.transaction.rule.TransactionRule;
import javax.sql.DataSource;
import java.sql.SQLException;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
+import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
@@ -58,22 +90,26 @@ public final class ClusterContextManagerBuilder implements
ContextManagerBuilder
ShardingSphereServiceLoader.register(ClusterPersistRepository.class);
}
+ private PersistService persistService;
+
+ private ContextManager contextManager;
+
@Override
public ContextManager build(final ModeConfiguration modeConfig, final
Map<String, Map<String, DataSource>> dataSourcesMap,
final Map<String,
Collection<RuleConfiguration>> schemaRuleConfigs, final
Collection<RuleConfiguration> globalRuleConfigs,
final Properties props, final boolean
isOverwrite) throws SQLException {
+ ShardingSphereEventBus.getInstance().register(this);
ClusterPersistRepository repository =
createClusterPersistRepository((ClusterPersistRepositoryConfiguration)
modeConfig.getRepository());
- PersistService persistService = new PersistService(repository);
+ persistService = new PersistService(repository);
RegistryCenter registryCenter = new RegistryCenter(repository);
persistConfigurations(persistService, dataSourcesMap,
schemaRuleConfigs, globalRuleConfigs, props, isOverwrite);
- // TODO Here may be some problems to load all schemaNames for JDBC
Collection<String> schemaNames =
persistService.getSchemaMetaDataService().loadAllNames();
MetaDataContexts metaDataContexts = new
MetaDataContextsBuilder(loadDataSourcesMap(persistService, dataSourcesMap,
schemaNames),
loadSchemaRules(persistService, schemaNames),
persistService.getGlobalRuleService().load(),
persistService.getPropsService().load()).build(persistService);
TransactionContexts transactionContexts =
createTransactionContexts(metaDataContexts);
- ContextManager result = new ClusterContextManager(persistService,
registryCenter);
- result.init(metaDataContexts, transactionContexts);
- return result;
+ contextManager = new ClusterContextManager(persistService,
registryCenter);
+ contextManager.init(metaDataContexts, transactionContexts);
+ return contextManager;
}
private ClusterPersistRepository createClusterPersistRepository(final
ClusterPersistRepositoryConfiguration config) {
@@ -161,14 +197,6 @@ public final class ClusterContextManagerBuilder implements
ContextManagerBuilder
||
!dataSourceConfigurationMap.get(entry.getKey()).equals(entry.getValue())).collect(Collectors.toMap(Entry::getKey,
Entry::getValue));
}
- private Map<String, Map<String, DataSource>> getChangedDataSources(final
Map<String, Map<String, DataSourceConfiguration>>
changedDataSourceConfigurations) {
- Map<String, Map<String, DataSource>> result = new
LinkedHashMap<>(changedDataSourceConfigurations.size(), 1);
- for (Entry<String, Map<String, DataSourceConfiguration>> entry :
changedDataSourceConfigurations.entrySet()) {
- result.put(entry.getKey(), createDataSources(entry.getValue()));
- }
- return result;
- }
-
private Map<String, DataSource> createDataSources(final Map<String,
DataSourceConfiguration> dataSourceConfigs) {
Map<String, DataSource> result = new
LinkedHashMap<>(dataSourceConfigs.size(), 1);
for (Entry<String, DataSourceConfiguration> each :
dataSourceConfigs.entrySet()) {
@@ -194,12 +222,361 @@ public final class ClusterContextManagerBuilder
implements ContextManagerBuilder
return new TransactionContexts(engines);
}
+ /**
+ * Renew to persist meta data.
+ *
+ * @param event schema added event
+ * @throws SQLException SQL exception
+ */
+ @Subscribe
+ public synchronized void renew(final SchemaAddedEvent event) throws
SQLException {
+ persistSchema(event.getSchemaName());
+ ShardingSphereMetaData metaData = buildMetaData(event.getSchemaName());
+
contextManager.getMetaDataContexts().getOptimizeContextFactory().getSchemaMetadatas().getSchemas().put(event.getSchemaName(),
new FederateSchemaMetadata(event.getSchemaName(),
+ metaData.getSchema().getTables()));
+
contextManager.getMetaDataContexts().getMetaDataMap().put(event.getSchemaName(),
metaData);
+ contextManager.renewMetaDataContexts(new
MetaDataContexts(persistService,
+ contextManager.getMetaDataContexts().getMetaDataMap(),
contextManager.getMetaDataContexts().getGlobalRuleMetaData(),
contextManager.getMetaDataContexts().getExecutorEngine(),
+ contextManager.getMetaDataContexts().getProps(),
contextManager.getMetaDataContexts().getOptimizeContextFactory()));
+ ShardingSphereEventBus.getInstance().post(new
DataSourceChangeCompletedEvent(event.getSchemaName(),
+
contextManager.getMetaDataContexts().getMetaDataMap().get(event.getSchemaName()).getResource().getDatabaseType(),
metaData.getResource().getDataSources()));
+ }
+
+ /**
+ * Renew to delete schema.
+ *
+ * @param event schema delete event
+ */
+ @Subscribe
+ public synchronized void renew(final SchemaDeletedEvent event) {
+ String schemaName = event.getSchemaName();
+ closeDataSources(schemaName);
+ Map<String, ShardingSphereMetaData> schemaMetaData = new
HashMap<>(contextManager.getMetaDataContexts().getMetaDataMap());
+ schemaMetaData.remove(schemaName);
+
contextManager.getMetaDataContexts().getOptimizeContextFactory().getSchemaMetadatas().getSchemas().remove(schemaName);
+ contextManager.renewMetaDataContexts(new
MetaDataContexts(persistService,
+ schemaMetaData,
contextManager.getMetaDataContexts().getGlobalRuleMetaData(),
contextManager.getMetaDataContexts().getExecutorEngine(),
+ contextManager.getMetaDataContexts().getProps(),
contextManager.getMetaDataContexts().getOptimizeContextFactory()));
+ ShardingSphereEventBus.getInstance().post(new
DataSourceDeletedEvent(schemaName));
+ }
+
+ /**
+ * Renew properties.
+ *
+ * @param event properties changed event
+ */
+ @Subscribe
+ public synchronized void renew(final PropertiesChangedEvent event) {
+ ConfigurationProperties props = new
ConfigurationProperties(event.getProps());
+ contextManager.renewMetaDataContexts(new
MetaDataContexts(persistService,
+ contextManager.getMetaDataContexts().getMetaDataMap(),
contextManager.getMetaDataContexts().getGlobalRuleMetaData(),
contextManager.getMetaDataContexts().getExecutorEngine(),
+ props,
contextManager.getMetaDataContexts().getOptimizeContextFactory()));
+ }
+
+ /**
+ * Renew authority.
+ *
+ * @param event authority changed event
+ */
+ @Subscribe
+ public synchronized void renew(final AuthorityChangedEvent event) {
+ Optional<AuthorityRule> rule =
contextManager.getMetaDataContexts().getGlobalRuleMetaData().getRules().stream()
+ .filter(each -> each instanceof
AuthorityRule).findAny().map(each -> (AuthorityRule) each);
+ rule.ifPresent(optional ->
optional.refresh(contextManager.getMetaDataContexts().getMetaDataMap(),
event.getUsers()));
+ }
+
+ /**
+ * Renew meta data of the schema.
+ *
+ * @param event meta data changed event
+ */
+ @Subscribe
+ public synchronized void renew(final SchemaChangedEvent event) {
+ try {
+ Map<String, ShardingSphereMetaData> schemaMetaData = new
HashMap<>(contextManager.getMetaDataContexts().getMetaDataMap().size(), 1);
+ for (Entry<String, ShardingSphereMetaData> entry :
contextManager.getMetaDataContexts().getMetaDataMap().entrySet()) {
+ String schemaName = entry.getKey();
+ ShardingSphereMetaData originalMetaData = entry.getValue();
+ ShardingSphereMetaData metaData =
event.getSchemaName().equals(schemaName) ? getChangedMetaData(originalMetaData,
event.getSchema(), schemaName) : originalMetaData;
+ schemaMetaData.put(schemaName, metaData);
+
contextManager.getMetaDataContexts().getOptimizeContextFactory().getSchemaMetadatas().getSchemas().put(event.getSchemaName(),
+ new FederateSchemaMetadata(event.getSchemaName(),
metaData.getSchema().getTables()));
+ }
+ contextManager.renewMetaDataContexts(new
MetaDataContexts(persistService,
+ schemaMetaData,
contextManager.getMetaDataContexts().getGlobalRuleMetaData(),
contextManager.getMetaDataContexts().getExecutorEngine(),
+ contextManager.getMetaDataContexts().getProps(),
contextManager.getMetaDataContexts().getOptimizeContextFactory()));
+ } finally {
+ ShardingSphereEventBus.getInstance().post(new
InnerLockReleasedEvent(LockNameUtil.getMetadataRefreshLockName()));
+ }
+ }
+
+ /**
+ * Renew rule configurations.
+ *
+ * @param event rule configurations changed event
+ * @throws SQLException SQL exception
+ */
+ @Subscribe
+ public synchronized void renew(final RuleConfigurationsChangedEvent event)
throws SQLException {
+ String schemaName = event.getSchemaName();
+ ShardingSphereMetaData metaData =
getChangedMetaData(contextManager.getMetaDataContexts().getMetaDataMap().get(schemaName),
event.getRuleConfigurations());
+ Map<String, ShardingSphereMetaData> schemaMetaData =
rebuildSchemaMetaData(schemaName, metaData);
+ contextManager.renewMetaDataContexts(new
MetaDataContexts(persistService, schemaMetaData,
contextManager.getMetaDataContexts().getGlobalRuleMetaData(),
+ contextManager.getMetaDataContexts().getExecutorEngine(),
+ contextManager.getMetaDataContexts().getProps(),
contextManager.getMetaDataContexts().getOptimizeContextFactory()));
+ persistService.getSchemaMetaDataService().persist(schemaName,
schemaMetaData.get(schemaName).getSchema());
+ }
+
+ /**
+ * Renew data source configuration.
+ *
+ * @param event data source changed event.
+ * @throws SQLException SQL exception
+ */
+ @Subscribe
+ public synchronized void renew(final DataSourceChangedEvent event) throws
SQLException {
+ String schemaName = event.getSchemaName();
+ Collection<DataSource> pendingClosedDataSources =
getPendingClosedDataSources(schemaName, event.getDataSourceConfigurations());
+ ShardingSphereMetaData metaData =
rebuildMetaData(contextManager.getMetaDataContexts().getMetaDataMap().get(schemaName),
event.getDataSourceConfigurations());
+ Map<String, ShardingSphereMetaData> schemaMetaData =
rebuildSchemaMetaData(schemaName, metaData);
+ contextManager.renewMetaDataContexts(new
MetaDataContexts(persistService, schemaMetaData,
+ contextManager.getMetaDataContexts().getGlobalRuleMetaData(),
contextManager.getMetaDataContexts().getExecutorEngine(),
contextManager.getMetaDataContexts().getProps(),
+
contextManager.getMetaDataContexts().getOptimizeContextFactory()));
+ ShardingSphereEventBus.getInstance().post(new
DataSourceChangeCompletedEvent(event.getSchemaName(),
+
contextManager.getMetaDataContexts().getMetaDataMap().get(event.getSchemaName()).getResource().getDatabaseType(),
+
schemaMetaData.get(event.getSchemaName()).getResource().getDataSources()));
+ closeDataSources(schemaName, pendingClosedDataSources);
+ }
+
+ /**
+ * Renew disabled data source names.
+ *
+ * @param event disabled state changed event
+ */
+ @Subscribe
+ public synchronized void renew(final DisabledStateChangedEvent event) {
+ GovernanceSchema governanceSchema = event.getGovernanceSchema();
+ Collection<ShardingSphereRule> rules =
contextManager.getMetaDataContexts().getMetaDataMap().get(governanceSchema.getSchemaName()).getRuleMetaData().getRules();
+ for (ShardingSphereRule each : rules) {
+ if (each instanceof StatusContainedRule) {
+ ((StatusContainedRule) each).updateRuleStatus(new
DataSourceNameDisabledEvent(governanceSchema.getDataSourceName(),
event.isDisabled()));
+ }
+ }
+ }
+
+ /**
+ * Renew primary data source names.
+ *
+ * @param event primary state changed event
+ */
+ @Subscribe
+ public synchronized void renew(final PrimaryStateChangedEvent event) {
+ GovernanceSchema governanceSchema = event.getGovernanceSchema();
+ Collection<ShardingSphereRule> rules =
contextManager.getMetaDataContexts().getMetaDataMap().get(governanceSchema.getSchemaName()).getRuleMetaData().getRules();
+ for (ShardingSphereRule each : rules) {
+ if (each instanceof StatusContainedRule) {
+ ((StatusContainedRule) each).updateRuleStatus(new
PrimaryDataSourceEvent(governanceSchema.getSchemaName(),
governanceSchema.getDataSourceName(), event.getPrimaryDataSourceName()));
+ }
+ }
+ }
+
+ /**
+ * Renew global rule configurations.
+ *
+ * @param event global rule configurations changed event
+ */
+ @Subscribe
+ public synchronized void renew(final GlobalRuleConfigurationsChangedEvent
event) {
+ Collection<RuleConfiguration> newGlobalConfigs =
event.getRuleConfigurations();
+ if (!newGlobalConfigs.isEmpty()) {
+ ShardingSphereRuleMetaData newGlobalRuleMetaData = new
ShardingSphereRuleMetaData(newGlobalConfigs,
+
ShardingSphereRulesBuilder.buildGlobalRules(newGlobalConfigs,
contextManager.getMetaDataContexts().getMetaDataMap()));
+ contextManager.renewMetaDataContexts(new
MetaDataContexts(persistService,
contextManager.getMetaDataContexts().getMetaDataMap(), newGlobalRuleMetaData,
+ contextManager.getMetaDataContexts().getExecutorEngine(),
+ contextManager.getMetaDataContexts().getProps(),
contextManager.getMetaDataContexts().getOptimizeContextFactory()));
+ }
+ }
+
+ private Map<String, ShardingSphereMetaData> rebuildSchemaMetaData(final
String schemaName, final ShardingSphereMetaData metaData) {
+ Map<String, ShardingSphereMetaData> result = new
HashMap<>(contextManager.getMetaDataContexts().getMetaDataMap());
+ result.put(schemaName, metaData);
+
contextManager.getMetaDataContexts().getOptimizeContextFactory().getSchemaMetadatas().getSchemas().put(schemaName,
new FederateSchemaMetadata(schemaName, metaData.getSchema().getTables()));
+ return result;
+ }
+
+ private void persistSchema(final String schemaName) {
+ if (!persistService.getDataSourceService().isExisted(schemaName)) {
+ persistService.getDataSourceService().persist(schemaName, new
LinkedHashMap<>());
+ }
+ if (!persistService.getSchemaRuleService().isExisted(schemaName)) {
+ persistService.getSchemaRuleService().persist(schemaName, new
LinkedList<>());
+ }
+ }
+
+ private ShardingSphereMetaData buildMetaData(final String schemaName)
throws SQLException {
+ Map<String, Map<String, DataSource>> dataSourcesMap =
createDataSourcesMap(Collections.singletonMap(schemaName,
persistService.getDataSourceService().load(schemaName)));
+ return new MetaDataContextsBuilder(dataSourcesMap,
+ Collections.singletonMap(schemaName,
persistService.getSchemaRuleService().load(schemaName)),
+ persistService.getGlobalRuleService().load(),
+
contextManager.getMetaDataContexts().getProps().getProps()).build(persistService).getMetaData(schemaName);
+ }
+
+ private ShardingSphereMetaData getChangedMetaData(final
ShardingSphereMetaData originalMetaData, final ShardingSphereSchema schema,
final String schemaName) {
+ // TODO refresh table addressing mapper
+ return new ShardingSphereMetaData(schemaName,
originalMetaData.getResource(), originalMetaData.getRuleMetaData(), schema);
+ }
+
+ private ShardingSphereMetaData getChangedMetaData(final
ShardingSphereMetaData originalMetaData, final Collection<RuleConfiguration>
ruleConfigs) throws SQLException {
+ MetaDataContextsBuilder builder = new
MetaDataContextsBuilder(Collections.singletonMap(originalMetaData.getName(),
originalMetaData.getResource().getDataSources()),
+ Collections.singletonMap(originalMetaData.getName(),
ruleConfigs), persistService.getGlobalRuleService().load(),
contextManager.getMetaDataContexts().getProps().getProps());
+ return
builder.build(persistService).getMetaDataMap().values().iterator().next();
+ }
+
+ private ShardingSphereMetaData rebuildMetaData(final
ShardingSphereMetaData originalMetaData, final Map<String,
DataSourceConfiguration> newDataSourceConfigs) throws SQLException {
+ Collection<String> deletedDataSources =
getDeletedDataSources(originalMetaData, newDataSourceConfigs).keySet();
+ Map<String, DataSource> changedDataSources =
buildChangedDataSources(originalMetaData, newDataSourceConfigs);
+ Map<String, Map<String, DataSource>> dataSourcesMap =
Collections.singletonMap(originalMetaData.getName(),
+
getNewDataSources(originalMetaData.getResource().getDataSources(),
getAddedDataSources(originalMetaData, newDataSourceConfigs),
changedDataSources, deletedDataSources));
+ return new MetaDataContextsBuilder(dataSourcesMap,
Collections.singletonMap(originalMetaData.getName(),
+ originalMetaData.getRuleMetaData().getConfigurations()),
persistService.getGlobalRuleService().load(),
+
contextManager.getMetaDataContexts().getProps().getProps()).build(persistService).getMetaData(originalMetaData.getName());
+ }
+
+ private Map<String, DataSource> getNewDataSources(final Map<String,
DataSource> originalDataSources,
+ final Map<String,
DataSource> addedDataSources, final Map<String, DataSource> changedDataSources,
final Collection<String> deletedDataSources) {
+ Map<String, DataSource> result = new
LinkedHashMap<>(originalDataSources);
+ result.keySet().removeAll(deletedDataSources);
+ result.putAll(changedDataSources);
+ result.putAll(addedDataSources);
+ return result;
+ }
+
+ private Map<String, DataSource> getDeletedDataSources(final
ShardingSphereMetaData originalMetaData, final Map<String,
DataSourceConfiguration> newDataSourceConfigs) {
+ return
originalMetaData.getResource().getDataSources().entrySet().stream().filter(entry
-> !newDataSourceConfigs.containsKey(entry.getKey()))
+ .collect(Collectors.toMap(Entry::getKey, Entry::getValue));
+ }
+
+ private Map<String, Map<String, DataSource>> getChangedDataSources(final
Map<String, Map<String, DataSourceConfiguration>>
changedDataSourceConfigurations) {
+ Map<String, Map<String, DataSource>> result = new
LinkedHashMap<>(changedDataSourceConfigurations.size(), 1);
+ for (Entry<String, Map<String, DataSourceConfiguration>> entry :
changedDataSourceConfigurations.entrySet()) {
+ result.put(entry.getKey(), createDataSources(entry.getValue()));
+ }
+ return result;
+ }
+
+ private Map<String, DataSource> getChangedDataSources(final
ShardingSphereMetaData originalMetaData, final Map<String,
DataSourceConfiguration> newDataSourceConfigs) {
+ Collection<String> changedDataSourceNames =
getChangedDataSourceConfiguration(originalMetaData,
newDataSourceConfigs).keySet();
+ return
originalMetaData.getResource().getDataSources().entrySet().stream().filter(entry
-> changedDataSourceNames.contains(entry.getKey()))
+ .collect(Collectors.toMap(Entry::getKey, Entry::getValue));
+ }
+
+ private Map<String, DataSource> getAddedDataSources(final
ShardingSphereMetaData originalMetaData, final Map<String,
DataSourceConfiguration> newDataSourceConfigs) {
+ return
DataSourceConverter.getDataSourceMap(Maps.filterKeys(newDataSourceConfigs, each
-> !originalMetaData.getResource().getDataSources().containsKey(each)));
+ }
+
+ private Map<String, DataSourceConfiguration>
getChangedDataSourceConfiguration(final ShardingSphereMetaData originalMetaData,
+
final Map<String, DataSourceConfiguration> dataSourceConfigurations) {
+ return dataSourceConfigurations.entrySet().stream()
+ .filter(entry ->
isModifiedDataSource(originalMetaData.getResource().getDataSources(),
entry.getKey(), entry.getValue()))
+ .collect(Collectors.toMap(Entry::getKey, Entry::getValue,
(oldValue, currentValue) -> oldValue, LinkedHashMap::new));
+ }
+
+ private Map<String, DataSource> buildChangedDataSources(final
ShardingSphereMetaData originalMetaData, final Map<String,
DataSourceConfiguration> newDataSourceConfigs) {
+ return
DataSourceConverter.getDataSourceMap(getChangedDataSourceConfiguration(originalMetaData,
newDataSourceConfigs));
+ }
+
+ private boolean isModifiedDataSource(final Map<String, DataSource>
originalDataSources, final String dataSourceName, final DataSourceConfiguration
dataSourceConfiguration) {
+ DataSourceConfiguration dataSourceConfig =
DataSourceConverter.getDataSourceConfigurationMap(originalDataSources).get(dataSourceName);
+ return null != dataSourceConfig &&
!dataSourceConfiguration.equals(dataSourceConfig);
+ }
+
+ private Map<String, Map<String, DataSource>> createDataSourcesMap(final
Map<String, Map<String, DataSourceConfiguration>> dataSourcesConfigs) {
+ return
dataSourcesConfigs.entrySet().stream().collect(Collectors.toMap(Entry::getKey,
entry -> DataSourceConverter.getDataSourceMap(entry.getValue())));
+ }
+
+ private Collection<DataSource> getPendingClosedDataSources(final String
schemaName, final Map<String, DataSourceConfiguration>
dataSourceConfigurations) {
+ Collection<DataSource> result = new LinkedList<>();
+
result.addAll(getDeletedDataSources(contextManager.getMetaDataContexts().getMetaData(schemaName),
dataSourceConfigurations).values());
+
result.addAll(getChangedDataSources(contextManager.getMetaDataContexts().getMetaData(schemaName),
dataSourceConfigurations).values());
+ return result;
+ }
+
+ private void closeDataSources(final String schemaName) {
+ if (null !=
contextManager.getMetaDataContexts().getMetaData(schemaName)
+ && null !=
contextManager.getMetaDataContexts().getMetaData(schemaName).getResource()) {
+ closeDataSources(schemaName,
contextManager.getMetaDataContexts().getMetaData(schemaName).getResource().getDataSources().values());
+ }
+ }
+
+ private void closeDataSources(final String schemaName, final
Collection<DataSource> dataSources) {
+ ShardingSphereResource resource =
contextManager.getMetaDataContexts().getMetaData(schemaName).getResource();
+ dataSources.forEach(each -> closeDataSource(resource, each));
+ }
+
+ private void closeDataSource(final ShardingSphereResource resource, final
DataSource dataSource) {
+ try {
+ resource.close(dataSource);
+ // CHECKSTYLE:OFF
+ } catch (final Exception ignore) {
+ // CHECKSTYLE:ON
+ }
+ }
+
+ /**
+ * Renew transaction manager engine contexts.
+ *
+ * @param event data source change completed event
+ * @throws Exception exception
+ */
+ @Subscribe
+ public synchronized void renewTransactionContext(final
DataSourceChangeCompletedEvent event) throws Exception {
+ closeStaleEngine(event.getSchemaName());
+ Map<String, ShardingSphereTransactionManagerEngine> existedEngines =
contextManager.getTransactionContexts().getEngines();
+ existedEngines.put(event.getSchemaName(),
createNewEngine(event.getDatabaseType(), event.getDataSources()));
+ renewContexts(existedEngines);
+ }
+
+ /**
+ * Renew transaction manager engine context.
+ *
+ * @param event data source deleted event.
+ * @throws Exception exception
+ */
+ @Subscribe
+ public synchronized void renewTransactionContext(final
DataSourceDeletedEvent event) throws Exception {
+ closeStaleEngine(event.getSchemaName());
+ renewContexts(contextManager.getTransactionContexts().getEngines());
+ }
+
+ private void closeStaleEngine(final String schemaName) throws Exception {
+ ShardingSphereTransactionManagerEngine staleEngine =
contextManager.getTransactionContexts().getEngines().remove(schemaName);
+ if (null != staleEngine) {
+ staleEngine.close();
+ }
+ }
+
+ private ShardingSphereTransactionManagerEngine createNewEngine(final
DatabaseType databaseType, final Map<String, DataSource> dataSources) {
+ ShardingSphereTransactionManagerEngine result = new
ShardingSphereTransactionManagerEngine();
+ result.init(databaseType, dataSources, getTransactionRule());
+ return result;
+ }
+
+ private TransactionRule getTransactionRule() {
+ Optional<TransactionRule> transactionRule =
contextManager.getMetaDataContexts().getGlobalRuleMetaData().getRules().stream()
+ .filter(each -> each instanceof TransactionRule).map(each ->
(TransactionRule) each).findFirst();
+ return transactionRule.orElseGet(() -> new TransactionRule(new
TransactionRuleConfiguration(TransactionType.LOCAL.name(), null)));
+ }
+
private TransactionRule getTransactionRule(final MetaDataContexts
metaDataContexts) {
Optional<TransactionRule> transactionRule =
metaDataContexts.getGlobalRuleMetaData().getRules().stream().filter(
each -> each instanceof TransactionRule).map(each ->
(TransactionRule) each).findFirst();
return transactionRule.orElseGet(() -> new TransactionRule(new
TransactionRuleConfiguration(TransactionType.LOCAL.name(), null)));
}
+ private void renewContexts(final Map<String,
ShardingSphereTransactionManagerEngine> engines) {
+ contextManager.renewTransactionContexts(new
TransactionContexts(engines));
+ }
+
@Override
public String getType() {
return "Cluster";
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerTest.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilderTest.java
similarity index 76%
copy from
shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerTest.java
copy to
shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilderTest.java
index f8398e2..141a856 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerTest.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilderTest.java
@@ -17,27 +17,16 @@
package org.apache.shardingsphere.mode.manager.cluster;
+import lombok.SneakyThrows;
import
org.apache.shardingsphere.authority.api.config.AuthorityRuleConfiguration;
import org.apache.shardingsphere.authority.rule.AuthorityRule;
-import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.authority.event.AuthorityChangedEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.datasource.DataSourceChangeCompletedEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.datasource.DataSourceChangedEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.datasource.DataSourceDeletedEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.props.PropertiesChangedEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.rule.GlobalRuleConfigurationsChangedEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.rule.RuleConfigurationsChangedEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.schema.SchemaChangedEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.metadata.event.SchemaAddedEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.metadata.event.SchemaDeletedEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.state.event.DisabledStateChangedEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.governance.schema.GovernanceSchema;
import org.apache.shardingsphere.infra.config.RuleConfiguration;
import
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmConfiguration;
import
org.apache.shardingsphere.infra.config.datasource.DataSourceConfiguration;
+import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
+import
org.apache.shardingsphere.infra.config.mode.PersistRepositoryConfiguration;
import
org.apache.shardingsphere.infra.config.properties.ConfigurationProperties;
import
org.apache.shardingsphere.infra.config.properties.ConfigurationPropertyKey;
-import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
-import org.apache.shardingsphere.infra.database.DefaultSchema;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
@@ -47,8 +36,23 @@ import
org.apache.shardingsphere.infra.metadata.schema.ShardingSphereSchema;
import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser;
import org.apache.shardingsphere.infra.optimize.context.OptimizeContextFactory;
import
org.apache.shardingsphere.infra.optimize.core.metadata.FederateSchemaMetadatas;
-import org.apache.shardingsphere.mode.persist.PersistService;
import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.authority.event.AuthorityChangedEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.datasource.DataSourceChangeCompletedEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.datasource.DataSourceChangedEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.datasource.DataSourceDeletedEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.props.PropertiesChangedEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.rule.GlobalRuleConfigurationsChangedEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.rule.RuleConfigurationsChangedEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.schema.SchemaChangedEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.metadata.event.SchemaAddedEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.metadata.event.SchemaDeletedEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.state.event.DisabledStateChangedEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.governance.schema.GovernanceSchema;
+import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
+import org.apache.shardingsphere.mode.persist.PersistService;
+import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
import org.apache.shardingsphere.test.mock.MockedDataSource;
import
org.apache.shardingsphere.transaction.ShardingSphereTransactionManagerEngine;
import org.apache.shardingsphere.transaction.context.TransactionContexts;
@@ -57,11 +61,13 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Answers;
import org.mockito.Mock;
+import org.mockito.internal.util.reflection.FieldSetter;
import org.mockito.junit.MockitoJUnitRunner;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map;
@@ -83,67 +89,38 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
-public final class ClusterContextManagerTest {
+public final class ClusterContextManagerBuilderTest {
- private final ConfigurationProperties props = new
ConfigurationProperties(new Properties());
+ private ClusterContextManagerBuilder builder;
- @Mock(answer = Answers.RETURNS_DEEP_STUBS)
- private PersistService persistService;
+ private ContextManager contextManager;
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
- private RegistryCenter registryCenter;
+ private PersistService persistService;
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private ShardingSphereMetaData metaData;
- private ClusterContextManager clusterContextManager;
-
- @Mock
- private ShardingSphereRuleMetaData globalRuleMetaData;
-
@Mock
private ShardingSphereTransactionManagerEngine engine;
@Mock
private Map<String, ShardingSphereTransactionManagerEngine> engines;
+ @Mock
+ private ShardingSphereRuleMetaData globalRuleMetaData;
+
+ @SneakyThrows
@Before
public void setUp() {
- clusterContextManager = new ClusterContextManager(persistService,
registryCenter);
- clusterContextManager.init(
- new MetaDataContexts(mock(PersistService.class),
createMetaDataMap(), globalRuleMetaData, mock(ExecutorEngine.class), props,
mockOptimizeContextFactory()),
- mock(TransactionContexts.class, RETURNS_DEEP_STUBS));
- }
-
- private Map<String, ShardingSphereMetaData> createMetaDataMap() {
- when(metaData.getName()).thenReturn("schema");
- ShardingSphereResource resource = mock(ShardingSphereResource.class);
- when(metaData.getResource()).thenReturn(resource);
-
when(metaData.getSchema()).thenReturn(mock(ShardingSphereSchema.class));
-
when(metaData.getRuleMetaData().getRules()).thenReturn(Collections.emptyList());
-
when(metaData.getRuleMetaData().getConfigurations()).thenReturn(Collections.emptyList());
- return Collections.singletonMap("schema", metaData);
- }
-
- private OptimizeContextFactory mockOptimizeContextFactory() {
- OptimizeContextFactory result = mock(OptimizeContextFactory.class);
- when(result.getSchemaMetadatas()).thenReturn(new
FederateSchemaMetadatas(Collections.emptyMap()));
- return result;
- }
-
- @Test
- public void assertGetMetaData() {
-
assertThat(clusterContextManager.getMetaDataContexts().getMetaData("schema"),
is(metaData));
- }
-
- @Test
- public void assertGetDefaultMetaData() {
-
assertNull(clusterContextManager.getMetaDataContexts().getMetaData(DefaultSchema.LOGIC_NAME));
- }
-
- @Test
- public void assertGetProps() {
- assertThat(clusterContextManager.getMetaDataContexts().getProps(),
is(props));
+ PersistRepositoryConfiguration persistRepositoryConfiguration = new
ClusterPersistRepositoryConfiguration("TEST", "", "", new Properties());
+ ModeConfiguration configuration = new ModeConfiguration("Cluster",
persistRepositoryConfiguration, false);
+ builder = new ClusterContextManagerBuilder();
+ contextManager = builder.build(configuration, new HashMap<>(), new
HashMap<>(), new LinkedList<>(), new Properties(), false);
+ FieldSetter.setField(builder,
builder.getClass().getDeclaredField("persistService"), persistService);
+ contextManager.renewMetaDataContexts(new
MetaDataContexts(mock(PersistService.class), createMetaDataMap(),
globalRuleMetaData, mock(ExecutorEngine.class),
+ new ConfigurationProperties(new Properties()),
mockOptimizeContextFactory()));
+
contextManager.renewTransactionContexts(mock(TransactionContexts.class,
RETURNS_DEEP_STUBS));
}
@Test
@@ -151,9 +128,9 @@ public final class ClusterContextManagerTest {
SchemaAddedEvent event = new SchemaAddedEvent("schema_add");
when(persistService.getDataSourceService().load("schema_add")).thenReturn(getDataSourceConfigurations());
when(persistService.getSchemaRuleService().load("schema_add")).thenReturn(Collections.emptyList());
- clusterContextManager.renew(event);
-
assertNotNull(clusterContextManager.getMetaDataContexts().getMetaData("schema_add"));
-
assertNotNull(clusterContextManager.getMetaDataContexts().getMetaData("schema_add").getResource().getDataSources());
+ builder.renew(event);
+
assertNotNull(contextManager.getMetaDataContexts().getMetaData("schema_add"));
+
assertNotNull(contextManager.getMetaDataContexts().getMetaData("schema_add").getResource().getDataSources());
}
private Map<String, DataSourceConfiguration> getDataSourceConfigurations()
{
@@ -168,8 +145,8 @@ public final class ClusterContextManagerTest {
@Test
public void assertSchemaDelete() {
SchemaDeletedEvent event = new SchemaDeletedEvent("schema");
- clusterContextManager.renew(event);
-
assertNull(clusterContextManager.getMetaDataContexts().getMetaData("schema"));
+ builder.renew(event);
+ assertNull(contextManager.getMetaDataContexts().getMetaData("schema"));
}
@Test
@@ -177,44 +154,44 @@ public final class ClusterContextManagerTest {
Properties properties = new Properties();
properties.setProperty(ConfigurationPropertyKey.SQL_SHOW.getKey(),
"true");
PropertiesChangedEvent event = new PropertiesChangedEvent(properties);
- clusterContextManager.renew(event);
-
assertThat(clusterContextManager.getMetaDataContexts().getProps().getProps().getProperty(ConfigurationPropertyKey.SQL_SHOW.getKey()),
is("true"));
+ builder.renew(event);
+
assertThat(contextManager.getMetaDataContexts().getProps().getProps().getProperty(ConfigurationPropertyKey.SQL_SHOW.getKey()),
is("true"));
}
@Test
public void assertSchemaChanged() {
SchemaChangedEvent event = new SchemaChangedEvent("schema_changed",
mock(ShardingSphereSchema.class));
- clusterContextManager.renew(event);
-
assertTrue(clusterContextManager.getMetaDataContexts().getAllSchemaNames().contains("schema"));
-
assertFalse(clusterContextManager.getMetaDataContexts().getAllSchemaNames().contains("schema_changed"));
+ builder.renew(event);
+
assertTrue(contextManager.getMetaDataContexts().getAllSchemaNames().contains("schema"));
+
assertFalse(contextManager.getMetaDataContexts().getAllSchemaNames().contains("schema_changed"));
}
@Test
public void assertSchemaChangedWithExistSchema() {
SchemaChangedEvent event = new SchemaChangedEvent("schema",
mock(ShardingSphereSchema.class));
- clusterContextManager.renew(event);
-
assertThat(clusterContextManager.getMetaDataContexts().getMetaData("schema"),
not(metaData));
+ builder.renew(event);
+ assertThat(contextManager.getMetaDataContexts().getMetaData("schema"),
not(metaData));
}
@Test
public void assertRuleConfigurationsChanged() throws SQLException {
-
assertThat(clusterContextManager.getMetaDataContexts().getMetaData("schema"),
is(metaData));
+ assertThat(contextManager.getMetaDataContexts().getMetaData("schema"),
is(metaData));
RuleConfigurationsChangedEvent event = new
RuleConfigurationsChangedEvent("schema", new LinkedList<>());
- clusterContextManager.renew(event);
-
assertThat(clusterContextManager.getMetaDataContexts().getMetaData("schema"),
not(metaData));
+ builder.renew(event);
+ assertThat(contextManager.getMetaDataContexts().getMetaData("schema"),
not(metaData));
}
@Test
public void assertDisableStateChanged() {
DisabledStateChangedEvent event = new DisabledStateChangedEvent(new
GovernanceSchema("schema.ds_0"), true);
- clusterContextManager.renew(event);
+ builder.renew(event);
}
@Test
public void assertDataSourceChanged() throws SQLException {
DataSourceChangedEvent event = new DataSourceChangedEvent("schema",
getChangedDataSourceConfigurations());
- clusterContextManager.renew(event);
-
assertTrue(clusterContextManager.getMetaDataContexts().getMetaData("schema").getResource().getDataSources().containsKey("ds_2"));
+ builder.renew(event);
+
assertTrue(contextManager.getMetaDataContexts().getMetaData("schema").getResource().getDataSources().containsKey("ds_2"));
}
private Map<String, DataSourceConfiguration>
getChangedDataSourceConfigurations() {
@@ -229,8 +206,8 @@ public final class ClusterContextManagerTest {
@Test
public void assertGlobalRuleConfigurationsChanged() {
GlobalRuleConfigurationsChangedEvent event = new
GlobalRuleConfigurationsChangedEvent(getChangedGlobalRuleConfigurations());
- clusterContextManager.renew(event);
-
assertThat(clusterContextManager.getMetaDataContexts().getGlobalRuleMetaData(),
not(globalRuleMetaData));
+ builder.renew(event);
+
assertThat(contextManager.getMetaDataContexts().getGlobalRuleMetaData(),
not(globalRuleMetaData));
}
private Collection<RuleConfiguration> getChangedGlobalRuleConfigurations()
{
@@ -247,10 +224,10 @@ public final class ClusterContextManagerTest {
@Test
public void assertAuthorityChanged() {
-
when(clusterContextManager.getMetaDataContexts().getGlobalRuleMetaData().getRules()).thenReturn(createAuthorityRule());
+
when(contextManager.getMetaDataContexts().getGlobalRuleMetaData().getRules()).thenReturn(createAuthorityRule());
AuthorityChangedEvent event = new
AuthorityChangedEvent(getShardingSphereUsers());
- clusterContextManager.renew(event);
- Optional<AuthorityRule> authorityRule =
clusterContextManager.getMetaDataContexts().getGlobalRuleMetaData().getRules()
+ builder.renew(event);
+ Optional<AuthorityRule> authorityRule =
contextManager.getMetaDataContexts().getGlobalRuleMetaData().getRules()
.stream().filter(each -> each instanceof
AuthorityRule).findAny().map(each -> (AuthorityRule) each);
assertTrue(authorityRule.isPresent());
assertNotNull(authorityRule.get().findUser(new
ShardingSphereUser("root", "root", "%").getGrantee()));
@@ -258,16 +235,16 @@ public final class ClusterContextManagerTest {
private Collection<ShardingSphereRule> createAuthorityRule() {
AuthorityRuleConfiguration ruleConfig = new
AuthorityRuleConfiguration(Collections.emptyList(), new
ShardingSphereAlgorithmConfiguration("ALL_PRIVILEGES_PERMITTED", new
Properties()));
- AuthorityRule authorityRule = new AuthorityRule(ruleConfig,
clusterContextManager.getMetaDataContexts().getMetaDataMap(),
Collections.emptyList());
+ AuthorityRule authorityRule = new AuthorityRule(ruleConfig,
contextManager.getMetaDataContexts().getMetaDataMap(), Collections.emptyList());
return Collections.singleton(authorityRule);
}
@Test
public void assertRenewWithDataSourceChangeCompletedEvent() throws
Exception {
DataSourceChangeCompletedEvent event = new
DataSourceChangeCompletedEvent("name", mock(DatabaseType.class),
Collections.emptyMap());
-
when(clusterContextManager.getTransactionContexts().getEngines()).thenReturn(engines);
+
when(contextManager.getTransactionContexts().getEngines()).thenReturn(engines);
when(engines.remove("name")).thenReturn(engine);
- clusterContextManager.renewTransactionContext(event);
+ builder.renewTransactionContext(event);
verify(engine).close();
verify(engines).put(eq("name"),
any(ShardingSphereTransactionManagerEngine.class));
}
@@ -275,9 +252,25 @@ public final class ClusterContextManagerTest {
@Test
public void assertRenewWithDataSourceDeletedEvent() throws Exception {
DataSourceDeletedEvent event = new DataSourceDeletedEvent("name");
-
when(clusterContextManager.getTransactionContexts().getEngines()).thenReturn(engines);
+
when(contextManager.getTransactionContexts().getEngines()).thenReturn(engines);
when(engines.remove("name")).thenReturn(engine);
- clusterContextManager.renewTransactionContext(event);
+ builder.renewTransactionContext(event);
verify(engine).close();
}
+
+ private Map<String, ShardingSphereMetaData> createMetaDataMap() {
+ when(metaData.getName()).thenReturn("schema");
+ ShardingSphereResource resource = mock(ShardingSphereResource.class);
+ when(metaData.getResource()).thenReturn(resource);
+
when(metaData.getSchema()).thenReturn(mock(ShardingSphereSchema.class));
+
when(metaData.getRuleMetaData().getRules()).thenReturn(Collections.emptyList());
+
when(metaData.getRuleMetaData().getConfigurations()).thenReturn(Collections.emptyList());
+ return Collections.singletonMap("schema", metaData);
+ }
+
+ private OptimizeContextFactory mockOptimizeContextFactory() {
+ OptimizeContextFactory result = mock(OptimizeContextFactory.class);
+ when(result.getSchemaMetadatas()).thenReturn(new
FederateSchemaMetadatas(Collections.emptyMap()));
+ return result;
+ }
}
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerTest.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerTest.java
index f8398e2..caba7be 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerTest.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerTest.java
@@ -17,40 +17,15 @@
package org.apache.shardingsphere.mode.manager.cluster;
-import
org.apache.shardingsphere.authority.api.config.AuthorityRuleConfiguration;
-import org.apache.shardingsphere.authority.rule.AuthorityRule;
-import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.authority.event.AuthorityChangedEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.datasource.DataSourceChangeCompletedEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.datasource.DataSourceChangedEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.datasource.DataSourceDeletedEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.props.PropertiesChangedEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.rule.GlobalRuleConfigurationsChangedEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.rule.RuleConfigurationsChangedEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.schema.SchemaChangedEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.metadata.event.SchemaAddedEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.metadata.event.SchemaDeletedEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.state.event.DisabledStateChangedEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.governance.schema.GovernanceSchema;
-import org.apache.shardingsphere.infra.config.RuleConfiguration;
-import
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmConfiguration;
-import
org.apache.shardingsphere.infra.config.datasource.DataSourceConfiguration;
import
org.apache.shardingsphere.infra.config.properties.ConfigurationProperties;
-import
org.apache.shardingsphere.infra.config.properties.ConfigurationPropertyKey;
-import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.infra.database.DefaultSchema;
-import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
-import
org.apache.shardingsphere.infra.metadata.resource.ShardingSphereResource;
import
org.apache.shardingsphere.infra.metadata.rule.ShardingSphereRuleMetaData;
import org.apache.shardingsphere.infra.metadata.schema.ShardingSphereSchema;
-import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser;
import org.apache.shardingsphere.infra.optimize.context.OptimizeContextFactory;
-import
org.apache.shardingsphere.infra.optimize.core.metadata.FederateSchemaMetadatas;
+import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.mode.persist.PersistService;
-import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
-import org.apache.shardingsphere.test.mock.MockedDataSource;
-import
org.apache.shardingsphere.transaction.ShardingSphereTransactionManagerEngine;
import org.apache.shardingsphere.transaction.context.TransactionContexts;
import org.junit.Before;
import org.junit.Test;
@@ -59,27 +34,15 @@ import org.mockito.Answers;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
-import java.sql.SQLException;
-import java.util.Collection;
import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
import java.util.Map;
-import java.util.Optional;
import java.util.Properties;
import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.CoreMatchers.not;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
@@ -101,12 +64,6 @@ public final class ClusterContextManagerTest {
@Mock
private ShardingSphereRuleMetaData globalRuleMetaData;
- @Mock
- private ShardingSphereTransactionManagerEngine engine;
-
- @Mock
- private Map<String, ShardingSphereTransactionManagerEngine> engines;
-
@Before
public void setUp() {
clusterContextManager = new ClusterContextManager(persistService,
registryCenter);
@@ -116,18 +73,13 @@ public final class ClusterContextManagerTest {
}
private Map<String, ShardingSphereMetaData> createMetaDataMap() {
- when(metaData.getName()).thenReturn("schema");
- ShardingSphereResource resource = mock(ShardingSphereResource.class);
- when(metaData.getResource()).thenReturn(resource);
when(metaData.getSchema()).thenReturn(mock(ShardingSphereSchema.class));
when(metaData.getRuleMetaData().getRules()).thenReturn(Collections.emptyList());
-
when(metaData.getRuleMetaData().getConfigurations()).thenReturn(Collections.emptyList());
return Collections.singletonMap("schema", metaData);
}
private OptimizeContextFactory mockOptimizeContextFactory() {
OptimizeContextFactory result = mock(OptimizeContextFactory.class);
- when(result.getSchemaMetadatas()).thenReturn(new
FederateSchemaMetadatas(Collections.emptyMap()));
return result;
}
@@ -145,139 +97,4 @@ public final class ClusterContextManagerTest {
public void assertGetProps() {
assertThat(clusterContextManager.getMetaDataContexts().getProps(),
is(props));
}
-
- @Test
- public void assertSchemaAdd() throws SQLException {
- SchemaAddedEvent event = new SchemaAddedEvent("schema_add");
-
when(persistService.getDataSourceService().load("schema_add")).thenReturn(getDataSourceConfigurations());
-
when(persistService.getSchemaRuleService().load("schema_add")).thenReturn(Collections.emptyList());
- clusterContextManager.renew(event);
-
assertNotNull(clusterContextManager.getMetaDataContexts().getMetaData("schema_add"));
-
assertNotNull(clusterContextManager.getMetaDataContexts().getMetaData("schema_add").getResource().getDataSources());
- }
-
- private Map<String, DataSourceConfiguration> getDataSourceConfigurations()
{
- MockedDataSource dataSource = new MockedDataSource();
- Map<String, DataSourceConfiguration> result = new LinkedHashMap<>(3,
1);
- result.put("primary_ds",
DataSourceConfiguration.getDataSourceConfiguration(dataSource));
- result.put("ds_0",
DataSourceConfiguration.getDataSourceConfiguration(dataSource));
- result.put("ds_1",
DataSourceConfiguration.getDataSourceConfiguration(dataSource));
- return result;
- }
-
- @Test
- public void assertSchemaDelete() {
- SchemaDeletedEvent event = new SchemaDeletedEvent("schema");
- clusterContextManager.renew(event);
-
assertNull(clusterContextManager.getMetaDataContexts().getMetaData("schema"));
- }
-
- @Test
- public void assertPropertiesChanged() {
- Properties properties = new Properties();
- properties.setProperty(ConfigurationPropertyKey.SQL_SHOW.getKey(),
"true");
- PropertiesChangedEvent event = new PropertiesChangedEvent(properties);
- clusterContextManager.renew(event);
-
assertThat(clusterContextManager.getMetaDataContexts().getProps().getProps().getProperty(ConfigurationPropertyKey.SQL_SHOW.getKey()),
is("true"));
- }
-
- @Test
- public void assertSchemaChanged() {
- SchemaChangedEvent event = new SchemaChangedEvent("schema_changed",
mock(ShardingSphereSchema.class));
- clusterContextManager.renew(event);
-
assertTrue(clusterContextManager.getMetaDataContexts().getAllSchemaNames().contains("schema"));
-
assertFalse(clusterContextManager.getMetaDataContexts().getAllSchemaNames().contains("schema_changed"));
- }
-
- @Test
- public void assertSchemaChangedWithExistSchema() {
- SchemaChangedEvent event = new SchemaChangedEvent("schema",
mock(ShardingSphereSchema.class));
- clusterContextManager.renew(event);
-
assertThat(clusterContextManager.getMetaDataContexts().getMetaData("schema"),
not(metaData));
- }
-
- @Test
- public void assertRuleConfigurationsChanged() throws SQLException {
-
assertThat(clusterContextManager.getMetaDataContexts().getMetaData("schema"),
is(metaData));
- RuleConfigurationsChangedEvent event = new
RuleConfigurationsChangedEvent("schema", new LinkedList<>());
- clusterContextManager.renew(event);
-
assertThat(clusterContextManager.getMetaDataContexts().getMetaData("schema"),
not(metaData));
- }
-
- @Test
- public void assertDisableStateChanged() {
- DisabledStateChangedEvent event = new DisabledStateChangedEvent(new
GovernanceSchema("schema.ds_0"), true);
- clusterContextManager.renew(event);
- }
-
- @Test
- public void assertDataSourceChanged() throws SQLException {
- DataSourceChangedEvent event = new DataSourceChangedEvent("schema",
getChangedDataSourceConfigurations());
- clusterContextManager.renew(event);
-
assertTrue(clusterContextManager.getMetaDataContexts().getMetaData("schema").getResource().getDataSources().containsKey("ds_2"));
- }
-
- private Map<String, DataSourceConfiguration>
getChangedDataSourceConfigurations() {
- MockedDataSource dataSource = new MockedDataSource();
- Map<String, DataSourceConfiguration> result = new LinkedHashMap<>(3,
1);
- result.put("primary_ds",
DataSourceConfiguration.getDataSourceConfiguration(dataSource));
- result.put("ds_1",
DataSourceConfiguration.getDataSourceConfiguration(dataSource));
- result.put("ds_2",
DataSourceConfiguration.getDataSourceConfiguration(dataSource));
- return result;
- }
-
- @Test
- public void assertGlobalRuleConfigurationsChanged() {
- GlobalRuleConfigurationsChangedEvent event = new
GlobalRuleConfigurationsChangedEvent(getChangedGlobalRuleConfigurations());
- clusterContextManager.renew(event);
-
assertThat(clusterContextManager.getMetaDataContexts().getGlobalRuleMetaData(),
not(globalRuleMetaData));
- }
-
- private Collection<RuleConfiguration> getChangedGlobalRuleConfigurations()
{
- RuleConfiguration authorityRuleConfig = new
AuthorityRuleConfiguration(getShardingSphereUsers(), new
ShardingSphereAlgorithmConfiguration("NATIVE", new Properties()));
- return Collections.singleton(authorityRuleConfig);
- }
-
- private Collection<ShardingSphereUser> getShardingSphereUsers() {
- Collection<ShardingSphereUser> result = new LinkedList<>();
- result.add(new ShardingSphereUser("root", "root", "%"));
- result.add(new ShardingSphereUser("sharding", "sharding",
"localhost"));
- return result;
- }
-
- @Test
- public void assertAuthorityChanged() {
-
when(clusterContextManager.getMetaDataContexts().getGlobalRuleMetaData().getRules()).thenReturn(createAuthorityRule());
- AuthorityChangedEvent event = new
AuthorityChangedEvent(getShardingSphereUsers());
- clusterContextManager.renew(event);
- Optional<AuthorityRule> authorityRule =
clusterContextManager.getMetaDataContexts().getGlobalRuleMetaData().getRules()
- .stream().filter(each -> each instanceof
AuthorityRule).findAny().map(each -> (AuthorityRule) each);
- assertTrue(authorityRule.isPresent());
- assertNotNull(authorityRule.get().findUser(new
ShardingSphereUser("root", "root", "%").getGrantee()));
- }
-
- private Collection<ShardingSphereRule> createAuthorityRule() {
- AuthorityRuleConfiguration ruleConfig = new
AuthorityRuleConfiguration(Collections.emptyList(), new
ShardingSphereAlgorithmConfiguration("ALL_PRIVILEGES_PERMITTED", new
Properties()));
- AuthorityRule authorityRule = new AuthorityRule(ruleConfig,
clusterContextManager.getMetaDataContexts().getMetaDataMap(),
Collections.emptyList());
- return Collections.singleton(authorityRule);
- }
-
- @Test
- public void assertRenewWithDataSourceChangeCompletedEvent() throws
Exception {
- DataSourceChangeCompletedEvent event = new
DataSourceChangeCompletedEvent("name", mock(DatabaseType.class),
Collections.emptyMap());
-
when(clusterContextManager.getTransactionContexts().getEngines()).thenReturn(engines);
- when(engines.remove("name")).thenReturn(engine);
- clusterContextManager.renewTransactionContext(event);
- verify(engine).close();
- verify(engines).put(eq("name"),
any(ShardingSphereTransactionManagerEngine.class));
- }
-
- @Test
- public void assertRenewWithDataSourceDeletedEvent() throws Exception {
- DataSourceDeletedEvent event = new DataSourceDeletedEvent("name");
-
when(clusterContextManager.getTransactionContexts().getEngines()).thenReturn(engines);
- when(engines.remove("name")).thenReturn(engine);
- clusterContextManager.renewTransactionContext(event);
- verify(engine).close();
- }
}