This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 7bdd51f  Add lock for memory and standalone (#16374)
7bdd51f is described below

commit 7bdd51f31db248e7d474a3729d689b08ee86d491
Author: gin <[email protected]>
AuthorDate: Fri Mar 25 20:43:06 2022 +0800

    Add lock for memory and standalone (#16374)
---
 .../db/protocol/error/CommonErrorCode.java         |  4 +-
 .../future/lock/DistributeLockContext.java         | 24 ++++---
 .../lock/ShardingSphereDistributeGlobalLock.java   |  2 +-
 .../lock/service/GlobalLockRegistryService.java    |  2 +-
 .../manager/memory/lock/MemoryLockContext.java     | 19 ++++--
 .../lock/ShardingSphereNonReentrantLock.java       | 74 ++++++++++++++++++++++
 .../lock/ShardingSphereNonReentrantLock.java       | 74 ++++++++++++++++++++++
 .../standalone/lock/StandaloneLockContext.java     | 17 ++++-
 .../frontend/mysql/err/MySQLErrPacketFactory.java  |  5 ++
 9 files changed, 201 insertions(+), 20 deletions(-)

diff --git 
a/shardingsphere-db-protocol/shardingsphere-db-protocol-core/src/main/java/org/apache/shardingsphere/db/protocol/error/CommonErrorCode.java
 
b/shardingsphere-db-protocol/shardingsphere-db-protocol-core/src/main/java/org/apache/shardingsphere/db/protocol/error/CommonErrorCode.java
index 7d356a1..b20bc4b 100644
--- 
a/shardingsphere-db-protocol/shardingsphere-db-protocol-core/src/main/java/org/apache/shardingsphere/db/protocol/error/CommonErrorCode.java
+++ 
b/shardingsphere-db-protocol/shardingsphere-db-protocol-core/src/main/java/org/apache/shardingsphere/db/protocol/error/CommonErrorCode.java
@@ -33,12 +33,12 @@ public enum CommonErrorCode implements SQLErrorCode {
     
     SCALING_OPERATE_FAILED(1209, "C1209", "Scaling Operate Failed: [%s]"),
     
+    SCHEMA_WRITE_LOCKED(1300, "C1300", "The schema %s is read-only"),
+    
     TABLE_LOCK_WAIT_TIMEOUT(1301, "C1301", "The table %s of schema %s lock 
wait timeout of %s ms exceeded"),
     
     TABLE_LOCKED(1302, "C1302", "The table %s of schema %s is locked"),
     
-    SCHEMA_LOCKED(1302, "C1303", "The schema %s is locked"),
-
     TOO_MANY_CONNECTIONS_EXCEPTION(1040, "08004", "Too many connections"),
 
     RUNTIME_EXCEPTION(1997, "C1997", "Runtime exception: [%s]"),
diff --git 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/future/lock/DistributeLockContext.java
 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/future/lock/DistributeLockContext.java
index 2084b81..fdbc3e7 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/future/lock/DistributeLockContext.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/future/lock/DistributeLockContext.java
@@ -56,19 +56,19 @@ public final class DistributeLockContext implements 
LockContext {
     public synchronized Optional<ShardingSphereLock> getSchemaLock(final 
String schemaName) {
         ShardingSphereGlobalLock result = globalLocks.get(schemaName);
         if (null == result) {
-            result = createGlobalLock(schemaName);
+            result = 
crateGlobalLock(instanceContext.getInstance().getInstanceDefinition().getInstanceId().getId());
             globalLocks.put(schemaName, result);
         }
         return Optional.of(result);
     }
     
-    private ShardingSphereGlobalLock createGlobalLock(final String schemaName) 
{
-        return new ShardingSphereDistributeGlobalLock(instanceContext, 
instanceContext.getInstance().getInstanceDefinition().getInstanceId().getId(), 
globalLockService);
+    private ShardingSphereGlobalLock crateGlobalLock(final String 
ownerInstanceId) {
+        return new ShardingSphereDistributeGlobalLock(instanceContext, 
ownerInstanceId, globalLockService);
     }
     
     @Override
     public synchronized boolean isLockedSchema(final String schemaName) {
-        return getSchemaLock(schemaName).map(shardingSphereGlobalLock -> 
shardingSphereGlobalLock.isLocked(schemaName)).orElse(false);
+        return getGlobalLock(schemaName).map(shardingSphereGlobalLock -> 
shardingSphereGlobalLock.isLocked(schemaName)).orElse(false);
     }
     
     /**
@@ -80,10 +80,10 @@ public final class DistributeLockContext implements 
LockContext {
             globalLockService.initGlobalLockRoot();
             return;
         }
-        ShardingSphereDistributeGlobalLock lock;
+        ShardingSphereGlobalLock lock;
         for (String each : allGlobalLock) {
             String[] schemaInstanceId = LockNodeUtil.parseLockName(each);
-            lock = new ShardingSphereDistributeGlobalLock(instanceContext, 
schemaInstanceId[1], globalLockService);
+            lock = crateGlobalLock(schemaInstanceId[1]);
             lock.ackLock(schemaInstanceId[0], getCurrentInstanceId());
             globalLocks.put(schemaInstanceId[0], lock);
         }
@@ -109,9 +109,15 @@ public final class DistributeLockContext implements 
LockContext {
         if (isSameInstanceId(ownerInstanceId)) {
             return;
         }
-        ShardingSphereGlobalLock globalLock = createGlobalLock(schema);
-        globalLocks.put(schema, globalLock);
-        globalLock.ackLock(schema, getCurrentInstanceId());
+        Optional<ShardingSphereGlobalLock> globalLock = getGlobalLock(schema);
+        ShardingSphereGlobalLock lock;
+        if (globalLock.isPresent()) {
+            lock = globalLock.get();
+        } else {
+            lock = crateGlobalLock(ownerInstanceId);
+            globalLocks.put(schema, lock);
+        }
+        lock.ackLock(schema, getCurrentInstanceId());
     }
     
     /**
diff --git 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/future/lock/ShardingSphereDistributeGlobalLock.java
 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/future/lock/ShardingSphereDistributeGlobalLock.java
index 0af3793..3b4d136 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/future/lock/ShardingSphereDistributeGlobalLock.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/future/lock/ShardingSphereDistributeGlobalLock.java
@@ -55,7 +55,7 @@ public final class ShardingSphereDistributeGlobalLock 
implements ShardingSphereG
         this.instanceContext = instanceContext;
         this.ownerInstanceId = ownerInstanceId;
         this.lockService = lockService;
-        synchronizedLockState = new 
AtomicReference<>(isOwnerInstanceId(getCurrentInstanceId()) ? 
LockState.INITIALIZATION : LockState.UNLOCKED);
+        synchronizedLockState = new 
AtomicReference<>(isOwnerInstanceId(getCurrentInstanceId()) ? 
LockState.INITIALIZATION : LockState.LOCKED);
         initLockedInstances(instanceContext);
     }
     
diff --git 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/future/lock/service/GlobalLockRegistryService.java
 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/future/lock/service/GlobalLockRegistryService.java
index 7bfbe94..7a963c2 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/future/lock/service/GlobalLockRegistryService.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/future/lock/service/GlobalLockRegistryService.java
@@ -60,7 +60,7 @@ public final class GlobalLockRegistryService {
         try {
             repository.persistEphemeral(lockName, LockState.LOCKED.name());
             return true;
-        } catch (final ClusterPersistRepositoryException isIgnored) {
+        } catch (final ClusterPersistRepositoryException ignored) {
             return false;
         }
     }
diff --git 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-memory-mode/shardingsphere-memory-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/memory/lock/MemoryLockContext.java
 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-memory-mode/shardingsphere-memory-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/memory/lock/MemoryLockContext.java
index 1d56593..e8be190 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-memory-mode/shardingsphere-memory-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/memory/lock/MemoryLockContext.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-memory-mode/shardingsphere-memory-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/memory/lock/MemoryLockContext.java
@@ -20,20 +20,31 @@ package org.apache.shardingsphere.mode.manager.memory.lock;
 import org.apache.shardingsphere.infra.lock.ShardingSphereLock;
 import org.apache.shardingsphere.mode.lock.LockContext;
 
+import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * Memory lock context.
  */
-public class MemoryLockContext implements LockContext {
+public final class MemoryLockContext implements LockContext {
+    
+    private final Map<String, ShardingSphereLock> locks = new 
ConcurrentHashMap<>();
     
     @Override
-    public Optional<ShardingSphereLock> getSchemaLock(final String schemaName) 
{
-        return Optional.empty();
+    public synchronized Optional<ShardingSphereLock> getSchemaLock(final 
String schemaName) {
+        ShardingSphereLock result = locks.get(schemaName);
+        if (null == result) {
+            result = new ShardingSphereNonReentrantLock(new ReentrantLock());
+            locks.put(schemaName, result);
+        }
+        return Optional.of(result);
     }
     
     @Override
     public boolean isLockedSchema(final String schemaName) {
-        return false;
+        ShardingSphereLock shardingSphereLock = locks.get(schemaName);
+        return null != shardingSphereLock && 
shardingSphereLock.isLocked(schemaName);
     }
 }
diff --git 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-memory-mode/shardingsphere-memory-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/memory/lock/ShardingSphereNonReentrantLock.java
 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-memory-mode/shardingsphere-memory-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/memory/lock/ShardingSphereNonReentrantLock.java
new file mode 100644
index 0000000..fc2f97a
--- /dev/null
+++ 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-memory-mode/shardingsphere-memory-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/memory/lock/ShardingSphereNonReentrantLock.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.memory.lock;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.lock.ShardingSphereLock;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+
+/**
+ * Non reentrant lock implemented ShardingSphereLock.
+ */
+@RequiredArgsConstructor
+public final class ShardingSphereNonReentrantLock implements 
ShardingSphereLock {
+    
+    private static final long DEFAULT_TRY_LOCK_TIMEOUT_MILLISECONDS = 3 * 60 * 
1000;
+    
+    private final Lock innerLock;
+    
+    private volatile boolean locked;
+    
+    @Override
+    public boolean tryLock(final String lockName) {
+        return innerTryLock(DEFAULT_TRY_LOCK_TIMEOUT_MILLISECONDS);
+    }
+    
+    @Override
+    public boolean tryLock(final String lockName, final long timeout) {
+        return innerTryLock(timeout);
+    }
+    
+    private synchronized boolean innerTryLock(final long timeout) {
+        try {
+            if (innerLock.tryLock(timeout, TimeUnit.MILLISECONDS)) {
+                locked = true;
+            }
+            return false;
+        } catch (final InterruptedException ignored) {
+            return false;
+        }
+    }
+    
+    @Override
+    public void releaseLock(final String lockName) {
+        innerLock.unlock();
+        locked = false;
+    }
+    
+    @Override
+    public boolean isLocked(final String lockName) {
+        return locked;
+    }
+    
+    @Override
+    public long getDefaultTimeOut() {
+        return DEFAULT_TRY_LOCK_TIMEOUT_MILLISECONDS;
+    }
+}
diff --git 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/lock/ShardingSphereNonReentrantLock.java
 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/lock/ShardingSphereNonReentrantLock.java
new file mode 100644
index 0000000..a1dd88c
--- /dev/null
+++ 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/lock/ShardingSphereNonReentrantLock.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.standalone.lock;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.lock.ShardingSphereLock;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+
+/**
+ * Non reentrant lock implemented ShardingSphereLock.
+ */
+@RequiredArgsConstructor
+public final class ShardingSphereNonReentrantLock implements 
ShardingSphereLock {
+    
+    private static final long DEFAULT_TRY_LOCK_TIMEOUT_MILLISECONDS = 3 * 60 * 
1000;
+    
+    private final Lock innerLock;
+    
+    private volatile boolean locked;
+    
+    @Override
+    public boolean tryLock(final String lockName) {
+        return innerTryLock(DEFAULT_TRY_LOCK_TIMEOUT_MILLISECONDS);
+    }
+    
+    @Override
+    public boolean tryLock(final String lockName, final long timeout) {
+        return innerTryLock(timeout);
+    }
+    
+    private synchronized boolean innerTryLock(final long timeout) {
+        try {
+            if (innerLock.tryLock(timeout, TimeUnit.MILLISECONDS)) {
+                locked = true;
+            }
+            return false;
+        } catch (final InterruptedException ignored) {
+            return false;
+        }
+    }
+    
+    @Override
+    public void releaseLock(final String lockName) {
+        innerLock.unlock();
+        locked = false;
+    }
+    
+    @Override
+    public boolean isLocked(final String lockName) {
+        return locked;
+    }
+    
+    @Override
+    public long getDefaultTimeOut() {
+        return DEFAULT_TRY_LOCK_TIMEOUT_MILLISECONDS;
+    }
+}
diff --git 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/lock/StandaloneLockContext.java
 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/lock/StandaloneLockContext.java
index 0af6f8a..e045384 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/lock/StandaloneLockContext.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/lock/StandaloneLockContext.java
@@ -20,20 +20,31 @@ package 
org.apache.shardingsphere.mode.manager.standalone.lock;
 import org.apache.shardingsphere.infra.lock.ShardingSphereLock;
 import org.apache.shardingsphere.mode.lock.LockContext;
 
+import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * Standalone lock context.
  */
 public final class StandaloneLockContext implements LockContext {
     
+    private final Map<String, ShardingSphereLock> locks = new 
ConcurrentHashMap<>();
+    
     @Override
-    public Optional<ShardingSphereLock> getSchemaLock(final String schemaName) 
{
-        return Optional.empty();
+    public synchronized Optional<ShardingSphereLock> getSchemaLock(final 
String schemaName) {
+        ShardingSphereLock result = locks.get(schemaName);
+        if (null == result) {
+            result = new ShardingSphereNonReentrantLock(new ReentrantLock());
+            locks.put(schemaName, result);
+        }
+        return Optional.of(result);
     }
     
     @Override
     public boolean isLockedSchema(final String schemaName) {
-        return false;
+        ShardingSphereLock shardingSphereLock = locks.get(schemaName);
+        return null != shardingSphereLock && 
shardingSphereLock.isLocked(schemaName);
     }
 }
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/err/MySQLErrPacketFactory.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/err/MySQLErrPacketFactory.java
index 5a3d825..778ff28 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/err/MySQLErrPacketFactory.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/err/MySQLErrPacketFactory.java
@@ -32,6 +32,7 @@ import 
org.apache.shardingsphere.proxy.backend.exception.DBDropNotExistsExceptio
 import 
org.apache.shardingsphere.proxy.backend.exception.DatabaseNotExistedException;
 import 
org.apache.shardingsphere.proxy.backend.exception.NoDatabaseSelectedException;
 import 
org.apache.shardingsphere.proxy.backend.exception.RuleNotExistedException;
+import org.apache.shardingsphere.proxy.backend.exception.SchemaLockedException;
 import 
org.apache.shardingsphere.proxy.backend.exception.TableLockWaitTimeoutException;
 import org.apache.shardingsphere.proxy.backend.exception.TableLockedException;
 import 
org.apache.shardingsphere.proxy.backend.exception.TableModifyInTransactionException;
@@ -111,6 +112,10 @@ public final class MySQLErrPacketFactory {
         if (cause instanceof RuleNotExistedException || cause instanceof 
DatabaseNotExistedException) {
             return new MySQLErrPacket(1, 
MySQLServerErrorCode.ER_SP_DOES_NOT_EXIST);
         }
+        if (cause instanceof SchemaLockedException) {
+            SchemaLockedException exception = (SchemaLockedException) cause;
+            return new MySQLErrPacket(1, CommonErrorCode.SCHEMA_WRITE_LOCKED, 
exception.getSchemaName());
+        }
         if (cause instanceof TableLockWaitTimeoutException) {
             TableLockWaitTimeoutException exception = 
(TableLockWaitTimeoutException) cause;
             return new MySQLErrPacket(1, 
CommonErrorCode.TABLE_LOCK_WAIT_TIMEOUT, exception.getTableName(),

Reply via email to