This is an automated email from the ASF dual-hosted git repository.

zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ed360aba3b Revert "Remove KillProcessCompletedEvent (#25454)" (#25455)
3ed360aba3b is described below

commit 3ed360aba3bf267bcb6d5e781c3819a8f9bfb3b0
Author: Liang Zhang <[email protected]>
AuthorDate: Thu May 4 22:48:01 2023 +0800

    Revert "Remove KillProcessCompletedEvent (#25454)" (#25455)
    
    This reverts commit 50965d51bbdc66e70589c1ed1f46bcfe19ee8db8.
---
 .../subscriber/ProcessListChangedSubscriber.java   | 10 +++++++
 .../compute/event/KillProcessCompletedEvent.java   | 32 ++++++++++++++++++++++
 .../watcher/ComputeNodeStateChangedWatcher.java    |  4 +++
 .../ProcessListChangedSubscriberTest.java          | 18 ++++++++++++
 4 files changed, 64 insertions(+)

diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriber.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriber.java
index 74a5933f2bb..0213e98375e 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriber.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriber.java
@@ -28,6 +28,7 @@ import 
org.apache.shardingsphere.metadata.persist.node.ProcessNode;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessCompletedEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ReportLocalProcessesCompletedEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListTriggerEvent;
 
@@ -99,6 +100,15 @@ public final class ProcessListChangedSubscriber {
             }
         }
         
registryCenter.getRepository().delete(ComputeNode.getProcessKillInstanceIdNodePath(event.getInstanceId(),
 event.getProcessId()));
+    }
+    
+    /**
+     * Complete to kill process.
+     *
+     * @param event kill process completed event
+     */
+    @Subscribe
+    public synchronized void completeToKillProcess(final 
KillProcessCompletedEvent event) {
         
ProcessOperationLockRegistry.getInstance().notify(event.getProcessId());
     }
 }
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/KillProcessCompletedEvent.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/KillProcessCompletedEvent.java
new file mode 100644
index 00000000000..4d130fd4859
--- /dev/null
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/KillProcessCompletedEvent.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
+
+/**
+ * Kill process completed event.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class KillProcessCompletedEvent implements GovernanceEvent {
+    
+    private final String processId;
+}
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
index 26b4df44399..44aea4d6bc4 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
@@ -29,6 +29,7 @@ import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.Gover
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.InstanceOfflineEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.InstanceOnlineEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessCompletedEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ReportLocalProcessesCompletedEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListTriggerEvent;
@@ -133,6 +134,9 @@ public final class ComputeNodeStateChangedWatcher 
implements GovernanceWatcher<G
         if (Type.ADDED == event.getType()) {
             return Optional.of(new KillProcessEvent(matcher.group(1), 
matcher.group(2)));
         }
+        if (Type.DELETED == event.getType()) {
+            return Optional.of(new 
KillProcessCompletedEvent(matcher.group(2)));
+        }
         return Optional.empty();
     }
     
diff --git 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
index 1270c2bf2d8..e57eac3c54b 100644
--- 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
+++ 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
@@ -36,6 +36,7 @@ import 
org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
 import 
org.apache.shardingsphere.mode.manager.cluster.ClusterContextManagerBuilder;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessCompletedEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ReportLocalProcessesCompletedEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListTriggerEvent;
 import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
@@ -141,6 +142,23 @@ class ProcessListChangedSubscriberTest {
         verify(repository).delete("/nodes/compute_nodes/process_kill/" + 
instanceId + ":foo_id");
     }
     
+    @Test
+    void assertCompleteToKillProcess() {
+        String processId = "foo_id";
+        long startMillis = System.currentTimeMillis();
+        Executors.newFixedThreadPool(1).submit(() -> {
+            try {
+                Thread.sleep(50L);
+            } catch (final InterruptedException ignored) {
+            }
+            subscriber.completeToKillProcess(new 
KillProcessCompletedEvent(processId));
+        });
+        waitUntilReleaseReady(processId);
+        long currentMillis = System.currentTimeMillis();
+        assertThat(currentMillis, greaterThanOrEqualTo(startMillis + 50L));
+        assertThat(currentMillis, lessThanOrEqualTo(startMillis + 5000L));
+    }
+    
     private static void waitUntilReleaseReady(final String lockId) {
         
ProcessOperationLockRegistry.getInstance().waitUntilReleaseReady(lockId, new 
ProcessOperationLockReleaseStrategy() {
             

Reply via email to