This is an automated email from the ASF dual-hosted git repository.

totalo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new bfeed589646 Refactor ProcessListChangedSubscriber and 
ProcessStandaloneSubscriber (#25430)
bfeed589646 is described below

commit bfeed58964609de78e999a7fe551a0cdf2f6a7d6
Author: Liang Zhang <[email protected]>
AuthorDate: Tue May 2 23:36:49 2023 +0800

    Refactor ProcessListChangedSubscriber and ProcessStandaloneSubscriber 
(#25430)
---
 ...llProcessIdEvent.java => KillProcessEvent.java} |  4 +-
 ...vent.java => KillProcessUnitCompleteEvent.java} |  4 +-
 ...vent.java => ShowProcessUnitCompleteEvent.java} |  4 +-
 .../watcher/ComputeNodeStateChangedWatcher.java    | 12 ++--
 .../subscriber/ProcessListChangedSubscriber.java   | 26 ++++----
 .../ProcessListChangedSubscriberTest.java          | 70 ++++++++++++++--------
 .../subscriber/ProcessStandaloneSubscriber.java    |  6 +-
 7 files changed, 74 insertions(+), 52 deletions(-)

diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/KillProcessIdEvent.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/KillProcessEvent.java
similarity index 92%
rename from 
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/KillProcessIdEvent.java
rename to 
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/KillProcessEvent.java
index 04052bc8467..95ccbf5e396 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/KillProcessIdEvent.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/KillProcessEvent.java
@@ -22,11 +22,11 @@ import lombok.RequiredArgsConstructor;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
 
 /**
- * Kill process id event.
+ * Kill process event.
  */
 @RequiredArgsConstructor
 @Getter
-public final class KillProcessIdEvent implements GovernanceEvent {
+public final class KillProcessEvent implements GovernanceEvent {
     
     private final String instanceId;
     
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/KillProcessIdUnitCompleteEvent.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/KillProcessUnitCompleteEvent.java
similarity index 90%
rename from 
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/KillProcessIdUnitCompleteEvent.java
rename to 
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/KillProcessUnitCompleteEvent.java
index 5bb052e1dc3..afbac0ab3a5 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/KillProcessIdUnitCompleteEvent.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/KillProcessUnitCompleteEvent.java
@@ -22,11 +22,11 @@ import lombok.RequiredArgsConstructor;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
 
 /**
- * Kill process id unit complete event.
+ * Kill process unit complete event.
  */
 @RequiredArgsConstructor
 @Getter
-public final class KillProcessIdUnitCompleteEvent implements GovernanceEvent {
+public final class KillProcessUnitCompleteEvent implements GovernanceEvent {
     
     private final String processId;
 }
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/ShowProcessListUnitCompleteEvent.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/ShowProcessUnitCompleteEvent.java
similarity index 90%
rename from 
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/ShowProcessListUnitCompleteEvent.java
rename to 
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/ShowProcessUnitCompleteEvent.java
index 7d8839de79d..7b8893584b2 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/ShowProcessListUnitCompleteEvent.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/ShowProcessUnitCompleteEvent.java
@@ -22,11 +22,11 @@ import lombok.RequiredArgsConstructor;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
 
 /**
- * Show processlist unit complete event.
+ * Show process unit complete event.
  */
 @RequiredArgsConstructor
 @Getter
-public final class ShowProcessListUnitCompleteEvent implements GovernanceEvent 
{
+public final class ShowProcessUnitCompleteEvent implements GovernanceEvent {
     
     private final String processId;
 }
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
index 71d007e61f5..186bce2ba33 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
@@ -27,11 +27,11 @@ import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.Gover
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.InstanceOfflineEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.InstanceOnlineEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessIdEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessIdUnitCompleteEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessUnitCompleteEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListTriggerEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListUnitCompleteEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessUnitCompleteEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.StateEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.WorkerIdEvent;
 import org.apache.shardingsphere.infra.instance.ComputeNodeData;
@@ -117,7 +117,7 @@ public final class ComputeNodeStateChangedWatcher 
implements GovernanceWatcher<G
             return Optional.of(new 
ShowProcessListTriggerEvent(matcher.group(1), matcher.group(2)));
         }
         if (Type.DELETED == event.getType()) {
-            return Optional.of(new 
ShowProcessListUnitCompleteEvent(matcher.group(2)));
+            return Optional.of(new 
ShowProcessUnitCompleteEvent(matcher.group(2)));
         }
         return Optional.empty();
     }
@@ -132,10 +132,10 @@ public final class ComputeNodeStateChangedWatcher 
implements GovernanceWatcher<G
             return Optional.empty();
         }
         if (Type.ADDED == event.getType()) {
-            return Optional.of(new KillProcessIdEvent(matcher.group(1), 
matcher.group(2)));
+            return Optional.of(new KillProcessEvent(matcher.group(1), 
matcher.group(2)));
         }
         if (Type.DELETED == event.getType()) {
-            return Optional.of(new 
KillProcessIdUnitCompleteEvent(matcher.group(2)));
+            return Optional.of(new 
KillProcessUnitCompleteEvent(matcher.group(2)));
         }
         return Optional.empty();
     }
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriber.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriber.java
index 04ca8a67ce0..249f554057a 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriber.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriber.java
@@ -27,10 +27,10 @@ import 
org.apache.shardingsphere.metadata.persist.node.ComputeNode;
 import org.apache.shardingsphere.metadata.persist.node.ProcessNode;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessIdEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessIdUnitCompleteEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessUnitCompleteEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListTriggerEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListUnitCompleteEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessUnitCompleteEvent;
 
 import java.sql.SQLException;
 import java.sql.Statement;
