This is an automated email from the ASF dual-hosted git repository.
wuweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new b978498 Refactor ComputeNodeStateChangedWatcher (#12525)
b978498 is described below
commit b97849890b99027bbb91a74e926ca169a6a5ee95
Author: Haoran Meng <[email protected]>
AuthorDate: Fri Sep 17 18:37:01 2021 +0800
Refactor ComputeNodeStateChangedWatcher (#12525)
* Refactor ComputeNodeStateChangedWatcher
* Refactor ComputeNodeStateChangedWatcher
---
.../lock/service/LockRegistryService.java | 2 +-
.../registry/status/node/ComputeNode.java | 33 ++++++++++++++++++++++
.../registry/status/node/StatusNode.java | 30 ++++++++++++++++++--
.../watcher/ComputeNodeStateChangedWatcher.java | 11 ++++++--
.../registry/status/node/StatusNodeTest.java | 23 ++++++++++++++-
.../ComputeNodeStateChangedWatcherTest.java | 4 +--
6 files changed, 94 insertions(+), 9 deletions(-)
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/service/LockRegistryService.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/service/LockRegistryService.java
index b5f4b9f..77c3610 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/service/LockRegistryService.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/service/LockRegistryService.java
@@ -112,7 +112,7 @@ public final class LockRegistryService {
}
private boolean checkAck(final String lockName, final String ackValue,
final long timeoutMilliseconds) {
- Collection<String> onlineInstanceIds =
repository.getChildrenKeys(StatusNode.getComputeNodePath(ComputeNodeStatus.ONLINE));
+ Collection<String> onlineInstanceIds =
repository.getChildrenKeys(StatusNode.getComputeNodeStatusPath(ComputeNodeStatus.ONLINE));
long checkMilliseconds = timeoutMilliseconds;
while (checkMilliseconds > 0) {
long start = System.currentTimeMillis();
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/node/ComputeNode.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/node/ComputeNode.java
new file mode 100644
index 0000000..05a87ac
--- /dev/null
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/node/ComputeNode.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.node;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+/**
+ * Compute node.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class ComputeNode {
+
+ private final String status;
+
+ private final String instanceId;
+}
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/node/StatusNode.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/node/StatusNode.java
index 3f77a71..e80dfe3 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/node/StatusNode.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/node/StatusNode.java
@@ -17,15 +17,18 @@
package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.node;
+import com.google.common.base.Joiner;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.ComputeNodeStatus;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.StorageNodeStatus;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.schema.ClusterSchema;
+import java.util.Arrays;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import java.util.stream.Collectors;
/**
* Status node.
@@ -42,18 +45,27 @@ public final class StatusNode {
private static final String PRIVILEGE_NODE = "privilegenode";
/**
- * Get compute node path.
+ * Get compute node status path.
*
* @param status compute node status
* @return compute node path
*/
- public static String getComputeNodePath(final ComputeNodeStatus status) {
+ public static String getComputeNodeStatusPath(final ComputeNodeStatus
status) {
return String.join("/", "", ROOT_NODE, COMPUTE_NODE,
status.name().toLowerCase());
}
/**
* Get compute node path.
*
+ * @return compute node path
+ */
+ public static String getComputeNodePath() {
+ return String.join("/", "", ROOT_NODE, COMPUTE_NODE);
+ }
+
+ /**
+ * Get compute node path.
+ *
* @param status compute node status
* @param instanceId instance id
* @return compute node path
@@ -97,7 +109,7 @@ public final class StatusNode {
*
* @param status storage node status
* @param storageNodeFullPath storage node full path
- * @return found cluster schema
+ * @return cluster schema
*/
public static Optional<ClusterSchema> findClusterSchema(final
StorageNodeStatus status, final String storageNodeFullPath) {
Pattern pattern = Pattern.compile(getStorageNodePath() + "/" +
status.name().toLowerCase() + "/(\\S+)$", Pattern.CASE_INSENSITIVE);
@@ -113,4 +125,16 @@ public final class StatusNode {
public static String getPrivilegeNodePath() {
return String.join("/", "", ROOT_NODE, PRIVILEGE_NODE);
}
+
+ /**
+ * Find compute node.
+ *
+ * @param computeNodeFullPath compute node full path
+ * @return compute node
+ */
+ public static Optional<ComputeNode> findComputeNode(final String
computeNodeFullPath) {
+ String status =
Joiner.on("|").join(Arrays.stream(ComputeNodeStatus.values()).map(each ->
each.name().toLowerCase()).collect(Collectors.toList()));
+ Matcher matcher = Pattern.compile(getComputeNodePath() + "/(" + status
+ ")/(\\S+)$", Pattern.CASE_INSENSITIVE).matcher(computeNodeFullPath);
+ return matcher.find() ? Optional.of(new ComputeNode(matcher.group(1),
matcher.group(2))) : Optional.empty();
+ }
}
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/watcher/ComputeNodeStateChangedWatcher.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/watcher/ComputeNodeStateChangedWatcher.java
index 7fd38ad..2a777c7 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/watcher/ComputeNodeStateChangedWatcher.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/watcher/ComputeNodeStateChangedWatcher.java
@@ -20,6 +20,7 @@ package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.stat
import org.apache.shardingsphere.infra.state.StateEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.ComputeNodeStatus;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.node.ComputeNode;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.node.StatusNode;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
@@ -36,7 +37,7 @@ public final class ComputeNodeStateChangedWatcher implements
GovernanceWatcher<S
@Override
public Collection<String> getWatchingKeys() {
- return
Collections.singleton(StatusNode.getComputeNodePath(ComputeNodeStatus.CIRCUIT_BREAKER));
+ return Collections.singleton(StatusNode.getComputeNodePath());
}
@Override
@@ -46,6 +47,12 @@ public final class ComputeNodeStateChangedWatcher implements
GovernanceWatcher<S
@Override
public Optional<StateEvent> createGovernanceEvent(final DataChangedEvent
event) {
- return Optional.of(new StateEvent("CIRCUIT_BREAK", Type.ADDED ==
event.getType()));
+ Optional<ComputeNode> computeNode =
StatusNode.findComputeNode(event.getKey());
+ // TODO use enum to instead of CIRCUIT_BREAK
+ return computeNode.isPresent() &&
isCircuitBreakerStatus(computeNode.get()) ? Optional.of(new
StateEvent("CIRCUIT_BREAK", Type.ADDED == event.getType())) : Optional.empty();
+ }
+
+ private boolean isCircuitBreakerStatus(final ComputeNode computeNode) {
+ return
ComputeNodeStatus.CIRCUIT_BREAKER.name().equalsIgnoreCase(computeNode.getStatus());
}
}
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/node/StatusNodeTest.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/node/StatusNodeTest.java
index a402090..36ce29f 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/node/StatusNodeTest.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/node/StatusNodeTest.java
@@ -31,8 +31,13 @@ import static org.junit.Assert.assertTrue;
public final class StatusNodeTest {
@Test
+ public void assertGetComputeNodeStatusPath() {
+
assertThat(StatusNode.getComputeNodeStatusPath(ComputeNodeStatus.CIRCUIT_BREAKER),
is("/status/compute_nodes/circuit_breaker"));
+ }
+
+ @Test
public void assertGetComputeNodePath() {
-
assertThat(StatusNode.getComputeNodePath(ComputeNodeStatus.CIRCUIT_BREAKER),
is("/status/compute_nodes/circuit_breaker"));
+ assertThat(StatusNode.getComputeNodePath(),
is("/status/compute_nodes"));
}
@Test
@@ -67,4 +72,20 @@ public final class StatusNodeTest {
public void assertGetPrivilegeNodePath() {
assertThat(StatusNode.getPrivilegeNodePath(),
is("/status/privilegenode"));
}
+
+ @Test
+ public void assertFindCircuitBreakerComputeNode() {
+ Optional<ComputeNode> actual =
StatusNode.findComputeNode("/status/compute_nodes/circuit_breaker/127.0.0.1@3307");
+ assertTrue(actual.isPresent());
+ assertThat(actual.get().getStatus(), is("circuit_breaker"));
+ assertThat(actual.get().getInstanceId(), is("127.0.0.1@3307"));
+ }
+
+ @Test
+ public void assertFindOnlineComputeNode() {
+ Optional<ComputeNode> actual =
StatusNode.findComputeNode("/status/compute_nodes/online/127.0.0.1@3307");
+ assertTrue(actual.isPresent());
+ assertThat(actual.get().getStatus(), is("online"));
+ assertThat(actual.get().getInstanceId(), is("127.0.0.1@3307"));
+ }
}
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/watcher/ComputeNodeStateChangedWatcherTest.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/watcher/ComputeNodeStateChangedWatcherTest.java
index b928c95..743e720 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/watcher/ComputeNodeStateChangedWatcherTest.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/watcher/ComputeNodeStateChangedWatcherTest.java
@@ -31,14 +31,14 @@ public final class ComputeNodeStateChangedWatcherTest {
@Test
public void assertCreateEventWhenEnabled() {
- Optional<StateEvent> actual = new
ComputeNodeStateChangedWatcher().createGovernanceEvent(new
DataChangedEvent("/test_ds/circuit_breaker", "", Type.ADDED));
+ Optional<StateEvent> actual = new
ComputeNodeStateChangedWatcher().createGovernanceEvent(new
DataChangedEvent("/status/compute_nodes/circuit_breaker/127.0.0.1@3307", "",
Type.ADDED));
assertTrue(actual.isPresent());
assertTrue(actual.get().isOn());
}
@Test
public void assertCreateEventWhenDisabled() {
- Optional<StateEvent> actual = new
ComputeNodeStateChangedWatcher().createGovernanceEvent(new
DataChangedEvent("/test_ds/circuit_breaker", "", Type.DELETED));
+ Optional<StateEvent> actual = new
ComputeNodeStateChangedWatcher().createGovernanceEvent(new
DataChangedEvent("/status/compute_nodes/circuit_breaker/127.0.0.1@3307", "",
Type.DELETED));
assertTrue(actual.isPresent());
assertFalse(actual.get().isOn());
}