This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 36d859e  Add active version changed listener (#15504)
36d859e is described below

commit 36d859e73b30d915f1c487f8f624cc1b73b745da
Author: Haoran Meng <[email protected]>
AuthorDate: Fri Feb 18 20:21:14 2022 +0800

    Add active version changed listener (#15504)
---
 .../mode/manager/ContextManager.java               | 47 ++++++++++++++++++----
 .../persist/service/SchemaBasedPersistService.java |  9 +++++
 .../service/impl/DataSourcePersistService.java     |  6 +++
 .../service/impl/SchemaRulePersistService.java     |  7 ++++
 .../ClusterContextManagerCoordinator.java          | 16 ++++++++
 .../event/version/SchemaVersionChangedEvent.java   | 34 ++++++++++++++++
 .../metadata/watcher/MetaDataChangedWatcher.java   |  4 ++
 7 files changed, 116 insertions(+), 7 deletions(-)

diff --git 
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
 
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
index 482ce4a..4807cd9 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
@@ -275,19 +275,29 @@ public final class ContextManager implements 
AutoCloseable {
     public void alterDataSourceConfiguration(final String schemaName, final 
Map<String, DataSourceProperties> dataSourcePropsMap) {
         try {
             MetaDataContexts changedMetaDataContext = 
buildChangedMetaDataContextWithChangedDataSource(metaDataContexts.getMetaDataMap().get(schemaName),
 dataSourcePropsMap);
-            
metaDataContexts.getOptimizerContext().getFederationMetaData().getSchemas().putAll(changedMetaDataContext.getOptimizerContext().getFederationMetaData().getSchemas());
-            Map<String, ShardingSphereMetaData> metaDataMap = new 
HashMap<>(metaDataContexts.getMetaDataMap());
-            metaDataMap.putAll(changedMetaDataContext.getMetaDataMap());
-            Collection<DataSource> pendingClosedDataSources = 
getPendingClosedDataSources(schemaName, dataSourcePropsMap);
-            renewMetaDataContexts(rebuildMetaDataContexts(metaDataMap));
-            renewTransactionContext(schemaName, 
metaDataContexts.getMetaData(schemaName).getResource());
-            closeDataSources(schemaName, pendingClosedDataSources);
+            refreshMetaDataContext(schemaName, changedMetaDataContext, 
dataSourcePropsMap);
         } catch (final SQLException ex) {
             log.error("Alter schema:{} data source configuration failed", 
schemaName, ex);
         }
     }
     
     /**
+     * Alter data source and rule configuration.
+     * 
+     * @param schemaName schema name
+     * @param dataSourcePropsMap data source props map
+     * @param ruleConfigs rule configurations
+     */
+    public void alterDataSourceAndRuleConfiguration(final String schemaName, 
final Map<String, DataSourceProperties> dataSourcePropsMap, final 
Collection<RuleConfiguration> ruleConfigs) {
+        try {
+            MetaDataContexts changedMetaDataContext = 
buildChangedMetaDataContextWithChangedDataSourceAndRule(metaDataContexts.getMetaDataMap().get(schemaName),
 dataSourcePropsMap, ruleConfigs);
+            refreshMetaDataContext(schemaName, changedMetaDataContext, 
dataSourcePropsMap);
+        } catch (SQLException ex) {
+            log.error("Alter schema:{} data source and rule configuration 
failed", schemaName, ex);
+        }
+    }
+    
+    /**
      * Alter global rule configuration.
      * 
      * @param ruleConfigurations global rule configuration
@@ -428,6 +438,16 @@ public final class ContextManager implements AutoCloseable 
{
         renewTransactionContext(schemaName, 
metaDataContexts.getMetaData(schemaName).getResource());
     }
     
+    private void refreshMetaDataContext(final String schemaName, final 
MetaDataContexts changedMetaDataContext, final Map<String, 
DataSourceProperties> dataSourcePropsMap) {
+        
metaDataContexts.getOptimizerContext().getFederationMetaData().getSchemas().putAll(changedMetaDataContext.getOptimizerContext().getFederationMetaData().getSchemas());
+        Map<String, ShardingSphereMetaData> metaDataMap = new 
HashMap<>(metaDataContexts.getMetaDataMap());
+        metaDataMap.putAll(changedMetaDataContext.getMetaDataMap());
+        Collection<DataSource> pendingClosedDataSources = 
getPendingClosedDataSources(schemaName, dataSourcePropsMap);
+        renewMetaDataContexts(rebuildMetaDataContexts(metaDataMap));
+        renewTransactionContext(schemaName, 
metaDataContexts.getMetaData(schemaName).getResource());
+        closeDataSources(schemaName, pendingClosedDataSources);
+    }
+    
     private MetaDataContexts 
buildChangedMetaDataContextWithAddedDataSource(final ShardingSphereMetaData 
originalMetaData, 
                                                                             
final Map<String, DataSourceProperties> addedDataSourceProps) throws 
SQLException {
         Map<String, DataSource> dataSourceMap = new 
HashMap<>(originalMetaData.getResource().getDataSources());
@@ -463,6 +483,19 @@ public final class ContextManager implements AutoCloseable 
{
         return 
metaDataContextsBuilder.build(metaDataContexts.getMetaDataPersistService().orElse(null));
     }
     
+    private MetaDataContexts 
buildChangedMetaDataContextWithChangedDataSourceAndRule(final 
ShardingSphereMetaData originalMetaData, final Map<String, 
DataSourceProperties> newDataSourceProps, 
+                                                                               
      final Collection<RuleConfiguration> ruleConfigs) throws SQLException {
+        Collection<String> deletedDataSources = 
getDeletedDataSources(originalMetaData, newDataSourceProps).keySet();
+        Map<String, DataSource> changedDataSources = 
buildChangedDataSources(originalMetaData, newDataSourceProps);
+        Properties props = metaDataContexts.getProps().getProps();
+        MetaDataContextsBuilder metaDataContextsBuilder = new 
MetaDataContextsBuilder(metaDataContexts.getGlobalRuleMetaData().getConfigurations(),
 props);
+        metaDataContextsBuilder.addSchema(originalMetaData.getName(), new 
DataSourceProvidedSchemaConfiguration(getNewDataSources(originalMetaData.getResource().getDataSources(),
 
+                getAddedDataSources(originalMetaData, newDataSourceProps), 
changedDataSources, deletedDataSources), ruleConfigs), props);
+        metaDataContexts.getMetaDataPersistService().ifPresent(
+            optional -> 
optional.getSchemaMetaDataService().persist(originalMetaData.getName(), 
metaDataContextsBuilder.getSchemaMap().get(originalMetaData.getName())));
+        return 
metaDataContextsBuilder.build(metaDataContexts.getMetaDataPersistService().orElse(null));
+    }
+    
     private Map<String, DataSource> getNewDataSources(final Map<String, 
DataSource> originalDataSources,
                                                       final Map<String, 
DataSource> addedDataSources, final Map<String, DataSource> changedDataSources, 
final Collection<String> deletedDataSources) {
         Map<String, DataSource> result = new 
LinkedHashMap<>(originalDataSources);
diff --git 
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/SchemaBasedPersistService.java
 
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/SchemaBasedPersistService.java
index 9fc8617..cb9e852 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/SchemaBasedPersistService.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/SchemaBasedPersistService.java
@@ -59,6 +59,15 @@ public interface SchemaBasedPersistService<T> {
     T load(String schemaName);
     
     /**
+     * Load configurations based version.
+     * 
+     * @param schemaName schema name
+     * @param version version
+     * @return configurations
+     */
+    T load(String schemaName, String version);
+    
+    /**
      * Judge whether schema configuration existed.
      *
      * @param schemaName schema name
diff --git 
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/impl/DataSourcePersistService.java
 
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/impl/DataSourcePersistService.java
index 7d229c1..0e191db 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/impl/DataSourcePersistService.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/impl/DataSourcePersistService.java
@@ -72,6 +72,12 @@ public final class DataSourcePersistService implements 
SchemaBasedPersistService
         return isExisted(schemaName) ? 
getDataSourceProperties(repository.get(SchemaMetaDataNode.getMetaDataDataSourcePath(schemaName,
 getSchemaActiveVersion(schemaName)))) : new LinkedHashMap<>();
     }
     
+    @Override
+    public Map<String, DataSourceProperties> load(final String schemaName, 
final String version) {
+        String yamlContent = 
repository.get(SchemaMetaDataNode.getMetaDataDataSourcePath(schemaName, 
version));
+        return Strings.isNullOrEmpty(yamlContent) ? new LinkedHashMap<>() : 
getDataSourceProperties(yamlContent);
+    }
+    
     @SuppressWarnings("unchecked")
     private Map<String, DataSourceProperties> getDataSourceProperties(final 
String yamlContent) {
         Map<String, Map<String, Object>> yamlDataSources = 
YamlEngine.unmarshal(yamlContent, Map.class);
diff --git 
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/impl/SchemaRulePersistService.java
 
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/impl/SchemaRulePersistService.java
index ae862c6..e8ae771 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/impl/SchemaRulePersistService.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/impl/SchemaRulePersistService.java
@@ -75,6 +75,13 @@ public final class SchemaRulePersistService implements 
SchemaBasedPersistService
     }
     
     @Override
+    public Collection<RuleConfiguration> load(final String schemaName, final 
String version) {
+        String yamlContent = 
repository.get(SchemaMetaDataNode.getRulePath(schemaName, version));
+        return Strings.isNullOrEmpty(yamlContent) ? new LinkedList<>() : new 
YamlRuleConfigurationSwapperEngine().swapToRuleConfigurations(YamlEngine.unmarshal(repository.get(SchemaMetaDataNode
+                .getRulePath(schemaName, getSchemaActiveVersion(schemaName))), 
Collection.class, true));
+    }
+    
+    @Override
     public boolean isExisted(final String schemaName) {
         return !Strings.isNullOrEmpty(getSchemaActiveVersion(schemaName)) && 
!Strings.isNullOrEmpty(repository.get(SchemaMetaDataNode.getRulePath(schemaName,
 getSchemaActiveVersion(schemaName))));
     }
diff --git 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
index e7f4b83..c403d1b 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
@@ -19,6 +19,8 @@ package 
org.apache.shardingsphere.mode.manager.cluster.coordinator;
 
 import com.google.common.eventbus.Subscribe;
 import org.apache.shardingsphere.authority.rule.AuthorityRule;
+import org.apache.shardingsphere.infra.config.RuleConfiguration;
+import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
 import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
 import org.apache.shardingsphere.infra.metadata.schema.QualifiedSchema;
 import 
org.apache.shardingsphere.infra.rule.event.impl.DataSourceNameDisabledEvent;
@@ -32,6 +34,7 @@ import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.confi
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.GlobalRuleConfigurationsChangedEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.RuleConfigurationsChangedEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.schema.SchemaChangedEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.version.SchemaVersionChangedEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.SchemaAddedEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.SchemaDeletedEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsEvent;
@@ -46,6 +49,7 @@ import java.sql.SQLException;
 import java.util.Collection;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
+import java.util.Map;
 import java.util.Optional;
 
 /**
@@ -223,6 +227,18 @@ public final class ClusterContextManagerCoordinator {
         }
     }
     
+    /**
+     * Renew with new schema version.
+     * 
+     * @param event schema version changed event
+     */
+    @Subscribe
+    public synchronized void renew(final SchemaVersionChangedEvent event) {
+        Map<String, DataSourceProperties> dataSourcePropertiesMap = 
metaDataPersistService.getDataSourceService().load(event.getSchemaName(), 
event.getActiveVersion());
+        Collection<RuleConfiguration> ruleConfigs = 
metaDataPersistService.getSchemaRuleService().load(event.getSchemaName(), 
event.getActiveVersion());
+        
contextManager.alterDataSourceAndRuleConfiguration(event.getSchemaName(), 
dataSourcePropertiesMap, ruleConfigs);
+    }
+    
     private void persistSchema(final String schemaName) {
         if 
(!metaDataPersistService.getDataSourceService().isExisted(schemaName)) {
             metaDataPersistService.getDataSourceService().persist(schemaName, 
new LinkedHashMap<>());
diff --git 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/config/event/version/SchemaVersionChangedEvent.java
 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/config/event/version/SchemaVersionChangedEvent.java
new file mode 100644
index 0000000..cb728ac
--- /dev/null
+++ 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/config/event/version/SchemaVersionChangedEvent.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.version;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
+
+/**
+ * Schema version changed event.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class SchemaVersionChangedEvent implements GovernanceEvent {
+    
+    private final String schemaName;
+    
+    private final String activeVersion;
+}
diff --git 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/watcher/MetaDataChangedWatcher.java
 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/watcher/MetaDataChangedWatcher.java
index e06993f..5580d74 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/watcher/MetaDataChangedWatcher.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/watcher/MetaDataChangedWatcher.java
@@ -31,6 +31,7 @@ import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.Gover
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.datasource.DataSourceChangedEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.RuleConfigurationsChangedEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.schema.SchemaChangedEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.version.SchemaVersionChangedEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.SchemaAddedEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.SchemaDeletedEvent;
 import org.apache.shardingsphere.mode.metadata.persist.node.SchemaMetaDataNode;
@@ -99,6 +100,9 @@ public final class MetaDataChangedWatcher implements 
GovernanceWatcher<Governanc
         if (!schemaName.isPresent() || 
Strings.isNullOrEmpty(event.getValue())) {
             return Optional.empty();
         }
+        if 
(event.getKey().equals(SchemaMetaDataNode.getActiveVersionPath(schemaName.get())))
 {
+            return Optional.of(new SchemaVersionChangedEvent(schemaName.get(), 
event.getValue()));
+        }
         Optional<String> schemaVersion = 
SchemaMetaDataNode.getVersionByDataSourcesPath(event.getKey());
         if (schemaVersion.isPresent()) {
             return Optional.of(createDataSourceChangedEvent(schemaName.get(), 
schemaVersion.get(), event));

Reply via email to