This is an automated email from the ASF dual-hosted git repository.

zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new d0abb75896c Use ProcessPersistService instead of ProcessListEvent and 
remove ProcessSubscriber (#31411)
d0abb75896c is described below

commit d0abb75896cce63f8309d1992aa4f4aae5224895
Author: Haoran Meng <[email protected]>
AuthorDate: Mon May 27 13:41:49 2024 +0800

    Use ProcessPersistService instead of ProcessListEvent and remove 
ProcessSubscriber (#31411)
---
 .../mode/process/ProcessSubscriber.java            |  44 --------
 .../process/event/KillProcessRequestEvent.java     |  31 ------
 .../process/event/ShowProcessListRequestEvent.java |  24 -----
 .../event/ShowProcessListResponseEvent.java        |  34 -------
 .../cluster/ClusterContextManagerBuilder.java      |   2 -
 .../subscriber/ClusterProcessSubscriber.java       | 113 ---------------------
 .../ClusterProcessPersistServiceTest.java}         |  24 +++--
 .../StandaloneEventSubscriberRegistry.java         |   4 +-
 .../subscriber/StandaloneProcessSubscriber.java    |  59 -----------
 .../StandaloneProcessPersistServiceTest.java}      |  39 +++++--
 .../admin/executor/KillProcessExecutor.java        |   7 +-
 .../admin/executor/ShowProcessListExecutor.java    |  18 +---
 .../executor/ShowProcessListExecutorTest.java      |  11 +-
 13 files changed, 58 insertions(+), 352 deletions(-)

diff --git 
a/mode/core/src/main/java/org/apache/shardingsphere/mode/process/ProcessSubscriber.java
 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/process/ProcessSubscriber.java
deleted file mode 100644
index 96fb755510c..00000000000
--- 
a/mode/core/src/main/java/org/apache/shardingsphere/mode/process/ProcessSubscriber.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.mode.process;
-
-import org.apache.shardingsphere.mode.process.event.KillProcessRequestEvent;
-import 
org.apache.shardingsphere.mode.process.event.ShowProcessListRequestEvent;
-
-import java.sql.SQLException;
-
-/**
- * Process subscriber.
- */
-public interface ProcessSubscriber {
-    
-    /**
-     * Post show process list data.
-     *
-     * @param event show process list request event
-     */
-    void postShowProcessListData(ShowProcessListRequestEvent event);
-    
-    /**
-     * Kill process.
-     *
-     * @param event kill process request event
-     * @throws SQLException SQL exception
-     */
-    void killProcess(KillProcessRequestEvent event) throws SQLException;
-}
diff --git 
a/mode/core/src/main/java/org/apache/shardingsphere/mode/process/event/KillProcessRequestEvent.java
 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/process/event/KillProcessRequestEvent.java
deleted file mode 100644
index e69590c04cd..00000000000
--- 
a/mode/core/src/main/java/org/apache/shardingsphere/mode/process/event/KillProcessRequestEvent.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.mode.process.event;
-
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-
-/**
- * Kill process request event.
- */
-@RequiredArgsConstructor
-@Getter
-public final class KillProcessRequestEvent {
-    
-    private final String id;
-}
diff --git 
a/mode/core/src/main/java/org/apache/shardingsphere/mode/process/event/ShowProcessListRequestEvent.java
 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/process/event/ShowProcessListRequestEvent.java
deleted file mode 100644
index 289964b6287..00000000000
--- 
a/mode/core/src/main/java/org/apache/shardingsphere/mode/process/event/ShowProcessListRequestEvent.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.mode.process.event;
-
-/**
- * Show process list request event.
- */
-public final class ShowProcessListRequestEvent {
-}
diff --git 
a/mode/core/src/main/java/org/apache/shardingsphere/mode/process/event/ShowProcessListResponseEvent.java
 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/process/event/ShowProcessListResponseEvent.java
