This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new e14fcb7  For #14722:Support only one scaling job could be executed at 
the same time for one schema (#14991)
e14fcb7 is described below

commit e14fcb7b5bb88022068875e0a731652b2daa2367
Author: ReyYang <[email protected]>
AuthorDate: Wed Jan 26 18:54:09 2022 +0800

    For #14722:Support only one scaling job could be executed at the same time 
for one schema (#14991)
    
    * For #14722:Support only one scaling job could be executed at the same 
time for one schema
    
    * For #14722:fix review
    
    * For #14722:fix review,add ScalingJobReleaseSchemaNameLockEvent
    
    * For #14722:fix review,delete ScalingJobReleaseSchemaNameLockEvent
---
 .../subscriber/ScalingRegistrySubscriber.java      | 22 ++++++++++++++++++++++
 .../zookeeper/CuratorZookeeperRepository.java      |  4 ++--
 2 files changed, 24 insertions(+), 2 deletions(-)

diff --git 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/cache/subscriber/ScalingRegistrySubscriber.java
 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/cache/subscriber/ScalingRegistrySubscriber.java
index c23b570..d286517 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/cache/subscriber/ScalingRegistrySubscriber.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/cache/subscriber/ScalingRegistrySubscriber.java
@@ -17,11 +17,13 @@
 
 package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.cache.subscriber;
 
+import com.google.common.collect.Maps;
 import com.google.common.eventbus.Subscribe;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
 import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
 import 
org.apache.shardingsphere.infra.yaml.config.swapper.YamlDataSourceConfigurationSwapper;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.service.LockRegistryService;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.cache.RegistryCacheManager;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.cache.event.StartScalingEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.ClusterSwitchConfigurationEvent;
@@ -56,11 +58,16 @@ public final class ScalingRegistrySubscriber {
     
     private final RegistryCacheManager registryCacheManager;
     
+    private final LockRegistryService lockRegistryService;
+    
+    private final Map<String, Boolean> schemaNameLockedMap = 
Maps.newConcurrentMap();
+    
     public ScalingRegistrySubscriber(final ClusterPersistRepository 
repository) {
         this.repository = repository;
         this.persistService = new SchemaRulePersistService(repository);
         dataSourcePersistService = new DataSourcePersistService(repository);
         registryCacheManager = new RegistryCacheManager(repository);
+        lockRegistryService = new LockRegistryService(repository);
         ShardingSphereEventBus.getInstance().register(this);
     }
     
@@ -71,10 +78,16 @@ public final class ScalingRegistrySubscriber {
      */
     @Subscribe
     public void ruleConfigurationCached(final RuleConfigurationCachedEvent 
event) {
+        boolean locked = 
lockRegistryService.tryLock(decorateLockName(event.getSchemaName()), 1000);
+        if (!locked) {
+            return;
+        }
+        schemaNameLockedMap.put(event.getSchemaName(), true);
         String sourceDataSource = 
repository.get(SchemaMetaDataNode.getMetaDataDataSourcePath(event.getSchemaName()));
         String sourceRule = 
repository.get(SchemaMetaDataNode.getRulePath(event.getSchemaName()));
         String targetRule = 
registryCacheManager.loadCache(SchemaMetaDataNode.getRulePath(event.getSchemaName()),
 event.getCacheId());
         String ruleCacheId = event.getCacheId();
+        log.info("start scaling job, locked the schema name, event={}", event);
         StartScalingEvent startScalingEvent = new 
StartScalingEvent(event.getSchemaName(), sourceDataSource, sourceRule, 
targetRule, ruleCacheId);
         ShardingSphereEventBus.getInstance().post(startScalingEvent);
     }
@@ -98,6 +111,11 @@ public final class ScalingRegistrySubscriber {
             log.info("start to delete cache, ruleCacheId={}", ruleCacheId);
             
registryCacheManager.deleteCache(SchemaMetaDataNode.getRulePath(event.getTargetSchemaName()),
 ruleCacheId);
         }
+        if (schemaNameLockedMap.getOrDefault(event.getTargetSchemaName(), 
false)) {
+            log.info("scaling job finished, release schema name lock, event = 
{}", event);
+            schemaNameLockedMap.remove(event.getTargetSchemaName());
+            
lockRegistryService.releaseLock(decorateLockName(event.getTargetSchemaName()));
+        }
     }
     
     /**
@@ -112,4 +130,8 @@ public final class ScalingRegistrySubscriber {
         dataSourcePersistService.persist(schemaName, 
event.getTargetDataSourcePropertiesMap());
         persistService.persist(schemaName, event.getTargetRuleConfigs());
     }
+    
+    private String decorateLockName(final String schemaName) {
+        return "Scaling-" + schemaName;
+    }
 }
diff --git 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-zookeeper-curator/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/CuratorZookeeperRepository.java
 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-prov
 [...]
index 8b11302..6a948ba 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-zookeeper-curator/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/CuratorZookeeperRepository.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-zookeeper-curator/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/CuratorZookeeperRepository.java
@@ -29,7 +29,7 @@ import 
org.apache.curator.framework.recipes.cache.CuratorCache;
 import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
 import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
 import org.apache.curator.framework.recipes.locks.InterProcessLock;
-import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.curator.utils.CloseableUtils;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
@@ -323,7 +323,7 @@ public final class CuratorZookeeperRepository implements 
ClusterPersistRepositor
         if (availableLock(key)) {
             return locks.get(key);
         }
-        InterProcessLock lock = new InterProcessMutex(client, key);
+        InterProcessLock lock = new InterProcessSemaphoreMutex(client, key);
         locks.put(key, lock);
         return lock;
     }

Reply via email to