This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 3020ed1 Use new version configuration instead of cache for scaling
(#15492)
3020ed1 is described below
commit 3020ed156ad406a3959602c7217f1dfa49d16607
Author: Haoran Meng <[email protected]>
AuthorDate: Fri Feb 18 15:52:12 2022 +0800
Use new version configuration instead of cache for scaling (#15492)
---
.../service/SchemaVersionPersistService.java | 18 ++++++++++++
.../rdl/rule/RuleDefinitionBackendHandler.java | 33 ++++++++++++++--------
2 files changed, 39 insertions(+), 12 deletions(-)
diff --git
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/SchemaVersionPersistService.java
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/SchemaVersionPersistService.java
index eb99a0c..3f9e622 100644
---
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/SchemaVersionPersistService.java
+++
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/SchemaVersionPersistService.java
@@ -22,6 +22,7 @@ import
org.apache.shardingsphere.mode.metadata.persist.node.SchemaMetaDataNode;
import org.apache.shardingsphere.mode.persist.PersistRepository;
import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
/**
* Schema version persist service.
@@ -52,4 +53,21 @@ public final class SchemaVersionPersistService {
Optional<String> actualVersion = getSchemaActiveVersion(schemaName);
return actualVersion.isPresent() &&
actualVersion.get().equals(version);
}
+
+ /**
+ * Create new schema version.
+ *
+ * @param schemaName schema name
+ * @return new version
+ */
+ public Optional<String> createNewVersion(final String schemaName) {
+ Optional<String> activeVersion = getSchemaActiveVersion(schemaName);
+ if (activeVersion.isPresent()) {
+ String newVersion = String.valueOf(new
AtomicLong(Long.valueOf(activeVersion.get())).incrementAndGet());
+ repository.persist(SchemaMetaDataNode.getRulePath(schemaName,
newVersion), repository.get(SchemaMetaDataNode.getRulePath(schemaName,
activeVersion.get())));
+
repository.persist(SchemaMetaDataNode.getMetaDataDataSourcePath(schemaName,
newVersion),
repository.get(SchemaMetaDataNode.getMetaDataDataSourcePath(schemaName,
activeVersion.get())));
+ return Optional.of(newVersion);
+ }
+ return Optional.empty();
+ }
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/rdl/rule/RuleDefinitionBackendHandler.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/rdl/rule/RuleDefinitionBackendHandler.java
index 365f051..a3cfc2a 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/rdl/rule/RuleDefinitionBackendHandler.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/rdl/rule/RuleDefinitionBackendHandler.java
@@ -29,6 +29,7 @@ import
org.apache.shardingsphere.infra.distsql.update.RuleDefinitionCreateUpdate
import
org.apache.shardingsphere.infra.distsql.update.RuleDefinitionDropUpdater;
import org.apache.shardingsphere.infra.distsql.update.RuleDefinitionUpdater;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
import
org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
@@ -83,7 +84,7 @@ public final class RuleDefinitionBackendHandler<T extends
RuleDefinitionStatemen
throw new RuntimeException("scaling is not enabled");
}
} else if (preprocessor.isPresent()) {
- processCache(shardingSphereMetaData, sqlStatement,
(RuleDefinitionAlterUpdater) ruleDefinitionUpdater, currentRuleConfig,
preprocessor.get());
+ prepareScaling(shardingSphereMetaData, sqlStatement,
(RuleDefinitionAlterUpdater) ruleDefinitionUpdater, currentRuleConfig,
preprocessor.get());
return new UpdateResponseHeader(sqlStatement);
}
processSQLStatement(shardingSphereMetaData, sqlStatement,
ruleDefinitionUpdater, currentRuleConfig);
@@ -137,15 +138,16 @@ public final class RuleDefinitionBackendHandler<T extends
RuleDefinitionStatemen
}
}
- private void processCache(final ShardingSphereMetaData
shardingSphereMetaData, final T sqlStatement, final RuleDefinitionAlterUpdater
updater, final RuleConfiguration currentRuleConfig,
+ private void prepareScaling(final ShardingSphereMetaData
shardingSphereMetaData, final T sqlStatement, final RuleDefinitionAlterUpdater
updater, final RuleConfiguration currentRuleConfig,
final RuleDefinitionAlterPreprocessor
preprocessor) {
- RuleConfiguration toBeAlteredRuleConfig =
updater.buildToBeAlteredRuleConfiguration(sqlStatement);
- RuleConfiguration alteredRuleConfig =
preprocessor.preprocess(currentRuleConfig, toBeAlteredRuleConfig);
- updater.updateCurrentRuleConfiguration(alteredRuleConfig,
toBeAlteredRuleConfig);
- Collection<RuleConfiguration> alteredConfigs = new
LinkedList<>(shardingSphereMetaData.getRuleMetaData().getConfigurations());
- alteredConfigs.remove(currentRuleConfig);
- alteredConfigs.add(alteredRuleConfig);
- cacheRuleConfigurationChange(shardingSphereMetaData.getName(),
alteredConfigs);
+ Optional<MetaDataPersistService> metaDataPersistService =
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaDataPersistService();
+ if (metaDataPersistService.isPresent()) {
+ Optional<String> newVersion =
metaDataPersistService.get().getSchemaVersionPersistService().createNewVersion(shardingSphereMetaData.getName());
+ if (!newVersion.isPresent()) {
+ throw new RuntimeException(String.format("Unable to get a new
version for schema: %s", shardingSphereMetaData.getName()));
+ }
+ persistNewVersionConfiguration(shardingSphereMetaData,
sqlStatement, updater, currentRuleConfig, preprocessor);
+ }
}
private void persistRuleConfigurationChange(final ShardingSphereMetaData
shardingSphereMetaData) {
@@ -153,8 +155,15 @@ public final class RuleDefinitionBackendHandler<T extends
RuleDefinitionStatemen
shardingSphereMetaData.getName(),
shardingSphereMetaData.getRuleMetaData().getConfigurations()));
}
- private void cacheRuleConfigurationChange(final String schemaName, final
Collection<RuleConfiguration> ruleConfigurations) {
-
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaDataPersistService().ifPresent(optional
-> optional.getSchemaRuleService().cache(
- schemaName, ruleConfigurations));
+ private void persistNewVersionConfiguration(final ShardingSphereMetaData
shardingSphereMetaData, final T sqlStatement, final RuleDefinitionAlterUpdater
updater,
+ final RuleConfiguration
currentRuleConfig,
+ final
RuleDefinitionAlterPreprocessor preprocessor) {
+ RuleConfiguration toBeAlteredRuleConfig =
updater.buildToBeAlteredRuleConfiguration(sqlStatement);
+ RuleConfiguration alteredRuleConfig =
preprocessor.preprocess(currentRuleConfig, toBeAlteredRuleConfig);
+ updater.updateCurrentRuleConfiguration(alteredRuleConfig,
toBeAlteredRuleConfig);
+ Collection<RuleConfiguration> alteredConfigs = new
LinkedList<>(shardingSphereMetaData.getRuleMetaData().getConfigurations());
+ alteredConfigs.remove(currentRuleConfig);
+ alteredConfigs.add(alteredRuleConfig);
+ // TODO persist altered configs to new schema version
}
}