This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new da3187670e1 Refactor ZookeeperRepository#watch to avoid unless refresh
metadata. (#31403)
da3187670e1 is described below
commit da3187670e109ff08e905de7d1cc79f7f08b9355
Author: zhaojinchao <[email protected]>
AuthorDate: Mon May 27 12:32:28 2024 +0800
Refactor ZookeeperRepository#watch to avoid unless refresh metadata.
(#31403)
* Refactor ZookeeperRepository#watch to avoid unless refresh metadata.
* Fix deletes
---
.../cluster/zookeeper/ZookeeperRepository.java | 30 +++++++---------------
1 file changed, 9 insertions(+), 21 deletions(-)
diff --git
a/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java
b/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java
index a33513e4624..76e20a97f79 100644
---
a/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java
+++
b/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java
@@ -25,7 +25,6 @@ import
org.apache.curator.framework.CuratorFrameworkFactory.Builder;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
-import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext;
@@ -53,6 +52,7 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@@ -242,31 +242,19 @@ public final class ZookeeperRepository implements
ClusterPersistRepository, Comp
caches.put(key, cache);
}
CuratorCacheListener curatorCacheListener =
CuratorCacheListener.builder()
- .afterInitialized()
- .forTreeCache(client, (framework, treeCacheListener) -> {
- Type changedType =
getChangedType(treeCacheListener.getType());
- if (Type.IGNORED != changedType) {
- listener.onChange(new
DataChangedEvent(treeCacheListener.getData().getPath(),
- new
String(treeCacheListener.getData().getData(), StandardCharsets.UTF_8),
changedType));
+ .forCreates(childData -> listener.onChange(new
DataChangedEvent(childData.getPath(), new String(childData.getData(),
StandardCharsets.UTF_8), Type.ADDED)))
+ .forChanges((oldData, newData) -> {
+ if (!Objects.equals(oldData, newData)) {
+ listener.onChange(new
DataChangedEvent(newData.getPath(), new String(newData.getData(),
StandardCharsets.UTF_8), Type.UPDATED));
}
- }).build();
+ })
+ .forDeletes(oldData -> listener.onChange(new
DataChangedEvent(oldData.getPath(), new String(oldData.getData(),
StandardCharsets.UTF_8), Type.DELETED)))
+ .afterInitialized()
+ .build();
cache.listenable().addListener(curatorCacheListener);
cache.start();
}
- private Type getChangedType(final TreeCacheEvent.Type type) {
- switch (type) {
- case NODE_ADDED:
- return Type.ADDED;
- case NODE_UPDATED:
- return Type.UPDATED;
- case NODE_REMOVED:
- return Type.DELETED;
- default:
- return Type.IGNORED;
- }
- }
-
@Override
public void close() {
caches.values().forEach(CuratorCache::close);