This is an automated email from the ASF dual-hosted git repository. jianbin pushed a commit to branch 2.x in repository https://gitbox.apache.org/repos/asf/incubator-seata.git
The following commit(s) were added to refs/heads/2.x by this push: new c0f7b820c6 optimize: use curator instead of zkclient in registry model (#6831) c0f7b820c6 is described below commit c0f7b820c64796a3678823ba26493e4c977bcdee Author: Lei Zhiyuan <leizhiy...@gmail.com> AuthorDate: Thu Sep 12 16:16:42 2024 +0800 optimize: use curator instead of zkclient in registry model (#6831) --- changes/en-us/2.x.md | 1 + changes/zh-cn/2.x.md | 2 + discovery/seata-discovery-zk/pom.xml | 14 +- .../registry/zk/ZookeeperRegisterServiceImpl.java | 226 +++++++++++++++------ .../zk/ZookeeperRegisterServiceImplTest.java | 51 +++-- 5 files changed, 212 insertions(+), 82 deletions(-) diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md index d6e978f854..64cc2890bf 100644 --- a/changes/en-us/2.x.md +++ b/changes/en-us/2.x.md @@ -96,6 +96,7 @@ Add changes here for all PR submitted to the 2.x branch. - [[#6827](https://github.com/apache/incubator-seata/pull/6827)] rename namingserver registry type - [[#6836](https://github.com/apache/incubator-seata/pull/6836)] add independent nacos for the CI process - [[#6779](https://github.com/apache/incubator-seata/pull/6779)] use curator instead of zkclient in config model +- [[#6831](https://github.com/apache/incubator-seata/pull/6831)] use curator instead of zkclient in registry model ### refactor: diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md index 72d7533a1a..22249bd103 100644 --- a/changes/zh-cn/2.x.md +++ b/changes/zh-cn/2.x.md @@ -96,6 +96,8 @@ - [[#6827](https://github.com/apache/incubator-seata/pull/6827)] 重命名namingserver注册类型改为seata - [[#6836](https://github.com/apache/incubator-seata/pull/6836)] 为CI流程增加独立nacos - [[#6779](https://github.com/apache/incubator-seata/pull/6779)] 在config模块中使用curator替代zkclient +- [[#6831](https://github.com/apache/incubator-seata/pull/6831)] 在registry模块中使用curator替代zkclient + ### refactor: diff --git a/discovery/seata-discovery-zk/pom.xml b/discovery/seata-discovery-zk/pom.xml index 6a4d83183a..51ee953451 100644 --- a/discovery/seata-discovery-zk/pom.xml +++ b/discovery/seata-discovery-zk/pom.xml @@ -36,14 +36,12 @@ <version>${project.version}</version> </dependency> <dependency> - <groupId>com.101tec</groupId> - <artifactId>zkclient</artifactId> - <exclusions> - <exclusion> - <artifactId>log4j</artifactId> - <groupId>log4j</groupId> - </exclusion> - </exclusions> + <groupId>org.apache.curator</groupId> + <artifactId>curator-recipes</artifactId> + </dependency> + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-framework</artifactId> </dependency> <dependency> <groupId>org.apache.curator</groupId> diff --git a/discovery/seata-discovery-zk/src/main/java/org/apache/seata/discovery/registry/zk/ZookeeperRegisterServiceImpl.java b/discovery/seata-discovery-zk/src/main/java/org/apache/seata/discovery/registry/zk/ZookeeperRegisterServiceImpl.java index 84e70abe04..1fdebe570b 100644 --- a/discovery/seata-discovery-zk/src/main/java/org/apache/seata/discovery/registry/zk/ZookeeperRegisterServiceImpl.java +++ b/discovery/seata-discovery-zk/src/main/java/org/apache/seata/discovery/registry/zk/ZookeeperRegisterServiceImpl.java @@ -17,6 +17,8 @@ package org.apache.seata.discovery.registry.zk; import java.net.InetSocketAddress; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -28,7 +30,16 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.stream.Collectors; - +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.recipes.cache.CuratorCache; +import org.apache.curator.framework.recipes.cache.CuratorCacheListener; +import org.apache.curator.framework.recipes.cache.CuratorCacheListenerBuilder; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.curator.retry.RetryNTimes; import org.apache.seata.common.util.CollectionUtils; import org.apache.seata.common.util.NetUtil; import org.apache.seata.common.util.StringUtils; @@ -36,22 +47,21 @@ import org.apache.seata.config.Configuration; import org.apache.seata.config.ConfigurationFactory; import org.apache.seata.config.exception.ConfigNotFoundException; import org.apache.seata.discovery.registry.RegistryService; -import org.I0Itec.zkclient.IZkChildListener; -import org.I0Itec.zkclient.IZkStateListener; -import org.I0Itec.zkclient.ZkClient; -import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; + +import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * zookeeper path as /registry/zk/ - * */ -public class ZookeeperRegisterServiceImpl implements RegistryService<IZkChildListener> { +public class ZookeeperRegisterServiceImpl implements RegistryService<CuratorCacheListener> { private static final Logger LOGGER = LoggerFactory.getLogger(ZookeeperRegisterServiceImpl.class); - + static final Charset CHARSET = StandardCharsets.UTF_8; private static volatile ZookeeperRegisterServiceImpl instance; - private static volatile ZkClient zkClient; + private static volatile CuratorFramework zkClient; private static final Configuration FILE_CONFIG = ConfigurationFactory.CURRENT_FILE_INSTANCE; private static final String ZK_PATH_SPLIT_CHAR = "/"; private static final String FILE_ROOT_REGISTRY = "registry"; @@ -72,8 +82,9 @@ public class ZookeeperRegisterServiceImpl implements RegistryService<IZkChildLis private static final String ROOT_PATH_WITHOUT_SUFFIX = ZK_PATH_SPLIT_CHAR + FILE_ROOT_REGISTRY + ZK_PATH_SPLIT_CHAR + REGISTRY_TYPE; private static final ConcurrentMap<String, List<InetSocketAddress>> CLUSTER_ADDRESS_MAP = new ConcurrentHashMap<>(); - private static final ConcurrentMap<String, List<IZkChildListener>> LISTENER_SERVICE_MAP = new ConcurrentHashMap<>(); + private static final ConcurrentMap<String, List<CuratorCacheListener>> LISTENER_SERVICE_MAP = new ConcurrentHashMap<>(); private static final ConcurrentMap<String, Object> CLUSTER_LOCK = new ConcurrentHashMap<>(); + private static Map<String, CuratorCache> nodeCacheMap = new ConcurrentHashMap<>(); private static final int REGISTERED_PATH_SET_SIZE = 1; private static final Set<String> REGISTERED_PATH_SET = Collections.synchronizedSet(new HashSet<>(REGISTERED_PATH_SET_SIZE)); @@ -107,7 +118,7 @@ public class ZookeeperRegisterServiceImpl implements RegistryService<IZkChildLis return false; } createParentIfNotPresent(path); - getClientInstance().createEphemeral(path, true); + createEphemeral(path, Boolean.TRUE.toString()); REGISTERED_PATH_SET.add(path); return true; } @@ -117,13 +128,19 @@ public class ZookeeperRegisterServiceImpl implements RegistryService<IZkChildLis if (i > 0) { String parent = path.substring(0, i); if (!checkExists(parent)) { - getClientInstance().createPersistent(parent); + createPersistent(parent); } } } private boolean checkExists(String path) { - return getClientInstance().exists(path); + try { + if (getClientInstance().checkExists().forPath(path) != null) { + return true; + } + } catch (Exception e) { + } + return false; } @Override @@ -131,39 +148,54 @@ public class ZookeeperRegisterServiceImpl implements RegistryService<IZkChildLis NetUtil.validAddress(address); String path = getRegisterPathByPath(address); - getClientInstance().delete(path); + deletePath(path); REGISTERED_PATH_SET.remove(path); } @Override - public void subscribe(String cluster, IZkChildListener listener) throws Exception { + public void subscribe(String cluster, CuratorCacheListener listener) throws Exception { if (cluster == null) { return; } - String path = ROOT_PATH + cluster; - if (!getClientInstance().exists(path)) { - getClientInstance().createPersistent(path); + if (!checkExists(path)) { + createPersistent(path); } - getClientInstance().subscribeChildChanges(path, listener); + subscribeChildChanges(path, listener); LISTENER_SERVICE_MAP.computeIfAbsent(cluster, key -> new CopyOnWriteArrayList<>()) - .add(listener); + .add(listener); + } + + private void subscribeChildChanges(String path, CuratorCacheListener listener) { + CuratorCache nodeCache = CuratorCache.build(zkClient, path); + if (nodeCacheMap.putIfAbsent(path, nodeCache) != null) { + return; + } + nodeCache.listenable().addListener(listener); + nodeCache.start(); + + } + + private void unsubscribeChildChanges(String path, CuratorCacheListener listener) { + CuratorCache nodeCache = nodeCacheMap.get(path); + if (nodeCache != null) { + nodeCache.listenable().removeListener(listener); + } } @Override - public void unsubscribe(String cluster, IZkChildListener listener) throws Exception { + public void unsubscribe(String cluster, CuratorCacheListener listener) throws Exception { if (cluster == null) { return; } String path = ROOT_PATH + cluster; - if (getClientInstance().exists(path)) { - getClientInstance().unsubscribeChildChanges(path, listener); - - List<IZkChildListener> subscribeList = LISTENER_SERVICE_MAP.get(cluster); + if (checkExists(path)) { + unsubscribeChildChanges(path, listener); + List<CuratorCacheListener> subscribeList = LISTENER_SERVICE_MAP.get(cluster); if (subscribeList != null) { - List<IZkChildListener> newSubscribeList = subscribeList.stream() - .filter(eventListener -> !eventListener.equals(listener)) - .collect(Collectors.toList()); + List<CuratorCacheListener> newSubscribeList = subscribeList.stream() + .filter(eventListener -> !eventListener.equals(listener)) + .collect(Collectors.toList()); LISTENER_SERVICE_MAP.put(cluster, newSubscribeList); } } @@ -197,12 +229,12 @@ public class ZookeeperRegisterServiceImpl implements RegistryService<IZkChildLis } synchronized (lock) { if (!LISTENER_SERVICE_MAP.containsKey(clusterName)) { - boolean exist = getClientInstance().exists(ROOT_PATH + clusterName); + boolean exist = checkExists(ROOT_PATH + clusterName); if (!exist) { return null; } - List<String> childClusterPath = getClientInstance().getChildren(ROOT_PATH + clusterName); + List<String> childClusterPath = getClientInstance().getChildren().forPath(ROOT_PATH + clusterName); refreshClusterAddressMap(clusterName, childClusterPath); subscribeCluster(clusterName); } @@ -217,7 +249,7 @@ public class ZookeeperRegisterServiceImpl implements RegistryService<IZkChildLis getClientInstance().close(); } - private ZkClient getClientInstance() { + private CuratorFramework getClientInstance() { if (zkClient == null) { synchronized (ZookeeperRegisterServiceImpl.class) { if (zkClient == null) { @@ -233,35 +265,43 @@ public class ZookeeperRegisterServiceImpl implements RegistryService<IZkChildLis } // visible for test. - ZkClient buildZkClient(String address, int sessionTimeout, int connectTimeout,String... authInfo) { - ZkClient zkClient = new ZkClient(address, sessionTimeout, connectTimeout); + CuratorFramework buildZkClient(String address, int sessionTimeout, int connectTimeout, String... authInfo) { + CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder() + .connectString(address) + .retryPolicy(new RetryNTimes(1, 1000)) + .connectionTimeoutMs(connectTimeout) + .sessionTimeoutMs(sessionTimeout); if (authInfo != null && authInfo.length == 2) { if (!StringUtils.isBlank(authInfo[0]) && !StringUtils.isBlank(authInfo[1])) { StringBuilder auth = new StringBuilder(authInfo[0]).append(":").append(authInfo[1]); - zkClient.addAuthInfo("digest", auth.toString().getBytes()); + builder.authorization("digest", auth.toString().getBytes()); } } - if (!zkClient.exists(ROOT_PATH_WITHOUT_SUFFIX)) { - zkClient.createPersistent(ROOT_PATH_WITHOUT_SUFFIX, true); - } - zkClient.subscribeStateChanges(new IZkStateListener() { - - @Override - public void handleStateChanged(Watcher.Event.KeeperState keeperState) throws Exception { - //ignore - } + zkClient = builder.build(); + zkClient.start(); - @Override - public void handleNewSession() throws Exception { - recover(); - } + if (!checkExists(ROOT_PATH_WITHOUT_SUFFIX)) { + createPersistent(ROOT_PATH_WITHOUT_SUFFIX, Boolean.TRUE.toString()); + } + subscribeStateChanges(); + return zkClient; + } + private void subscribeStateChanges() { + getClientInstance().getConnectionStateListenable().addListener(new ConnectionStateListener() { @Override - public void handleSessionEstablishmentError(Throwable throwable) throws Exception { - //ignore + public void stateChanged(CuratorFramework client, ConnectionState newState) { + if (newState == ConnectionState.RECONNECTED || newState == ConnectionState.CONNECTED) { + try { + recover(); + } catch (Exception e) { + LOGGER.error("handleNewSession error", e); + } + } else { + LOGGER.error("stateChanged error, newState:{}", newState); + } } }); - return zkClient; } private void recover() throws Exception { @@ -271,14 +311,14 @@ public class ZookeeperRegisterServiceImpl implements RegistryService<IZkChildLis } // recover client if (!LISTENER_SERVICE_MAP.isEmpty()) { - Map<String, List<IZkChildListener>> listenerMap = new HashMap<>(LISTENER_SERVICE_MAP); + Map<String, List<CuratorCacheListener>> listenerMap = new HashMap<>(LISTENER_SERVICE_MAP); LISTENER_SERVICE_MAP.clear(); - for (Map.Entry<String, List<IZkChildListener>> listenerEntry : listenerMap.entrySet()) { - List<IZkChildListener> iZkChildListeners = listenerEntry.getValue(); + for (Map.Entry<String, List<CuratorCacheListener>> listenerEntry : listenerMap.entrySet()) { + List<CuratorCacheListener> iZkChildListeners = listenerEntry.getValue(); if (CollectionUtils.isEmpty(iZkChildListeners)) { continue; } - for (IZkChildListener listener : iZkChildListeners) { + for (CuratorCacheListener listener : iZkChildListeners) { subscribe(listenerEntry.getKey(), listener); } } @@ -286,14 +326,21 @@ public class ZookeeperRegisterServiceImpl implements RegistryService<IZkChildLis } private void subscribeCluster(String cluster) throws Exception { - subscribe(cluster, (parentPath, currentChilds) -> { - String clusterName = parentPath.replace(ROOT_PATH, ""); - if (CollectionUtils.isEmpty(currentChilds) && CLUSTER_ADDRESS_MAP.get(clusterName) != null) { - CLUSTER_ADDRESS_MAP.remove(clusterName); - } else if (!CollectionUtils.isEmpty(currentChilds)) { - refreshClusterAddressMap(clusterName, currentChilds); + String path = ROOT_PATH + cluster; + CuratorCacheListenerBuilder builder = CuratorCacheListener.builder(); + CuratorCacheListener listener = builder.forPathChildrenCache(path, getClientInstance(), new PathChildrenCacheListener() { + @Override + public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { + List<String> currentChilds = getClientInstance().getChildren().forPath(path); + if (CollectionUtils.isEmpty(currentChilds) && CLUSTER_ADDRESS_MAP.get(cluster) != null) { + CLUSTER_ADDRESS_MAP.remove(cluster); + } else if (!CollectionUtils.isEmpty(currentChilds)) { + ZookeeperRegisterServiceImpl.this.refreshClusterAddressMap(cluster, currentChilds); + } } - }); + }).build(); + + subscribe(cluster, listener); } private void refreshClusterAddressMap(String clusterName, List<String> instances) { @@ -323,4 +370,61 @@ public class ZookeeperRegisterServiceImpl implements RegistryService<IZkChildLis private String getRegisterPathByPath(InetSocketAddress address) { return ROOT_PATH + getClusterName() + ZK_PATH_SPLIT_CHAR + NetUtil.toStringAddress(address); } + + protected void createPersistent(String path, String data) { + byte[] dataBytes = data.getBytes(CHARSET); + try { + zkClient.create().creatingParentsIfNeeded().forPath(path, dataBytes); + } catch (KeeperException.NodeExistsException e) { + try { + zkClient.setData().forPath(path, dataBytes); + } catch (Exception e1) { + throw new IllegalStateException(e.getMessage(), e1); + } + } catch (Exception e) { + throw new IllegalStateException(e.getMessage(), e); + } + } + + protected void createPersistent(String path) { + try { + zkClient.create().creatingParentsIfNeeded().forPath(path); + } catch (KeeperException.NodeExistsException e) { + // ignore + } catch (Exception e) { + throw new IllegalStateException(e.getMessage(), e); + } + } + + protected void createEphemeral(String path, String data) { + byte[] dataBytes = data.getBytes(CHARSET); + try { + getClientInstance().create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, dataBytes); + } catch (KeeperException.NodeExistsException e) { + try { + getClientInstance().setData().forPath(path, dataBytes); + } catch (Exception e1) { + throw new IllegalStateException(e.getMessage(), e1); + } + } catch (Exception e) { + throw new IllegalStateException(e.getMessage(), e); + } + } + + protected boolean deletePath(String path) { + try { + getClientInstance().delete().deletingChildrenIfNeeded().forPath(path); + return true; + } catch (KeeperException.NoNodeException ignored) { + return true; + } catch (Exception e) { + LOGGER.error("deletePath {} is error or timeout", path, e); + return false; + } + } + + @VisibleForTesting + CuratorFramework getZkClient() { + return zkClient; + } } diff --git a/discovery/seata-discovery-zk/src/test/java/org/apache/seata/discovery/registry/zk/ZookeeperRegisterServiceImplTest.java b/discovery/seata-discovery-zk/src/test/java/org/apache/seata/discovery/registry/zk/ZookeeperRegisterServiceImplTest.java index dffd1a279a..d177d4a858 100644 --- a/discovery/seata-discovery-zk/src/test/java/org/apache/seata/discovery/registry/zk/ZookeeperRegisterServiceImplTest.java +++ b/discovery/seata-discovery-zk/src/test/java/org/apache/seata/discovery/registry/zk/ZookeeperRegisterServiceImplTest.java @@ -18,22 +18,27 @@ package org.apache.seata.discovery.registry.zk; import java.lang.reflect.Field; import java.net.InetSocketAddress; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; - +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.CuratorCacheListener; +import org.apache.curator.test.TestingServer; import org.apache.seata.common.util.NetUtil; import org.apache.seata.config.exception.ConfigNotFoundException; -import org.I0Itec.zkclient.IZkChildListener; -import org.I0Itec.zkclient.ZkClient; -import org.apache.curator.test.TestingServer; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.data.Stat; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.function.Executable; - public class ZookeeperRegisterServiceImplTest { protected static TestingServer server = null; @@ -60,8 +65,14 @@ public class ZookeeperRegisterServiceImplTest { @Test public void buildZkTest() { - ZkClient client = service.buildZkClient("127.0.0.1:2181", 5000, 5000); - Assertions.assertTrue(client.exists("/zookeeper")); + + CuratorFramework client = service.buildZkClient("127.0.0.1:2181", 5000, 5000); + try { + Stat stat = client.checkExists().forPath("/zookeeper"); + Assertions.assertTrue(stat!=null); + } catch (Exception e) { + throw new RuntimeException(e); + } } @Test @@ -79,7 +90,13 @@ public class ZookeeperRegisterServiceImplTest { final List<String> data = new ArrayList<>(); final CountDownLatch latch = new CountDownLatch(1); - IZkChildListener listener = (s, list) -> { + CuratorCacheListener listener = (CuratorCacheListener.Type type, ChildData oldData, ChildData newdata) -> { + List<String> list; + try { + list =service.getZkClient().getChildren().forPath(newdata.getPath()); + } catch (Exception e) { + throw new RuntimeException(e); + } data.clear(); data.addAll(list); latch.countDown(); @@ -87,11 +104,19 @@ public class ZookeeperRegisterServiceImplTest { service.subscribe("default", listener); final CountDownLatch latch2 = new CountDownLatch(1); final List<String> data2 = new ArrayList<>(); - IZkChildListener listener2 = (s, list) -> { + + CuratorCacheListener listener2 = (CuratorCacheListener.Type type, ChildData oldData, ChildData newdata) -> { + List<String> list; + try { + list =service.getZkClient().getChildren().forPath(newdata.getPath()); + } catch (Exception e) { + throw new RuntimeException(e); + } data2.clear(); data2.addAll(list); latch2.countDown(); }; + service.subscribe("default", listener2); service.unregister(new InetSocketAddress(NetUtil.getLocalAddress(), 33333)); @@ -106,9 +131,9 @@ public class ZookeeperRegisterServiceImplTest { public void testLookUp() throws Exception { ZookeeperRegisterServiceImpl zookeeperRegisterService = ZookeeperRegisterServiceImpl.getInstance(); - ZkClient client = service.buildZkClient("127.0.0.1:2181", 5000, 5000); - client.createPersistent("/registry/zk/cluster"); - client.createEphemeral("/registry/zk/cluster/127.0.0.1:8091"); + CuratorFramework client = service.buildZkClient("127.0.0.1:2181", 5000, 5000); + client.create().withMode(CreateMode.PERSISTENT).forPath("/registry/zk/cluster"); + client.create().withMode(CreateMode.EPHEMERAL).forPath("/registry/zk/cluster/127.0.0.1:8091"); Field field = ZookeeperRegisterServiceImpl.class.getDeclaredField("zkClient"); field.setAccessible(true); --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@seata.apache.org For additional commands, e-mail: notifications-h...@seata.apache.org