This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new c3701bd8e2d Refactor ShowProcessListResponseEvent (#25458)
c3701bd8e2d is described below
commit c3701bd8e2d89c22eff53a609e4e80624e509f26
Author: Liang Zhang <[email protected]>
AuthorDate: Fri May 5 10:29:59 2023 +0800
Refactor ShowProcessListResponseEvent (#25458)
* Refactor ComputeNodeStateChangedWatcher
* Refactor ShowProcessListResponseEvent
* Refactor YamlProcessListSwapper
* Refactor ShowProcessListResponseEvent
* Rename trigger zonde name
---
...42\200\231s_Show_processlist_&_Kill_Work.en.md" | 4 +--
.../content/reference/management/_index.cn.md | 2 +-
.../content/reference/management/_index.en.md | 2 +-
.../infra/executor/sql/process/Process.java | 2 ++
.../yaml/swapper/YamlProcessListSwapper.java | 2 +-
.../process/yaml/swapper/YamlProcessSwapper.java | 6 +++-
.../yaml/swapper/YamlProcessListSwapperTest.java | 37 ++++++++++++++++++++--
.../yaml/swapper/YamlProcessSwapperTest.java | 27 ++++++++++++++--
.../metadata/persist/node/ComputeNode.java | 4 +--
.../metadata/persist/node/ComputeNodeTest.java | 6 ++--
.../event/ShowProcessListResponseEvent.java | 3 +-
.../subscriber/ClusterProcessSubscriber.java | 17 +++++++---
.../ProcessListChangedSubscriberTest.java | 4 +--
.../subscriber/StandaloneProcessSubscriber.java | 11 ++-----
.../admin/executor/ShowProcessListExecutor.java | 33 ++++++++-----------
.../executor/ShowProcessListExecutorTest.java | 27 +++++++---------
16 files changed, 119 insertions(+), 68 deletions(-)
diff --git
"a/docs/blog/content/material/2022_09_22_How_does_ShardingSphere\342\200\231s_Show_processlist_&_Kill_Work.en.md"
"b/docs/blog/content/material/2022_09_22_How_does_ShardingSphere\342\200\231s_Show_processlist_&_Kill_Work.en.md"
index e58ddd8f2f1..b5f27d9cca7 100644
---
"a/docs/blog/content/material/2022_09_22_How_does_ShardingSphere\342\200\231s_Show_processlist_&_Kill_Work.en.md"
+++
"b/docs/blog/content/material/2022_09_22_How_does_ShardingSphere\342\200\231s_Show_processlist_&_Kill_Work.en.md"
@@ -243,7 +243,7 @@ It contains five steps and steps 2 & 3 are the focus.
### 2.2.1 Step 2: the cluster obtains the data implementation
-In this step, an empty string will be written to the node
`/nodes/compute_nodes/process_trigger/<instanceId>:<processId>`, which will
trigger ShardingSphere's monitoring logic.
+In this step, an empty string will be written to the node
`/nodes/compute_nodes/show_process_list_trigger/<instanceId>:<processId>`,
which will trigger ShardingSphere's monitoring logic.
When ShardingSphere is started, the persistence layer will `watch` to monitor
a series of path changes, such as the addition, deletion, and modification
operations of the path `/nodes/compute_nodes`.
@@ -321,7 +321,7 @@ public final class ClusterContextManagerCoordinator {
@Subscribe
}
```
-`ClusterContextManagerCoordinator#triggerShowProcessList` will subscribe to
`ShowProcessListTriggerEvent`, in which `process` data is processed by itself.
`ShowProcessListManager.getInstance().getAllProcessContext()` retrieves the
`process` that is currently running (here the data refers to the SQL
information that ShardingSphere stores in the Map before each SQL execution,
which is described at the beginning of the article) and transfers it to the
persistence layer. If the `/nodes/compu [...]
+`ClusterContextManagerCoordinator#triggerShowProcessList` will subscribe to
`ShowProcessListTriggerEvent`, in which `process` data is processed by itself.
`ShowProcessListManager.getInstance().getAllProcessContext()` retrieves the
`process` that is currently running (here the data refers to the SQL
information that ShardingSphere stores in the Map before each SQL execution,
which is described at the beginning of the article) and transfers it to the
persistence layer. If the `/nodes/compu [...]
When you delete the node, monitoring will also be triggered and
`ShowProcessListUnitCompleteEvent` will be posted. This event will finally
awake the pending lock.
diff --git a/docs/document/content/reference/management/_index.cn.md
b/docs/document/content/reference/management/_index.cn.md
index bd626b99516..37d430aa309 100644
--- a/docs/document/content/reference/management/_index.cn.md
+++ b/docs/document/content/reference/management/_index.cn.md
@@ -47,7 +47,7 @@ namespace
├ ├ ├──worker_id
├ ├ ├ ├──UUID
├ ├ ├ ├──....
- ├ ├ ├──process_trigger
+ ├ ├ ├──show_process_list_trigger
├ ├ ├ ├──process_id:UUID
├ ├ ├ ├──....
├ ├ ├──labels
diff --git a/docs/document/content/reference/management/_index.en.md
b/docs/document/content/reference/management/_index.en.md
index ac6c452894d..a9c1e8ddb2b 100644
--- a/docs/document/content/reference/management/_index.en.md
+++ b/docs/document/content/reference/management/_index.en.md
@@ -47,7 +47,7 @@ namespace
├ ├ ├──worker_id
├ ├ ├ ├──UUID
├ ├ ├ ├──....
- ├ ├ ├──process_trigger
+ ├ ├ ├──show_process_list_trigger
├ ├ ├ ├──process_id:UUID
├ ├ ├ ├──....
├ ├ ├──labels
diff --git
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java
index a7432c56815..50b34d8bf9d 100644
---
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java
+++
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.infra.executor.sql.process;
import lombok.Getter;
+import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutionUnit;
@@ -32,6 +33,7 @@ import java.util.concurrent.atomic.AtomicInteger;
/**
* Process.
*/
+@RequiredArgsConstructor
@Getter
public final class Process {
diff --git
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessListSwapper.java
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessListSwapper.java
index a4c10da1b46..245afa9789b 100644
---
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessListSwapper.java
+++
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessListSwapper.java
@@ -40,6 +40,6 @@ public final class YamlProcessListSwapper implements
YamlConfigurationSwapper<Ya
@Override
public Collection<Process> swapToObject(final YamlProcessList yamlConfig) {
- throw new
UnsupportedOperationException("YamlProcessListSwapper.swapToObject");
+ return
yamlConfig.getProcesses().stream().map(yamlProcessSwapper::swapToObject).collect(Collectors.toList());
}
}
diff --git
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapper.java
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapper.java
index f9800611dae..fd98124fd53 100644
---
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapper.java
+++
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapper.java
@@ -21,6 +21,9 @@ import
org.apache.shardingsphere.infra.executor.sql.process.Process;
import org.apache.shardingsphere.infra.executor.sql.process.yaml.YamlProcess;
import
org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicInteger;
+
/**
* YAML process swapper.
*/
@@ -43,6 +46,7 @@ public final class YamlProcessSwapper implements
YamlConfigurationSwapper<YamlPr
@Override
public Process swapToObject(final YamlProcess yamlConfig) {
- throw new
UnsupportedOperationException("YamlProcessSwapper.swapToObject");
+ return new Process(yamlConfig.getId(), yamlConfig.getStartMillis(),
yamlConfig.getSql(), yamlConfig.getDatabaseName(), yamlConfig.getUsername(),
yamlConfig.getHostname(),
+ yamlConfig.getTotalUnitCount(), Collections.emptyList(), new
AtomicInteger(yamlConfig.getCompletedUnitCount()), yamlConfig.isIdle());
}
}
diff --git
a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessListSwapperTest.java
b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessListSwapperTest.java
index e510cbfdd81..c1ab8e8ed06 100644
---
a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessListSwapperTest.java
+++
b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessListSwapperTest.java
@@ -26,6 +26,7 @@ import
org.apache.shardingsphere.infra.executor.sql.process.yaml.YamlProcessList
import org.apache.shardingsphere.infra.metadata.user.Grantee;
import org.junit.jupiter.api.Test;
+import java.util.Collection;
import java.util.Collections;
import static org.hamcrest.CoreMatchers.is;
@@ -33,7 +34,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
class YamlProcessListSwapperTest {
@@ -47,7 +48,7 @@ class YamlProcessListSwapperTest {
assertYamlProcessContext(actual.getProcesses().iterator().next());
}
- private static void assertYamlProcessContext(final YamlProcess actual) {
+ private void assertYamlProcessContext(final YamlProcess actual) {
assertNotNull(actual.getId());
assertThat(actual.getStartMillis(),
lessThanOrEqualTo(System.currentTimeMillis()));
assertThat(actual.getSql(), is("SELECT 1"));
@@ -61,6 +62,36 @@ class YamlProcessListSwapperTest {
@Test
void assertSwapToObject() {
- assertThrows(UnsupportedOperationException.class, () -> new
YamlProcessListSwapper().swapToObject(new YamlProcessList()));
+ YamlProcessList yamlProcessList = new YamlProcessList();
+
yamlProcessList.setProcesses(Collections.singleton(createYamlProcess()));
+ Collection<Process> actual = new
YamlProcessListSwapper().swapToObject(yamlProcessList);
+ assertThat(actual.size(), is(1));
+ assertProcess(actual.iterator().next());
+ }
+
+ private YamlProcess createYamlProcess() {
+ YamlProcess result = new YamlProcess();
+ result.setId("foo_id");
+ result.setStartMillis(1000L);
+ result.setSql("SELECT 1");
+ result.setDatabaseName("foo_db");
+ result.setUsername("root");
+ result.setHostname("localhost");
+ result.setTotalUnitCount(10);
+ result.setCompletedUnitCount(5);
+ result.setIdle(true);
+ return result;
+ }
+
+ private void assertProcess(final Process actual) {
+ assertThat(actual.getId(), is("foo_id"));
+ assertThat(actual.getStartMillis(), is(1000L));
+ assertThat(actual.getSql(), is("SELECT 1"));
+ assertThat(actual.getDatabaseName(), is("foo_db"));
+ assertThat(actual.getUsername(), is("root"));
+ assertThat(actual.getHostname(), is("localhost"));
+ assertThat(actual.getTotalUnitCount(), is(10));
+ assertThat(actual.getCompletedUnitCount(), is(5));
+ assertTrue(actual.isIdle());
}
}
diff --git
a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapperTest.java
b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapperTest.java
index 871ede91e3e..6d6fe1361e6 100644
---
a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapperTest.java
+++
b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapperTest.java
@@ -32,7 +32,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
class YamlProcessSwapperTest {
@@ -55,6 +55,29 @@ class YamlProcessSwapperTest {
@Test
void assertSwapToObject() {
- assertThrows(UnsupportedOperationException.class, () -> new
YamlProcessSwapper().swapToObject(new YamlProcess()));
+ Process actual = new
YamlProcessSwapper().swapToObject(createYamlProcess());
+ assertThat(actual.getId(), is("foo_id"));
+ assertThat(actual.getStartMillis(), is(1000L));
+ assertThat(actual.getSql(), is("SELECT 1"));
+ assertThat(actual.getDatabaseName(), is("foo_db"));
+ assertThat(actual.getUsername(), is("root"));
+ assertThat(actual.getHostname(), is("localhost"));
+ assertThat(actual.getTotalUnitCount(), is(10));
+ assertThat(actual.getCompletedUnitCount(), is(5));
+ assertTrue(actual.isIdle());
+ }
+
+ private YamlProcess createYamlProcess() {
+ YamlProcess result = new YamlProcess();
+ result.setId("foo_id");
+ result.setStartMillis(1000L);
+ result.setSql("SELECT 1");
+ result.setDatabaseName("foo_db");
+ result.setUsername("root");
+ result.setHostname("localhost");
+ result.setTotalUnitCount(10);
+ result.setCompletedUnitCount(5);
+ result.setIdle(true);
+ return result;
}
}
diff --git
a/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/node/ComputeNode.java
b/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/node/ComputeNode.java
index c21c08cc872..5749223a6ed 100644
---
a/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/node/ComputeNode.java
+++
b/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/node/ComputeNode.java
@@ -38,9 +38,9 @@ public final class ComputeNode {
private static final String LABELS_NODE = "labels";
- private static final String SHOW_PROCESS_LIST_TRIGGER = "process_trigger";
+ private static final String SHOW_PROCESS_LIST_TRIGGER =
"show_process_list_trigger";
- private static final String KILL_PROCESS_TRIGGER = "process_kill";
+ private static final String KILL_PROCESS_TRIGGER = "kill_process_trigger";
private static final String STATUS_NODE = "status";
diff --git
a/kernel/metadata/core/src/test/java/org/apache/shardingsphere/metadata/persist/node/ComputeNodeTest.java
b/kernel/metadata/core/src/test/java/org/apache/shardingsphere/metadata/persist/node/ComputeNodeTest.java
index 42b282e2580..f0338c4b8f2 100644
---
a/kernel/metadata/core/src/test/java/org/apache/shardingsphere/metadata/persist/node/ComputeNodeTest.java
+++
b/kernel/metadata/core/src/test/java/org/apache/shardingsphere/metadata/persist/node/ComputeNodeTest.java
@@ -39,15 +39,15 @@ class ComputeNodeTest {
@Test
void assertGetProcessTriggerNodePatch() {
- assertThat(ComputeNode.getShowProcessListTriggerNodePath(),
is("/nodes/compute_nodes/process_trigger"));
+ assertThat(ComputeNode.getShowProcessListTriggerNodePath(),
is("/nodes/compute_nodes/show_process_list_trigger"));
}
@Test
void assertGetProcessTriggerInstanceIdNodePath() {
assertThat(ComputeNode.getProcessTriggerInstanceNodePath("foo_instance",
"foo_process_id"),
-
is("/nodes/compute_nodes/process_trigger/foo_instance:foo_process_id"));
+
is("/nodes/compute_nodes/show_process_list_trigger/foo_instance:foo_process_id"));
assertThat(ComputeNode.getProcessTriggerInstanceNodePath("foo_instance",
"foo_process_id"),
-
is("/nodes/compute_nodes/process_trigger/foo_instance:foo_process_id"));
+
is("/nodes/compute_nodes/show_process_list_trigger/foo_instance:foo_process_id"));
}
@Test
diff --git
a/mode/core/src/main/java/org/apache/shardingsphere/mode/process/event/ShowProcessListResponseEvent.java
b/mode/core/src/main/java/org/apache/shardingsphere/mode/process/event/ShowProcessListResponseEvent.java
index f3b44baf636..4b83a572516 100644
---
a/mode/core/src/main/java/org/apache/shardingsphere/mode/process/event/ShowProcessListResponseEvent.java
+++
b/mode/core/src/main/java/org/apache/shardingsphere/mode/process/event/ShowProcessListResponseEvent.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.mode.process.event;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.executor.sql.process.Process;
import java.util.Collection;
@@ -29,5 +30,5 @@ import java.util.Collection;
@Getter
public final class ShowProcessListResponseEvent {
- private final Collection<String> processes;
+ private final Collection<Process> processes;
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessSubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessSubscriber.java
index 8a354e05f51..ae546387a8b 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessSubscriber.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessSubscriber.java
@@ -19,14 +19,17 @@ package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.proc
import com.google.common.eventbus.Subscribe;
import
org.apache.shardingsphere.infra.executor.sql.process.lock.ProcessOperationLockRegistry;
+import
org.apache.shardingsphere.infra.executor.sql.process.yaml.YamlProcessList;
+import
org.apache.shardingsphere.infra.executor.sql.process.yaml.swapper.YamlProcessListSwapper;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
import org.apache.shardingsphere.metadata.persist.node.ProcessNode;
+import org.apache.shardingsphere.mode.process.ProcessSubscriber;
import org.apache.shardingsphere.mode.process.event.KillProcessRequestEvent;
import
org.apache.shardingsphere.mode.process.event.ShowProcessListRequestEvent;
import
org.apache.shardingsphere.mode.process.event.ShowProcessListResponseEvent;
-import org.apache.shardingsphere.mode.process.ProcessSubscriber;
import org.apache.shardingsphere.mode.spi.PersistRepository;
import java.util.Collection;
@@ -45,9 +48,12 @@ public final class ClusterProcessSubscriber implements
ProcessSubscriber {
private final EventBusContext eventBusContext;
+ private final YamlProcessListSwapper swapper;
+
public ClusterProcessSubscriber(final PersistRepository repository, final
EventBusContext eventBusContext) {
this.repository = repository;
this.eventBusContext = eventBusContext;
+ swapper = new YamlProcessListSwapper();
eventBusContext.register(this);
}
@@ -70,9 +76,12 @@ public final class ClusterProcessSubscriber implements
ProcessSubscriber {
}
private void postShowProcessListData(final String taskId) {
- Collection<String> yamlProcessListContexts =
repository.getChildrenKeys(ProcessNode.getProcessIdPath(taskId)).stream()
- .map(each ->
repository.getDirectly(ProcessNode.getProcessListInstancePath(taskId,
each))).collect(Collectors.toList());
- eventBusContext.post(new
ShowProcessListResponseEvent(yamlProcessListContexts));
+ YamlProcessList yamlProcessList = new YamlProcessList();
+ for (String each :
repository.getChildrenKeys(ProcessNode.getProcessIdPath(taskId)).stream()
+ .map(each ->
repository.getDirectly(ProcessNode.getProcessListInstancePath(taskId,
each))).collect(Collectors.toList())) {
+ yamlProcessList.getProcesses().addAll(YamlEngine.unmarshal(each,
YamlProcessList.class).getProcesses());
+ }
+ eventBusContext.post(new
ShowProcessListResponseEvent(swapper.swapToObject(yamlProcessList)));
}
private Collection<String> getShowProcessListTriggerPaths(final String
taskId) {
diff --git
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
index 31011afc267..801a050d120 100644
---
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
+++
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
@@ -115,7 +115,7 @@ class ProcessListChangedSubscriberTest {
ClusterPersistRepository repository = registryCenter.getRepository();
verify(repository).persist("/execution_nodes/foo_id/" + instanceId,
"processes:" + System.lineSeparator() + "- completedUnitCount:
0\n id: foo_id\n idle: false\n startMillis: 0\n totalUnitCount: 0" +
System.lineSeparator());
- verify(repository).delete("/nodes/compute_nodes/process_trigger/" +
instanceId + ":foo_id");
+
verify(repository).delete("/nodes/compute_nodes/show_process_list_trigger/" +
instanceId + ":foo_id");
}
@Test
@@ -141,7 +141,7 @@ class ProcessListChangedSubscriberTest {
String processId = "foo_id";
subscriber.killLocalProcess(new KillLocalProcessEvent(instanceId,
processId));
ClusterPersistRepository repository = registryCenter.getRepository();
- verify(repository).delete("/nodes/compute_nodes/process_kill/" +
instanceId + ":foo_id");
+ verify(repository).delete("/nodes/compute_nodes/kill_process_trigger/"
+ instanceId + ":foo_id");
}
@Test
diff --git
a/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneProcessSubscriber.java
b/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneProcessSubscriber.java
index 47e71a3daea..2063cfb6bea 100644
---
a/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneProcessSubscriber.java
+++
b/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneProcessSubscriber.java
@@ -20,18 +20,14 @@ package
org.apache.shardingsphere.mode.manager.standalone.subscriber;
import com.google.common.eventbus.Subscribe;
import org.apache.shardingsphere.infra.executor.sql.process.Process;
import org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry;
-import
org.apache.shardingsphere.infra.executor.sql.process.yaml.YamlProcessList;
-import
org.apache.shardingsphere.infra.executor.sql.process.yaml.swapper.YamlProcessListSwapper;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import org.apache.shardingsphere.mode.process.ProcessSubscriber;
import org.apache.shardingsphere.mode.process.event.KillProcessRequestEvent;
import
org.apache.shardingsphere.mode.process.event.ShowProcessListRequestEvent;
import
org.apache.shardingsphere.mode.process.event.ShowProcessListResponseEvent;
-import org.apache.shardingsphere.mode.process.ProcessSubscriber;
import java.sql.SQLException;
import java.sql.Statement;
-import java.util.Collections;
/**
* Standalone process subscriber.
@@ -41,8 +37,6 @@ public final class StandaloneProcessSubscriber implements
ProcessSubscriber {
private final EventBusContext eventBusContext;
- private final YamlProcessListSwapper swapper = new
YamlProcessListSwapper();
-
public StandaloneProcessSubscriber(final EventBusContext eventBusContext) {
this.eventBusContext = eventBusContext;
eventBusContext.register(this);
@@ -51,8 +45,7 @@ public final class StandaloneProcessSubscriber implements
ProcessSubscriber {
@Override
@Subscribe
public void postShowProcessListData(final ShowProcessListRequestEvent
event) {
- YamlProcessList yamlProcessList =
swapper.swapToYamlConfiguration(ProcessRegistry.getInstance().listAll());
- eventBusContext.post(new
ShowProcessListResponseEvent(Collections.singleton(YamlEngine.marshal(yamlProcessList))));
+ eventBusContext.post(new
ShowProcessListResponseEvent(ProcessRegistry.getInstance().listAll()));
}
@Override
diff --git
a/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutor.java
b/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutor.java
index 84c955bce7a..f8e2d3e88c5 100644
---
a/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutor.java
+++
b/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutor.java
@@ -25,11 +25,9 @@ import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.ra
import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.raw.metadata.RawQueryResultMetaData;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.raw.type.RawMemoryQueryResult;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.type.memory.row.MemoryQueryResultDataRow;
-import
org.apache.shardingsphere.infra.executor.sql.process.yaml.YamlProcessList;
-import org.apache.shardingsphere.infra.executor.sql.process.yaml.YamlProcess;
+import org.apache.shardingsphere.infra.executor.sql.process.Process;
import org.apache.shardingsphere.infra.merge.result.MergedResult;
import
org.apache.shardingsphere.infra.merge.result.impl.transparent.TransparentMergedResult;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import
org.apache.shardingsphere.mode.process.event.ShowProcessListRequestEvent;
import
org.apache.shardingsphere.mode.process.event.ShowProcessListResponseEvent;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
@@ -40,7 +38,6 @@ import java.sql.Types;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -51,7 +48,7 @@ import java.util.stream.Collectors;
@SuppressWarnings("UnstableApiUsage")
public final class ShowProcessListExecutor implements
DatabaseAdminQueryExecutor {
- private Collection<String> processes;
+ private Collection<Process> processes;
@Getter
private QueryResultMetaData queryResultMetaData;
@@ -84,30 +81,26 @@ public final class ShowProcessListExecutor implements
DatabaseAdminQueryExecutor
if (null == processes || processes.isEmpty()) {
return new RawMemoryQueryResult(queryResultMetaData,
Collections.emptyList());
}
- Collection<YamlProcess> processes = new LinkedList<>();
- for (String each : this.processes) {
- processes.addAll(YamlEngine.unmarshal(each,
YamlProcessList.class).getProcesses());
- }
List<MemoryQueryResultDataRow> rows =
processes.stream().map(ShowProcessListExecutor::getMemoryQueryResultDataRow).collect(Collectors.toList());
return new RawMemoryQueryResult(queryResultMetaData, rows);
}
- private static MemoryQueryResultDataRow getMemoryQueryResultDataRow(final
YamlProcess yamlProcess) {
+ private static MemoryQueryResultDataRow getMemoryQueryResultDataRow(final
Process process) {
List<Object> rowValues = new ArrayList<>(8);
- rowValues.add(yamlProcess.getId());
- rowValues.add(yamlProcess.getUsername());
- rowValues.add(yamlProcess.getHostname());
- rowValues.add(yamlProcess.getDatabaseName());
- rowValues.add(yamlProcess.isIdle() ? "Sleep" : "Execute");
-
rowValues.add(TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() -
yamlProcess.getStartMillis()));
+ rowValues.add(process.getId());
+ rowValues.add(process.getUsername());
+ rowValues.add(process.getHostname());
+ rowValues.add(process.getDatabaseName());
+ rowValues.add(process.isIdle() ? "Sleep" : "Execute");
+
rowValues.add(TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() -
process.getStartMillis()));
String sql = null;
- if (yamlProcess.isIdle()) {
+ if (process.isIdle()) {
rowValues.add("");
} else {
- int processDoneCount = yamlProcess.getCompletedUnitCount();
+ int processDoneCount = process.getCompletedUnitCount();
String statePrefix = "Executing ";
- rowValues.add(statePrefix + processDoneCount + "/" +
yamlProcess.getTotalUnitCount());
- sql = yamlProcess.getSql();
+ rowValues.add(statePrefix + processDoneCount + "/" +
process.getTotalUnitCount());
+ sql = process.getSql();
}
if (null != sql && sql.length() > 100) {
sql = sql.substring(0, 100);
diff --git
a/proxy/backend/type/mysql/src/test/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutorTest.java
b/proxy/backend/type/mysql/src/test/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutorTest.java
index 420f528a734..729bab7c86e 100644
---
a/proxy/backend/type/mysql/src/test/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutorTest.java
+++
b/proxy/backend/type/mysql/src/test/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutorTest.java
@@ -19,6 +19,7 @@ package
org.apache.shardingsphere.proxy.backend.mysql.handler.admin.executor;
import io.netty.util.DefaultAttributeMap;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import org.apache.shardingsphere.infra.executor.sql.process.Process;
import org.apache.shardingsphere.infra.merge.result.MergedResult;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
@@ -32,6 +33,7 @@ import org.mockito.internal.configuration.plugins.Plugins;
import java.sql.SQLException;
import java.util.Collections;
+import java.util.concurrent.atomic.AtomicInteger;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -48,31 +50,24 @@ class ShowProcessListExecutorTest {
ContextManager contextManager = mock(ContextManager.class,
RETURNS_DEEP_STUBS);
when(ProxyContext.getInstance().getContextManager()).thenReturn(contextManager);
ShowProcessListExecutor showProcessListExecutor = new
ShowProcessListExecutor();
- setupBatchProcessContexts(showProcessListExecutor);
+ setupProcesses(showProcessListExecutor);
showProcessListExecutor.execute(new
ConnectionSession(mock(MySQLDatabaseType.class), TransactionType.LOCAL, new
DefaultAttributeMap()));
assertThat(showProcessListExecutor.getQueryResultMetaData().getColumnCount(),
is(8));
MergedResult mergedResult = showProcessListExecutor.getMergedResult();
while (mergedResult.next()) {
assertThat(mergedResult.getValue(1, String.class),
is("f6c2336a-63ba-41bf-941e-2e3504eb2c80"));
- assertThat(mergedResult.getValue(2, String.class), is("sharding"));
+ assertThat(mergedResult.getValue(2, String.class), is("root"));
assertThat(mergedResult.getValue(3, String.class),
is("127.0.0.1"));
- assertThat(mergedResult.getValue(4, String.class),
is("sharding_db"));
+ assertThat(mergedResult.getValue(4, String.class), is("foo_db"));
assertThat(mergedResult.getValue(7, String.class), is("Executing
1/2"));
- assertThat(mergedResult.getValue(8, String.class), is("alter table
t_order add column a varchar(64) after order_id"));
+ assertThat(mergedResult.getValue(8, String.class), is("ALTER TABLE
t_order ADD COLUMN a varchar(64) AFTER order_id"));
}
}
- private void setupBatchProcessContexts(final ShowProcessListExecutor
showProcessListExecutor) throws ReflectiveOperationException {
- String executionNodeValue = "processes:\n"
- + "- id: f6c2336a-63ba-41bf-941e-2e3504eb2c80\n"
- + " sql: alter table t_order add column a varchar(64) after
order_id\n"
- + " startMillis: 1617939785160\n"
- + " databaseName: sharding_db\n"
- + " username: sharding\n"
- + " hostname: 127.0.0.1\n"
- + " totalUnitCount: 2\n"
- + " completedUnitCount: 1\n"
- + " idle: false\n";
-
Plugins.getMemberAccessor().set(showProcessListExecutor.getClass().getDeclaredField("processes"),
showProcessListExecutor, Collections.singleton(executionNodeValue));
+ private void setupProcesses(final ShowProcessListExecutor
showProcessListExecutor) throws ReflectiveOperationException {
+ Process process = new Process("f6c2336a-63ba-41bf-941e-2e3504eb2c80",
1617939785160L,
+ "ALTER TABLE t_order ADD COLUMN a varchar(64) AFTER order_id",
"foo_db", "root", "127.0.0.1", 2, Collections.emptyList(), new
AtomicInteger(1), false);
+ Plugins.getMemberAccessor().set(
+
showProcessListExecutor.getClass().getDeclaredField("processes"),
showProcessListExecutor, Collections.singleton(process));
}
}