deleted file mode 100644
index 4b83a572516..00000000000
--- 
a/mode/core/src/main/java/org/apache/shardingsphere/mode/process/event/ShowProcessListResponseEvent.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.mode.process.event;
-
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.infra.executor.sql.process.Process;
-
-import java.util.Collection;
-
-/**
- * Show process list response event.
- */
-@RequiredArgsConstructor
-@Getter
-public final class ShowProcessListResponseEvent {
-    
-    private final Collection<Process> processes;
-}
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
index 72f4874996c..f6409ee689d 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
@@ -34,7 +34,6 @@ import 
org.apache.shardingsphere.mode.manager.ContextManagerBuilder;
 import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.GlobalLockPersistService;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcherFactory;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process.subscriber.ClusterProcessSubscriber;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.subscriber.QualifiedDataSourceStatusSubscriber;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.workerid.generator.ClusterWorkerIdGenerator;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber.ClusterEventSubscriberRegistry;
@@ -97,7 +96,6 @@ public final class ClusterContextManagerBuilder implements 
ContextManagerBuilder
     // TODO remove the method, only keep ZooKeeper's events, remove all 
decouple events
     private void createSubscribers(final EventBusContext eventBusContext, 
final ClusterPersistRepository repository) {
         eventBusContext.register(new 
QualifiedDataSourceStatusSubscriber(repository));
-        eventBusContext.register(new ClusterProcessSubscriber(repository, 
eventBusContext));
     }
     
     private void registerOnline(final EventBusContext eventBusContext, final 
ComputeNodeInstanceContext computeNodeInstanceContext,
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessSubscriber.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessSubscriber.java
deleted file mode 100644
index 440d8e22dc9..00000000000
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessSubscriber.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process.subscriber;
-
-import com.google.common.eventbus.Subscribe;
-import lombok.RequiredArgsConstructor;
-import 
org.apache.shardingsphere.infra.executor.sql.process.lock.ProcessOperationLockRegistry;
-import 
org.apache.shardingsphere.infra.executor.sql.process.yaml.YamlProcessList;
-import 
org.apache.shardingsphere.infra.executor.sql.process.yaml.swapper.YamlProcessListSwapper;
-import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
-import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
-import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
-import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
-import org.apache.shardingsphere.metadata.persist.node.ProcessNode;
-import org.apache.shardingsphere.mode.process.ProcessSubscriber;
-import org.apache.shardingsphere.mode.process.event.KillProcessRequestEvent;
-import 
org.apache.shardingsphere.mode.process.event.ShowProcessListRequestEvent;
-import 
org.apache.shardingsphere.mode.process.event.ShowProcessListResponseEvent;
-import org.apache.shardingsphere.mode.spi.PersistRepository;
-
-import java.util.Collection;
-import java.util.UUID;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-/**
- * Cluster process subscriber.
- */
-@RequiredArgsConstructor
-public final class ClusterProcessSubscriber implements ProcessSubscriber, 
EventSubscriber {
-    
-    private final PersistRepository repository;
-    
-    private final EventBusContext eventBusContext;
-    
-    private final YamlProcessListSwapper swapper = new 
YamlProcessListSwapper();
-    
-    @Override
-    @Subscribe
-    public void postShowProcessListData(final ShowProcessListRequestEvent 
event) {
-        String taskId = new UUID(ThreadLocalRandom.current().nextLong(), 
ThreadLocalRandom.current().nextLong()).toString().replace("-", "");
-        Collection<String> triggerPaths = 
getShowProcessListTriggerPaths(taskId);
-        boolean isCompleted = false;
-        try {
-            triggerPaths.forEach(each -> repository.persist(each, ""));
-            isCompleted = 
ProcessOperationLockRegistry.getInstance().waitUntilReleaseReady(taskId, () -> 
isReady(triggerPaths));
-            postShowProcessListData(taskId);
-        } finally {
-            repository.delete(ProcessNode.getProcessIdPath(taskId));
-            if (!isCompleted) {
-                triggerPaths.forEach(repository::delete);
-            }
-        }
-    }
-    
-    private void postShowProcessListData(final String taskId) {
-        YamlProcessList yamlProcessList = new YamlProcessList();
-        for (String each : 
repository.getChildrenKeys(ProcessNode.getProcessIdPath(taskId)).stream()
-                .map(each -> 
repository.query(ProcessNode.getProcessListInstancePath(taskId, 
each))).collect(Collectors.toList())) {
-            yamlProcessList.getProcesses().addAll(YamlEngine.unmarshal(each, 
YamlProcessList.class).getProcesses());
-        }
-        eventBusContext.post(new 
ShowProcessListResponseEvent(swapper.swapToObject(yamlProcessList)));
-    }
-    
-    private Collection<String> getShowProcessListTriggerPaths(final String 
taskId) {
-        return Stream.of(InstanceType.values())
-                .flatMap(each -> 
repository.getChildrenKeys(ComputeNode.getOnlineNodePath(each)).stream().map(onlinePath
 -> ComputeNode.getProcessTriggerInstanceNodePath(onlinePath, taskId)))
-                .collect(Collectors.toList());
-    }
-    
-    private boolean isReady(final Collection<String> paths) {
-        return paths.stream().noneMatch(each -> null != 
repository.query(each));
-    }
-    
-    @Override
-    @Subscribe
-    public void killProcess(final KillProcessRequestEvent event) {
-        String processId = event.getId();
-        Collection<String> triggerPaths = 
getKillProcessTriggerPaths(processId);
-        boolean isCompleted = false;
-        try {
-            triggerPaths.forEach(each -> repository.persist(each, ""));
-            isCompleted = 
ProcessOperationLockRegistry.getInstance().waitUntilReleaseReady(processId, () 
-> isReady(triggerPaths));
-        } finally {
-            if (!isCompleted) {
-                triggerPaths.forEach(repository::delete);
-            }
-        }
-    }
-    
-    private Collection<String> getKillProcessTriggerPaths(final String 
processId) {
-        return Stream.of(InstanceType.values())
-                .flatMap(each -> 
repository.getChildrenKeys(ComputeNode.getOnlineNodePath(each)).stream().map(onlinePath
 -> ComputeNode.getProcessKillInstanceIdNodePath(onlinePath, processId)))
-                .collect(Collectors.toList());
-    }
-}
diff --git 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessSubscriberTest.java
 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/service/ClusterProcessPersistServiceTest.java
similarity index 72%
rename from 
mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessSubscriberTest.java
rename to 
mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/service/ClusterProcessPersistServiceTest.java
index 5fc60d3e805..d7f2ce4969e 100644
--- 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessSubscriberTest.java
+++ 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/service/ClusterProcessPersistServiceTest.java
@@ -15,12 +15,10 @@
  * limitations under the License.
  */
 
-package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process.subscriber;
+package org.apache.shardingsphere.mode.manager.cluster.service;
 
 import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
-import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
 import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
-import 
org.apache.shardingsphere.mode.process.event.ShowProcessListRequestEvent;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -35,26 +33,32 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 @ExtendWith(MockitoExtension.class)
-class ClusterProcessSubscriberTest {
+class ClusterProcessPersistServiceTest {
     
     @Mock
     private ClusterPersistRepository repository;
     
-    private final EventBusContext eventBusContext = new EventBusContext();
-    
-    private ClusterProcessSubscriber clusterProcessListSubscriber;
+    private ClusterProcessPersistService processPersistService;
     
     @BeforeEach
     void setUp() {
-        clusterProcessListSubscriber = new 
ClusterProcessSubscriber(repository, eventBusContext);
+        processPersistService = new ClusterProcessPersistService(repository);
     }
     
     @Test
-    void assertPostShowProcessListData() {
+    void getProcessList() {
         
when(repository.getChildrenKeys(ComputeNode.getOnlineNodePath(InstanceType.JDBC))).thenReturn(Collections.emptyList());
         
when(repository.getChildrenKeys(ComputeNode.getOnlineNodePath(InstanceType.PROXY))).thenReturn(Collections.singletonList("abc"));
         when(repository.query(any())).thenReturn(null);
-        clusterProcessListSubscriber.postShowProcessListData(new 
ShowProcessListRequestEvent());
+        processPersistService.getProcessList();
+        verify(repository).persist(any(), any());
+    }
+    
+    @Test
+    void killProcess() {
+        
when(repository.getChildrenKeys(ComputeNode.getOnlineNodePath(InstanceType.JDBC))).thenReturn(Collections.emptyList());
+        
when(repository.getChildrenKeys(ComputeNode.getOnlineNodePath(InstanceType.PROXY))).thenReturn(Collections.singletonList("abc"));
+        processPersistService.killProcess("foo_process_id");
         verify(repository).persist(any(), any());
     }
 }
diff --git 
a/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneEventSubscriberRegistry.java
 
b/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneEventSubscriberRegistry.java
index b0a4278a9a7..040973879dc 100644
--- 
a/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneEventSubscriberRegistry.java
+++ 
b/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneEventSubscriberRegistry.java
@@ -27,8 +27,6 @@ import 
org.apache.shardingsphere.mode.subsciber.RuleItemChangedSubscriber;
 public final class StandaloneEventSubscriberRegistry extends 
EventSubscriberRegistry {
     
     public StandaloneEventSubscriberRegistry(final ContextManager 
contextManager) {
-        super(contextManager,
-                new 
StandaloneProcessSubscriber(contextManager.getComputeNodeInstanceContext().getEventBusContext()),
-                new RuleItemChangedSubscriber(contextManager));
+        super(contextManager, new RuleItemChangedSubscriber(contextManager));
     }
 }
diff --git 
a/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneProcessSubscriber.java
 
b/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneProcessSubscriber.java
deleted file mode 100644
index 8180b6334d6..00000000000
--- 
a/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneProcessSubscriber.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.mode.manager.standalone.subscriber;
-
-import com.google.common.eventbus.Subscribe;
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.infra.executor.sql.process.Process;
-import org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry;
-import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
-import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
-import org.apache.shardingsphere.mode.process.ProcessSubscriber;
-import org.apache.shardingsphere.mode.process.event.KillProcessRequestEvent;
-import 
org.apache.shardingsphere.mode.process.event.ShowProcessListRequestEvent;
-import 
org.apache.shardingsphere.mode.process.event.ShowProcessListResponseEvent;
-
-import java.sql.SQLException;
-import java.sql.Statement;
-
-/**
- * Standalone process subscriber.
- */
-@RequiredArgsConstructor
-public final class StandaloneProcessSubscriber implements ProcessSubscriber, 
EventSubscriber {
-    
-    private final EventBusContext eventBusContext;
-    
-    @Override
-    @Subscribe
-    public void postShowProcessListData(final ShowProcessListRequestEvent 
event) {
-        eventBusContext.post(new 
ShowProcessListResponseEvent(ProcessRegistry.getInstance().listAll()));
-    }
-    
-    @Override
-    @Subscribe
-    public void killProcess(final KillProcessRequestEvent event) throws 
SQLException {
-        Process process = ProcessRegistry.getInstance().get(event.getId());
-        if (null == process) {
-            return;
-        }
-        for (Statement each : process.getProcessStatements().values()) {
-            each.cancel();
-        }
-    }
-}
diff --git 
a/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneProcessSubscriberTest.java
 
b/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/service/StandaloneProcessPersistServiceTest.java
similarity index 53%
rename from 
mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneProcessSubscriberTest.java
rename to 
mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/service/StandaloneProcessPersistServiceTest.java
index 83e1dae112c..8c9e53d028a 100644
--- 
a/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneProcessSubscriberTest.java
+++ 
b/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/service/StandaloneProcessPersistServiceTest.java
@@ -15,29 +15,56 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.mode.manager.standalone.subscriber;
+package org.apache.shardingsphere.mode.manager.standalone.service;
 
+import org.apache.shardingsphere.infra.executor.sql.process.Process;
 import org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry;
-import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
-import 
org.apache.shardingsphere.mode.process.event.ShowProcessListRequestEvent;
 import org.apache.shardingsphere.test.mock.AutoMockExtension;
 import org.apache.shardingsphere.test.mock.StaticMockSettings;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 @ExtendWith(AutoMockExtension.class)
 @StaticMockSettings(ProcessRegistry.class)
-class StandaloneProcessSubscriberTest {
+class StandaloneProcessPersistServiceTest {
+    
+    private StandaloneProcessPersistService processPersistService;
+    
+    @BeforeEach
+    void setUp() {
+        processPersistService = new StandaloneProcessPersistService();
+    }
     
     @Test
-    void assertPostShowProcessListData() {
+    void getProcessList() {
         ProcessRegistry processRegistry = mock(ProcessRegistry.class);
         when(ProcessRegistry.getInstance()).thenReturn(processRegistry);
-        new StandaloneProcessSubscriber(new 
EventBusContext()).postShowProcessListData(new ShowProcessListRequestEvent());
+        processPersistService.getProcessList();
         verify(processRegistry).listAll();
     }
+    
+    @Test
+    void killProcess() throws SQLException {
+        ProcessRegistry processRegistry = mock(ProcessRegistry.class);
+        when(ProcessRegistry.getInstance()).thenReturn(processRegistry);
+        Process process = mock(Process.class);
+        Statement statement = mock(Statement.class);
+        Map<Integer, Statement> processStatements = new ConcurrentHashMap<>();
+        processStatements.put(1, statement);
+        when(process.getProcessStatements()).thenReturn(processStatements);
+        when(processRegistry.get(any())).thenReturn(process);
+        processPersistService.killProcess("foo_process_id");
+        verify(statement).cancel();
+    }
 }
diff --git 
a/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/KillProcessExecutor.java
 
b/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/KillProcessExecutor.java
index 87eebb9e610..ddea96d89a1 100644
--- 
a/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/KillProcessExecutor.java
+++ 
b/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/KillProcessExecutor.java
@@ -18,12 +18,13 @@
 package org.apache.shardingsphere.proxy.backend.mysql.handler.admin.executor;
 
 import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.mode.process.event.KillProcessRequestEvent;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 import 
org.apache.shardingsphere.proxy.backend.handler.admin.executor.DatabaseAdminExecutor;
 import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
 import 
org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dal.MySQLKillStatement;
 
+import java.sql.SQLException;
+
 /**
  * Kill process executor.
  */
@@ -38,8 +39,8 @@ public final class KillProcessExecutor implements 
DatabaseAdminExecutor {
      * @param connectionSession connection session
      */
     @Override
-    public void execute(final ConnectionSession connectionSession) {
+    public void execute(final ConnectionSession connectionSession) throws 
SQLException {
         String processId = killStatement.getProcessId();
-        
ProxyContext.getInstance().getContextManager().getComputeNodeInstanceContext().getEventBusContext().post(new
 KillProcessRequestEvent(processId));
+        
ProxyContext.getInstance().getContextManager().getPersistServiceFacade().getProcessPersistService().killProcess(processId);
     }
 }
diff --git 
a/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutor.java
 
b/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutor.java
index 8aa4aabf976..d4907a68162 100644
--- 
a/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutor.java
+++ 
b/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutor.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.proxy.backend.mysql.handler.admin.executor;
 
-import com.google.common.eventbus.Subscribe;
 import lombok.Getter;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResultMetaData;
@@ -29,8 +28,6 @@ import 
org.apache.shardingsphere.infra.executor.sql.process.Process;
 import org.apache.shardingsphere.infra.merge.result.MergedResult;
 import 
org.apache.shardingsphere.infra.merge.result.impl.transparent.TransparentMergedResult;
 import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
-import 
org.apache.shardingsphere.mode.process.event.ShowProcessListRequestEvent;
-import 
org.apache.shardingsphere.mode.process.event.ShowProcessListResponseEvent;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 import 
org.apache.shardingsphere.proxy.backend.handler.admin.executor.DatabaseAdminQueryExecutor;
 import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
@@ -50,8 +47,6 @@ public final class ShowProcessListExecutor implements 
DatabaseAdminQueryExecutor
     
     private final boolean showFullProcesslist;
     
-    private Collection<Process> processes;
-    
     @Getter
     private QueryResultMetaData queryResultMetaData;
     
@@ -63,17 +58,6 @@ public final class ShowProcessListExecutor implements 
DatabaseAdminQueryExecutor
         
ProxyContext.getInstance().getContextManager().getComputeNodeInstanceContext().getEventBusContext().register(this);
     }
     
-    /**
-     * Receive and handle response event.
-     *
-     * @param event show process list response event
-     */
-    @SuppressWarnings("unused")
-    @Subscribe
-    public void receiveProcessListData(final ShowProcessListResponseEvent 
event) {
-        processes = event.getProcesses();
-    }
-    
     @Override
     public void execute(final ConnectionSession connectionSession) {
         queryResultMetaData = createQueryResultMetaData();
@@ -81,7 +65,7 @@ public final class ShowProcessListExecutor implements 
DatabaseAdminQueryExecutor
     }
     
     private QueryResult getQueryResult() {
-        
ProxyContext.getInstance().getContextManager().getComputeNodeInstanceContext().getEventBusContext().post(new
 ShowProcessListRequestEvent());
+        Collection<Process> processes = 
ProxyContext.getInstance().getContextManager().getPersistServiceFacade().getProcessPersistService().getProcessList();
         if (null == processes || processes.isEmpty()) {
             return new RawMemoryQueryResult(queryResultMetaData, 
Collections.emptyList());
         }
diff --git 
a/proxy/backend/type/mysql/src/test/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutorTest.java
 
b/proxy/backend/type/mysql/src/test/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutorTest.java
index 5bd4d57c12c..16ad4602956 100644
--- 
a/proxy/backend/type/mysql/src/test/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutorTest.java
+++ 
b/proxy/backend/type/mysql/src/test/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutorTest.java
@@ -28,9 +28,9 @@ import org.apache.shardingsphere.test.mock.AutoMockExtension;
 import org.apache.shardingsphere.test.mock.StaticMockSettings;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.internal.configuration.plugins.Plugins;
 
 import java.sql.SQLException;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -46,11 +46,11 @@ import static org.mockito.Mockito.when;
 class ShowProcessListExecutorTest {
     
     @Test
-    void assertExecute() throws SQLException, ReflectiveOperationException {
+    void assertExecute() throws SQLException {
         ContextManager contextManager = mock(ContextManager.class, 
RETURNS_DEEP_STUBS);
         
when(ProxyContext.getInstance().getContextManager()).thenReturn(contextManager);
+        
when(contextManager.getPersistServiceFacade().getProcessPersistService().getProcessList()).thenReturn(mockProcessList());
         ShowProcessListExecutor showProcessListExecutor = new 
ShowProcessListExecutor(false);
-        setupProcesses(showProcessListExecutor);
         showProcessListExecutor.execute(new 
ConnectionSession(mock(MySQLDatabaseType.class), new DefaultAttributeMap()));
         
assertThat(showProcessListExecutor.getQueryResultMetaData().getColumnCount(), 
is(8));
         MergedResult mergedResult = showProcessListExecutor.getMergedResult();
@@ -64,11 +64,10 @@ class ShowProcessListExecutorTest {
         }
     }
     
-    private void setupProcesses(final ShowProcessListExecutor 
showProcessListExecutor) throws ReflectiveOperationException {
+    private Collection<Process> mockProcessList() {
         Process process = new Process("f6c2336a-63ba-41bf-941e-2e3504eb2c80", 
1617939785160L,
                 "ALTER TABLE t_order ADD COLUMN a varchar(64) AFTER order_id", 
"foo_db", "root", "127.0.0.1", new AtomicInteger(2), new AtomicInteger(1), new 
AtomicBoolean(false),
                 new AtomicBoolean());
-        Plugins.getMemberAccessor().set(
-                
showProcessListExecutor.getClass().getDeclaredField("processes"), 
showProcessListExecutor, Collections.singleton(process));
+        return Collections.singleton(process);
     }
 }


Reply via email to