This is an automated email from the ASF dual-hosted git repository.

sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new c3701bd8e2d Refactor ShowProcessListResponseEvent (#25458)
c3701bd8e2d is described below

commit c3701bd8e2d89c22eff53a609e4e80624e509f26
Author: Liang Zhang <[email protected]>
AuthorDate: Fri May 5 10:29:59 2023 +0800

    Refactor ShowProcessListResponseEvent (#25458)
    
    * Refactor ComputeNodeStateChangedWatcher
    
    * Refactor ShowProcessListResponseEvent
    
    * Refactor YamlProcessListSwapper
    
    * Refactor ShowProcessListResponseEvent
    
    * Rename trigger zonde name
---
 ...42\200\231s_Show_processlist_&_Kill_Work.en.md" |  4 +--
 .../content/reference/management/_index.cn.md      |  2 +-
 .../content/reference/management/_index.en.md      |  2 +-
 .../infra/executor/sql/process/Process.java        |  2 ++
 .../yaml/swapper/YamlProcessListSwapper.java       |  2 +-
 .../process/yaml/swapper/YamlProcessSwapper.java   |  6 +++-
 .../yaml/swapper/YamlProcessListSwapperTest.java   | 37 ++++++++++++++++++++--
 .../yaml/swapper/YamlProcessSwapperTest.java       | 27 ++++++++++++++--
 .../metadata/persist/node/ComputeNode.java         |  4 +--
 .../metadata/persist/node/ComputeNodeTest.java     |  6 ++--
 .../event/ShowProcessListResponseEvent.java        |  3 +-
 .../subscriber/ClusterProcessSubscriber.java       | 17 +++++++---
 .../ProcessListChangedSubscriberTest.java          |  4 +--
 .../subscriber/StandaloneProcessSubscriber.java    | 11 ++-----
 .../admin/executor/ShowProcessListExecutor.java    | 33 ++++++++-----------
 .../executor/ShowProcessListExecutorTest.java      | 27 +++++++---------
 16 files changed, 119 insertions(+), 68 deletions(-)

diff --git 
"a/docs/blog/content/material/2022_09_22_How_does_ShardingSphere\342\200\231s_Show_processlist_&_Kill_Work.en.md"
 
"b/docs/blog/content/material/2022_09_22_How_does_ShardingSphere\342\200\231s_Show_processlist_&_Kill_Work.en.md"
index e58ddd8f2f1..b5f27d9cca7 100644
--- 
"a/docs/blog/content/material/2022_09_22_How_does_ShardingSphere\342\200\231s_Show_processlist_&_Kill_Work.en.md"
+++ 
"b/docs/blog/content/material/2022_09_22_How_does_ShardingSphere\342\200\231s_Show_processlist_&_Kill_Work.en.md"
@@ -243,7 +243,7 @@ It contains five steps and steps 2 & 3 are the focus.
 
 ### 2.2.1 Step 2: the cluster obtains the data implementation
 
-In this step, an empty string will be written to the node 
`/nodes/compute_nodes/process_trigger/<instanceId>:<processId>`, which will 
trigger ShardingSphere's monitoring logic.
+In this step, an empty string will be written to the node 
`/nodes/compute_nodes/show_process_list_trigger/<instanceId>:<processId>`, 
which will trigger ShardingSphere's monitoring logic.
 
 When ShardingSphere is started, the persistence layer will `watch` to monitor 
a series of path changes, such as the addition, deletion, and modification 
operations of the path `/nodes/compute_nodes`.
 
@@ -321,7 +321,7 @@ public final class ClusterContextManagerCoordinator {    
@Subscribe
 }
 ```
 
-`ClusterContextManagerCoordinator#triggerShowProcessList` will subscribe to 
`ShowProcessListTriggerEvent`, in which `process` data is processed by itself. 
`ShowProcessListManager.getInstance().getAllProcessContext()` retrieves the 
`process` that is currently running (here the data refers to the SQL 
information that ShardingSphere stores in the Map before each SQL execution, 
which is described at the beginning of the article) and transfers it to the 
persistence layer. If the `/nodes/compu [...]
+`ClusterContextManagerCoordinator#triggerShowProcessList` will subscribe to 
`ShowProcessListTriggerEvent`, in which `process` data is processed by itself. 
`ShowProcessListManager.getInstance().getAllProcessContext()` retrieves the 
`process` that is currently running (here the data refers to the SQL 
information that ShardingSphere stores in the Map before each SQL execution, 
which is described at the beginning of the article) and transfers it to the 
persistence layer. If the `/nodes/compu [...]
 
 When you delete the node, monitoring will also be triggered and 
`ShowProcessListUnitCompleteEvent` will be posted. This event will finally 
awake the pending lock.
 
diff --git a/docs/document/content/reference/management/_index.cn.md 
b/docs/document/content/reference/management/_index.cn.md
index bd626b99516..37d430aa309 100644
--- a/docs/document/content/reference/management/_index.cn.md
+++ b/docs/document/content/reference/management/_index.cn.md
@@ -47,7 +47,7 @@ namespace
    ├    ├     ├──worker_id
    ├    ├     ├     ├──UUID
    ├    ├     ├     ├──....
-   ├    ├     ├──process_trigger
+   ├    ├     ├──show_process_list_trigger
    ├    ├     ├     ├──process_id:UUID
    ├    ├     ├     ├──....
    ├    ├     ├──labels                      
diff --git a/docs/document/content/reference/management/_index.en.md 
b/docs/document/content/reference/management/_index.en.md
index ac6c452894d..a9c1e8ddb2b 100644
--- a/docs/document/content/reference/management/_index.en.md
+++ b/docs/document/content/reference/management/_index.en.md
@@ -47,7 +47,7 @@ namespace
    ├    ├     ├──worker_id
    ├    ├     ├     ├──UUID
    ├    ├     ├     ├──....
-   ├    ├     ├──process_trigger
+   ├    ├     ├──show_process_list_trigger
    ├    ├     ├     ├──process_id:UUID
    ├    ├     ├     ├──....
    ├    ├     ├──labels                      
diff --git 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java
 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java
index a7432c56815..50b34d8bf9d 100644
--- 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java
+++ 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.infra.executor.sql.process;
 
 import lombok.Getter;
+import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
 import 
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutionUnit;
@@ -32,6 +33,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 /**
  * Process.
  */
+@RequiredArgsConstructor
 @Getter
 public final class Process {
     
diff --git 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessListSwapper.java
 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessListSwapper.java
index a4c10da1b46..245afa9789b 100644
--- 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessListSwapper.java
+++ 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessListSwapper.java
@@ -40,6 +40,6 @@ public final class YamlProcessListSwapper implements 
YamlConfigurationSwapper<Ya
     
     @Override
     public Collection<Process> swapToObject(final YamlProcessList yamlConfig) {
-        throw new 
UnsupportedOperationException("YamlProcessListSwapper.swapToObject");
+        return 
yamlConfig.getProcesses().stream().map(yamlProcessSwapper::swapToObject).collect(Collectors.toList());
     }
 }
diff --git 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapper.java
 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapper.java
index f9800611dae..fd98124fd53 100644
--- 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapper.java
+++ 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapper.java
@@ -21,6 +21,9 @@ import 
org.apache.shardingsphere.infra.executor.sql.process.Process;
 import org.apache.shardingsphere.infra.executor.sql.process.yaml.YamlProcess;
 import 
org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
 
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicInteger;
+
 /**
  * YAML process swapper.
  */
@@ -43,6 +46,7 @@ public final class YamlProcessSwapper implements 
YamlConfigurationSwapper<YamlPr
     
     @Override
     public Process swapToObject(final YamlProcess yamlConfig) {
-        throw new 
UnsupportedOperationException("YamlProcessSwapper.swapToObject");
+        return new Process(yamlConfig.getId(), yamlConfig.getStartMillis(), 
yamlConfig.getSql(), yamlConfig.getDatabaseName(), yamlConfig.getUsername(), 
yamlConfig.getHostname(),
+                yamlConfig.getTotalUnitCount(), Collections.emptyList(), new 
AtomicInteger(yamlConfig.getCompletedUnitCount()), yamlConfig.isIdle());
     }
 }
diff --git 
a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessListSwapperTest.java
 
b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessListSwapperTest.java
index e510cbfdd81..c1ab8e8ed06 100644
--- 
a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessListSwapperTest.java
+++ 
b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessListSwapperTest.java
@@ -26,6 +26,7 @@ import 
org.apache.shardingsphere.infra.executor.sql.process.yaml.YamlProcessList
 import org.apache.shardingsphere.infra.metadata.user.Grantee;
 import org.junit.jupiter.api.Test;
 
+import java.util.Collection;
 import java.util.Collections;
 
 import static org.hamcrest.CoreMatchers.is;
@@ -33,7 +34,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 class YamlProcessListSwapperTest {
     
@@ -47,7 +48,7 @@ class YamlProcessListSwapperTest {
         assertYamlProcessContext(actual.getProcesses().iterator().next());
     }
     
-    private static void assertYamlProcessContext(final YamlProcess actual) {
+    private void assertYamlProcessContext(final YamlProcess actual) {
         assertNotNull(actual.getId());
         assertThat(actual.getStartMillis(), 
lessThanOrEqualTo(System.currentTimeMillis()));
         assertThat(actual.getSql(), is("SELECT 1"));
@@ -61,6 +62,36 @@ class YamlProcessListSwapperTest {
     
     @Test
     void assertSwapToObject() {
-        assertThrows(UnsupportedOperationException.class, () -> new 
YamlProcessListSwapper().swapToObject(new YamlProcessList()));
+        YamlProcessList yamlProcessList = new YamlProcessList();
+        
yamlProcessList.setProcesses(Collections.singleton(createYamlProcess()));
+        Collection<Process> actual = new 
YamlProcessListSwapper().swapToObject(yamlProcessList);
+        assertThat(actual.size(), is(1));
+        assertProcess(actual.iterator().next());
+    }
+    
+    private YamlProcess createYamlProcess() {
+        YamlProcess result = new YamlProcess();
+        result.setId("foo_id");
+        result.setStartMillis(1000L);
+        result.setSql("SELECT 1");
+        result.setDatabaseName("foo_db");
+        result.setUsername("root");
+        result.setHostname("localhost");
+        result.setTotalUnitCount(10);
+        result.setCompletedUnitCount(5);
+        result.setIdle(true);
+        return result;
+    }
+    
+    private void assertProcess(final Process actual) {
+        assertThat(actual.getId(), is("foo_id"));
+        assertThat(actual.getStartMillis(), is(1000L));
+        assertThat(actual.getSql(), is("SELECT 1"));
+        assertThat(actual.getDatabaseName(), is("foo_db"));
+        assertThat(actual.getUsername(), is("root"));
+        assertThat(actual.getHostname(), is("localhost"));
+        assertThat(actual.getTotalUnitCount(), is(10));
+        assertThat(actual.getCompletedUnitCount(), is(5));
+        assertTrue(actual.isIdle());
     }
 }
diff --git 
a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapperTest.java
 
b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapperTest.java
index 871ede91e3e..6d6fe1361e6 100644
--- 
a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapperTest.java
+++ 
b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapperTest.java
@@ -32,7 +32,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 class YamlProcessSwapperTest {
     
@@ -55,6 +55,29 @@ class YamlProcessSwapperTest {
     
     @Test
     void assertSwapToObject() {
-        assertThrows(UnsupportedOperationException.class, () -> new 
YamlProcessSwapper().swapToObject(new YamlProcess()));
+        Process actual = new 
YamlProcessSwapper().swapToObject(createYamlProcess());
+        assertThat(actual.getId(), is("foo_id"));
+        assertThat(actual.getStartMillis(), is(1000L));
+        assertThat(actual.getSql(), is("SELECT 1"));
+        assertThat(actual.getDatabaseName(), is("foo_db"));
+        assertThat(actual.getUsername(), is("root"));
+        assertThat(actual.getHostname(), is("localhost"));
+        assertThat(actual.getTotalUnitCount(), is(10));
+        assertThat(actual.getCompletedUnitCount(), is(5));
+        assertTrue(actual.isIdle());
+    }
+    
+    private YamlProcess createYamlProcess() {
+        YamlProcess result = new YamlProcess();
+        result.setId("foo_id");
+        result.setStartMillis(1000L);
+        result.setSql("SELECT 1");
+        result.setDatabaseName("foo_db");
+        result.setUsername("root");
+        result.setHostname("localhost");
+        result.setTotalUnitCount(10);
+        result.setCompletedUnitCount(5);
+        result.setIdle(true);
+        return result;
     }
 }
diff --git 
a/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/node/ComputeNode.java
 
b/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/node/ComputeNode.java
index c21c08cc872..5749223a6ed 100644
--- 
a/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/node/ComputeNode.java
+++ 
b/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/node/ComputeNode.java
@@ -38,9 +38,9 @@ public final class ComputeNode {
     
     private static final String LABELS_NODE = "labels";
     
-    private static final String SHOW_PROCESS_LIST_TRIGGER = "process_trigger";
+    private static final String SHOW_PROCESS_LIST_TRIGGER = 
"show_process_list_trigger";
     
-    private static final String KILL_PROCESS_TRIGGER = "process_kill";
+    private static final String KILL_PROCESS_TRIGGER = "kill_process_trigger";
     
     private static final String STATUS_NODE = "status";
     
diff --git 
a/kernel/metadata/core/src/test/java/org/apache/shardingsphere/metadata/persist/node/ComputeNodeTest.java
 
b/kernel/metadata/core/src/test/java/org/apache/shardingsphere/metadata/persist/node/ComputeNodeTest.java
index 42b282e2580..f0338c4b8f2 100644
--- 
a/kernel/metadata/core/src/test/java/org/apache/shardingsphere/metadata/persist/node/ComputeNodeTest.java
+++ 
b/kernel/metadata/core/src/test/java/org/apache/shardingsphere/metadata/persist/node/ComputeNodeTest.java
@@ -39,15 +39,15 @@ class ComputeNodeTest {
     
     @Test
     void assertGetProcessTriggerNodePatch() {
-        assertThat(ComputeNode.getShowProcessListTriggerNodePath(), 
is("/nodes/compute_nodes/process_trigger"));
+        assertThat(ComputeNode.getShowProcessListTriggerNodePath(), 
is("/nodes/compute_nodes/show_process_list_trigger"));
     }
     
     @Test
     void assertGetProcessTriggerInstanceIdNodePath() {
         
assertThat(ComputeNode.getProcessTriggerInstanceNodePath("foo_instance", 
"foo_process_id"),
-                
is("/nodes/compute_nodes/process_trigger/foo_instance:foo_process_id"));
+                
is("/nodes/compute_nodes/show_process_list_trigger/foo_instance:foo_process_id"));
         
assertThat(ComputeNode.getProcessTriggerInstanceNodePath("foo_instance", 
"foo_process_id"),
-                
is("/nodes/compute_nodes/process_trigger/foo_instance:foo_process_id"));
+                
is("/nodes/compute_nodes/show_process_list_trigger/foo_instance:foo_process_id"));
     }
     
     @Test
diff --git 
a/mode/core/src/main/java/org/apache/shardingsphere/mode/process/event/ShowProcessListResponseEvent.java
 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/process/event/ShowProcessListResponseEvent.java
index f3b44baf636..4b83a572516 100644
--- 
a/mode/core/src/main/java/org/apache/shardingsphere/mode/process/event/ShowProcessListResponseEvent.java
+++ 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/process/event/ShowProcessListResponseEvent.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.mode.process.event;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.executor.sql.process.Process;
 
 import java.util.Collection;
 
@@ -29,5 +30,5 @@ import java.util.Collection;
 @Getter
 public final class ShowProcessListResponseEvent {
     
-    private final Collection<String> processes;
+    private final Collection<Process> processes;
 }
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessSubscriber.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessSubscriber.java
index 8a354e05f51..ae546387a8b 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessSubscriber.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessSubscriber.java
@@ -19,14 +19,17 @@ package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.proc
 
 import com.google.common.eventbus.Subscribe;
 import 
org.apache.shardingsphere.infra.executor.sql.process.lock.ProcessOperationLockRegistry;
+import 
org.apache.shardingsphere.infra.executor.sql.process.yaml.YamlProcessList;
+import 
org.apache.shardingsphere.infra.executor.sql.process.yaml.swapper.YamlProcessListSwapper;
 import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
 import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
 import org.apache.shardingsphere.metadata.persist.node.ProcessNode;
+import org.apache.shardingsphere.mode.process.ProcessSubscriber;
 import org.apache.shardingsphere.mode.process.event.KillProcessRequestEvent;
 import 
org.apache.shardingsphere.mode.process.event.ShowProcessListRequestEvent;
 import 
org.apache.shardingsphere.mode.process.event.ShowProcessListResponseEvent;
-import org.apache.shardingsphere.mode.process.ProcessSubscriber;
 import org.apache.shardingsphere.mode.spi.PersistRepository;
 
 import java.util.Collection;
@@ -45,9 +48,12 @@ public final class ClusterProcessSubscriber implements 
ProcessSubscriber {
     
     private final EventBusContext eventBusContext;
     
+    private final YamlProcessListSwapper swapper;
+    
     public ClusterProcessSubscriber(final PersistRepository repository, final 
EventBusContext eventBusContext) {
         this.repository = repository;
         this.eventBusContext = eventBusContext;
+        swapper = new YamlProcessListSwapper();
         eventBusContext.register(this);
     }
     
@@ -70,9 +76,12 @@ public final class ClusterProcessSubscriber implements 
ProcessSubscriber {
     }
     
     private void postShowProcessListData(final String taskId) {
-        Collection<String> yamlProcessListContexts = 
repository.getChildrenKeys(ProcessNode.getProcessIdPath(taskId)).stream()
-                .map(each -> 
repository.getDirectly(ProcessNode.getProcessListInstancePath(taskId, 
each))).collect(Collectors.toList());
-        eventBusContext.post(new 
ShowProcessListResponseEvent(yamlProcessListContexts));
+        YamlProcessList yamlProcessList = new YamlProcessList();
+        for (String each : 
repository.getChildrenKeys(ProcessNode.getProcessIdPath(taskId)).stream()
+                .map(each -> 
repository.getDirectly(ProcessNode.getProcessListInstancePath(taskId, 
each))).collect(Collectors.toList())) {
+            yamlProcessList.getProcesses().addAll(YamlEngine.unmarshal(each, 
YamlProcessList.class).getProcesses());
+        }
+        eventBusContext.post(new 
ShowProcessListResponseEvent(swapper.swapToObject(yamlProcessList)));
     }
     
     private Collection<String> getShowProcessListTriggerPaths(final String 
taskId) {
diff --git 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
index 31011afc267..801a050d120 100644
--- 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
+++ 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
@@ -115,7 +115,7 @@ class ProcessListChangedSubscriberTest {
         ClusterPersistRepository repository = registryCenter.getRepository();
         verify(repository).persist("/execution_nodes/foo_id/" + instanceId,
                 "processes:" + System.lineSeparator() + "- completedUnitCount: 
0\n  id: foo_id\n  idle: false\n  startMillis: 0\n  totalUnitCount: 0" + 
System.lineSeparator());
-        verify(repository).delete("/nodes/compute_nodes/process_trigger/" + 
instanceId + ":foo_id");
+        
verify(repository).delete("/nodes/compute_nodes/show_process_list_trigger/" + 
instanceId + ":foo_id");
     }
     
     @Test
@@ -141,7 +141,7 @@ class ProcessListChangedSubscriberTest {
         String processId = "foo_id";
         subscriber.killLocalProcess(new KillLocalProcessEvent(instanceId, 
processId));
         ClusterPersistRepository repository = registryCenter.getRepository();
-        verify(repository).delete("/nodes/compute_nodes/process_kill/" + 
instanceId + ":foo_id");
+        verify(repository).delete("/nodes/compute_nodes/kill_process_trigger/" 
+ instanceId + ":foo_id");
     }
     
     @Test
diff --git 
a/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneProcessSubscriber.java
 
b/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneProcessSubscriber.java
index 47e71a3daea..2063cfb6bea 100644
--- 
a/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneProcessSubscriber.java
+++ 
b/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneProcessSubscriber.java
@@ -20,18 +20,14 @@ package 
org.apache.shardingsphere.mode.manager.standalone.subscriber;
 import com.google.common.eventbus.Subscribe;
 import org.apache.shardingsphere.infra.executor.sql.process.Process;
 import org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry;
-import 
org.apache.shardingsphere.infra.executor.sql.process.yaml.YamlProcessList;
-import 
org.apache.shardingsphere.infra.executor.sql.process.yaml.swapper.YamlProcessListSwapper;
 import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import org.apache.shardingsphere.mode.process.ProcessSubscriber;
 import org.apache.shardingsphere.mode.process.event.KillProcessRequestEvent;
 import 
org.apache.shardingsphere.mode.process.event.ShowProcessListRequestEvent;
 import 
org.apache.shardingsphere.mode.process.event.ShowProcessListResponseEvent;
-import org.apache.shardingsphere.mode.process.ProcessSubscriber;
 
 import java.sql.SQLException;
 import java.sql.Statement;
-import java.util.Collections;
 
 /**
  * Standalone process subscriber.
@@ -41,8 +37,6 @@ public final class StandaloneProcessSubscriber implements 
ProcessSubscriber {
     
     private final EventBusContext eventBusContext;
     
-    private final YamlProcessListSwapper swapper = new 
YamlProcessListSwapper();
-    
     public StandaloneProcessSubscriber(final EventBusContext eventBusContext) {
         this.eventBusContext = eventBusContext;
         eventBusContext.register(this);
@@ -51,8 +45,7 @@ public final class StandaloneProcessSubscriber implements 
ProcessSubscriber {
     @Override
     @Subscribe
     public void postShowProcessListData(final ShowProcessListRequestEvent 
event) {
-        YamlProcessList yamlProcessList = 
swapper.swapToYamlConfiguration(ProcessRegistry.getInstance().listAll());
-        eventBusContext.post(new 
ShowProcessListResponseEvent(Collections.singleton(YamlEngine.marshal(yamlProcessList))));
+        eventBusContext.post(new 
ShowProcessListResponseEvent(ProcessRegistry.getInstance().listAll()));
     }
     
     @Override
diff --git 
a/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutor.java
 
b/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutor.java
index 84c955bce7a..f8e2d3e88c5 100644
--- 
a/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutor.java
+++ 
b/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutor.java
@@ -25,11 +25,9 @@ import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.ra
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.raw.metadata.RawQueryResultMetaData;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.raw.type.RawMemoryQueryResult;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.type.memory.row.MemoryQueryResultDataRow;
-import 
org.apache.shardingsphere.infra.executor.sql.process.yaml.YamlProcessList;
-import org.apache.shardingsphere.infra.executor.sql.process.yaml.YamlProcess;
+import org.apache.shardingsphere.infra.executor.sql.process.Process;
 import org.apache.shardingsphere.infra.merge.result.MergedResult;
 import 
org.apache.shardingsphere.infra.merge.result.impl.transparent.TransparentMergedResult;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import 
org.apache.shardingsphere.mode.process.event.ShowProcessListRequestEvent;
 import 
org.apache.shardingsphere.mode.process.event.ShowProcessListResponseEvent;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
@@ -40,7 +38,6 @@ import java.sql.Types;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
@@ -51,7 +48,7 @@ import java.util.stream.Collectors;
 @SuppressWarnings("UnstableApiUsage")
 public final class ShowProcessListExecutor implements 
DatabaseAdminQueryExecutor {
     
-    private Collection<String> processes;
+    private Collection<Process> processes;
     
     @Getter
     private QueryResultMetaData queryResultMetaData;
@@ -84,30 +81,26 @@ public final class ShowProcessListExecutor implements 
DatabaseAdminQueryExecutor
         if (null == processes || processes.isEmpty()) {
             return new RawMemoryQueryResult(queryResultMetaData, 
Collections.emptyList());
         }
-        Collection<YamlProcess> processes = new LinkedList<>();
-        for (String each : this.processes) {
-            processes.addAll(YamlEngine.unmarshal(each, 
YamlProcessList.class).getProcesses());
-        }
         List<MemoryQueryResultDataRow> rows = 
processes.stream().map(ShowProcessListExecutor::getMemoryQueryResultDataRow).collect(Collectors.toList());
         return new RawMemoryQueryResult(queryResultMetaData, rows);
     }
     
-    private static MemoryQueryResultDataRow getMemoryQueryResultDataRow(final 
YamlProcess yamlProcess) {
+    private static MemoryQueryResultDataRow getMemoryQueryResultDataRow(final 
Process process) {
         List<Object> rowValues = new ArrayList<>(8);
-        rowValues.add(yamlProcess.getId());
-        rowValues.add(yamlProcess.getUsername());
-        rowValues.add(yamlProcess.getHostname());
-        rowValues.add(yamlProcess.getDatabaseName());
-        rowValues.add(yamlProcess.isIdle() ? "Sleep" : "Execute");
-        
rowValues.add(TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - 
yamlProcess.getStartMillis()));
+        rowValues.add(process.getId());
+        rowValues.add(process.getUsername());
+        rowValues.add(process.getHostname());
+        rowValues.add(process.getDatabaseName());
+        rowValues.add(process.isIdle() ? "Sleep" : "Execute");
+        
rowValues.add(TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - 
process.getStartMillis()));
         String sql = null;
-        if (yamlProcess.isIdle()) {
+        if (process.isIdle()) {
             rowValues.add("");
         } else {
-            int processDoneCount = yamlProcess.getCompletedUnitCount();
+            int processDoneCount = process.getCompletedUnitCount();
             String statePrefix = "Executing ";
-            rowValues.add(statePrefix + processDoneCount + "/" + 
yamlProcess.getTotalUnitCount());
-            sql = yamlProcess.getSql();
+            rowValues.add(statePrefix + processDoneCount + "/" + 
process.getTotalUnitCount());
+            sql = process.getSql();
         }
         if (null != sql && sql.length() > 100) {
             sql = sql.substring(0, 100);
diff --git 
a/proxy/backend/type/mysql/src/test/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutorTest.java
 
b/proxy/backend/type/mysql/src/test/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutorTest.java
index 420f528a734..729bab7c86e 100644
--- 
a/proxy/backend/type/mysql/src/test/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutorTest.java
+++ 
b/proxy/backend/type/mysql/src/test/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutorTest.java
@@ -19,6 +19,7 @@ package 
org.apache.shardingsphere.proxy.backend.mysql.handler.admin.executor;
 
 import io.netty.util.DefaultAttributeMap;
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import org.apache.shardingsphere.infra.executor.sql.process.Process;
 import org.apache.shardingsphere.infra.merge.result.MergedResult;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
@@ -32,6 +33,7 @@ import org.mockito.internal.configuration.plugins.Plugins;
 
 import java.sql.SQLException;
 import java.util.Collections;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -48,31 +50,24 @@ class ShowProcessListExecutorTest {
         ContextManager contextManager = mock(ContextManager.class, 
RETURNS_DEEP_STUBS);
         
when(ProxyContext.getInstance().getContextManager()).thenReturn(contextManager);
         ShowProcessListExecutor showProcessListExecutor = new 
ShowProcessListExecutor();
-        setupBatchProcessContexts(showProcessListExecutor);
+        setupProcesses(showProcessListExecutor);
         showProcessListExecutor.execute(new 
ConnectionSession(mock(MySQLDatabaseType.class), TransactionType.LOCAL, new 
DefaultAttributeMap()));
         
assertThat(showProcessListExecutor.getQueryResultMetaData().getColumnCount(), 
is(8));
         MergedResult mergedResult = showProcessListExecutor.getMergedResult();
         while (mergedResult.next()) {
             assertThat(mergedResult.getValue(1, String.class), 
is("f6c2336a-63ba-41bf-941e-2e3504eb2c80"));
-            assertThat(mergedResult.getValue(2, String.class), is("sharding"));
+            assertThat(mergedResult.getValue(2, String.class), is("root"));
             assertThat(mergedResult.getValue(3, String.class), 
is("127.0.0.1"));
-            assertThat(mergedResult.getValue(4, String.class), 
is("sharding_db"));
+            assertThat(mergedResult.getValue(4, String.class), is("foo_db"));
             assertThat(mergedResult.getValue(7, String.class), is("Executing 
1/2"));
-            assertThat(mergedResult.getValue(8, String.class), is("alter table 
t_order add column a varchar(64) after order_id"));
+            assertThat(mergedResult.getValue(8, String.class), is("ALTER TABLE 
t_order ADD COLUMN a varchar(64) AFTER order_id"));
         }
     }
     
-    private void setupBatchProcessContexts(final ShowProcessListExecutor 
showProcessListExecutor) throws ReflectiveOperationException {
-        String executionNodeValue = "processes:\n"
-                + "- id: f6c2336a-63ba-41bf-941e-2e3504eb2c80\n"
-                + "  sql: alter table t_order add column a varchar(64) after 
order_id\n"
-                + "  startMillis: 1617939785160\n"
-                + "  databaseName: sharding_db\n"
-                + "  username: sharding\n"
-                + "  hostname: 127.0.0.1\n"
-                + "  totalUnitCount: 2\n"
-                + "  completedUnitCount: 1\n"
-                + "  idle: false\n";
-        
Plugins.getMemberAccessor().set(showProcessListExecutor.getClass().getDeclaredField("processes"),
 showProcessListExecutor, Collections.singleton(executionNodeValue));
+    private void setupProcesses(final ShowProcessListExecutor 
showProcessListExecutor) throws ReflectiveOperationException {
+        Process process = new Process("f6c2336a-63ba-41bf-941e-2e3504eb2c80", 
1617939785160L,
+                "ALTER TABLE t_order ADD COLUMN a varchar(64) AFTER order_id", 
"foo_db", "root", "127.0.0.1", 2, Collections.emptyList(), new 
AtomicInteger(1), false);
+        Plugins.getMemberAccessor().set(
+                
showProcessListExecutor.getClass().getDeclaredField("processes"), 
showProcessListExecutor, Collections.singleton(process));
     }
 }


Reply via email to