This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 237a864468c Refactor ProcessListChangedSubscriber (#34203)
237a864468c is described below
commit 237a864468cc56b4a3d85fcc986571342c170ded
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Dec 29 22:03:19 2024 +0800
Refactor ProcessListChangedSubscriber (#34203)
* Refactor ProcessListChangedSubscriber
* Refactor ProcessListChangedSubscriber
* Refactor ProcessListChangedSubscriber
* Refactor ProcessListChangedSubscriber
* Refactor ProcessListChangedSubscriber
* Refactor ProcessListChangedSubscriber
* Refactor ProcessListChangedSubscriber
* Refactor ProcessListChangedSubscriber
* Refactor ProcessListChangedSubscriber
---
.../infra/executor/sql/process/Process.java | 13 +++++++
.../executor/sql/process/ProcessRegistry.java | 16 +++++++-
.../service/divided/ProcessPersistService.java | 16 ++++++++
.../type/ProcessListChangedSubscriber.java | 45 ++++++----------------
.../service/ClusterProcessPersistService.java | 17 ++++++++
.../type/ProcessListChangedSubscriberTest.java | 42 ++++----------------
.../service/ClusterProcessPersistServiceTest.java | 27 ++++++++++++-
.../service/StandaloneProcessPersistService.java | 17 ++++----
.../StandaloneProcessPersistServiceTest.java | 18 +--------
.../admin/executor/ShowProcessListExecutor.java | 2 +-
10 files changed, 118 insertions(+), 95 deletions(-)
diff --git
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java
index f715b1af99b..a1eae30a7ec 100644
---
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java
+++
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java
@@ -27,6 +27,7 @@ import
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutionU
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
import org.apache.shardingsphere.infra.metadata.user.Grantee;
+import java.sql.SQLException;
import java.sql.Statement;
import java.util.LinkedHashMap;
import java.util.Map;
@@ -151,4 +152,16 @@ public final class Process {
public void removeProcessStatement(final ExecutionUnit executionUnit) {
processStatements.remove(System.identityHashCode(executionUnit));
}
+
+ /**
+ * Kill process.
+ *
+ * @throws SQLException SQL exception
+ */
+ public void kill() throws SQLException {
+ setInterrupted(true);
+ for (Statement each : processStatements.values()) {
+ each.cancel();
+ }
+ }
}
diff --git
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessRegistry.java
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessRegistry.java
index 53a5c18f6c9..1198c4245de 100644
---
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessRegistry.java
+++
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessRegistry.java
@@ -24,6 +24,7 @@ import
org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import
org.apache.shardingsphere.infra.exception.kernel.connection.SQLExecutionInterruptedException;
+import java.sql.SQLException;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -94,11 +95,24 @@ public final class ProcessRegistry {
}
/**
- * List all process.
+ * List all processes.
*
* @return all processes
*/
public Collection<Process> listAll() {
return processes.values();
}
+
+ /**
+ * Kill process.
+ *
+ * @param processId process ID
+ * @throws SQLException SQL exception
+ */
+ public void kill(final String processId) throws SQLException {
+ Process process = ProcessRegistry.getInstance().get(processId);
+ if (null != process) {
+ process.kill();
+ }
+ }
}
diff --git
a/mode/core/src/main/java/org/apache/shardingsphere/mode/persist/service/divided/ProcessPersistService.java
b/mode/core/src/main/java/org/apache/shardingsphere/mode/persist/service/divided/ProcessPersistService.java
index 0794364a6b8..894424fc4b9 100644
---
a/mode/core/src/main/java/org/apache/shardingsphere/mode/persist/service/divided/ProcessPersistService.java
+++
b/mode/core/src/main/java/org/apache/shardingsphere/mode/persist/service/divided/ProcessPersistService.java
@@ -27,6 +27,14 @@ import java.util.Collection;
*/
public interface ProcessPersistService {
+ /**
+ * Report local processes.
+ *
+ * @param instanceId instance ID
+ * @param taskId task ID
+ */
+ void reportLocalProcesses(String instanceId, String taskId);
+
/**
* Get process list.
*
@@ -41,4 +49,12 @@ public interface ProcessPersistService {
* @throws SQLException SQL exception
*/
void killProcess(String processId) throws SQLException;
+
+ /**
+ * Clean process.
+ *
+ * @param instanceId instance ID
+ * @param processId process ID
+ */
+ void cleanProcess(String instanceId, String processId);
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ProcessListChangedSubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ProcessListChangedSubscriber.java
index 4e9c64cd2c8..32f22434919 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ProcessListChangedSubscriber.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ProcessListChangedSubscriber.java
@@ -18,57 +18,42 @@
package
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.type;
import com.google.common.eventbus.Subscribe;
-import org.apache.shardingsphere.infra.executor.sql.process.Process;
import org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry;
import
org.apache.shardingsphere.infra.executor.sql.process.lock.ProcessOperationLockRegistry;
-import
org.apache.shardingsphere.infra.executor.sql.process.yaml.swapper.YamlProcessListSwapper;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
-import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
-import org.apache.shardingsphere.metadata.persist.node.ProcessNode;
+import org.apache.shardingsphere.mode.manager.ContextManager;
import
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.event.state.compute.KillLocalProcessCompletedEvent;
import
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.event.state.compute.KillLocalProcessEvent;
import
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.event.state.compute.ReportLocalProcessesCompletedEvent;
import
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.event.state.compute.ReportLocalProcessesEvent;
-import org.apache.shardingsphere.mode.manager.ContextManager;
import
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.DispatchEventSubscriber;
-import org.apache.shardingsphere.mode.spi.PersistRepository;
+import
org.apache.shardingsphere.mode.persist.service.divided.ProcessPersistService;
import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Collection;
/**
* Process list changed subscriber.
*/
public final class ProcessListChangedSubscriber implements
DispatchEventSubscriber {
- private final ContextManager contextManager;
-
- private final PersistRepository repository;
+ private final String instanceId;
- private final YamlProcessListSwapper swapper;
+ private final ProcessPersistService processPersistService;
public ProcessListChangedSubscriber(final ContextManager contextManager) {
- this.contextManager = contextManager;
- repository = contextManager.getPersistServiceFacade().getRepository();
- swapper = new YamlProcessListSwapper();
+ instanceId =
contextManager.getComputeNodeInstanceContext().getInstance().getMetaData().getId();
+ processPersistService =
contextManager.getPersistServiceFacade().getProcessPersistService();
}
/**
* Report local processes.
*
- * @param event show process list trigger event
+ * @param event report local processes event
*/
@Subscribe
public void reportLocalProcesses(final ReportLocalProcessesEvent event) {
- if
(!event.getInstanceId().equals(contextManager.getComputeNodeInstanceContext().getInstance().getMetaData().getId()))
{
- return;
- }
- Collection<Process> processes =
ProcessRegistry.getInstance().listAll();
- if (!processes.isEmpty()) {
-
repository.persist(ProcessNode.getProcessListInstancePath(event.getTaskId(),
event.getInstanceId()),
YamlEngine.marshal(swapper.swapToYamlConfiguration(processes)));
+ if (event.getInstanceId().equals(instanceId)) {
+ processPersistService.reportLocalProcesses(instanceId,
event.getTaskId());
}
-
repository.delete(ComputeNode.getProcessTriggerInstanceNodePath(event.getInstanceId(),
event.getTaskId()));
}
/**
@@ -89,17 +74,11 @@ public final class ProcessListChangedSubscriber implements
DispatchEventSubscrib
*/
@Subscribe
public synchronized void killLocalProcess(final KillLocalProcessEvent
event) throws SQLException {
- if
(!event.getInstanceId().equals(contextManager.getComputeNodeInstanceContext().getInstance().getMetaData().getId()))
{
+ if (!event.getInstanceId().equals(instanceId)) {
return;
}
- Process process =
ProcessRegistry.getInstance().get(event.getProcessId());
- if (null != process) {
- process.setInterrupted(true);
- for (Statement each : process.getProcessStatements().values()) {
- each.cancel();
- }
- }
-
repository.delete(ComputeNode.getProcessKillInstanceIdNodePath(event.getInstanceId(),
event.getProcessId()));
+ ProcessRegistry.getInstance().kill(event.getProcessId());
+ processPersistService.cleanProcess(instanceId, event.getProcessId());
}
/**
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/ClusterProcessPersistService.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/ClusterProcessPersistService.java
index 3bc48b500c1..9bcc9470062 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/ClusterProcessPersistService.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/ClusterProcessPersistService.java
@@ -19,6 +19,7 @@ package
org.apache.shardingsphere.mode.manager.cluster.persist.service;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.executor.sql.process.Process;
+import org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry;
import
org.apache.shardingsphere.infra.executor.sql.process.lock.ProcessOperationLockRegistry;
import
org.apache.shardingsphere.infra.executor.sql.process.yaml.YamlProcessList;
import
org.apache.shardingsphere.infra.executor.sql.process.yaml.swapper.YamlProcessListSwapper;
@@ -43,6 +44,17 @@ public final class ClusterProcessPersistService implements
ProcessPersistService
private final PersistRepository repository;
+ private final YamlProcessListSwapper swapper = new
YamlProcessListSwapper();
+
+ @Override
+ public void reportLocalProcesses(final String instanceId, final String
taskId) {
+ Collection<Process> processes =
ProcessRegistry.getInstance().listAll();
+ if (!processes.isEmpty()) {
+ repository.persist(ProcessNode.getProcessListInstancePath(taskId,
instanceId), YamlEngine.marshal(swapper.swapToYamlConfiguration(processes)));
+ }
+
repository.delete(ComputeNode.getProcessTriggerInstanceNodePath(instanceId,
taskId));
+ }
+
@Override
public Collection<Process> getProcessList() {
String taskId = new UUID(ThreadLocalRandom.current().nextLong(),
ThreadLocalRandom.current().nextLong()).toString().replace("-", "");
@@ -98,4 +110,9 @@ public final class ClusterProcessPersistService implements
ProcessPersistService
private boolean isReady(final Collection<String> paths) {
return paths.stream().noneMatch(each -> null !=
repository.query(each));
}
+
+ @Override
+ public void cleanProcess(final String instanceId, final String processId) {
+
repository.delete(ComputeNode.getProcessKillInstanceIdNodePath(instanceId,
processId));
+ }
}
diff --git
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ProcessListChangedSubscriberTest.java
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ProcessListChangedSubscriberTest.java
index 2dab71dc3d4..5cd48c2be7b 100644
---
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ProcessListChangedSubscriberTest.java
+++
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ProcessListChangedSubscriberTest.java
@@ -17,14 +17,13 @@
package
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.type;
-import org.apache.shardingsphere.infra.executor.sql.process.Process;
import org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry;
import
org.apache.shardingsphere.infra.executor.sql.process.lock.ProcessOperationLockRegistry;
+import org.apache.shardingsphere.mode.manager.ContextManager;
import
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.event.state.compute.KillLocalProcessCompletedEvent;
import
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.event.state.compute.KillLocalProcessEvent;
import
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.event.state.compute.ReportLocalProcessesCompletedEvent;
import
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.event.state.compute.ReportLocalProcessesEvent;
-import org.apache.shardingsphere.mode.manager.ContextManager;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.test.mock.AutoMockExtension;
import org.apache.shardingsphere.test.mock.StaticMockSettings;
@@ -37,12 +36,9 @@ import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import java.sql.SQLException;
-import java.sql.Statement;
import java.util.Collections;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -72,19 +68,10 @@ class ProcessListChangedSubscriberTest {
}
@Test
- void assertReportEmptyLocalProcesses() {
+ void assertReportLocalProcesses() {
when(ProcessRegistry.getInstance().listAll()).thenReturn(Collections.emptyList());
subscriber.reportLocalProcesses(new
ReportLocalProcessesEvent("foo_instance_id", "foo_task_id"));
- verify(contextManager.getPersistServiceFacade().getRepository(),
times(0)).persist(any(), any());
-
verify(contextManager.getPersistServiceFacade().getRepository()).delete("/nodes/compute_nodes/show_process_list_trigger/foo_instance_id:foo_task_id");
- }
-
- @Test
- void assertReportNotEmptyLocalProcesses() {
-
when(ProcessRegistry.getInstance().listAll()).thenReturn(Collections.singleton(mock(Process.class,
RETURNS_DEEP_STUBS)));
- subscriber.reportLocalProcesses(new
ReportLocalProcessesEvent("foo_instance_id", "foo_task_id"));
-
verify(contextManager.getPersistServiceFacade().getRepository()).persist(eq("/execution_nodes/foo_task_id/foo_instance_id"),
any());
-
verify(contextManager.getPersistServiceFacade().getRepository()).delete("/nodes/compute_nodes/show_process_list_trigger/foo_instance_id:foo_task_id");
+
verify(contextManager.getPersistServiceFacade().getProcessPersistService()).reportLocalProcesses("foo_instance_id",
"foo_task_id");
}
@Test
@@ -94,28 +81,15 @@ class ProcessListChangedSubscriberTest {
}
@Test
- void assertKillLocalProcessWithNotCurrentInstance() throws SQLException {
- subscriber.killLocalProcess(new
KillLocalProcessEvent("bar_instance_id", "foo_pid"));
- verify(contextManager.getPersistServiceFacade().getRepository(),
times(0)).delete(any());
- }
-
- @Test
- void assertKillLocalProcessWithoutExistedProcess() throws SQLException {
- when(ProcessRegistry.getInstance().get("foo_pid")).thenReturn(null);
+ void assertKillLocalProcessWithCurrentInstance() throws SQLException {
subscriber.killLocalProcess(new
KillLocalProcessEvent("foo_instance_id", "foo_pid"));
-
verify(contextManager.getPersistServiceFacade().getRepository()).delete("/nodes/compute_nodes/kill_process_trigger/foo_instance_id:foo_pid");
+
verify(contextManager.getPersistServiceFacade().getProcessPersistService()).cleanProcess("foo_instance_id",
"foo_pid");
}
@Test
- void assertKillLocalProcessWithExistedProcess() throws SQLException {
- Process process = mock(Process.class, RETURNS_DEEP_STUBS);
- Statement statement = mock(Statement.class);
-
when(process.getProcessStatements()).thenReturn(Collections.singletonMap(1,
statement));
- when(ProcessRegistry.getInstance().get("foo_pid")).thenReturn(process);
- subscriber.killLocalProcess(new
KillLocalProcessEvent("foo_instance_id", "foo_pid"));
- verify(process).setInterrupted(true);
- verify(statement).cancel();
-
verify(contextManager.getPersistServiceFacade().getRepository()).delete("/nodes/compute_nodes/kill_process_trigger/foo_instance_id:foo_pid");
+ void assertKillLocalProcessWithNotCurrentInstance() throws SQLException {
+ subscriber.killLocalProcess(new
KillLocalProcessEvent("bar_instance_id", "foo_pid"));
+
verify(contextManager.getPersistServiceFacade().getProcessPersistService(),
times(0)).cleanProcess("bar_instance_id", "foo_pid");
}
@Test
diff --git
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/ClusterProcessPersistServiceTest.java
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/ClusterProcessPersistServiceTest.java
index efebb308c67..5a888875f8d 100644
---
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/ClusterProcessPersistServiceTest.java
+++
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/ClusterProcessPersistServiceTest.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.mode.manager.cluster.persist.service;
import org.apache.shardingsphere.infra.executor.sql.process.Process;
+import org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry;
import
org.apache.shardingsphere.infra.executor.sql.process.lock.ProcessOperationLockRegistry;
import org.apache.shardingsphere.infra.executor.sql.process.yaml.YamlProcess;
import
org.apache.shardingsphere.infra.executor.sql.process.yaml.YamlProcessList;
@@ -40,12 +41,14 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.contains;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(AutoMockExtension.class)
-@StaticMockSettings(ProcessOperationLockRegistry.class)
+@StaticMockSettings({ProcessRegistry.class,
ProcessOperationLockRegistry.class})
class ClusterProcessPersistServiceTest {
@Mock
@@ -58,6 +61,22 @@ class ClusterProcessPersistServiceTest {
processPersistService = new ClusterProcessPersistService(repository);
}
+ @Test
+ void assertReportEmptyLocalProcesses() {
+
when(ProcessRegistry.getInstance().listAll()).thenReturn(Collections.emptyList());
+ processPersistService.reportLocalProcesses("foo_instance_id",
"foo_task_id");
+ verify(repository, times(0)).persist(any(), any());
+
verify(repository).delete("/nodes/compute_nodes/show_process_list_trigger/foo_instance_id:foo_task_id");
+ }
+
+ @Test
+ void assertReportNotEmptyLocalProcesses() {
+
when(ProcessRegistry.getInstance().listAll()).thenReturn(Collections.singleton(mock(Process.class,
RETURNS_DEEP_STUBS)));
+ processPersistService.reportLocalProcesses("foo_instance_id",
"foo_task_id");
+
verify(repository).persist(eq("/execution_nodes/foo_task_id/foo_instance_id"),
any());
+
verify(repository).delete("/nodes/compute_nodes/show_process_list_trigger/foo_instance_id:foo_task_id");
+ }
+
@Test
void assertGetCompletedProcessList() {
when(ProcessOperationLockRegistry.getInstance().waitUntilReleaseReady(any(),
any())).thenReturn(true);
@@ -111,4 +130,10 @@ class ClusterProcessPersistServiceTest {
processPersistService.killProcess("foo_process_id");
verify(repository).persist("/nodes/compute_nodes/kill_process_trigger/abc:foo_process_id",
"");
}
+
+ @Test
+ void assertCleanProcess() {
+ processPersistService.cleanProcess("foo_instance_id", "foo_pid");
+
verify(repository).delete("/nodes/compute_nodes/kill_process_trigger/foo_instance_id:foo_pid");
+ }
}
diff --git
a/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/persist/service/StandaloneProcessPersistService.java
b/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/persist/service/StandaloneProcessPersistService.java
index 135bc0a1a55..d7c0e6e5057 100644
---
a/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/persist/service/StandaloneProcessPersistService.java
+++
b/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/persist/service/StandaloneProcessPersistService.java
@@ -22,7 +22,6 @@ import
org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry;
import
org.apache.shardingsphere.mode.persist.service.divided.ProcessPersistService;
import java.sql.SQLException;
-import java.sql.Statement;
import java.util.Collection;
/**
@@ -30,6 +29,10 @@ import java.util.Collection;
*/
public final class StandaloneProcessPersistService implements
ProcessPersistService {
+ @Override
+ public void reportLocalProcesses(final String instanceId, final String
taskId) {
+ }
+
@Override
public Collection<Process> getProcessList() {
return ProcessRegistry.getInstance().listAll();
@@ -37,12 +40,10 @@ public final class StandaloneProcessPersistService
implements ProcessPersistServ
@Override
public void killProcess(final String processId) throws SQLException {
- Process process = ProcessRegistry.getInstance().get(processId);
- if (null == process) {
- return;
- }
- for (Statement each : process.getProcessStatements().values()) {
- each.cancel();
- }
+ ProcessRegistry.getInstance().kill(processId);
+ }
+
+ @Override
+ public void cleanProcess(final String instanceId, final String processId) {
}
}
diff --git
a/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/persist/service/StandaloneProcessPersistServiceTest.java
b/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/persist/service/StandaloneProcessPersistServiceTest.java
index 45c6a36b44c..156636d56a2 100644
---
a/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/persist/service/StandaloneProcessPersistServiceTest.java
+++
b/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/persist/service/StandaloneProcessPersistServiceTest.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.mode.manager.standalone.persist.service;
-import org.apache.shardingsphere.infra.executor.sql.process.Process;
import org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry;
import org.apache.shardingsphere.test.mock.AutoMockExtension;
import org.apache.shardingsphere.test.mock.StaticMockSettings;
@@ -25,10 +24,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Collections;
-import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -51,19 +47,7 @@ class StandaloneProcessPersistServiceTest {
void assertKillProcess() throws SQLException {
ProcessRegistry processRegistry = mock(ProcessRegistry.class);
when(ProcessRegistry.getInstance()).thenReturn(processRegistry);
- Process process = mock(Process.class);
- Statement statement = mock(Statement.class);
-
when(process.getProcessStatements()).thenReturn(Collections.singletonMap(1,
statement));
- when(processRegistry.get("foo_id")).thenReturn(process);
processPersistService.killProcess("foo_id");
- verify(statement).cancel();
- }
-
- @Test
- void assertKillProcessWithNotExistedProcessId() {
- ProcessRegistry processRegistry = mock(ProcessRegistry.class);
- when(ProcessRegistry.getInstance()).thenReturn(processRegistry);
- when(processRegistry.get("foo_id")).thenReturn(null);
- assertDoesNotThrow(() -> processPersistService.killProcess("foo_id"));
+ verify(ProcessRegistry.getInstance()).kill("foo_id");
}
}
diff --git
a/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutor.java
b/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutor.java
index 57dd3db0bdb..3254b62ee3f 100644
---
a/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutor.java
+++
b/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutor.java
@@ -62,7 +62,7 @@ public final class ShowProcessListExecutor implements
DatabaseAdminQueryExecutor
private QueryResult getQueryResult() {
Collection<Process> processes =
ProxyContext.getInstance().getContextManager().getPersistServiceFacade().getProcessPersistService().getProcessList();
- if (null == processes || processes.isEmpty()) {
+ if (processes.isEmpty()) {
return new RawMemoryQueryResult(queryResultMetaData,
Collections.emptyList());
}
List<MemoryQueryResultDataRow> rows =
processes.stream().map(this::getMemoryQueryResultDataRow).collect(Collectors.toList());