This is an automated email from the ASF dual-hosted git repository.

wuweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new b978498  Refactor ComputeNodeStateChangedWatcher (#12525)
b978498 is described below

commit b97849890b99027bbb91a74e926ca169a6a5ee95
Author: Haoran Meng <[email protected]>
AuthorDate: Fri Sep 17 18:37:01 2021 +0800

    Refactor ComputeNodeStateChangedWatcher (#12525)
    
    * Refactor ComputeNodeStateChangedWatcher
    
    * Refactor ComputeNodeStateChangedWatcher
---
 .../lock/service/LockRegistryService.java          |  2 +-
 .../registry/status/node/ComputeNode.java          | 33 ++++++++++++++++++++++
 .../registry/status/node/StatusNode.java           | 30 ++++++++++++++++++--
 .../watcher/ComputeNodeStateChangedWatcher.java    | 11 ++++++--
 .../registry/status/node/StatusNodeTest.java       | 23 ++++++++++++++-
 .../ComputeNodeStateChangedWatcherTest.java        |  4 +--
 6 files changed, 94 insertions(+), 9 deletions(-)

diff --git 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/service/LockRegistryService.java
 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/service/LockRegistryService.java
index b5f4b9f..77c3610 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/service/LockRegistryService.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/service/LockRegistryService.java
@@ -112,7 +112,7 @@ public final class LockRegistryService {
     }
     
     private boolean checkAck(final String lockName, final String ackValue, 
final long timeoutMilliseconds) {
-        Collection<String> onlineInstanceIds = 
repository.getChildrenKeys(StatusNode.getComputeNodePath(ComputeNodeStatus.ONLINE));
+        Collection<String> onlineInstanceIds = 
repository.getChildrenKeys(StatusNode.getComputeNodeStatusPath(ComputeNodeStatus.ONLINE));
         long checkMilliseconds = timeoutMilliseconds;
         while (checkMilliseconds > 0) {
             long start = System.currentTimeMillis();
diff --git 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/node/ComputeNode.java
 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/node/ComputeNode.java
new file mode 100644
index 0000000..05a87ac
--- /dev/null
+++ 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/node/ComputeNode.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.node;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+/**
+ * Compute node.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class ComputeNode {
+    
+    private final String status;
+    
+    private final String instanceId;
+}
diff --git 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/node/StatusNode.java
 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/node/StatusNode.java
index 3f77a71..e80dfe3 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/node/StatusNode.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/node/StatusNode.java
@@ -17,15 +17,18 @@
 
 package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.node;
 
+import com.google.common.base.Joiner;
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.ComputeNodeStatus;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.StorageNodeStatus;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.schema.ClusterSchema;
 
+import java.util.Arrays;
 import java.util.Optional;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 /**
  * Status node.
@@ -42,18 +45,27 @@ public final class StatusNode {
     private static final String PRIVILEGE_NODE = "privilegenode";
     
     /**
-     * Get compute node path.
+     * Get compute node status path.
      *
      * @param status compute node status
      * @return compute node path
      */
-    public static String getComputeNodePath(final ComputeNodeStatus status) {
+    public static String getComputeNodeStatusPath(final ComputeNodeStatus 
status) {
         return String.join("/", "", ROOT_NODE, COMPUTE_NODE, 
status.name().toLowerCase());
     }
     
     /**
      * Get compute node path.
      *
+     * @return compute node path
+     */
+    public static String getComputeNodePath() {
+        return String.join("/", "", ROOT_NODE, COMPUTE_NODE);
+    }
+    
+    /**
+     * Get compute node path.
+     *
      * @param status compute node status
      * @param instanceId instance id
      * @return compute node path
@@ -97,7 +109,7 @@ public final class StatusNode {
      *
      * @param status storage node status
      * @param storageNodeFullPath storage node full path
-     * @return found cluster schema
+     * @return cluster schema
      */
     public static Optional<ClusterSchema> findClusterSchema(final 
StorageNodeStatus status, final String storageNodeFullPath) {
         Pattern pattern = Pattern.compile(getStorageNodePath() + "/" + 
status.name().toLowerCase() + "/(\\S+)$", Pattern.CASE_INSENSITIVE);
@@ -113,4 +125,16 @@ public final class StatusNode {
     public static String getPrivilegeNodePath() {
         return String.join("/", "", ROOT_NODE, PRIVILEGE_NODE);
     }
+    
+    /**
+     * Find compute node.
+     *
+     * @param computeNodeFullPath compute node full path
+     * @return compute node
+     */
+    public static Optional<ComputeNode> findComputeNode(final String 
computeNodeFullPath) {
+        String status = 
Joiner.on("|").join(Arrays.stream(ComputeNodeStatus.values()).map(each -> 
each.name().toLowerCase()).collect(Collectors.toList()));
+        Matcher matcher = Pattern.compile(getComputeNodePath() + "/(" + status 
+ ")/(\\S+)$", Pattern.CASE_INSENSITIVE).matcher(computeNodeFullPath);
+        return matcher.find() ? Optional.of(new ComputeNode(matcher.group(1), 
matcher.group(2))) : Optional.empty();
+    }
 }
diff --git 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/watcher/ComputeNodeStateChangedWatcher.java
 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/watcher/ComputeNodeStateChangedWatcher.java
index 7fd38ad..2a777c7 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/watcher/ComputeNodeStateChangedWatcher.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/watcher/ComputeNodeStateChangedWatcher.java
@@ -20,6 +20,7 @@ package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.stat
 import org.apache.shardingsphere.infra.state.StateEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.ComputeNodeStatus;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.node.ComputeNode;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.node.StatusNode;
 import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
 import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
@@ -36,7 +37,7 @@ public final class ComputeNodeStateChangedWatcher implements 
GovernanceWatcher<S
     
     @Override
     public Collection<String> getWatchingKeys() {
-        return 
Collections.singleton(StatusNode.getComputeNodePath(ComputeNodeStatus.CIRCUIT_BREAKER));
+        return Collections.singleton(StatusNode.getComputeNodePath());
     }
     
     @Override
@@ -46,6 +47,12 @@ public final class ComputeNodeStateChangedWatcher implements 
GovernanceWatcher<S
     
     @Override
     public Optional<StateEvent> createGovernanceEvent(final DataChangedEvent 
event) {
-        return Optional.of(new StateEvent("CIRCUIT_BREAK", Type.ADDED == 
event.getType()));
+        Optional<ComputeNode> computeNode = 
StatusNode.findComputeNode(event.getKey());
+        // TODO use enum to instead of CIRCUIT_BREAK
+        return computeNode.isPresent() && 
isCircuitBreakerStatus(computeNode.get()) ? Optional.of(new 
StateEvent("CIRCUIT_BREAK", Type.ADDED == event.getType())) : Optional.empty();
+    }
+    
+    private boolean isCircuitBreakerStatus(final ComputeNode computeNode) {
+        return 
ComputeNodeStatus.CIRCUIT_BREAKER.name().equalsIgnoreCase(computeNode.getStatus());
     }
 }
diff --git 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/node/StatusNodeTest.java
 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/node/StatusNodeTest.java
index a402090..36ce29f 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/node/StatusNodeTest.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/node/StatusNodeTest.java
@@ -31,8 +31,13 @@ import static org.junit.Assert.assertTrue;
 public final class StatusNodeTest {
     
     @Test
+    public void assertGetComputeNodeStatusPath() {
+        
assertThat(StatusNode.getComputeNodeStatusPath(ComputeNodeStatus.CIRCUIT_BREAKER),
 is("/status/compute_nodes/circuit_breaker"));
+    }
+    
+    @Test
     public void assertGetComputeNodePath() {
-        
assertThat(StatusNode.getComputeNodePath(ComputeNodeStatus.CIRCUIT_BREAKER), 
is("/status/compute_nodes/circuit_breaker"));
+        assertThat(StatusNode.getComputeNodePath(), 
is("/status/compute_nodes"));
     }
     
     @Test
@@ -67,4 +72,20 @@ public final class StatusNodeTest {
     public void assertGetPrivilegeNodePath() {
         assertThat(StatusNode.getPrivilegeNodePath(), 
is("/status/privilegenode"));
     }
+    
+    @Test
+    public void assertFindCircuitBreakerComputeNode() {
+        Optional<ComputeNode> actual = 
StatusNode.findComputeNode("/status/compute_nodes/circuit_breaker/127.0.0.1@3307");
+        assertTrue(actual.isPresent());
+        assertThat(actual.get().getStatus(), is("circuit_breaker"));
+        assertThat(actual.get().getInstanceId(), is("127.0.0.1@3307"));
+    }
+    
+    @Test
+    public void assertFindOnlineComputeNode() {
+        Optional<ComputeNode> actual = 
StatusNode.findComputeNode("/status/compute_nodes/online/127.0.0.1@3307");
+        assertTrue(actual.isPresent());
+        assertThat(actual.get().getStatus(), is("online"));
+        assertThat(actual.get().getInstanceId(), is("127.0.0.1@3307"));
+    }
 }
diff --git 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/watcher/ComputeNodeStateChangedWatcherTest.java
 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/watcher/ComputeNodeStateChangedWatcherTest.java
index b928c95..743e720 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/watcher/ComputeNodeStateChangedWatcherTest.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/watcher/ComputeNodeStateChangedWatcherTest.java
@@ -31,14 +31,14 @@ public final class ComputeNodeStateChangedWatcherTest {
     
     @Test
     public void assertCreateEventWhenEnabled() {
-        Optional<StateEvent> actual = new 
ComputeNodeStateChangedWatcher().createGovernanceEvent(new 
DataChangedEvent("/test_ds/circuit_breaker", "", Type.ADDED));
+        Optional<StateEvent> actual = new 
ComputeNodeStateChangedWatcher().createGovernanceEvent(new 
DataChangedEvent("/status/compute_nodes/circuit_breaker/127.0.0.1@3307", "", 
Type.ADDED));
         assertTrue(actual.isPresent());
         assertTrue(actual.get().isOn());
     }
     
     @Test
     public void assertCreateEventWhenDisabled() {
-        Optional<StateEvent> actual = new 
ComputeNodeStateChangedWatcher().createGovernanceEvent(new 
DataChangedEvent("/test_ds/circuit_breaker", "", Type.DELETED));
+        Optional<StateEvent> actual = new 
ComputeNodeStateChangedWatcher().createGovernanceEvent(new 
DataChangedEvent("/status/compute_nodes/circuit_breaker/127.0.0.1@3307", "", 
Type.DELETED));
         assertTrue(actual.isPresent());
         assertFalse(actual.get().isOn());
     }

Reply via email to