This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 36d859e Add active version changed listener (#15504)
36d859e is described below
commit 36d859e73b30d915f1c487f8f624cc1b73b745da
Author: Haoran Meng <[email protected]>
AuthorDate: Fri Feb 18 20:21:14 2022 +0800
Add active version changed listener (#15504)
---
.../mode/manager/ContextManager.java | 47 ++++++++++++++++++----
.../persist/service/SchemaBasedPersistService.java | 9 +++++
.../service/impl/DataSourcePersistService.java | 6 +++
.../service/impl/SchemaRulePersistService.java | 7 ++++
.../ClusterContextManagerCoordinator.java | 16 ++++++++
.../event/version/SchemaVersionChangedEvent.java | 34 ++++++++++++++++
.../metadata/watcher/MetaDataChangedWatcher.java | 4 ++
7 files changed, 116 insertions(+), 7 deletions(-)
diff --git
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
index 482ce4a..4807cd9 100644
---
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
+++
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
@@ -275,19 +275,29 @@ public final class ContextManager implements
AutoCloseable {
public void alterDataSourceConfiguration(final String schemaName, final
Map<String, DataSourceProperties> dataSourcePropsMap) {
try {
MetaDataContexts changedMetaDataContext =
buildChangedMetaDataContextWithChangedDataSource(metaDataContexts.getMetaDataMap().get(schemaName),
dataSourcePropsMap);
-
metaDataContexts.getOptimizerContext().getFederationMetaData().getSchemas().putAll(changedMetaDataContext.getOptimizerContext().getFederationMetaData().getSchemas());
- Map<String, ShardingSphereMetaData> metaDataMap = new
HashMap<>(metaDataContexts.getMetaDataMap());
- metaDataMap.putAll(changedMetaDataContext.getMetaDataMap());
- Collection<DataSource> pendingClosedDataSources =
getPendingClosedDataSources(schemaName, dataSourcePropsMap);
- renewMetaDataContexts(rebuildMetaDataContexts(metaDataMap));
- renewTransactionContext(schemaName,
metaDataContexts.getMetaData(schemaName).getResource());
- closeDataSources(schemaName, pendingClosedDataSources);
+ refreshMetaDataContext(schemaName, changedMetaDataContext,
dataSourcePropsMap);
} catch (final SQLException ex) {
log.error("Alter schema:{} data source configuration failed",
schemaName, ex);
}
}
/**
+ * Alter data source and rule configuration.
+ *
+ * @param schemaName schema name
+ * @param dataSourcePropsMap data source props map
+ * @param ruleConfigs rule configurations
+ */
+ public void alterDataSourceAndRuleConfiguration(final String schemaName,
final Map<String, DataSourceProperties> dataSourcePropsMap, final
Collection<RuleConfiguration> ruleConfigs) {
+ try {
+ MetaDataContexts changedMetaDataContext =
buildChangedMetaDataContextWithChangedDataSourceAndRule(metaDataContexts.getMetaDataMap().get(schemaName),
dataSourcePropsMap, ruleConfigs);
+ refreshMetaDataContext(schemaName, changedMetaDataContext,
dataSourcePropsMap);
+ } catch (SQLException ex) {
+ log.error("Alter schema:{} data source and rule configuration
failed", schemaName, ex);
+ }
+ }
+
+ /**
* Alter global rule configuration.
*
* @param ruleConfigurations global rule configuration
@@ -428,6 +438,16 @@ public final class ContextManager implements AutoCloseable
{
renewTransactionContext(schemaName,
metaDataContexts.getMetaData(schemaName).getResource());
}
+ private void refreshMetaDataContext(final String schemaName, final
MetaDataContexts changedMetaDataContext, final Map<String,
DataSourceProperties> dataSourcePropsMap) {
+
metaDataContexts.getOptimizerContext().getFederationMetaData().getSchemas().putAll(changedMetaDataContext.getOptimizerContext().getFederationMetaData().getSchemas());
+ Map<String, ShardingSphereMetaData> metaDataMap = new
HashMap<>(metaDataContexts.getMetaDataMap());
+ metaDataMap.putAll(changedMetaDataContext.getMetaDataMap());
+ Collection<DataSource> pendingClosedDataSources =
getPendingClosedDataSources(schemaName, dataSourcePropsMap);
+ renewMetaDataContexts(rebuildMetaDataContexts(metaDataMap));
+ renewTransactionContext(schemaName,
metaDataContexts.getMetaData(schemaName).getResource());
+ closeDataSources(schemaName, pendingClosedDataSources);
+ }
+
private MetaDataContexts
buildChangedMetaDataContextWithAddedDataSource(final ShardingSphereMetaData
originalMetaData,
final Map<String, DataSourceProperties> addedDataSourceProps) throws
SQLException {
Map<String, DataSource> dataSourceMap = new
HashMap<>(originalMetaData.getResource().getDataSources());
@@ -463,6 +483,19 @@ public final class ContextManager implements AutoCloseable
{
return
metaDataContextsBuilder.build(metaDataContexts.getMetaDataPersistService().orElse(null));
}
+ private MetaDataContexts
buildChangedMetaDataContextWithChangedDataSourceAndRule(final
ShardingSphereMetaData originalMetaData, final Map<String,
DataSourceProperties> newDataSourceProps,
+
final Collection<RuleConfiguration> ruleConfigs) throws SQLException {
+ Collection<String> deletedDataSources =
getDeletedDataSources(originalMetaData, newDataSourceProps).keySet();
+ Map<String, DataSource> changedDataSources =
buildChangedDataSources(originalMetaData, newDataSourceProps);
+ Properties props = metaDataContexts.getProps().getProps();
+ MetaDataContextsBuilder metaDataContextsBuilder = new
MetaDataContextsBuilder(metaDataContexts.getGlobalRuleMetaData().getConfigurations(),
props);
+ metaDataContextsBuilder.addSchema(originalMetaData.getName(), new
DataSourceProvidedSchemaConfiguration(getNewDataSources(originalMetaData.getResource().getDataSources(),
+ getAddedDataSources(originalMetaData, newDataSourceProps),
changedDataSources, deletedDataSources), ruleConfigs), props);
+ metaDataContexts.getMetaDataPersistService().ifPresent(
+ optional ->
optional.getSchemaMetaDataService().persist(originalMetaData.getName(),
metaDataContextsBuilder.getSchemaMap().get(originalMetaData.getName())));
+ return
metaDataContextsBuilder.build(metaDataContexts.getMetaDataPersistService().orElse(null));
+ }
+
private Map<String, DataSource> getNewDataSources(final Map<String,
DataSource> originalDataSources,
final Map<String,
DataSource> addedDataSources, final Map<String, DataSource> changedDataSources,
final Collection<String> deletedDataSources) {
Map<String, DataSource> result = new
LinkedHashMap<>(originalDataSources);
diff --git
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/SchemaBasedPersistService.java
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/SchemaBasedPersistService.java
index 9fc8617..cb9e852 100644
---
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/SchemaBasedPersistService.java
+++
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/SchemaBasedPersistService.java
@@ -59,6 +59,15 @@ public interface SchemaBasedPersistService<T> {
T load(String schemaName);
/**
+ * Load configurations based version.
+ *
+ * @param schemaName schema name
+ * @param version version
+ * @return configurations
+ */
+ T load(String schemaName, String version);
+
+ /**
* Judge whether schema configuration existed.
*
* @param schemaName schema name
diff --git
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/impl/DataSourcePersistService.java
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/impl/DataSourcePersistService.java
index 7d229c1..0e191db 100644
---
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/impl/DataSourcePersistService.java
+++
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/impl/DataSourcePersistService.java
@@ -72,6 +72,12 @@ public final class DataSourcePersistService implements
SchemaBasedPersistService
return isExisted(schemaName) ?
getDataSourceProperties(repository.get(SchemaMetaDataNode.getMetaDataDataSourcePath(schemaName,
getSchemaActiveVersion(schemaName)))) : new LinkedHashMap<>();
}
+ @Override
+ public Map<String, DataSourceProperties> load(final String schemaName,
final String version) {
+ String yamlContent =
repository.get(SchemaMetaDataNode.getMetaDataDataSourcePath(schemaName,
version));
+ return Strings.isNullOrEmpty(yamlContent) ? new LinkedHashMap<>() :
getDataSourceProperties(yamlContent);
+ }
+
@SuppressWarnings("unchecked")
private Map<String, DataSourceProperties> getDataSourceProperties(final
String yamlContent) {
Map<String, Map<String, Object>> yamlDataSources =
YamlEngine.unmarshal(yamlContent, Map.class);
diff --git
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/impl/SchemaRulePersistService.java
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/impl/SchemaRulePersistService.java
index ae862c6..e8ae771 100644
---
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/impl/SchemaRulePersistService.java
+++
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/impl/SchemaRulePersistService.java
@@ -75,6 +75,13 @@ public final class SchemaRulePersistService implements
SchemaBasedPersistService
}
@Override
+ public Collection<RuleConfiguration> load(final String schemaName, final
String version) {
+ String yamlContent =
repository.get(SchemaMetaDataNode.getRulePath(schemaName, version));
+ return Strings.isNullOrEmpty(yamlContent) ? new LinkedList<>() : new
YamlRuleConfigurationSwapperEngine().swapToRuleConfigurations(YamlEngine.unmarshal(repository.get(SchemaMetaDataNode
+ .getRulePath(schemaName, getSchemaActiveVersion(schemaName))),
Collection.class, true));
+ }
+
+ @Override
public boolean isExisted(final String schemaName) {
return !Strings.isNullOrEmpty(getSchemaActiveVersion(schemaName)) &&
!Strings.isNullOrEmpty(repository.get(SchemaMetaDataNode.getRulePath(schemaName,
getSchemaActiveVersion(schemaName))));
}
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
index e7f4b83..c403d1b 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
@@ -19,6 +19,8 @@ package
org.apache.shardingsphere.mode.manager.cluster.coordinator;
import com.google.common.eventbus.Subscribe;
import org.apache.shardingsphere.authority.rule.AuthorityRule;
+import org.apache.shardingsphere.infra.config.RuleConfiguration;
+import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
import org.apache.shardingsphere.infra.metadata.schema.QualifiedSchema;
import
org.apache.shardingsphere.infra.rule.event.impl.DataSourceNameDisabledEvent;
@@ -32,6 +34,7 @@ import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.confi
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.GlobalRuleConfigurationsChangedEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.RuleConfigurationsChangedEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.schema.SchemaChangedEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.version.SchemaVersionChangedEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.SchemaAddedEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.SchemaDeletedEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsEvent;
@@ -46,6 +49,7 @@ import java.sql.SQLException;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.LinkedList;
+import java.util.Map;
import java.util.Optional;
/**
@@ -223,6 +227,18 @@ public final class ClusterContextManagerCoordinator {
}
}
+ /**
+ * Renew with new schema version.
+ *
+ * @param event schema version changed event
+ */
+ @Subscribe
+ public synchronized void renew(final SchemaVersionChangedEvent event) {
+ Map<String, DataSourceProperties> dataSourcePropertiesMap =
metaDataPersistService.getDataSourceService().load(event.getSchemaName(),
event.getActiveVersion());
+ Collection<RuleConfiguration> ruleConfigs =
metaDataPersistService.getSchemaRuleService().load(event.getSchemaName(),
event.getActiveVersion());
+
contextManager.alterDataSourceAndRuleConfiguration(event.getSchemaName(),
dataSourcePropertiesMap, ruleConfigs);
+ }
+
private void persistSchema(final String schemaName) {
if
(!metaDataPersistService.getDataSourceService().isExisted(schemaName)) {
metaDataPersistService.getDataSourceService().persist(schemaName,
new LinkedHashMap<>());
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/config/event/version/SchemaVersionChangedEvent.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/config/event/version/SchemaVersionChangedEvent.java
new file mode 100644
index 0000000..cb728ac
--- /dev/null
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/config/event/version/SchemaVersionChangedEvent.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.version;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
+
+/**
+ * Schema version changed event.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class SchemaVersionChangedEvent implements GovernanceEvent {
+
+ private final String schemaName;
+
+ private final String activeVersion;
+}
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/watcher/MetaDataChangedWatcher.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/watcher/MetaDataChangedWatcher.java
index e06993f..5580d74 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/watcher/MetaDataChangedWatcher.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/watcher/MetaDataChangedWatcher.java
@@ -31,6 +31,7 @@ import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.Gover
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.datasource.DataSourceChangedEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.RuleConfigurationsChangedEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.schema.SchemaChangedEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.version.SchemaVersionChangedEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.SchemaAddedEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.SchemaDeletedEvent;
import org.apache.shardingsphere.mode.metadata.persist.node.SchemaMetaDataNode;
@@ -99,6 +100,9 @@ public final class MetaDataChangedWatcher implements
GovernanceWatcher<Governanc
if (!schemaName.isPresent() ||
Strings.isNullOrEmpty(event.getValue())) {
return Optional.empty();
}
+ if
(event.getKey().equals(SchemaMetaDataNode.getActiveVersionPath(schemaName.get())))
{
+ return Optional.of(new SchemaVersionChangedEvent(schemaName.get(),
event.getValue()));
+ }
Optional<String> schemaVersion =
SchemaMetaDataNode.getVersionByDataSourcesPath(event.getKey());
if (schemaVersion.isPresent()) {
return Optional.of(createDataSourceChangedEvent(schemaName.get(),
schemaVersion.get(), event));