This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 1898bd9  Refactor watcher of CuratorZookeeperRepository (#16247)
1898bd9 is described below

commit 1898bd90bcd84113fb3edff12fdd3b06faa43d88
Author: Haoran Meng <[email protected]>
AuthorDate: Tue Mar 22 07:59:48 2022 +0800

    Refactor watcher of CuratorZookeeperRepository (#16247)
---
 .../mode/metadata/persist/node/ComputeNode.java    |  9 +++++++++
 .../metadata/persist/node/ComputeNodeTest.java     |  5 +++++
 .../registry/GovernanceWatcherFactory.java         | 22 +++++++++++++++++-----
 .../watcher/ComputeNodeStateChangedWatcher.java    |  2 +-
 .../zookeeper/CuratorZookeeperRepository.java      | 15 ++++++++-------
 .../zookeeper/CuratorZookeeperRepositoryTest.java  | 16 ++++++++++------
 6 files changed, 50 insertions(+), 19 deletions(-)

diff --git 
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNode.java
 
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNode.java
index 0d54fa2..a26bd98 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNode.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNode.java
@@ -105,6 +105,15 @@ public final class ComputeNode {
     }
     
     /**
+     * Get compute node path.
+     *
+     * @return compute node path
+     */
+    public static String getComputeNodePath() {
+        return String.join("/", "", ROOT_NODE, COMPUTE_NODE);
+    }
+    
+    /**
      * Get instance worker id node path.
      *
      * @param instanceId instance id
diff --git 
a/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNodeTest.java
 
b/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNodeTest.java
index 92355ec..78c31b7 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNodeTest.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNodeTest.java
@@ -88,4 +88,9 @@ public final class ComputeNodeTest {
         assertThat(actual.get().getInstanceId().getId(), is("127.0.0.1@3307"));
         assertThat(actual.get().getInstanceType(), is(InstanceType.JDBC));
     }
+    
+    @Test
+    public void assertGetComputeNodePath() {
+        assertThat(ComputeNode.getComputeNodePath(), 
is("/nodes/compute_nodes"));
+    }
 }
diff --git 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcherFactory.java
 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcherFactory.java
index 2128c63..24b148a 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcherFactory.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcherFactory.java
@@ -22,8 +22,6 @@ import 
org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 import org.apache.shardingsphere.spi.ShardingSphereServiceLoader;
 
-import java.util.Collection;
-
 /**
  * Governance watcher factory.
  */
@@ -40,8 +38,22 @@ public final class GovernanceWatcherFactory {
      * Watch listeners.
      */
     public void watchListeners() {
-        Collection<GovernanceWatcher> governanceWatchers = 
ShardingSphereServiceLoader.getSingletonServiceInstances(GovernanceWatcher.class);
-        repository.watch("/", dataChangedEventListener -> 
governanceWatchers.stream().filter(each -> 
each.getWatchingTypes().contains(dataChangedEventListener.getType()))
-                .forEach(each -> 
each.createGovernanceEvent(dataChangedEventListener).ifPresent(ShardingSphereEventBus.getInstance()::post)));
+        for (GovernanceWatcher<?> each : 
ShardingSphereServiceLoader.getSingletonServiceInstances(GovernanceWatcher.class))
 {
+            watch(each);
+        }
+    }
+    
+    private void watch(final GovernanceWatcher<?> listener) {
+        for (String each : listener.getWatchingKeys()) {
+            watch(each, listener);
+        }
+    }
+    
+    private void watch(final String watchingKey, final GovernanceWatcher<?> 
listener) {
+        repository.watch(watchingKey, dataChangedEventListener -> {
+            if 
(listener.getWatchingTypes().contains(dataChangedEventListener.getType())) {
+                
listener.createGovernanceEvent(dataChangedEventListener).ifPresent(ShardingSphereEventBus.getInstance()::post);
+            }
+        });
     }
 }
diff --git 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChan
 [...]
index d32cb94..f917f8a 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
@@ -45,7 +45,7 @@ public final class ComputeNodeStateChangedWatcher implements 
GovernanceWatcher<G
     
     @Override
     public Collection<String> getWatchingKeys() {
-        return Collections.singleton(ComputeNode.getAttributesNodePath());
+        return Collections.singleton(ComputeNode.getComputeNodePath());
     }
     
     @Override
diff --git 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-zookeeper-curator/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/CuratorZookeeperRepository.java
 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-prov
 [...]
index 26a97f7..53dac57 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-zookeeper-curator/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/CuratorZookeeperRepository.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-zookeeper-curator/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/CuratorZookeeperRepository.java
@@ -49,10 +49,10 @@ import org.apache.zookeeper.data.ACL;
 import java.nio.charset.StandardCharsets;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
@@ -62,7 +62,7 @@ import java.util.concurrent.TimeUnit;
  */
 public final class CuratorZookeeperRepository implements 
ClusterPersistRepository {
     
-    private CuratorCache curatorCache;
+    private final Map<String, CuratorCache> caches = new HashMap<>();
     
     private CuratorFramework client;
     
@@ -236,8 +236,10 @@ public final class CuratorZookeeperRepository implements 
ClusterPersistRepositor
     
     @Override
     public void watch(final String key, final DataChangedEventListener 
listener) {
-        if (!Optional.ofNullable(curatorCache).isPresent()) {
-            curatorCache = CuratorCache.build(client, "/");
+        if (!caches.containsKey(key)) {
+            CuratorCache curatorCache = CuratorCache.build(client, key);
+            start(curatorCache);
+            caches.put(key, curatorCache);
         }
         CuratorCacheListener curatorCacheListener = 
CuratorCacheListener.builder()
                 .forTreeCache(client, (framework, treeCacheListener) -> {
@@ -247,8 +249,7 @@ public final class CuratorZookeeperRepository implements 
ClusterPersistRepositor
                                 new 
String(treeCacheListener.getData().getData(), StandardCharsets.UTF_8), 
changedType));
                     }
                 }).build();
-        curatorCache.listenable().addListener(curatorCacheListener);
-        start(curatorCache);
+        caches.get(key).listenable().addListener(curatorCacheListener);
     }
     
     private void start(final CuratorCache cache) {
@@ -314,7 +315,7 @@ public final class CuratorZookeeperRepository implements 
ClusterPersistRepositor
     
     @Override
     public void close() {
-        curatorCache.close();
+        caches.values().forEach(CuratorCache::close);
         waitForCacheClose();
         CloseableUtils.closeQuietly(client);
     }
diff --git 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-zookeeper-curator/src/test/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/CuratorZookeeperRepositoryTest.java
 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-
 [...]
index 3dcd998..019c9fb 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-zookeeper-curator/src/test/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/CuratorZookeeperRepositoryTest.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-zookeeper-curator/src/test/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/CuratorZookeeperRepositoryTest.java
@@ -53,7 +53,9 @@ import java.lang.reflect.Field;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
@@ -203,7 +205,7 @@ public final class CuratorZookeeperRepositoryTest {
     @Test
     @SneakyThrows
     public void assertWatchUpdatedChangedType() {
-        mockCache();
+        mockCache("/test/children_updated/1");
         ChildData oldData = new ChildData("/test/children_updated/1", null, 
"value1".getBytes());
         ChildData data = new ChildData("/test/children_updated/1", null, 
"value2".getBytes());
         
doAnswer(AdditionalAnswers.answerVoid(getListenerAnswer(CuratorCacheListener.Type.NODE_CHANGED,
 oldData, data))).when(listenable).addListener(any(CuratorCacheListener.class));
@@ -218,7 +220,7 @@ public final class CuratorZookeeperRepositoryTest {
     
     @Test
     public void assertWatchDeletedChangedType() throws Exception {
-        mockCache();
+        mockCache("/test/children_deleted/5");
         ChildData oldData = new ChildData("/test/children_deleted/5", null, 
"value5".getBytes());
         ChildData data = new ChildData("/test/children_deleted/5", null, 
"value5".getBytes());
         
doAnswer(AdditionalAnswers.answerVoid(getListenerAnswer(CuratorCacheListener.Type.NODE_DELETED,
 oldData, data))).when(listenable).addListener(any(CuratorCacheListener.class));
@@ -234,7 +236,7 @@ public final class CuratorZookeeperRepositoryTest {
     @Test
     @SneakyThrows
     public void assertWatchAddedChangedType() {
-        mockCache();
+        mockCache("/test/children_added/4");
         ChildData data = new ChildData("/test/children_added/4", null, 
"value4".getBytes());
         
doAnswer(AdditionalAnswers.answerVoid(getListenerAnswer(CuratorCacheListener.Type.NODE_CREATED,
 null, data))).when(listenable).addListener(any(CuratorCacheListener.class));
         SettableFuture<DataChangedEvent> settableFuture = 
SettableFuture.create();
@@ -246,10 +248,12 @@ public final class CuratorZookeeperRepositoryTest {
         assertThat(dataChangedEvent.getValue(), is("value4"));
     }
     
-    private void mockCache() throws Exception {
-        Field cachesFiled = 
CuratorZookeeperRepository.class.getDeclaredField("curatorCache");
+    private void mockCache(final String key) throws Exception {
+        Field cachesFiled = 
CuratorZookeeperRepository.class.getDeclaredField("caches");
         cachesFiled.setAccessible(true);
-        cachesFiled.set(REPOSITORY, curatorCache);
+        Map<String, CuratorCache> caches = new HashMap<>();
+        caches.put(key, curatorCache);
+        cachesFiled.set(REPOSITORY, caches);
         when(curatorCache.listenable()).thenReturn(listenable);
     }
     

Reply via email to