@@ -55,12 +55,12 @@ public final class ProcessListChangedSubscriber {
     }
     
     /**
-     * Trigger show process list.
+     * Report local processes.
      *
      * @param event show process list trigger event
      */
     @Subscribe
-    public synchronized void triggerShowProcessList(final 
ShowProcessListTriggerEvent event) {
+    public synchronized void reportLocalProcesses(final 
ShowProcessListTriggerEvent event) {
         if 
(!event.getInstanceId().equals(contextManager.getInstanceContext().getInstance().getMetaData().getId()))
 {
             return;
         }
@@ -73,13 +73,13 @@ public final class ProcessListChangedSubscriber {
     }
     
     /**
-     * Kill process id.
+     * Kill process.
      *
      * @param event kill process id event
      * @throws SQLException SQL exception
      */
     @Subscribe
-    public synchronized void killProcessId(final KillProcessIdEvent event) 
throws SQLException {
+    public synchronized void killProcess(final KillProcessEvent event) throws 
SQLException {
         if 
(!event.getInstanceId().equals(contextManager.getInstanceContext().getInstance().getMetaData().getId()))
 {
             return;
         }
@@ -93,12 +93,12 @@ public final class ProcessListChangedSubscriber {
     }
     
     /**
-     * Complete unit show process list.
+     * Complete show process unit.
      *
-     * @param event show process list unit complete event
+     * @param event show process unit complete event
      */
     @Subscribe
-    public synchronized void completeUnitShowProcessList(final 
ShowProcessListUnitCompleteEvent event) {
+    public synchronized void completeShowProcessUnit(final 
ShowProcessUnitCompleteEvent event) {
         ShowProcessListLock lock = 
ProcessRegistry.getInstance().getLocks().get(event.getProcessId());
         if (null != lock) {
             lock.doNotify();
@@ -106,12 +106,12 @@ public final class ProcessListChangedSubscriber {
     }
     
     /**
-     * Complete unit kill process id.
+     * Complete to kill process unit.
      *
-     * @param event kill process id unit complete event
+     * @param event kill process unit complete event
      */
     @Subscribe
-    public synchronized void completeUnitKillProcessId(final 
KillProcessIdUnitCompleteEvent event) {
+    public synchronized void completeKillProcessUnit(final 
KillProcessUnitCompleteEvent event) {
         ShowProcessListLock lock = 
ProcessRegistry.getInstance().getLocks().get(event.getProcessId());
         if (null != lock) {
             lock.doNotify();
diff --git 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriberTest.java
 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriberTest.java
index 56f32148f90..eaf306fa601 100644
--- 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriberTest.java
+++ 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriberTest.java
@@ -34,9 +34,10 @@ import org.apache.shardingsphere.mode.manager.ContextManager;
 import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
 import 
org.apache.shardingsphere.mode.manager.cluster.ClusterContextManagerBuilder;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessIdEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessUnitCompleteEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListTriggerEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListUnitCompleteEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessUnitCompleteEvent;
 import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
@@ -99,45 +100,66 @@ class ProcessListChangedSubscriberTest {
     }
     
     @Test
-    void assertCompleteUnitShowProcessList() {
+    void assertReportLocalProcesses() throws ReflectiveOperationException {
+        String instanceId = 
contextManager.getInstanceContext().getInstance().getMetaData().getId();
+        ProcessRegistry.getInstance().putProcessContext("foo_execution_id", 
mock(ProcessContext.class));
+        String processId = "foo_process_id";
+        subscriber.reportLocalProcesses(new 
ShowProcessListTriggerEvent(instanceId, processId));
+        ClusterPersistRepository repository = ((RegistryCenter) 
Plugins.getMemberAccessor().get(ProcessListChangedSubscriber.class.getDeclaredField("registryCenter"),
 subscriber)).getRepository();
+        verify(repository).persist("/execution_nodes/foo_process_id/" + 
instanceId,
+                "contexts:" + System.lineSeparator() + "- completedUnitCount: 
0\n  idle: false\n  startTimeMillis: 0\n  totalUnitCount: 0" + 
System.lineSeparator());
+        verify(repository).delete("/nodes/compute_nodes/process_trigger/" + 
instanceId + ":foo_process_id");
+    }
+    
+    @Test
+    void assertKillProcess() throws SQLException, ReflectiveOperationException 
{
+        String instanceId = 
contextManager.getInstanceContext().getInstance().getMetaData().getId();
+        String processId = "foo_process_id";
+        subscriber.killProcess(new KillProcessEvent(instanceId, processId));
+        ClusterPersistRepository repository = ((RegistryCenter) 
Plugins.getMemberAccessor().get(ProcessListChangedSubscriber.class.getDeclaredField("registryCenter"),
 subscriber)).getRepository();
+        verify(repository).delete("/nodes/compute_nodes/process_kill/" + 
instanceId + ":foo_process_id");
+    }
+    
+    @Test
+    void assertCompleteShowProcessUnit() {
         String processId = "foo_process_id";
         ShowProcessListLock lock = new ShowProcessListLock();
         ProcessRegistry.getInstance().getLocks().put(processId, lock);
-        long startTime = System.currentTimeMillis();
+        long startMillis = System.currentTimeMillis();
         ExecutorService executorService = Executors.newFixedThreadPool(1);
         executorService.submit(() -> {
             try {
                 Thread.sleep(50L);
             } catch (final InterruptedException ignored) {
             }
-            subscriber.completeUnitShowProcessList(new 
ShowProcessListUnitCompleteEvent(processId));
+            subscriber.completeShowProcessUnit(new 
ShowProcessUnitCompleteEvent(processId));
         });
         lockAndAwaitDefaultTime(lock);
         long currentTime = System.currentTimeMillis();
-        assertTrue(currentTime >= startTime + 50L);
-        assertTrue(currentTime <= startTime + 5000L);
+        assertTrue(currentTime >= startMillis + 50L);
+        assertTrue(currentTime <= startMillis + 5000L);
         ProcessRegistry.getInstance().getLocks().remove(processId);
     }
     
     @Test
-    void assertTriggerShowProcessList() throws ReflectiveOperationException {
-        String instanceId = 
contextManager.getInstanceContext().getInstance().getMetaData().getId();
-        ProcessRegistry.getInstance().putProcessContext("foo_execution_id", 
mock(ProcessContext.class));
-        String processId = "foo_process_id";
-        subscriber.triggerShowProcessList(new 
ShowProcessListTriggerEvent(instanceId, processId));
-        ClusterPersistRepository repository = ((RegistryCenter) 
Plugins.getMemberAccessor().get(ProcessListChangedSubscriber.class.getDeclaredField("registryCenter"),
 subscriber)).getRepository();
-        verify(repository).persist("/execution_nodes/foo_process_id/" + 
instanceId,
-                "contexts:" + System.lineSeparator() + "- completedUnitCount: 
0\n  idle: false\n  startTimeMillis: 0\n  totalUnitCount: 0" + 
System.lineSeparator());
-        verify(repository).delete("/nodes/compute_nodes/process_trigger/" + 
instanceId + ":foo_process_id");
-    }
-    
-    @Test
-    void assertKillProcessId() throws SQLException, 
ReflectiveOperationException {
-        String instanceId = 
contextManager.getInstanceContext().getInstance().getMetaData().getId();
+    void assertCompleteKillProcessUnit() {
         String processId = "foo_process_id";
-        subscriber.killProcessId(new KillProcessIdEvent(instanceId, 
processId));
-        ClusterPersistRepository repository = ((RegistryCenter) 
Plugins.getMemberAccessor().get(ProcessListChangedSubscriber.class.getDeclaredField("registryCenter"),
 subscriber)).getRepository();
-        verify(repository).delete("/nodes/compute_nodes/process_kill/" + 
instanceId + ":foo_process_id");
+        ShowProcessListLock lock = new ShowProcessListLock();
+        ProcessRegistry.getInstance().getLocks().put(processId, lock);
+        long startMillis = System.currentTimeMillis();
+        ExecutorService executorService = Executors.newFixedThreadPool(1);
+        executorService.submit(() -> {
+            try {
+                Thread.sleep(50L);
+            } catch (final InterruptedException ignored) {
+            }
+            subscriber.completeKillProcessUnit(new 
KillProcessUnitCompleteEvent(processId));
+        });
+        lockAndAwaitDefaultTime(lock);
+        long currentTime = System.currentTimeMillis();
+        assertTrue(currentTime >= startMillis + 50L);
+        assertTrue(currentTime <= startMillis + 5000L);
+        ProcessRegistry.getInstance().getLocks().remove(processId);
     }
     
     private void lockAndAwaitDefaultTime(final ShowProcessListLock lock) {
diff --git 
a/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/ProcessStandaloneSubscriber.java
 
b/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/ProcessStandaloneSubscriber.java
index f49a58490fa..ef96271dbb6 100644
--- 
a/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/ProcessStandaloneSubscriber.java
+++ 
b/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/ProcessStandaloneSubscriber.java
@@ -50,18 +50,18 @@ public final class ProcessStandaloneSubscriber {
     /**
      * Load show process list data.
      *
-     * @param event get children request event.
+     * @param event get children request event
      */
     @Subscribe
     public void loadShowProcessListData(final ShowProcessListRequestEvent 
event) {
         YamlProcessListContexts yamlContexts = 
swapper.swapToYamlConfiguration(ProcessRegistry.getInstance().getAllProcessContexts());
-        eventBusContext.post(new 
ShowProcessListResponseEvent(yamlContexts.getContexts().isEmpty() ? 
Collections.emptyList() : 
Collections.singleton(YamlEngine.marshal(yamlContexts))));
+        eventBusContext.post(new 
ShowProcessListResponseEvent(Collections.singleton(YamlEngine.marshal(yamlContexts))));
     }
     
     /**
      * Kill process.
      *
-     * @param event kill process request event.
+     * @param event kill process request event
      * @throws SQLException SQL exception
      */
     @Subscribe

Reply via email to