This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 3020ed1  Use new version configuration instead of cache for scaling 
(#15492)
3020ed1 is described below

commit 3020ed156ad406a3959602c7217f1dfa49d16607
Author: Haoran Meng <[email protected]>
AuthorDate: Fri Feb 18 15:52:12 2022 +0800

    Use new version configuration instead of cache for scaling (#15492)
---
 .../service/SchemaVersionPersistService.java       | 18 ++++++++++++
 .../rdl/rule/RuleDefinitionBackendHandler.java     | 33 ++++++++++++++--------
 2 files changed, 39 insertions(+), 12 deletions(-)

diff --git 
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/SchemaVersionPersistService.java
 
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/SchemaVersionPersistService.java
index eb99a0c..3f9e622 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/SchemaVersionPersistService.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/SchemaVersionPersistService.java
@@ -22,6 +22,7 @@ import 
org.apache.shardingsphere.mode.metadata.persist.node.SchemaMetaDataNode;
 import org.apache.shardingsphere.mode.persist.PersistRepository;
 
 import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Schema version persist service.
@@ -52,4 +53,21 @@ public final class SchemaVersionPersistService {
         Optional<String> actualVersion = getSchemaActiveVersion(schemaName);
         return actualVersion.isPresent() && 
actualVersion.get().equals(version);
     }
+    
+    /**
+     * Create new schema version.
+     * 
+     * @param schemaName schema name
+     * @return new version
+     */
+    public Optional<String> createNewVersion(final String schemaName) {
+        Optional<String> activeVersion = getSchemaActiveVersion(schemaName);
+        if (activeVersion.isPresent()) {
+            String newVersion = String.valueOf(new 
AtomicLong(Long.valueOf(activeVersion.get())).incrementAndGet());
+            repository.persist(SchemaMetaDataNode.getRulePath(schemaName, 
newVersion), repository.get(SchemaMetaDataNode.getRulePath(schemaName, 
activeVersion.get())));
+            
repository.persist(SchemaMetaDataNode.getMetaDataDataSourcePath(schemaName, 
newVersion), 
repository.get(SchemaMetaDataNode.getMetaDataDataSourcePath(schemaName, 
activeVersion.get())));
+            return Optional.of(newVersion);
+        }
+        return Optional.empty();
+    }
 }
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/rdl/rule/RuleDefinitionBackendHandler.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/rdl/rule/RuleDefinitionBackendHandler.java
index 365f051..a3cfc2a 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/rdl/rule/RuleDefinitionBackendHandler.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/rdl/rule/RuleDefinitionBackendHandler.java
@@ -29,6 +29,7 @@ import 
org.apache.shardingsphere.infra.distsql.update.RuleDefinitionCreateUpdate
 import 
org.apache.shardingsphere.infra.distsql.update.RuleDefinitionDropUpdater;
 import org.apache.shardingsphere.infra.distsql.update.RuleDefinitionUpdater;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
 import 
org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
@@ -83,7 +84,7 @@ public final class RuleDefinitionBackendHandler<T extends 
RuleDefinitionStatemen
                 throw new RuntimeException("scaling is not enabled");
             }
         } else if (preprocessor.isPresent()) {
-            processCache(shardingSphereMetaData, sqlStatement, 
(RuleDefinitionAlterUpdater) ruleDefinitionUpdater, currentRuleConfig, 
preprocessor.get());
+            prepareScaling(shardingSphereMetaData, sqlStatement, 
(RuleDefinitionAlterUpdater) ruleDefinitionUpdater, currentRuleConfig, 
preprocessor.get());
             return new UpdateResponseHeader(sqlStatement);
         }
         processSQLStatement(shardingSphereMetaData, sqlStatement, 
ruleDefinitionUpdater, currentRuleConfig);
@@ -137,15 +138,16 @@ public final class RuleDefinitionBackendHandler<T extends 
RuleDefinitionStatemen
         }
     }
     
-    private void processCache(final ShardingSphereMetaData 
shardingSphereMetaData, final T sqlStatement, final RuleDefinitionAlterUpdater 
updater, final RuleConfiguration currentRuleConfig,
+    private void prepareScaling(final ShardingSphereMetaData 
shardingSphereMetaData, final T sqlStatement, final RuleDefinitionAlterUpdater 
updater, final RuleConfiguration currentRuleConfig,
                               final RuleDefinitionAlterPreprocessor 
preprocessor) {
-        RuleConfiguration toBeAlteredRuleConfig = 
updater.buildToBeAlteredRuleConfiguration(sqlStatement);
-        RuleConfiguration alteredRuleConfig = 
preprocessor.preprocess(currentRuleConfig, toBeAlteredRuleConfig);
-        updater.updateCurrentRuleConfiguration(alteredRuleConfig, 
toBeAlteredRuleConfig);
-        Collection<RuleConfiguration> alteredConfigs = new 
LinkedList<>(shardingSphereMetaData.getRuleMetaData().getConfigurations());
-        alteredConfigs.remove(currentRuleConfig);
-        alteredConfigs.add(alteredRuleConfig);
-        cacheRuleConfigurationChange(shardingSphereMetaData.getName(), 
alteredConfigs);
+        Optional<MetaDataPersistService> metaDataPersistService = 
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaDataPersistService();
+        if (metaDataPersistService.isPresent()) {
+            Optional<String> newVersion = 
metaDataPersistService.get().getSchemaVersionPersistService().createNewVersion(shardingSphereMetaData.getName());
+            if (!newVersion.isPresent()) {
+                throw new RuntimeException(String.format("Unable to get a new 
version for schema: %s", shardingSphereMetaData.getName()));
+            }
+            persistNewVersionConfiguration(shardingSphereMetaData, 
sqlStatement, updater, currentRuleConfig, preprocessor);
+        }
     }
     
     private void persistRuleConfigurationChange(final ShardingSphereMetaData 
shardingSphereMetaData) {
@@ -153,8 +155,15 @@ public final class RuleDefinitionBackendHandler<T extends 
RuleDefinitionStatemen
                 shardingSphereMetaData.getName(), 
shardingSphereMetaData.getRuleMetaData().getConfigurations()));
     }
     
-    private void cacheRuleConfigurationChange(final String schemaName, final 
Collection<RuleConfiguration> ruleConfigurations) {
-        
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaDataPersistService().ifPresent(optional
 -> optional.getSchemaRuleService().cache(
-                schemaName, ruleConfigurations));
+    private void persistNewVersionConfiguration(final ShardingSphereMetaData 
shardingSphereMetaData, final T sqlStatement, final RuleDefinitionAlterUpdater 
updater, 
+                                                final RuleConfiguration 
currentRuleConfig, 
+                                                final 
RuleDefinitionAlterPreprocessor preprocessor) {
+        RuleConfiguration toBeAlteredRuleConfig = 
updater.buildToBeAlteredRuleConfiguration(sqlStatement);
+        RuleConfiguration alteredRuleConfig = 
preprocessor.preprocess(currentRuleConfig, toBeAlteredRuleConfig);
+        updater.updateCurrentRuleConfiguration(alteredRuleConfig, 
toBeAlteredRuleConfig);
+        Collection<RuleConfiguration> alteredConfigs = new 
LinkedList<>(shardingSphereMetaData.getRuleMetaData().getConfigurations());
+        alteredConfigs.remove(currentRuleConfig);
+        alteredConfigs.add(alteredRuleConfig);
+        // TODO persist altered configs to new schema version
     }
 }

Reply via email to