This is an automated email from the ASF dual-hosted git repository.
zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 3ed360aba3b Revert "Remove KillProcessCompletedEvent (#25454)" (#25455)
3ed360aba3b is described below
commit 3ed360aba3bf267bcb6d5e781c3819a8f9bfb3b0
Author: Liang Zhang <[email protected]>
AuthorDate: Thu May 4 22:48:01 2023 +0800
Revert "Remove KillProcessCompletedEvent (#25454)" (#25455)
This reverts commit 50965d51bbdc66e70589c1ed1f46bcfe19ee8db8.
---
.../subscriber/ProcessListChangedSubscriber.java | 10 +++++++
.../compute/event/KillProcessCompletedEvent.java | 32 ++++++++++++++++++++++
.../watcher/ComputeNodeStateChangedWatcher.java | 4 +++
.../ProcessListChangedSubscriberTest.java | 18 ++++++++++++
4 files changed, 64 insertions(+)
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriber.java
index 74a5933f2bb..0213e98375e 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriber.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriber.java
@@ -28,6 +28,7 @@ import
org.apache.shardingsphere.metadata.persist.node.ProcessNode;
import org.apache.shardingsphere.mode.manager.ContextManager;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessCompletedEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ReportLocalProcessesCompletedEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListTriggerEvent;
@@ -99,6 +100,15 @@ public final class ProcessListChangedSubscriber {
}
}
registryCenter.getRepository().delete(ComputeNode.getProcessKillInstanceIdNodePath(event.getInstanceId(),
event.getProcessId()));
+ }
+
+ /**
+ * Complete to kill process.
+ *
+ * @param event kill process completed event
+ */
+ @Subscribe
+ public synchronized void completeToKillProcess(final
KillProcessCompletedEvent event) {
ProcessOperationLockRegistry.getInstance().notify(event.getProcessId());
}
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/KillProcessCompletedEvent.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/KillProcessCompletedEvent.java
new file mode 100644
index 00000000000..4d130fd4859
--- /dev/null
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/KillProcessCompletedEvent.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
+
+/**
+ * Kill process completed event.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class KillProcessCompletedEvent implements GovernanceEvent {
+
+ private final String processId;
+}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
index 26b4df44399..44aea4d6bc4 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
@@ -29,6 +29,7 @@ import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.Gover
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.InstanceOfflineEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.InstanceOnlineEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessCompletedEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ReportLocalProcessesCompletedEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListTriggerEvent;
@@ -133,6 +134,9 @@ public final class ComputeNodeStateChangedWatcher
implements GovernanceWatcher<G
if (Type.ADDED == event.getType()) {
return Optional.of(new KillProcessEvent(matcher.group(1),
matcher.group(2)));
}
+ if (Type.DELETED == event.getType()) {
+ return Optional.of(new
KillProcessCompletedEvent(matcher.group(2)));
+ }
return Optional.empty();
}
diff --git
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
index 1270c2bf2d8..e57eac3c54b 100644
---
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
+++
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
@@ -36,6 +36,7 @@ import
org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
import
org.apache.shardingsphere.mode.manager.cluster.ClusterContextManagerBuilder;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessCompletedEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ReportLocalProcessesCompletedEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListTriggerEvent;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
@@ -141,6 +142,23 @@ class ProcessListChangedSubscriberTest {
verify(repository).delete("/nodes/compute_nodes/process_kill/" +
instanceId + ":foo_id");
}
+ @Test
+ void assertCompleteToKillProcess() {
+ String processId = "foo_id";
+ long startMillis = System.currentTimeMillis();
+ Executors.newFixedThreadPool(1).submit(() -> {
+ try {
+ Thread.sleep(50L);
+ } catch (final InterruptedException ignored) {
+ }
+ subscriber.completeToKillProcess(new
KillProcessCompletedEvent(processId));
+ });
+ waitUntilReleaseReady(processId);
+ long currentMillis = System.currentTimeMillis();
+ assertThat(currentMillis, greaterThanOrEqualTo(startMillis + 50L));
+ assertThat(currentMillis, lessThanOrEqualTo(startMillis + 5000L));
+ }
+
private static void waitUntilReleaseReady(final String lockId) {
ProcessOperationLockRegistry.getInstance().waitUntilReleaseReady(lockId, new
ProcessOperationLockReleaseStrategy() {