This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 1898bd9 Refactor watcher of CuratorZookeeperRepository (#16247)
1898bd9 is described below
commit 1898bd90bcd84113fb3edff12fdd3b06faa43d88
Author: Haoran Meng <[email protected]>
AuthorDate: Tue Mar 22 07:59:48 2022 +0800
Refactor watcher of CuratorZookeeperRepository (#16247)
---
.../mode/metadata/persist/node/ComputeNode.java | 9 +++++++++
.../metadata/persist/node/ComputeNodeTest.java | 5 +++++
.../registry/GovernanceWatcherFactory.java | 22 +++++++++++++++++-----
.../watcher/ComputeNodeStateChangedWatcher.java | 2 +-
.../zookeeper/CuratorZookeeperRepository.java | 15 ++++++++-------
.../zookeeper/CuratorZookeeperRepositoryTest.java | 16 ++++++++++------
6 files changed, 50 insertions(+), 19 deletions(-)
diff --git
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNode.java
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNode.java
index 0d54fa2..a26bd98 100644
---
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNode.java
+++
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNode.java
@@ -105,6 +105,15 @@ public final class ComputeNode {
}
/**
+ * Get compute node path.
+ *
+ * @return compute node path
+ */
+ public static String getComputeNodePath() {
+ return String.join("/", "", ROOT_NODE, COMPUTE_NODE);
+ }
+
+ /**
* Get instance worker id node path.
*
* @param instanceId instance id
diff --git
a/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNodeTest.java
b/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNodeTest.java
index 92355ec..78c31b7 100644
---
a/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNodeTest.java
+++
b/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNodeTest.java
@@ -88,4 +88,9 @@ public final class ComputeNodeTest {
assertThat(actual.get().getInstanceId().getId(), is("127.0.0.1@3307"));
assertThat(actual.get().getInstanceType(), is(InstanceType.JDBC));
}
+
+ @Test
+ public void assertGetComputeNodePath() {
+ assertThat(ComputeNode.getComputeNodePath(),
is("/nodes/compute_nodes"));
+ }
}
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcherFactory.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcherFactory.java
index 2128c63..24b148a 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcherFactory.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcherFactory.java
@@ -22,8 +22,6 @@ import
org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.spi.ShardingSphereServiceLoader;
-import java.util.Collection;
-
/**
* Governance watcher factory.
*/
@@ -40,8 +38,22 @@ public final class GovernanceWatcherFactory {
* Watch listeners.
*/
public void watchListeners() {
- Collection<GovernanceWatcher> governanceWatchers =
ShardingSphereServiceLoader.getSingletonServiceInstances(GovernanceWatcher.class);
- repository.watch("/", dataChangedEventListener ->
governanceWatchers.stream().filter(each ->
each.getWatchingTypes().contains(dataChangedEventListener.getType()))
- .forEach(each ->
each.createGovernanceEvent(dataChangedEventListener).ifPresent(ShardingSphereEventBus.getInstance()::post)));
+ for (GovernanceWatcher<?> each :
ShardingSphereServiceLoader.getSingletonServiceInstances(GovernanceWatcher.class))
{
+ watch(each);
+ }
+ }
+
+ private void watch(final GovernanceWatcher<?> listener) {
+ for (String each : listener.getWatchingKeys()) {
+ watch(each, listener);
+ }
+ }
+
+ private void watch(final String watchingKey, final GovernanceWatcher<?>
listener) {
+ repository.watch(watchingKey, dataChangedEventListener -> {
+ if
(listener.getWatchingTypes().contains(dataChangedEventListener.getType())) {
+
listener.createGovernanceEvent(dataChangedEventListener).ifPresent(ShardingSphereEventBus.getInstance()::post);
+ }
+ });
}
}
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChan
[...]
index d32cb94..f917f8a 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
@@ -45,7 +45,7 @@ public final class ComputeNodeStateChangedWatcher implements
GovernanceWatcher<G
@Override
public Collection<String> getWatchingKeys() {
- return Collections.singleton(ComputeNode.getAttributesNodePath());
+ return Collections.singleton(ComputeNode.getComputeNodePath());
}
@Override
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-zookeeper-curator/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/CuratorZookeeperRepository.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-prov
[...]
index 26a97f7..53dac57 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-zookeeper-curator/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/CuratorZookeeperRepository.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-zookeeper-curator/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/CuratorZookeeperRepository.java
@@ -49,10 +49,10 @@ import org.apache.zookeeper.data.ACL;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@@ -62,7 +62,7 @@ import java.util.concurrent.TimeUnit;
*/
public final class CuratorZookeeperRepository implements
ClusterPersistRepository {
- private CuratorCache curatorCache;
+ private final Map<String, CuratorCache> caches = new HashMap<>();
private CuratorFramework client;
@@ -236,8 +236,10 @@ public final class CuratorZookeeperRepository implements
ClusterPersistRepositor
@Override
public void watch(final String key, final DataChangedEventListener
listener) {
- if (!Optional.ofNullable(curatorCache).isPresent()) {
- curatorCache = CuratorCache.build(client, "/");
+ if (!caches.containsKey(key)) {
+ CuratorCache curatorCache = CuratorCache.build(client, key);
+ start(curatorCache);
+ caches.put(key, curatorCache);
}
CuratorCacheListener curatorCacheListener =
CuratorCacheListener.builder()
.forTreeCache(client, (framework, treeCacheListener) -> {
@@ -247,8 +249,7 @@ public final class CuratorZookeeperRepository implements
ClusterPersistRepositor
new
String(treeCacheListener.getData().getData(), StandardCharsets.UTF_8),
changedType));
}
}).build();
- curatorCache.listenable().addListener(curatorCacheListener);
- start(curatorCache);
+ caches.get(key).listenable().addListener(curatorCacheListener);
}
private void start(final CuratorCache cache) {
@@ -314,7 +315,7 @@ public final class CuratorZookeeperRepository implements
ClusterPersistRepositor
@Override
public void close() {
- curatorCache.close();
+ caches.values().forEach(CuratorCache::close);
waitForCacheClose();
CloseableUtils.closeQuietly(client);
}
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-zookeeper-curator/src/test/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/CuratorZookeeperRepositoryTest.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-
[...]
index 3dcd998..019c9fb 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-zookeeper-curator/src/test/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/CuratorZookeeperRepositoryTest.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-repository/shardingsphere-cluster-mode-repository-provider/shardingsphere-cluster-mode-repository-zookeeper-curator/src/test/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/CuratorZookeeperRepositoryTest.java
@@ -53,7 +53,9 @@ import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
@@ -203,7 +205,7 @@ public final class CuratorZookeeperRepositoryTest {
@Test
@SneakyThrows
public void assertWatchUpdatedChangedType() {
- mockCache();
+ mockCache("/test/children_updated/1");
ChildData oldData = new ChildData("/test/children_updated/1", null,
"value1".getBytes());
ChildData data = new ChildData("/test/children_updated/1", null,
"value2".getBytes());
doAnswer(AdditionalAnswers.answerVoid(getListenerAnswer(CuratorCacheListener.Type.NODE_CHANGED,
oldData, data))).when(listenable).addListener(any(CuratorCacheListener.class));
@@ -218,7 +220,7 @@ public final class CuratorZookeeperRepositoryTest {
@Test
public void assertWatchDeletedChangedType() throws Exception {
- mockCache();
+ mockCache("/test/children_deleted/5");
ChildData oldData = new ChildData("/test/children_deleted/5", null,
"value5".getBytes());
ChildData data = new ChildData("/test/children_deleted/5", null,
"value5".getBytes());
doAnswer(AdditionalAnswers.answerVoid(getListenerAnswer(CuratorCacheListener.Type.NODE_DELETED,
oldData, data))).when(listenable).addListener(any(CuratorCacheListener.class));
@@ -234,7 +236,7 @@ public final class CuratorZookeeperRepositoryTest {
@Test
@SneakyThrows
public void assertWatchAddedChangedType() {
- mockCache();
+ mockCache("/test/children_added/4");
ChildData data = new ChildData("/test/children_added/4", null,
"value4".getBytes());
doAnswer(AdditionalAnswers.answerVoid(getListenerAnswer(CuratorCacheListener.Type.NODE_CREATED,
null, data))).when(listenable).addListener(any(CuratorCacheListener.class));
SettableFuture<DataChangedEvent> settableFuture =
SettableFuture.create();
@@ -246,10 +248,12 @@ public final class CuratorZookeeperRepositoryTest {
assertThat(dataChangedEvent.getValue(), is("value4"));
}
- private void mockCache() throws Exception {
- Field cachesFiled =
CuratorZookeeperRepository.class.getDeclaredField("curatorCache");
+ private void mockCache(final String key) throws Exception {
+ Field cachesFiled =
CuratorZookeeperRepository.class.getDeclaredField("caches");
cachesFiled.setAccessible(true);
- cachesFiled.set(REPOSITORY, curatorCache);
+ Map<String, CuratorCache> caches = new HashMap<>();
+ caches.put(key, curatorCache);
+ cachesFiled.set(REPOSITORY, caches);
when(curatorCache.listenable()).thenReturn(listenable);
}