This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 237a864468c Refactor ProcessListChangedSubscriber (#34203)
237a864468c is described below

commit 237a864468cc56b4a3d85fcc986571342c170ded
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Dec 29 22:03:19 2024 +0800

    Refactor ProcessListChangedSubscriber (#34203)
    
    * Refactor ProcessListChangedSubscriber
    
    * Refactor ProcessListChangedSubscriber
    
    * Refactor ProcessListChangedSubscriber
    
    * Refactor ProcessListChangedSubscriber
    
    * Refactor ProcessListChangedSubscriber
    
    * Refactor ProcessListChangedSubscriber
    
    * Refactor ProcessListChangedSubscriber
    
    * Refactor ProcessListChangedSubscriber
    
    * Refactor ProcessListChangedSubscriber
---
 .../infra/executor/sql/process/Process.java        | 13 +++++++
 .../executor/sql/process/ProcessRegistry.java      | 16 +++++++-
 .../service/divided/ProcessPersistService.java     | 16 ++++++++
 .../type/ProcessListChangedSubscriber.java         | 45 ++++++----------------
 .../service/ClusterProcessPersistService.java      | 17 ++++++++
 .../type/ProcessListChangedSubscriberTest.java     | 42 ++++----------------
 .../service/ClusterProcessPersistServiceTest.java  | 27 ++++++++++++-
 .../service/StandaloneProcessPersistService.java   | 17 ++++----
 .../StandaloneProcessPersistServiceTest.java       | 18 +--------
 .../admin/executor/ShowProcessListExecutor.java    |  2 +-
 10 files changed, 118 insertions(+), 95 deletions(-)

diff --git 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java
 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java
index f715b1af99b..a1eae30a7ec 100644
--- 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java
+++ 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java
@@ -27,6 +27,7 @@ import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutionU
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
 import org.apache.shardingsphere.infra.metadata.user.Grantee;
 
+import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.LinkedHashMap;
 import java.util.Map;
@@ -151,4 +152,16 @@ public final class Process {
     public void removeProcessStatement(final ExecutionUnit executionUnit) {
         processStatements.remove(System.identityHashCode(executionUnit));
     }
+    
+    /**
+     * Kill process.
+     *
+     * @throws SQLException SQL exception
+     */
+    public void kill() throws SQLException {
+        setInterrupted(true);
+        for (Statement each : processStatements.values()) {
+            each.cancel();
+        }
+    }
 }
diff --git 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessRegistry.java
 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessRegistry.java
index 53a5c18f6c9..1198c4245de 100644
--- 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessRegistry.java
+++ 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessRegistry.java
@@ -24,6 +24,7 @@ import 
org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
 import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
 import 
org.apache.shardingsphere.infra.exception.kernel.connection.SQLExecutionInterruptedException;
 
+import java.sql.SQLException;
 import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -94,11 +95,24 @@ public final class ProcessRegistry {
     }
     
     /**
-     * List all process.
+     * List all processes.
      *
      * @return all processes
      */
     public Collection<Process> listAll() {
         return processes.values();
     }
+    
+    /**
+     * Kill process.
+     *
+     * @param processId process ID
+     * @throws SQLException SQL exception
+     */
+    public void kill(final String processId) throws SQLException {
+        Process process = ProcessRegistry.getInstance().get(processId);
+        if (null != process) {
+            process.kill();
+        }
+    }
 }
diff --git 
a/mode/core/src/main/java/org/apache/shardingsphere/mode/persist/service/divided/ProcessPersistService.java
 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/persist/service/divided/ProcessPersistService.java
index 0794364a6b8..894424fc4b9 100644
--- 
a/mode/core/src/main/java/org/apache/shardingsphere/mode/persist/service/divided/ProcessPersistService.java
+++ 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/persist/service/divided/ProcessPersistService.java
@@ -27,6 +27,14 @@ import java.util.Collection;
  */
 public interface ProcessPersistService {
     
+    /**
+     * Report local processes.
+     *
+     * @param instanceId instance ID
+     * @param taskId task ID
+     */
+    void reportLocalProcesses(String instanceId, String taskId);
+    
     /**
      * Get process list.
      *
@@ -41,4 +49,12 @@ public interface ProcessPersistService {
      * @throws  SQLException SQL exception
      */
     void killProcess(String processId) throws SQLException;
+    
+    /**
+     * Clean process.
+     *
+     * @param instanceId instance ID
+     * @param processId process ID
+     */
+    void cleanProcess(String instanceId, String processId);
 }
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ProcessListChangedSubscriber.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ProcessListChangedSubscriber.java
index 4e9c64cd2c8..32f22434919 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ProcessListChangedSubscriber.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ProcessListChangedSubscriber.java
@@ -18,57 +18,42 @@
 package 
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.type;
 
 import com.google.common.eventbus.Subscribe;
-import org.apache.shardingsphere.infra.executor.sql.process.Process;
 import org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry;
 import 
org.apache.shardingsphere.infra.executor.sql.process.lock.ProcessOperationLockRegistry;
-import 
org.apache.shardingsphere.infra.executor.sql.process.yaml.swapper.YamlProcessListSwapper;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
-import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
-import org.apache.shardingsphere.metadata.persist.node.ProcessNode;
+import org.apache.shardingsphere.mode.manager.ContextManager;
 import 
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.event.state.compute.KillLocalProcessCompletedEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.event.state.compute.KillLocalProcessEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.event.state.compute.ReportLocalProcessesCompletedEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.event.state.compute.ReportLocalProcessesEvent;
-import org.apache.shardingsphere.mode.manager.ContextManager;
 import 
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.DispatchEventSubscriber;
-import org.apache.shardingsphere.mode.spi.PersistRepository;
+import 
org.apache.shardingsphere.mode.persist.service.divided.ProcessPersistService;
 
 import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Collection;
 
 /**
  * Process list changed subscriber.
  */
 public final class ProcessListChangedSubscriber implements 
DispatchEventSubscriber {
     
-    private final ContextManager contextManager;
-    
-    private final PersistRepository repository;
+    private final String instanceId;
     
-    private final YamlProcessListSwapper swapper;
+    private final ProcessPersistService processPersistService;
     
     public ProcessListChangedSubscriber(final ContextManager contextManager) {
-        this.contextManager = contextManager;
-        repository = contextManager.getPersistServiceFacade().getRepository();
-        swapper = new YamlProcessListSwapper();
+        instanceId = 
contextManager.getComputeNodeInstanceContext().getInstance().getMetaData().getId();
+        processPersistService = 
contextManager.getPersistServiceFacade().getProcessPersistService();
     }
     
     /**
      * Report local processes.
      *
-     * @param event show process list trigger event
+     * @param event report local processes event
      */
     @Subscribe
     public void reportLocalProcesses(final ReportLocalProcessesEvent event) {
-        if 
(!event.getInstanceId().equals(contextManager.getComputeNodeInstanceContext().getInstance().getMetaData().getId()))
 {
-            return;
-        }
-        Collection<Process> processes = 
ProcessRegistry.getInstance().listAll();
-        if (!processes.isEmpty()) {
-            
repository.persist(ProcessNode.getProcessListInstancePath(event.getTaskId(), 
event.getInstanceId()), 
YamlEngine.marshal(swapper.swapToYamlConfiguration(processes)));
+        if (event.getInstanceId().equals(instanceId)) {
+            processPersistService.reportLocalProcesses(instanceId, 
event.getTaskId());
         }
-        
repository.delete(ComputeNode.getProcessTriggerInstanceNodePath(event.getInstanceId(),
 event.getTaskId()));
     }
     
     /**
@@ -89,17 +74,11 @@ public final class ProcessListChangedSubscriber implements 
DispatchEventSubscrib
      */
     @Subscribe
     public synchronized void killLocalProcess(final KillLocalProcessEvent 
event) throws SQLException {
-        if 
(!event.getInstanceId().equals(contextManager.getComputeNodeInstanceContext().getInstance().getMetaData().getId()))
 {
+        if (!event.getInstanceId().equals(instanceId)) {
             return;
         }
-        Process process = 
ProcessRegistry.getInstance().get(event.getProcessId());
-        if (null != process) {
-            process.setInterrupted(true);
-            for (Statement each : process.getProcessStatements().values()) {
-                each.cancel();
-            }
-        }
-        
repository.delete(ComputeNode.getProcessKillInstanceIdNodePath(event.getInstanceId(),
 event.getProcessId()));
+        ProcessRegistry.getInstance().kill(event.getProcessId());
+        processPersistService.cleanProcess(instanceId, event.getProcessId());
     }
     
     /**
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/ClusterProcessPersistService.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/ClusterProcessPersistService.java
index 3bc48b500c1..9bcc9470062 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/ClusterProcessPersistService.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/ClusterProcessPersistService.java
@@ -19,6 +19,7 @@ package 
org.apache.shardingsphere.mode.manager.cluster.persist.service;
 
 import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.infra.executor.sql.process.Process;
+import org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry;
 import 
org.apache.shardingsphere.infra.executor.sql.process.lock.ProcessOperationLockRegistry;
 import 
org.apache.shardingsphere.infra.executor.sql.process.yaml.YamlProcessList;
 import 
org.apache.shardingsphere.infra.executor.sql.process.yaml.swapper.YamlProcessListSwapper;
@@ -43,6 +44,17 @@ public final class ClusterProcessPersistService implements 
ProcessPersistService
     
     private final PersistRepository repository;
     
+    private final YamlProcessListSwapper swapper = new 
YamlProcessListSwapper();
+    
+    @Override
+    public void reportLocalProcesses(final String instanceId, final String 
taskId) {
+        Collection<Process> processes = 
ProcessRegistry.getInstance().listAll();
+        if (!processes.isEmpty()) {
+            repository.persist(ProcessNode.getProcessListInstancePath(taskId, 
instanceId), YamlEngine.marshal(swapper.swapToYamlConfiguration(processes)));
+        }
+        
repository.delete(ComputeNode.getProcessTriggerInstanceNodePath(instanceId, 
taskId));
+    }
+    
     @Override
     public Collection<Process> getProcessList() {
         String taskId = new UUID(ThreadLocalRandom.current().nextLong(), 
ThreadLocalRandom.current().nextLong()).toString().replace("-", "");
@@ -98,4 +110,9 @@ public final class ClusterProcessPersistService implements 
ProcessPersistService
     private boolean isReady(final Collection<String> paths) {
         return paths.stream().noneMatch(each -> null != 
repository.query(each));
     }
+    
+    @Override
+    public void cleanProcess(final String instanceId, final String processId) {
+        
repository.delete(ComputeNode.getProcessKillInstanceIdNodePath(instanceId, 
processId));
+    }
 }
diff --git 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ProcessListChangedSubscriberTest.java
 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ProcessListChangedSubscriberTest.java
index 2dab71dc3d4..5cd48c2be7b 100644
--- 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ProcessListChangedSubscriberTest.java
+++ 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ProcessListChangedSubscriberTest.java
@@ -17,14 +17,13 @@
 
 package 
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.type;
 
-import org.apache.shardingsphere.infra.executor.sql.process.Process;
 import org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry;
 import 
org.apache.shardingsphere.infra.executor.sql.process.lock.ProcessOperationLockRegistry;
+import org.apache.shardingsphere.mode.manager.ContextManager;
 import 
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.event.state.compute.KillLocalProcessCompletedEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.event.state.compute.KillLocalProcessEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.event.state.compute.ReportLocalProcessesCompletedEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.event.state.compute.ReportLocalProcessesEvent;
-import org.apache.shardingsphere.mode.manager.ContextManager;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 import org.apache.shardingsphere.test.mock.AutoMockExtension;
 import org.apache.shardingsphere.test.mock.StaticMockSettings;
@@ -37,12 +36,9 @@ import org.mockito.junit.jupiter.MockitoSettings;
 import org.mockito.quality.Strictness;
 
 import java.sql.SQLException;
-import java.sql.Statement;
 import java.util.Collections;
 
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -72,19 +68,10 @@ class ProcessListChangedSubscriberTest {
     }
     
     @Test
-    void assertReportEmptyLocalProcesses() {
+    void assertReportLocalProcesses() {
         
when(ProcessRegistry.getInstance().listAll()).thenReturn(Collections.emptyList());
         subscriber.reportLocalProcesses(new 
ReportLocalProcessesEvent("foo_instance_id", "foo_task_id"));
-        verify(contextManager.getPersistServiceFacade().getRepository(), 
times(0)).persist(any(), any());
-        
verify(contextManager.getPersistServiceFacade().getRepository()).delete("/nodes/compute_nodes/show_process_list_trigger/foo_instance_id:foo_task_id");
-    }
-    
-    @Test
-    void assertReportNotEmptyLocalProcesses() {
-        
when(ProcessRegistry.getInstance().listAll()).thenReturn(Collections.singleton(mock(Process.class,
 RETURNS_DEEP_STUBS)));
-        subscriber.reportLocalProcesses(new 
ReportLocalProcessesEvent("foo_instance_id", "foo_task_id"));
-        
verify(contextManager.getPersistServiceFacade().getRepository()).persist(eq("/execution_nodes/foo_task_id/foo_instance_id"),
 any());
-        
verify(contextManager.getPersistServiceFacade().getRepository()).delete("/nodes/compute_nodes/show_process_list_trigger/foo_instance_id:foo_task_id");
+        
verify(contextManager.getPersistServiceFacade().getProcessPersistService()).reportLocalProcesses("foo_instance_id",
 "foo_task_id");
     }
     
     @Test
@@ -94,28 +81,15 @@ class ProcessListChangedSubscriberTest {
     }
     
     @Test
-    void assertKillLocalProcessWithNotCurrentInstance() throws SQLException {
-        subscriber.killLocalProcess(new 
KillLocalProcessEvent("bar_instance_id", "foo_pid"));
-        verify(contextManager.getPersistServiceFacade().getRepository(), 
times(0)).delete(any());
-    }
-    
-    @Test
-    void assertKillLocalProcessWithoutExistedProcess() throws SQLException {
-        when(ProcessRegistry.getInstance().get("foo_pid")).thenReturn(null);
+    void assertKillLocalProcessWithCurrentInstance() throws SQLException {
         subscriber.killLocalProcess(new 
KillLocalProcessEvent("foo_instance_id", "foo_pid"));
-        
verify(contextManager.getPersistServiceFacade().getRepository()).delete("/nodes/compute_nodes/kill_process_trigger/foo_instance_id:foo_pid");
+        
verify(contextManager.getPersistServiceFacade().getProcessPersistService()).cleanProcess("foo_instance_id",
 "foo_pid");
     }
     
     @Test
-    void assertKillLocalProcessWithExistedProcess() throws SQLException {
-        Process process = mock(Process.class, RETURNS_DEEP_STUBS);
-        Statement statement = mock(Statement.class);
-        
when(process.getProcessStatements()).thenReturn(Collections.singletonMap(1, 
statement));
-        when(ProcessRegistry.getInstance().get("foo_pid")).thenReturn(process);
-        subscriber.killLocalProcess(new 
KillLocalProcessEvent("foo_instance_id", "foo_pid"));
-        verify(process).setInterrupted(true);
-        verify(statement).cancel();
-        
verify(contextManager.getPersistServiceFacade().getRepository()).delete("/nodes/compute_nodes/kill_process_trigger/foo_instance_id:foo_pid");
+    void assertKillLocalProcessWithNotCurrentInstance() throws SQLException {
+        subscriber.killLocalProcess(new 
KillLocalProcessEvent("bar_instance_id", "foo_pid"));
+        
verify(contextManager.getPersistServiceFacade().getProcessPersistService(), 
times(0)).cleanProcess("bar_instance_id", "foo_pid");
     }
     
     @Test
diff --git 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/ClusterProcessPersistServiceTest.java
 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/ClusterProcessPersistServiceTest.java
index efebb308c67..5a888875f8d 100644
--- 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/ClusterProcessPersistServiceTest.java
+++ 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/ClusterProcessPersistServiceTest.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.mode.manager.cluster.persist.service;
 
 import org.apache.shardingsphere.infra.executor.sql.process.Process;
+import org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry;
 import 
org.apache.shardingsphere.infra.executor.sql.process.lock.ProcessOperationLockRegistry;
 import org.apache.shardingsphere.infra.executor.sql.process.yaml.YamlProcess;
 import 
org.apache.shardingsphere.infra.executor.sql.process.yaml.YamlProcessList;
@@ -40,12 +41,14 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.contains;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 @ExtendWith(AutoMockExtension.class)
-@StaticMockSettings(ProcessOperationLockRegistry.class)
+@StaticMockSettings({ProcessRegistry.class, 
ProcessOperationLockRegistry.class})
 class ClusterProcessPersistServiceTest {
     
     @Mock
@@ -58,6 +61,22 @@ class ClusterProcessPersistServiceTest {
         processPersistService = new ClusterProcessPersistService(repository);
     }
     
+    @Test
+    void assertReportEmptyLocalProcesses() {
+        
when(ProcessRegistry.getInstance().listAll()).thenReturn(Collections.emptyList());
+        processPersistService.reportLocalProcesses("foo_instance_id", 
"foo_task_id");
+        verify(repository, times(0)).persist(any(), any());
+        
verify(repository).delete("/nodes/compute_nodes/show_process_list_trigger/foo_instance_id:foo_task_id");
+    }
+    
+    @Test
+    void assertReportNotEmptyLocalProcesses() {
+        
when(ProcessRegistry.getInstance().listAll()).thenReturn(Collections.singleton(mock(Process.class,
 RETURNS_DEEP_STUBS)));
+        processPersistService.reportLocalProcesses("foo_instance_id", 
"foo_task_id");
+        
verify(repository).persist(eq("/execution_nodes/foo_task_id/foo_instance_id"), 
any());
+        
verify(repository).delete("/nodes/compute_nodes/show_process_list_trigger/foo_instance_id:foo_task_id");
+    }
+    
     @Test
     void assertGetCompletedProcessList() {
         
when(ProcessOperationLockRegistry.getInstance().waitUntilReleaseReady(any(), 
any())).thenReturn(true);
@@ -111,4 +130,10 @@ class ClusterProcessPersistServiceTest {
         processPersistService.killProcess("foo_process_id");
         
verify(repository).persist("/nodes/compute_nodes/kill_process_trigger/abc:foo_process_id",
 "");
     }
+    
+    @Test
+    void assertCleanProcess() {
+        processPersistService.cleanProcess("foo_instance_id", "foo_pid");
+        
verify(repository).delete("/nodes/compute_nodes/kill_process_trigger/foo_instance_id:foo_pid");
+    }
 }
diff --git 
a/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/persist/service/StandaloneProcessPersistService.java
 
b/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/persist/service/StandaloneProcessPersistService.java
index 135bc0a1a55..d7c0e6e5057 100644
--- 
a/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/persist/service/StandaloneProcessPersistService.java
+++ 
b/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/persist/service/StandaloneProcessPersistService.java
@@ -22,7 +22,6 @@ import 
org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry;
 import 
org.apache.shardingsphere.mode.persist.service.divided.ProcessPersistService;
 
 import java.sql.SQLException;
-import java.sql.Statement;
 import java.util.Collection;
 
 /**
@@ -30,6 +29,10 @@ import java.util.Collection;
  */
 public final class StandaloneProcessPersistService implements 
ProcessPersistService {
     
+    @Override
+    public void reportLocalProcesses(final String instanceId, final String 
taskId) {
+    }
+    
     @Override
     public Collection<Process> getProcessList() {
         return ProcessRegistry.getInstance().listAll();
@@ -37,12 +40,10 @@ public final class StandaloneProcessPersistService 
implements ProcessPersistServ
     
     @Override
     public void killProcess(final String processId) throws SQLException {
-        Process process = ProcessRegistry.getInstance().get(processId);
-        if (null == process) {
-            return;
-        }
-        for (Statement each : process.getProcessStatements().values()) {
-            each.cancel();
-        }
+        ProcessRegistry.getInstance().kill(processId);
+    }
+    
+    @Override
+    public void cleanProcess(final String instanceId, final String processId) {
     }
 }
diff --git 
a/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/persist/service/StandaloneProcessPersistServiceTest.java
 
b/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/persist/service/StandaloneProcessPersistServiceTest.java
index 45c6a36b44c..156636d56a2 100644
--- 
a/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/persist/service/StandaloneProcessPersistServiceTest.java
+++ 
b/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/persist/service/StandaloneProcessPersistServiceTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.mode.manager.standalone.persist.service;
 
-import org.apache.shardingsphere.infra.executor.sql.process.Process;
 import org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry;
 import org.apache.shardingsphere.test.mock.AutoMockExtension;
 import org.apache.shardingsphere.test.mock.StaticMockSettings;
@@ -25,10 +24,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Collections;
 
-import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -51,19 +47,7 @@ class StandaloneProcessPersistServiceTest {
     void assertKillProcess() throws SQLException {
         ProcessRegistry processRegistry = mock(ProcessRegistry.class);
         when(ProcessRegistry.getInstance()).thenReturn(processRegistry);
-        Process process = mock(Process.class);
-        Statement statement = mock(Statement.class);
-        
when(process.getProcessStatements()).thenReturn(Collections.singletonMap(1, 
statement));
-        when(processRegistry.get("foo_id")).thenReturn(process);
         processPersistService.killProcess("foo_id");
-        verify(statement).cancel();
-    }
-    
-    @Test
-    void assertKillProcessWithNotExistedProcessId() {
-        ProcessRegistry processRegistry = mock(ProcessRegistry.class);
-        when(ProcessRegistry.getInstance()).thenReturn(processRegistry);
-        when(processRegistry.get("foo_id")).thenReturn(null);
-        assertDoesNotThrow(() -> processPersistService.killProcess("foo_id"));
+        verify(ProcessRegistry.getInstance()).kill("foo_id");
     }
 }
diff --git 
a/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutor.java
 
b/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutor.java
index 57dd3db0bdb..3254b62ee3f 100644
--- 
a/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutor.java
+++ 
b/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutor.java
@@ -62,7 +62,7 @@ public final class ShowProcessListExecutor implements 
DatabaseAdminQueryExecutor
     
     private QueryResult getQueryResult() {
         Collection<Process> processes = 
ProxyContext.getInstance().getContextManager().getPersistServiceFacade().getProcessPersistService().getProcessList();
-        if (null == processes || processes.isEmpty()) {
+        if (processes.isEmpty()) {
             return new RawMemoryQueryResult(queryResultMetaData, 
Collections.emptyList());
         }
         List<MemoryQueryResultDataRow> rows = 
processes.stream().map(this::getMemoryQueryResultDataRow).collect(Collectors.toList());

Reply via email to