This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 7bdd51f Add lock for memory and standalone (#16374)
7bdd51f is described below
commit 7bdd51f31db248e7d474a3729d689b08ee86d491
Author: gin <[email protected]>
AuthorDate: Fri Mar 25 20:43:06 2022 +0800
Add lock for memory and standalone (#16374)
---
.../db/protocol/error/CommonErrorCode.java | 4 +-
.../future/lock/DistributeLockContext.java | 24 ++++---
.../lock/ShardingSphereDistributeGlobalLock.java | 2 +-
.../lock/service/GlobalLockRegistryService.java | 2 +-
.../manager/memory/lock/MemoryLockContext.java | 19 ++++--
.../lock/ShardingSphereNonReentrantLock.java | 74 ++++++++++++++++++++++
.../lock/ShardingSphereNonReentrantLock.java | 74 ++++++++++++++++++++++
.../standalone/lock/StandaloneLockContext.java | 17 ++++-
.../frontend/mysql/err/MySQLErrPacketFactory.java | 5 ++
9 files changed, 201 insertions(+), 20 deletions(-)
diff --git
a/shardingsphere-db-protocol/shardingsphere-db-protocol-core/src/main/java/org/apache/shardingsphere/db/protocol/error/CommonErrorCode.java
b/shardingsphere-db-protocol/shardingsphere-db-protocol-core/src/main/java/org/apache/shardingsphere/db/protocol/error/CommonErrorCode.java
index 7d356a1..b20bc4b 100644
---
a/shardingsphere-db-protocol/shardingsphere-db-protocol-core/src/main/java/org/apache/shardingsphere/db/protocol/error/CommonErrorCode.java
+++
b/shardingsphere-db-protocol/shardingsphere-db-protocol-core/src/main/java/org/apache/shardingsphere/db/protocol/error/CommonErrorCode.java
@@ -33,12 +33,12 @@ public enum CommonErrorCode implements SQLErrorCode {
SCALING_OPERATE_FAILED(1209, "C1209", "Scaling Operate Failed: [%s]"),
+ SCHEMA_WRITE_LOCKED(1300, "C1300", "The schema %s is read-only"),
+
TABLE_LOCK_WAIT_TIMEOUT(1301, "C1301", "The table %s of schema %s lock
wait timeout of %s ms exceeded"),
TABLE_LOCKED(1302, "C1302", "The table %s of schema %s is locked"),
- SCHEMA_LOCKED(1302, "C1303", "The schema %s is locked"),
-
TOO_MANY_CONNECTIONS_EXCEPTION(1040, "08004", "Too many connections"),
RUNTIME_EXCEPTION(1997, "C1997", "Runtime exception: [%s]"),
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/future/lock/DistributeLockContext.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/future/lock/DistributeLockContext.java
index 2084b81..fdbc3e7 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/future/lock/DistributeLockContext.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/future/lock/DistributeLockContext.java
@@ -56,19 +56,19 @@ public final class DistributeLockContext implements
LockContext {
public synchronized Optional<ShardingSphereLock> getSchemaLock(final
String schemaName) {
ShardingSphereGlobalLock result = globalLocks.get(schemaName);
if (null == result) {
- result = createGlobalLock(schemaName);
+ result =
crateGlobalLock(instanceContext.getInstance().getInstanceDefinition().getInstanceId().getId());
globalLocks.put(schemaName, result);
}
return Optional.of(result);
}
- private ShardingSphereGlobalLock createGlobalLock(final String schemaName)
{
- return new ShardingSphereDistributeGlobalLock(instanceContext,
instanceContext.getInstance().getInstanceDefinition().getInstanceId().getId(),
globalLockService);
+ private ShardingSphereGlobalLock crateGlobalLock(final String
ownerInstanceId) {
+ return new ShardingSphereDistributeGlobalLock(instanceContext,
ownerInstanceId, globalLockService);
}
@Override
public synchronized boolean isLockedSchema(final String schemaName) {
- return getSchemaLock(schemaName).map(shardingSphereGlobalLock ->
shardingSphereGlobalLock.isLocked(schemaName)).orElse(false);
+ return getGlobalLock(schemaName).map(shardingSphereGlobalLock ->
shardingSphereGlobalLock.isLocked(schemaName)).orElse(false);
}
/**
@@ -80,10 +80,10 @@ public final class DistributeLockContext implements
LockContext {
globalLockService.initGlobalLockRoot();
return;
}
- ShardingSphereDistributeGlobalLock lock;
+ ShardingSphereGlobalLock lock;
for (String each : allGlobalLock) {
String[] schemaInstanceId = LockNodeUtil.parseLockName(each);
- lock = new ShardingSphereDistributeGlobalLock(instanceContext,
schemaInstanceId[1], globalLockService);
+ lock = crateGlobalLock(schemaInstanceId[1]);
lock.ackLock(schemaInstanceId[0], getCurrentInstanceId());
globalLocks.put(schemaInstanceId[0], lock);
}
@@ -109,9 +109,15 @@ public final class DistributeLockContext implements
LockContext {
if (isSameInstanceId(ownerInstanceId)) {
return;
}
- ShardingSphereGlobalLock globalLock = createGlobalLock(schema);
- globalLocks.put(schema, globalLock);
- globalLock.ackLock(schema, getCurrentInstanceId());
+ Optional<ShardingSphereGlobalLock> globalLock = getGlobalLock(schema);
+ ShardingSphereGlobalLock lock;
+ if (globalLock.isPresent()) {
+ lock = globalLock.get();
+ } else {
+ lock = crateGlobalLock(ownerInstanceId);
+ globalLocks.put(schema, lock);
+ }
+ lock.ackLock(schema, getCurrentInstanceId());
}
/**
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/future/lock/ShardingSphereDistributeGlobalLock.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/future/lock/ShardingSphereDistributeGlobalLock.java
index 0af3793..3b4d136 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/future/lock/ShardingSphereDistributeGlobalLock.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/future/lock/ShardingSphereDistributeGlobalLock.java
@@ -55,7 +55,7 @@ public final class ShardingSphereDistributeGlobalLock
implements ShardingSphereG
this.instanceContext = instanceContext;
this.ownerInstanceId = ownerInstanceId;
this.lockService = lockService;
- synchronizedLockState = new
AtomicReference<>(isOwnerInstanceId(getCurrentInstanceId()) ?
LockState.INITIALIZATION : LockState.UNLOCKED);
+ synchronizedLockState = new
AtomicReference<>(isOwnerInstanceId(getCurrentInstanceId()) ?
LockState.INITIALIZATION : LockState.LOCKED);
initLockedInstances(instanceContext);
}
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/future/lock/service/GlobalLockRegistryService.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/future/lock/service/GlobalLockRegistryService.java
index 7bfbe94..7a963c2 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/future/lock/service/GlobalLockRegistryService.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/future/lock/service/GlobalLockRegistryService.java
@@ -60,7 +60,7 @@ public final class GlobalLockRegistryService {
try {
repository.persistEphemeral(lockName, LockState.LOCKED.name());
return true;
- } catch (final ClusterPersistRepositoryException isIgnored) {
+ } catch (final ClusterPersistRepositoryException ignored) {
return false;
}
}
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-memory-mode/shardingsphere-memory-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/memory/lock/MemoryLockContext.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-memory-mode/shardingsphere-memory-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/memory/lock/MemoryLockContext.java
index 1d56593..e8be190 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-memory-mode/shardingsphere-memory-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/memory/lock/MemoryLockContext.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-memory-mode/shardingsphere-memory-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/memory/lock/MemoryLockContext.java
@@ -20,20 +20,31 @@ package org.apache.shardingsphere.mode.manager.memory.lock;
import org.apache.shardingsphere.infra.lock.ShardingSphereLock;
import org.apache.shardingsphere.mode.lock.LockContext;
+import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantLock;
/**
* Memory lock context.
*/
-public class MemoryLockContext implements LockContext {
+public final class MemoryLockContext implements LockContext {
+
+ private final Map<String, ShardingSphereLock> locks = new
ConcurrentHashMap<>();
@Override
- public Optional<ShardingSphereLock> getSchemaLock(final String schemaName)
{
- return Optional.empty();
+ public synchronized Optional<ShardingSphereLock> getSchemaLock(final
String schemaName) {
+ ShardingSphereLock result = locks.get(schemaName);
+ if (null == result) {
+ result = new ShardingSphereNonReentrantLock(new ReentrantLock());
+ locks.put(schemaName, result);
+ }
+ return Optional.of(result);
}
@Override
public boolean isLockedSchema(final String schemaName) {
- return false;
+ ShardingSphereLock shardingSphereLock = locks.get(schemaName);
+ return null != shardingSphereLock &&
shardingSphereLock.isLocked(schemaName);
}
}
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-memory-mode/shardingsphere-memory-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/memory/lock/ShardingSphereNonReentrantLock.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-memory-mode/shardingsphere-memory-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/memory/lock/ShardingSphereNonReentrantLock.java
new file mode 100644
index 0000000..fc2f97a
--- /dev/null
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-memory-mode/shardingsphere-memory-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/memory/lock/ShardingSphereNonReentrantLock.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.memory.lock;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.lock.ShardingSphereLock;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+
+/**
+ * Non reentrant lock implemented ShardingSphereLock.
+ */
+@RequiredArgsConstructor
+public final class ShardingSphereNonReentrantLock implements
ShardingSphereLock {
+
+ private static final long DEFAULT_TRY_LOCK_TIMEOUT_MILLISECONDS = 3 * 60 *
1000;
+
+ private final Lock innerLock;
+
+ private volatile boolean locked;
+
+ @Override
+ public boolean tryLock(final String lockName) {
+ return innerTryLock(DEFAULT_TRY_LOCK_TIMEOUT_MILLISECONDS);
+ }
+
+ @Override
+ public boolean tryLock(final String lockName, final long timeout) {
+ return innerTryLock(timeout);
+ }
+
+ private synchronized boolean innerTryLock(final long timeout) {
+ try {
+ if (innerLock.tryLock(timeout, TimeUnit.MILLISECONDS)) {
+ locked = true;
+ }
+ return false;
+ } catch (final InterruptedException ignored) {
+ return false;
+ }
+ }
+
+ @Override
+ public void releaseLock(final String lockName) {
+ innerLock.unlock();
+ locked = false;
+ }
+
+ @Override
+ public boolean isLocked(final String lockName) {
+ return locked;
+ }
+
+ @Override
+ public long getDefaultTimeOut() {
+ return DEFAULT_TRY_LOCK_TIMEOUT_MILLISECONDS;
+ }
+}
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/lock/ShardingSphereNonReentrantLock.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/lock/ShardingSphereNonReentrantLock.java
new file mode 100644
index 0000000..a1dd88c
--- /dev/null
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/lock/ShardingSphereNonReentrantLock.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.standalone.lock;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.lock.ShardingSphereLock;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+
+/**
+ * Non reentrant lock implemented ShardingSphereLock.
+ */
+@RequiredArgsConstructor
+public final class ShardingSphereNonReentrantLock implements
ShardingSphereLock {
+
+ private static final long DEFAULT_TRY_LOCK_TIMEOUT_MILLISECONDS = 3 * 60 *
1000;
+
+ private final Lock innerLock;
+
+ private volatile boolean locked;
+
+ @Override
+ public boolean tryLock(final String lockName) {
+ return innerTryLock(DEFAULT_TRY_LOCK_TIMEOUT_MILLISECONDS);
+ }
+
+ @Override
+ public boolean tryLock(final String lockName, final long timeout) {
+ return innerTryLock(timeout);
+ }
+
+ private synchronized boolean innerTryLock(final long timeout) {
+ try {
+ if (innerLock.tryLock(timeout, TimeUnit.MILLISECONDS)) {
+ locked = true;
+ }
+ return false;
+ } catch (final InterruptedException ignored) {
+ return false;
+ }
+ }
+
+ @Override
+ public void releaseLock(final String lockName) {
+ innerLock.unlock();
+ locked = false;
+ }
+
+ @Override
+ public boolean isLocked(final String lockName) {
+ return locked;
+ }
+
+ @Override
+ public long getDefaultTimeOut() {
+ return DEFAULT_TRY_LOCK_TIMEOUT_MILLISECONDS;
+ }
+}
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/lock/StandaloneLockContext.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/lock/StandaloneLockContext.java
index 0af6f8a..e045384 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/lock/StandaloneLockContext.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/lock/StandaloneLockContext.java
@@ -20,20 +20,31 @@ package
org.apache.shardingsphere.mode.manager.standalone.lock;
import org.apache.shardingsphere.infra.lock.ShardingSphereLock;
import org.apache.shardingsphere.mode.lock.LockContext;
+import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantLock;
/**
* Standalone lock context.
*/
public final class StandaloneLockContext implements LockContext {
+ private final Map<String, ShardingSphereLock> locks = new
ConcurrentHashMap<>();
+
@Override
- public Optional<ShardingSphereLock> getSchemaLock(final String schemaName)
{
- return Optional.empty();
+ public synchronized Optional<ShardingSphereLock> getSchemaLock(final
String schemaName) {
+ ShardingSphereLock result = locks.get(schemaName);
+ if (null == result) {
+ result = new ShardingSphereNonReentrantLock(new ReentrantLock());
+ locks.put(schemaName, result);
+ }
+ return Optional.of(result);
}
@Override
public boolean isLockedSchema(final String schemaName) {
- return false;
+ ShardingSphereLock shardingSphereLock = locks.get(schemaName);
+ return null != shardingSphereLock &&
shardingSphereLock.isLocked(schemaName);
}
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/err/MySQLErrPacketFactory.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/err/MySQLErrPacketFactory.java
index 5a3d825..778ff28 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/err/MySQLErrPacketFactory.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/err/MySQLErrPacketFactory.java
@@ -32,6 +32,7 @@ import
org.apache.shardingsphere.proxy.backend.exception.DBDropNotExistsExceptio
import
org.apache.shardingsphere.proxy.backend.exception.DatabaseNotExistedException;
import
org.apache.shardingsphere.proxy.backend.exception.NoDatabaseSelectedException;
import
org.apache.shardingsphere.proxy.backend.exception.RuleNotExistedException;
+import org.apache.shardingsphere.proxy.backend.exception.SchemaLockedException;
import
org.apache.shardingsphere.proxy.backend.exception.TableLockWaitTimeoutException;
import org.apache.shardingsphere.proxy.backend.exception.TableLockedException;
import
org.apache.shardingsphere.proxy.backend.exception.TableModifyInTransactionException;
@@ -111,6 +112,10 @@ public final class MySQLErrPacketFactory {
if (cause instanceof RuleNotExistedException || cause instanceof
DatabaseNotExistedException) {
return new MySQLErrPacket(1,
MySQLServerErrorCode.ER_SP_DOES_NOT_EXIST);
}
+ if (cause instanceof SchemaLockedException) {
+ SchemaLockedException exception = (SchemaLockedException) cause;
+ return new MySQLErrPacket(1, CommonErrorCode.SCHEMA_WRITE_LOCKED,
exception.getSchemaName());
+ }
if (cause instanceof TableLockWaitTimeoutException) {
TableLockWaitTimeoutException exception =
(TableLockWaitTimeoutException) cause;
return new MySQLErrPacket(1,
CommonErrorCode.TABLE_LOCK_WAIT_TIMEOUT, exception.getTableName(),