This is an automated email from the ASF dual-hosted git repository.
totalo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new bfeed589646 Refactor ProcessListChangedSubscriber and
ProcessStandaloneSubscriber (#25430)
bfeed589646 is described below
commit bfeed58964609de78e999a7fe551a0cdf2f6a7d6
Author: Liang Zhang <[email protected]>
AuthorDate: Tue May 2 23:36:49 2023 +0800
Refactor ProcessListChangedSubscriber and ProcessStandaloneSubscriber
(#25430)
---
...llProcessIdEvent.java => KillProcessEvent.java} | 4 +-
...vent.java => KillProcessUnitCompleteEvent.java} | 4 +-
...vent.java => ShowProcessUnitCompleteEvent.java} | 4 +-
.../watcher/ComputeNodeStateChangedWatcher.java | 12 ++--
.../subscriber/ProcessListChangedSubscriber.java | 26 ++++----
.../ProcessListChangedSubscriberTest.java | 70 ++++++++++++++--------
.../subscriber/ProcessStandaloneSubscriber.java | 6 +-
7 files changed, 74 insertions(+), 52 deletions(-)
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/KillProcessIdEvent.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/KillProcessEvent.java
similarity index 92%
rename from
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/KillProcessIdEvent.java
rename to
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/KillProcessEvent.java
index 04052bc8467..95ccbf5e396 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/KillProcessIdEvent.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/KillProcessEvent.java
@@ -22,11 +22,11 @@ import lombok.RequiredArgsConstructor;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
/**
- * Kill process id event.
+ * Kill process event.
*/
@RequiredArgsConstructor
@Getter
-public final class KillProcessIdEvent implements GovernanceEvent {
+public final class KillProcessEvent implements GovernanceEvent {
private final String instanceId;
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/KillProcessIdUnitCompleteEvent.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/KillProcessUnitCompleteEvent.java
similarity index 90%
rename from
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/KillProcessIdUnitCompleteEvent.java
rename to
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/KillProcessUnitCompleteEvent.java
index 5bb052e1dc3..afbac0ab3a5 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/KillProcessIdUnitCompleteEvent.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/KillProcessUnitCompleteEvent.java
@@ -22,11 +22,11 @@ import lombok.RequiredArgsConstructor;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
/**
- * Kill process id unit complete event.
+ * Kill process unit complete event.
*/
@RequiredArgsConstructor
@Getter
-public final class KillProcessIdUnitCompleteEvent implements GovernanceEvent {
+public final class KillProcessUnitCompleteEvent implements GovernanceEvent {
private final String processId;
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/ShowProcessListUnitCompleteEvent.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/ShowProcessUnitCompleteEvent.java
similarity index 90%
rename from
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/ShowProcessListUnitCompleteEvent.java
rename to
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/ShowProcessUnitCompleteEvent.java
index 7d8839de79d..7b8893584b2 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/ShowProcessListUnitCompleteEvent.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/ShowProcessUnitCompleteEvent.java
@@ -22,11 +22,11 @@ import lombok.RequiredArgsConstructor;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
/**
- * Show processlist unit complete event.
+ * Show process unit complete event.
*/
@RequiredArgsConstructor
@Getter
-public final class ShowProcessListUnitCompleteEvent implements GovernanceEvent
{
+public final class ShowProcessUnitCompleteEvent implements GovernanceEvent {
private final String processId;
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
index 71d007e61f5..186bce2ba33 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
@@ -27,11 +27,11 @@ import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.Gover
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.InstanceOfflineEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.InstanceOnlineEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessIdEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessIdUnitCompleteEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessUnitCompleteEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListTriggerEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListUnitCompleteEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessUnitCompleteEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.StateEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.WorkerIdEvent;
import org.apache.shardingsphere.infra.instance.ComputeNodeData;
@@ -117,7 +117,7 @@ public final class ComputeNodeStateChangedWatcher
implements GovernanceWatcher<G
return Optional.of(new
ShowProcessListTriggerEvent(matcher.group(1), matcher.group(2)));
}
if (Type.DELETED == event.getType()) {
- return Optional.of(new
ShowProcessListUnitCompleteEvent(matcher.group(2)));
+ return Optional.of(new
ShowProcessUnitCompleteEvent(matcher.group(2)));
}
return Optional.empty();
}
@@ -132,10 +132,10 @@ public final class ComputeNodeStateChangedWatcher
implements GovernanceWatcher<G
return Optional.empty();
}
if (Type.ADDED == event.getType()) {
- return Optional.of(new KillProcessIdEvent(matcher.group(1),
matcher.group(2)));
+ return Optional.of(new KillProcessEvent(matcher.group(1),
matcher.group(2)));
}
if (Type.DELETED == event.getType()) {
- return Optional.of(new
KillProcessIdUnitCompleteEvent(matcher.group(2)));
+ return Optional.of(new
KillProcessUnitCompleteEvent(matcher.group(2)));
}
return Optional.empty();
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriber.java
index 04ca8a67ce0..249f554057a 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriber.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriber.java
@@ -27,10 +27,10 @@ import
org.apache.shardingsphere.metadata.persist.node.ComputeNode;
import org.apache.shardingsphere.metadata.persist.node.ProcessNode;
import org.apache.shardingsphere.mode.manager.ContextManager;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessIdEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessIdUnitCompleteEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessUnitCompleteEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListTriggerEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListUnitCompleteEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessUnitCompleteEvent;
import java.sql.SQLException;
import java.sql.Statement;
@@ -55,12 +55,12 @@ public final class ProcessListChangedSubscriber {
}
/**
- * Trigger show process list.
+ * Report local processes.
*
* @param event show process list trigger event
*/
@Subscribe
- public synchronized void triggerShowProcessList(final
ShowProcessListTriggerEvent event) {
+ public synchronized void reportLocalProcesses(final
ShowProcessListTriggerEvent event) {
if
(!event.getInstanceId().equals(contextManager.getInstanceContext().getInstance().getMetaData().getId()))
{
return;
}
@@ -73,13 +73,13 @@ public final class ProcessListChangedSubscriber {
}
/**
- * Kill process id.
+ * Kill process.
*
* @param event kill process id event
* @throws SQLException SQL exception
*/
@Subscribe
- public synchronized void killProcessId(final KillProcessIdEvent event)
throws SQLException {
+ public synchronized void killProcess(final KillProcessEvent event) throws
SQLException {
if
(!event.getInstanceId().equals(contextManager.getInstanceContext().getInstance().getMetaData().getId()))
{
return;
}
@@ -93,12 +93,12 @@ public final class ProcessListChangedSubscriber {
}
/**
- * Complete unit show process list.
+ * Complete show process unit.
*
- * @param event show process list unit complete event
+ * @param event show process unit complete event
*/
@Subscribe
- public synchronized void completeUnitShowProcessList(final
ShowProcessListUnitCompleteEvent event) {
+ public synchronized void completeShowProcessUnit(final
ShowProcessUnitCompleteEvent event) {
ShowProcessListLock lock =
ProcessRegistry.getInstance().getLocks().get(event.getProcessId());
if (null != lock) {
lock.doNotify();
@@ -106,12 +106,12 @@ public final class ProcessListChangedSubscriber {
}
/**
- * Complete unit kill process id.
+ * Complete to kill process unit.
*
- * @param event kill process id unit complete event
+ * @param event kill process unit complete event
*/
@Subscribe
- public synchronized void completeUnitKillProcessId(final
KillProcessIdUnitCompleteEvent event) {
+ public synchronized void completeKillProcessUnit(final
KillProcessUnitCompleteEvent event) {
ShowProcessListLock lock =
ProcessRegistry.getInstance().getLocks().get(event.getProcessId());
if (null != lock) {
lock.doNotify();
diff --git
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriberTest.java
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriberTest.java
index 56f32148f90..eaf306fa601 100644
---
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriberTest.java
+++
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriberTest.java
@@ -34,9 +34,10 @@ import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
import
org.apache.shardingsphere.mode.manager.cluster.ClusterContextManagerBuilder;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessIdEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessUnitCompleteEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListTriggerEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListUnitCompleteEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessUnitCompleteEvent;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
@@ -99,45 +100,66 @@ class ProcessListChangedSubscriberTest {
}
@Test
- void assertCompleteUnitShowProcessList() {
+ void assertReportLocalProcesses() throws ReflectiveOperationException {
+ String instanceId =
contextManager.getInstanceContext().getInstance().getMetaData().getId();
+ ProcessRegistry.getInstance().putProcessContext("foo_execution_id",
mock(ProcessContext.class));
+ String processId = "foo_process_id";
+ subscriber.reportLocalProcesses(new
ShowProcessListTriggerEvent(instanceId, processId));
+ ClusterPersistRepository repository = ((RegistryCenter)
Plugins.getMemberAccessor().get(ProcessListChangedSubscriber.class.getDeclaredField("registryCenter"),
subscriber)).getRepository();
+ verify(repository).persist("/execution_nodes/foo_process_id/" +
instanceId,
+ "contexts:" + System.lineSeparator() + "- completedUnitCount:
0\n idle: false\n startTimeMillis: 0\n totalUnitCount: 0" +
System.lineSeparator());
+ verify(repository).delete("/nodes/compute_nodes/process_trigger/" +
instanceId + ":foo_process_id");
+ }
+
+ @Test
+ void assertKillProcess() throws SQLException, ReflectiveOperationException
{
+ String instanceId =
contextManager.getInstanceContext().getInstance().getMetaData().getId();
+ String processId = "foo_process_id";
+ subscriber.killProcess(new KillProcessEvent(instanceId, processId));
+ ClusterPersistRepository repository = ((RegistryCenter)
Plugins.getMemberAccessor().get(ProcessListChangedSubscriber.class.getDeclaredField("registryCenter"),
subscriber)).getRepository();
+ verify(repository).delete("/nodes/compute_nodes/process_kill/" +
instanceId + ":foo_process_id");
+ }
+
+ @Test
+ void assertCompleteShowProcessUnit() {
String processId = "foo_process_id";
ShowProcessListLock lock = new ShowProcessListLock();
ProcessRegistry.getInstance().getLocks().put(processId, lock);
- long startTime = System.currentTimeMillis();
+ long startMillis = System.currentTimeMillis();
ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.submit(() -> {
try {
Thread.sleep(50L);
} catch (final InterruptedException ignored) {
}
- subscriber.completeUnitShowProcessList(new
ShowProcessListUnitCompleteEvent(processId));
+ subscriber.completeShowProcessUnit(new
ShowProcessUnitCompleteEvent(processId));
});
lockAndAwaitDefaultTime(lock);
long currentTime = System.currentTimeMillis();
- assertTrue(currentTime >= startTime + 50L);
- assertTrue(currentTime <= startTime + 5000L);
+ assertTrue(currentTime >= startMillis + 50L);
+ assertTrue(currentTime <= startMillis + 5000L);
ProcessRegistry.getInstance().getLocks().remove(processId);
}
@Test
- void assertTriggerShowProcessList() throws ReflectiveOperationException {
- String instanceId =
contextManager.getInstanceContext().getInstance().getMetaData().getId();
- ProcessRegistry.getInstance().putProcessContext("foo_execution_id",
mock(ProcessContext.class));
- String processId = "foo_process_id";
- subscriber.triggerShowProcessList(new
ShowProcessListTriggerEvent(instanceId, processId));
- ClusterPersistRepository repository = ((RegistryCenter)
Plugins.getMemberAccessor().get(ProcessListChangedSubscriber.class.getDeclaredField("registryCenter"),
subscriber)).getRepository();
- verify(repository).persist("/execution_nodes/foo_process_id/" +
instanceId,
- "contexts:" + System.lineSeparator() + "- completedUnitCount:
0\n idle: false\n startTimeMillis: 0\n totalUnitCount: 0" +
System.lineSeparator());
- verify(repository).delete("/nodes/compute_nodes/process_trigger/" +
instanceId + ":foo_process_id");
- }
-
- @Test
- void assertKillProcessId() throws SQLException,
ReflectiveOperationException {
- String instanceId =
contextManager.getInstanceContext().getInstance().getMetaData().getId();
+ void assertCompleteKillProcessUnit() {
String processId = "foo_process_id";
- subscriber.killProcessId(new KillProcessIdEvent(instanceId,
processId));
- ClusterPersistRepository repository = ((RegistryCenter)
Plugins.getMemberAccessor().get(ProcessListChangedSubscriber.class.getDeclaredField("registryCenter"),
subscriber)).getRepository();
- verify(repository).delete("/nodes/compute_nodes/process_kill/" +
instanceId + ":foo_process_id");
+ ShowProcessListLock lock = new ShowProcessListLock();
+ ProcessRegistry.getInstance().getLocks().put(processId, lock);
+ long startMillis = System.currentTimeMillis();
+ ExecutorService executorService = Executors.newFixedThreadPool(1);
+ executorService.submit(() -> {
+ try {
+ Thread.sleep(50L);
+ } catch (final InterruptedException ignored) {
+ }
+ subscriber.completeKillProcessUnit(new
KillProcessUnitCompleteEvent(processId));
+ });
+ lockAndAwaitDefaultTime(lock);
+ long currentTime = System.currentTimeMillis();
+ assertTrue(currentTime >= startMillis + 50L);
+ assertTrue(currentTime <= startMillis + 5000L);
+ ProcessRegistry.getInstance().getLocks().remove(processId);
}
private void lockAndAwaitDefaultTime(final ShowProcessListLock lock) {
diff --git
a/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/ProcessStandaloneSubscriber.java
b/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/ProcessStandaloneSubscriber.java
index f49a58490fa..ef96271dbb6 100644
---
a/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/ProcessStandaloneSubscriber.java
+++
b/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/ProcessStandaloneSubscriber.java
@@ -50,18 +50,18 @@ public final class ProcessStandaloneSubscriber {
/**
* Load show process list data.
*
- * @param event get children request event.
+ * @param event get children request event
*/
@Subscribe
public void loadShowProcessListData(final ShowProcessListRequestEvent
event) {
YamlProcessListContexts yamlContexts =
swapper.swapToYamlConfiguration(ProcessRegistry.getInstance().getAllProcessContexts());
- eventBusContext.post(new
ShowProcessListResponseEvent(yamlContexts.getContexts().isEmpty() ?
Collections.emptyList() :
Collections.singleton(YamlEngine.marshal(yamlContexts))));
+ eventBusContext.post(new
ShowProcessListResponseEvent(Collections.singleton(YamlEngine.marshal(yamlContexts))));
}
/**
* Kill process.
*
- * @param event kill process request event.
+ * @param event kill process request event
* @throws SQLException SQL exception
*/
@Subscribe