This is an automated email from the ASF dual-hosted git repository.

jiangmaolin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 0fd9c407c4a Fix `show processlist` not wait for all nodes (#35348)
0fd9c407c4a is described below

commit 0fd9c407c4a3a2e6a7c24ab45cbbb1d732c1ce1d
Author: Raigor <[email protected]>
AuthorDate: Fri May 9 12:53:16 2025 +0800

    Fix `show processlist` not wait for all nodes (#35348)
    
    * Fix processlist not wait for all nodes
    
    * Update RELEASE-NOTES.md
---
 RELEASE-NOTES.md                                   |  1 +
 .../sql/process/lock/ProcessOperationLock.java     | 37 +++++-----------------
 .../process/lock/ProcessOperationLockRegistry.java |  7 ++--
 .../lock/ProcessOperationLockRegistryTest.java     |  2 +-
 .../service/ClusterProcessPersistService.java      |  4 +--
 .../service/ClusterProcessPersistServiceTest.java  |  5 +--
 6 files changed, 18 insertions(+), 38 deletions(-)

diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md
index b0cd6b43c99..7f0fa0af36b 100644
--- a/RELEASE-NOTES.md
+++ b/RELEASE-NOTES.md
@@ -46,6 +46,7 @@
 1. DistSQL: Fix duplicate result when show rules used storage unit with 
readwrite-splitting rule - 
[#35129](https://github.com/apache/shardingsphere/pull/35129)
 1. Transaction: Fix conflicting dependencies of BASE transaction integration 
module - [#35142](https://github.com/apache/shardingsphere/pull/35142)
 1. SQL Federation: Fix Operation not allowed after ResultSet closed exception 
when use sql federation - 
[#35206](https://github.com/apache/shardingsphere/pull/35206)
+1. Proxy: Fix `show processlist` not wait for all nodes - 
[#35348](https://github.com/apache/shardingsphere/pull/35348)
 
 ### Change Logs
 
diff --git 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ProcessOperationLock.java
 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ProcessOperationLock.java
index ca4e0c7e005..9822205bc20 100644
--- 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ProcessOperationLock.java
+++ 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ProcessOperationLock.java
@@ -19,10 +19,8 @@ package 
org.apache.shardingsphere.infra.executor.sql.process.lock;
 
 import lombok.SneakyThrows;
 
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * Process operation lock.
@@ -31,22 +29,10 @@ public final class ProcessOperationLock {
     
     private static final long TIMEOUT_MILLIS = 5000L;
     
-    private final Lock lock = new ReentrantLock();
+    private final CountDownLatch latch;
     
-    private final Condition condition = lock.newCondition();
-    
-    /**
-     * Lock.
-     */
-    public void lock() {
-        lock.lock();
-    }
-    
-    /**
-     * Unlock.
-     */
-    public void unlock() {
-        lock.unlock();
+    public ProcessOperationLock(final int latchCount) {
+        latch = new CountDownLatch(latchCount);
     }
     
     /**
@@ -57,23 +43,16 @@ public final class ProcessOperationLock {
      */
     @SneakyThrows(InterruptedException.class)
     public boolean awaitDefaultTime(final ProcessOperationLockReleaseStrategy 
releaseStrategy) {
-        while (!releaseStrategy.isReadyToRelease()) {
-            if (condition.await(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
-                return false;
-            }
+        if (releaseStrategy.isReadyToRelease()) {
+            return true;
         }
-        return true;
+        return latch.await(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
     }
     
     /**
      * Notify.
      */
     public void doNotify() {
-        lock.lock();
-        try {
-            condition.signalAll();
-        } finally {
-            lock.unlock();
-        }
+        latch.countDown();
     }
 }
diff --git 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ProcessOperationLockRegistry.java
 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ProcessOperationLockRegistry.java
index ae23e2e50f8..83efe5c1a4f 100644
--- 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ProcessOperationLockRegistry.java
+++ 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ProcessOperationLockRegistry.java
@@ -46,17 +46,16 @@ public final class ProcessOperationLockRegistry {
      * Wait until release ready.
      *
      * @param processId process ID
+     * @param latchCount count fot latch
      * @param releaseStrategy process operation lock release strategy
      * @return release ready or not
      */
-    public boolean waitUntilReleaseReady(final String processId, final 
ProcessOperationLockReleaseStrategy releaseStrategy) {
-        ProcessOperationLock lock = new ProcessOperationLock();
+    public boolean waitUntilReleaseReady(final String processId, final int 
latchCount, final ProcessOperationLockReleaseStrategy releaseStrategy) {
+        ProcessOperationLock lock = new ProcessOperationLock(latchCount);
         locks.put(processId, lock);
-        lock.lock();
         try {
             return lock.awaitDefaultTime(releaseStrategy);
         } finally {
-            lock.unlock();
             locks.remove(processId);
         }
     }
diff --git 
a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ProcessOperationLockRegistryTest.java
 
b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ProcessOperationLockRegistryTest.java
index 90139a1b43f..614e502b020 100644
--- 
a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ProcessOperationLockRegistryTest.java
+++ 
b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ProcessOperationLockRegistryTest.java
@@ -45,7 +45,7 @@ class ProcessOperationLockRegistryTest {
     }
     
     private void waitUntilReleaseReady(final String lockId) {
-        
ProcessOperationLockRegistry.getInstance().waitUntilReleaseReady(lockId, new 
ProcessOperationLockReleaseStrategy() {
+        
ProcessOperationLockRegistry.getInstance().waitUntilReleaseReady(lockId, 1, new 
ProcessOperationLockReleaseStrategy() {
             
             private final AtomicBoolean firstTime = new AtomicBoolean(true);
             
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/ClusterProcessPersistService.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/ClusterProcessPersistService.java
index fcd794a00f5..cdf35ec1dd4 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/ClusterProcessPersistService.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/ClusterProcessPersistService.java
@@ -54,7 +54,7 @@ public final class ClusterProcessPersistService implements 
ProcessPersistService
         boolean isCompleted = false;
         try {
             triggerPaths.forEach(each -> repository.persist(each, ""));
-            isCompleted = 
ProcessOperationLockRegistry.getInstance().waitUntilReleaseReady(taskId, () -> 
isReady(triggerPaths));
+            isCompleted = 
ProcessOperationLockRegistry.getInstance().waitUntilReleaseReady(taskId, 
triggerPaths.size(), () -> isReady(triggerPaths));
             return getShowProcessListData(taskId);
         } finally {
             repository.delete(NodePathGenerator.toPath(new 
ProcessNodePath(taskId, null)));
@@ -86,7 +86,7 @@ public final class ClusterProcessPersistService implements 
ProcessPersistService
         boolean isCompleted = false;
         try {
             triggerPaths.forEach(each -> repository.persist(each, ""));
-            isCompleted = 
ProcessOperationLockRegistry.getInstance().waitUntilReleaseReady(processId, () 
-> isReady(triggerPaths));
+            isCompleted = 
ProcessOperationLockRegistry.getInstance().waitUntilReleaseReady(processId, 
triggerPaths.size(), () -> isReady(triggerPaths));
         } finally {
             if (!isCompleted) {
                 triggerPaths.forEach(repository::delete);
diff --git 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/ClusterProcessPersistServiceTest.java
 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/ClusterProcessPersistServiceTest.java
index 0b224a770f2..147f801ae0f 100644
--- 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/ClusterProcessPersistServiceTest.java
+++ 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/ClusterProcessPersistServiceTest.java
@@ -37,6 +37,7 @@ import java.util.Collections;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.contains;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.times;
@@ -59,7 +60,7 @@ class ClusterProcessPersistServiceTest {
     
     @Test
     void assertGetCompletedProcessList() {
-        
when(ProcessOperationLockRegistry.getInstance().waitUntilReleaseReady(any(), 
any())).thenReturn(true);
+        
when(ProcessOperationLockRegistry.getInstance().waitUntilReleaseReady(any(), 
anyInt(), any())).thenReturn(true);
         assertGetProcessList();
         verify(repository, 
times(0)).delete(contains("/nodes/compute_nodes/show_process_list_trigger/abc:"));
     }
@@ -93,7 +94,7 @@ class ClusterProcessPersistServiceTest {
     
     @Test
     void assertKillCompletedProcess() {
-        
when(ProcessOperationLockRegistry.getInstance().waitUntilReleaseReady(any(), 
any())).thenReturn(true);
+        
when(ProcessOperationLockRegistry.getInstance().waitUntilReleaseReady(any(), 
anyInt(), any())).thenReturn(true);
         assertKillProcess();
         verify(repository, 
times(0)).delete("/nodes/compute_nodes/kill_process_trigger/abc:foo_process_id");
     }

Reply via email to