This is an automated email from the ASF dual-hosted git repository.
jiangmaolin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 0fd9c407c4a Fix `show processlist` not wait for all nodes (#35348)
0fd9c407c4a is described below
commit 0fd9c407c4a3a2e6a7c24ab45cbbb1d732c1ce1d
Author: Raigor <[email protected]>
AuthorDate: Fri May 9 12:53:16 2025 +0800
Fix `show processlist` not wait for all nodes (#35348)
* Fix processlist not wait for all nodes
* Update RELEASE-NOTES.md
---
RELEASE-NOTES.md | 1 +
.../sql/process/lock/ProcessOperationLock.java | 37 +++++-----------------
.../process/lock/ProcessOperationLockRegistry.java | 7 ++--
.../lock/ProcessOperationLockRegistryTest.java | 2 +-
.../service/ClusterProcessPersistService.java | 4 +--
.../service/ClusterProcessPersistServiceTest.java | 5 +--
6 files changed, 18 insertions(+), 38 deletions(-)
diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md
index b0cd6b43c99..7f0fa0af36b 100644
--- a/RELEASE-NOTES.md
+++ b/RELEASE-NOTES.md
@@ -46,6 +46,7 @@
1. DistSQL: Fix duplicate result when show rules used storage unit with
readwrite-splitting rule -
[#35129](https://github.com/apache/shardingsphere/pull/35129)
1. Transaction: Fix conflicting dependencies of BASE transaction integration
module - [#35142](https://github.com/apache/shardingsphere/pull/35142)
1. SQL Federation: Fix Operation not allowed after ResultSet closed exception
when use sql federation -
[#35206](https://github.com/apache/shardingsphere/pull/35206)
+1. Proxy: Fix `show processlist` not wait for all nodes -
[#35348](https://github.com/apache/shardingsphere/pull/35348)
### Change Logs
diff --git
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ProcessOperationLock.java
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ProcessOperationLock.java
index ca4e0c7e005..9822205bc20 100644
---
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ProcessOperationLock.java
+++
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ProcessOperationLock.java
@@ -19,10 +19,8 @@ package
org.apache.shardingsphere.infra.executor.sql.process.lock;
import lombok.SneakyThrows;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
/**
* Process operation lock.
@@ -31,22 +29,10 @@ public final class ProcessOperationLock {
private static final long TIMEOUT_MILLIS = 5000L;
- private final Lock lock = new ReentrantLock();
+ private final CountDownLatch latch;
- private final Condition condition = lock.newCondition();
-
- /**
- * Lock.
- */
- public void lock() {
- lock.lock();
- }
-
- /**
- * Unlock.
- */
- public void unlock() {
- lock.unlock();
+ public ProcessOperationLock(final int latchCount) {
+ latch = new CountDownLatch(latchCount);
}
/**
@@ -57,23 +43,16 @@ public final class ProcessOperationLock {
*/
@SneakyThrows(InterruptedException.class)
public boolean awaitDefaultTime(final ProcessOperationLockReleaseStrategy
releaseStrategy) {
- while (!releaseStrategy.isReadyToRelease()) {
- if (condition.await(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
- return false;
- }
+ if (releaseStrategy.isReadyToRelease()) {
+ return true;
}
- return true;
+ return latch.await(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
}
/**
* Notify.
*/
public void doNotify() {
- lock.lock();
- try {
- condition.signalAll();
- } finally {
- lock.unlock();
- }
+ latch.countDown();
}
}
diff --git
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ProcessOperationLockRegistry.java
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ProcessOperationLockRegistry.java
index ae23e2e50f8..83efe5c1a4f 100644
---
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ProcessOperationLockRegistry.java
+++
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ProcessOperationLockRegistry.java
@@ -46,17 +46,16 @@ public final class ProcessOperationLockRegistry {
* Wait until release ready.
*
* @param processId process ID
+ * @param latchCount count fot latch
* @param releaseStrategy process operation lock release strategy
* @return release ready or not
*/
- public boolean waitUntilReleaseReady(final String processId, final
ProcessOperationLockReleaseStrategy releaseStrategy) {
- ProcessOperationLock lock = new ProcessOperationLock();
+ public boolean waitUntilReleaseReady(final String processId, final int
latchCount, final ProcessOperationLockReleaseStrategy releaseStrategy) {
+ ProcessOperationLock lock = new ProcessOperationLock(latchCount);
locks.put(processId, lock);
- lock.lock();
try {
return lock.awaitDefaultTime(releaseStrategy);
} finally {
- lock.unlock();
locks.remove(processId);
}
}
diff --git
a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ProcessOperationLockRegistryTest.java
b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ProcessOperationLockRegistryTest.java
index 90139a1b43f..614e502b020 100644
---
a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ProcessOperationLockRegistryTest.java
+++
b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ProcessOperationLockRegistryTest.java
@@ -45,7 +45,7 @@ class ProcessOperationLockRegistryTest {
}
private void waitUntilReleaseReady(final String lockId) {
-
ProcessOperationLockRegistry.getInstance().waitUntilReleaseReady(lockId, new
ProcessOperationLockReleaseStrategy() {
+
ProcessOperationLockRegistry.getInstance().waitUntilReleaseReady(lockId, 1, new
ProcessOperationLockReleaseStrategy() {
private final AtomicBoolean firstTime = new AtomicBoolean(true);
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/ClusterProcessPersistService.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/ClusterProcessPersistService.java
index fcd794a00f5..cdf35ec1dd4 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/ClusterProcessPersistService.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/ClusterProcessPersistService.java
@@ -54,7 +54,7 @@ public final class ClusterProcessPersistService implements
ProcessPersistService
boolean isCompleted = false;
try {
triggerPaths.forEach(each -> repository.persist(each, ""));
- isCompleted =
ProcessOperationLockRegistry.getInstance().waitUntilReleaseReady(taskId, () ->
isReady(triggerPaths));
+ isCompleted =
ProcessOperationLockRegistry.getInstance().waitUntilReleaseReady(taskId,
triggerPaths.size(), () -> isReady(triggerPaths));
return getShowProcessListData(taskId);
} finally {
repository.delete(NodePathGenerator.toPath(new
ProcessNodePath(taskId, null)));
@@ -86,7 +86,7 @@ public final class ClusterProcessPersistService implements
ProcessPersistService
boolean isCompleted = false;
try {
triggerPaths.forEach(each -> repository.persist(each, ""));
- isCompleted =
ProcessOperationLockRegistry.getInstance().waitUntilReleaseReady(processId, ()
-> isReady(triggerPaths));
+ isCompleted =
ProcessOperationLockRegistry.getInstance().waitUntilReleaseReady(processId,
triggerPaths.size(), () -> isReady(triggerPaths));
} finally {
if (!isCompleted) {
triggerPaths.forEach(repository::delete);
diff --git
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/ClusterProcessPersistServiceTest.java
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/ClusterProcessPersistServiceTest.java
index 0b224a770f2..147f801ae0f 100644
---
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/ClusterProcessPersistServiceTest.java
+++
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/ClusterProcessPersistServiceTest.java
@@ -37,6 +37,7 @@ import java.util.Collections;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.contains;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.times;
@@ -59,7 +60,7 @@ class ClusterProcessPersistServiceTest {
@Test
void assertGetCompletedProcessList() {
-
when(ProcessOperationLockRegistry.getInstance().waitUntilReleaseReady(any(),
any())).thenReturn(true);
+
when(ProcessOperationLockRegistry.getInstance().waitUntilReleaseReady(any(),
anyInt(), any())).thenReturn(true);
assertGetProcessList();
verify(repository,
times(0)).delete(contains("/nodes/compute_nodes/show_process_list_trigger/abc:"));
}
@@ -93,7 +94,7 @@ class ClusterProcessPersistServiceTest {
@Test
void assertKillCompletedProcess() {
-
when(ProcessOperationLockRegistry.getInstance().waitUntilReleaseReady(any(),
any())).thenReturn(true);
+
when(ProcessOperationLockRegistry.getInstance().waitUntilReleaseReady(any(),
anyInt(), any())).thenReturn(true);
assertKillProcess();
verify(repository,
times(0)).delete("/nodes/compute_nodes/kill_process_trigger/abc:foo_process_id");
}