This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new e14fcb7 For #14722:Support only one scaling job could be executed at
the same time for one schema (#14991)
e14fcb7 is described below
commit e14fcb7b5bb88022068875e0a731652b2daa2367
Author: ReyYang <[email protected]>
AuthorDate: Wed Jan 26 18:54:09 2022 +0800
For #14722:Support only one scaling job could be executed at the same time
for one schema (#14991)
* For #14722:Support only one scaling job could be executed at the same
time for one schema
* For #14722:fix review
* For #14722:fix review,add ScalingJobReleaseSchemaNameLockEvent
* For #14722:fix review,delete ScalingJobReleaseSchemaNameLockEvent
---
.../subscriber/ScalingRegistrySubscriber.java | 22 ++++++++++++++++++++++
.../zookeeper/CuratorZookeeperRepository.java | 4 ++--
2 files changed, 24 insertions(+), 2 deletions(-)
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/cache/subscriber/ScalingRegistrySubscriber.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/cache/subscriber/ScalingRegistrySubscriber.java
index c23b570..d286517 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/cache/subscriber/ScalingRegistrySubscriber.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/cache/subscriber/ScalingRegistrySubscriber.java
@@ -17,11 +17,13 @@
package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.cache.subscriber;
+import com.google.common.collect.Maps;
import com.google.common.eventbus.Subscribe;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
import
org.apache.shardingsphere.infra.yaml.config.swapper.YamlDataSourceConfigurationSwapper;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.service.LockRegistryService;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.cache.RegistryCacheManager;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.cache.event.StartScalingEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.ClusterSwitchConfigurationEvent;
@@ -56,11 +58,16 @@ public final class ScalingRegistrySubscriber {
private final RegistryCacheManager registryCacheManager;
+ private final LockRegistryService lockRegistryService;
+
+ private final Map<String, Boolean> schemaNameLockedMap =
Maps.newConcurrentMap();
+
public ScalingRegistrySubscriber(final ClusterPersistRepository
repository) {
this.repository = repository;
this.persistService = new SchemaRulePersistService(repository);
dataSourcePersistService = new DataSourcePersistService(repository);
registryCacheManager = new RegistryCacheManager(repository);
+ lockRegistryService = new LockRegistryService(repository);
ShardingSphereEventBus.getInstance().register(this);
}
@@ -71,10 +78,16 @@ public final class ScalingRegistrySubscriber {
*/
@Subscribe
public void ruleConfigurationCached(final RuleConfigurationCachedEvent
event) {
+ boolean locked =
lockRegistryService.tryLock(decorateLockName(event.getSchemaName()), 1000);
+ if (!locked) {
+ return;
+ }
+ schemaNameLockedMap.put(event.getSchemaName(), true);
String sourceDataSource =
repository.get(SchemaMetaDataNode.getMetaDataDataSourcePath(event.getSchemaName()));
String sourceRule =
repository.get(SchemaMetaDataNode.getRulePath(event.getSchemaName()));
String targetRule =
registryCacheManager.loadCache(SchemaMetaDataNode.getRulePath(event.getSchemaName()),
event.getCacheId());
String ruleCacheId = event.getCacheId();
+ log.info("start scaling job, locked the schema name, event={}", event);
StartScalingEvent startScalingEvent = new
StartScalingEvent(event.getSchemaName(), sourceDataSource, sourceRule,
targetRule, ruleCacheId);
ShardingSphereEventBus.getInstance().post(startScalingEvent);
}
@@ -98,6 +111,11 @@ public final class ScalingRegistrySubscriber {
log.info("start to delete cache, ruleCacheId={}", ruleCacheId);
registryCacheManager.deleteCache(SchemaMetaDataNode.getRulePath(event.getTargetSchemaName()),
ruleCacheId);
}
+ if (schemaNameLockedMap.getOrDefault(event.getTargetSchemaName(),
false)) {
+ log.info("scaling job finished, release schema name lock, event =
{}", event);
+ schemaNameLockedMap.remove(event.getTargetSchemaName());
+
lockRegistryService.releaseLock(decorateLockName(event.getTargetSchemaName()));
+ }
}
/**
@@ -112,4 +130,8 @@ public final class ScalingRegistrySubscriber {
dataSourcePersistService.persist(schemaName,
event.getTargetDataSourcePropertiesMap());
persistService.persist(schemaName, event.getTargetRuleConfigs());
}
+
+ private String decorateLockName(final String schemaName) {
+ return "Scaling-" + schemaName;
+ }
}
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-zookeeper-curator/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/CuratorZookeeperRepository.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-prov
[...]
index 8b11302..6a948ba 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-zookeeper-curator/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/CuratorZookeeperRepository.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-zookeeper-curator/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/CuratorZookeeperRepository.java
@@ -29,7 +29,7 @@ import
org.apache.curator.framework.recipes.cache.CuratorCache;
import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
-import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
@@ -323,7 +323,7 @@ public final class CuratorZookeeperRepository implements
ClusterPersistRepositor
if (availableLock(key)) {
return locks.get(key);
}
- InterProcessLock lock = new InterProcessMutex(client, key);
+ InterProcessLock lock = new InterProcessSemaphoreMutex(client, key);
locks.put(key, lock);
return lock;
}