This is an automated email from the ASF dual-hosted git repository. jianbin pushed a commit to branch 2.x in repository https://gitbox.apache.org/repos/asf/incubator-seata.git
The following commit(s) were added to refs/heads/2.x by this push: new be01fb010c optimize: use curator instead of zkclient in config model (#6779) be01fb010c is described below commit be01fb010c176886adf795c8226f9fb603c90309 Author: Lei Zhiyuan <leizhiy...@gmail.com> AuthorDate: Thu Sep 12 11:22:00 2024 +0800 optimize: use curator instead of zkclient in config model (#6779) --- changes/en-us/2.x.md | 1 + changes/zh-cn/2.x.md | 2 +- .../seata/config/ConfigurationChangeEvent.java | 11 + config/seata-config-zk/pom.xml | 18 +- .../seata/config/zk/DefaultZkSerializer.java | 42 ---- .../seata/config/zk/ZookeeperConfiguration.java | 266 +++++++++++++-------- .../seata/config/zk/ZkConfigurationTest.java | 127 ++++++++++ dependencies/pom.xml | 14 +- 8 files changed, 329 insertions(+), 152 deletions(-) diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md index 84f351c675..d6e978f854 100644 --- a/changes/en-us/2.x.md +++ b/changes/en-us/2.x.md @@ -95,6 +95,7 @@ Add changes here for all PR submitted to the 2.x branch. - [[#6819](https://github.com/apache/incubator-seata/pull/6819)] merge the packaging processes of namingserver and seata-server - [[#6827](https://github.com/apache/incubator-seata/pull/6827)] rename namingserver registry type - [[#6836](https://github.com/apache/incubator-seata/pull/6836)] add independent nacos for the CI process +- [[#6779](https://github.com/apache/incubator-seata/pull/6779)] use curator instead of zkclient in config model ### refactor: diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md index f0987c88e6..72d7533a1a 100644 --- a/changes/zh-cn/2.x.md +++ b/changes/zh-cn/2.x.md @@ -95,7 +95,7 @@ - [[#6819](https://github.com/apache/incubator-seata/pull/6819)] namingserver与server的合并打包 - [[#6827](https://github.com/apache/incubator-seata/pull/6827)] 重命名namingserver注册类型改为seata - [[#6836](https://github.com/apache/incubator-seata/pull/6836)] 为CI流程增加独立nacos - +- [[#6779](https://github.com/apache/incubator-seata/pull/6779)] 在config模块中使用curator替代zkclient ### refactor: diff --git a/config/seata-config-core/src/main/java/org/apache/seata/config/ConfigurationChangeEvent.java b/config/seata-config-core/src/main/java/org/apache/seata/config/ConfigurationChangeEvent.java index 215805a301..714f8b1767 100644 --- a/config/seata-config-core/src/main/java/org/apache/seata/config/ConfigurationChangeEvent.java +++ b/config/seata-config-core/src/main/java/org/apache/seata/config/ConfigurationChangeEvent.java @@ -141,4 +141,15 @@ public class ConfigurationChangeEvent { this.namespace = namespace; return this; } + + @Override + public String toString() { + return "ConfigurationChangeEvent{" + + "dataId='" + dataId + '\'' + + ", oldValue='" + oldValue + '\'' + + ", newValue='" + newValue + '\'' + + ", namespace='" + namespace + '\'' + + ", changeType=" + changeType + + '}'; + } } diff --git a/config/seata-config-zk/pom.xml b/config/seata-config-zk/pom.xml index eacb161136..c84196de83 100644 --- a/config/seata-config-zk/pom.xml +++ b/config/seata-config-zk/pom.xml @@ -36,14 +36,16 @@ <version>${project.version}</version> </dependency> <dependency> - <groupId>com.101tec</groupId> - <artifactId>zkclient</artifactId> - <exclusions> - <exclusion> - <artifactId>log4j</artifactId> - <groupId>log4j</groupId> - </exclusion> - </exclusions> + <groupId>org.apache.curator</groupId> + <artifactId>curator-recipes</artifactId> + </dependency> + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-framework</artifactId> + </dependency> + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-test</artifactId> </dependency> </dependencies> diff --git a/config/seata-config-zk/src/main/java/org/apache/seata/config/zk/DefaultZkSerializer.java b/config/seata-config-zk/src/main/java/org/apache/seata/config/zk/DefaultZkSerializer.java deleted file mode 100644 index bd764d75c3..0000000000 --- a/config/seata-config-zk/src/main/java/org/apache/seata/config/zk/DefaultZkSerializer.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.seata.config.zk; - -import org.I0Itec.zkclient.exception.ZkMarshallingError; -import org.I0Itec.zkclient.serialize.ZkSerializer; - -import java.nio.charset.StandardCharsets; - -/** - * Default zk serializer. - * <p> - * If the user is not configured in config.zk.serializer configuration item, then use default serializer. - * - * @since 1.3.0 - */ -public class DefaultZkSerializer implements ZkSerializer { - - @Override - public byte[] serialize(Object data) throws ZkMarshallingError { - return String.valueOf(data).getBytes(StandardCharsets.UTF_8); - } - - @Override - public Object deserialize(byte[] bytes) throws ZkMarshallingError { - return new String(bytes, StandardCharsets.UTF_8); - } -} diff --git a/config/seata-config-zk/src/main/java/org/apache/seata/config/zk/ZookeeperConfiguration.java b/config/seata-config-zk/src/main/java/org/apache/seata/config/zk/ZookeeperConfiguration.java index 8c29ad2086..b045984f66 100644 --- a/config/seata-config-zk/src/main/java/org/apache/seata/config/zk/ZookeeperConfiguration.java +++ b/config/seata-config-zk/src/main/java/org/apache/seata/config/zk/ZookeeperConfiguration.java @@ -17,7 +17,8 @@ package org.apache.seata.config.zk; import java.io.IOException; -import java.lang.reflect.Constructor; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.util.Enumeration; import java.util.Map; import java.util.Properties; @@ -29,7 +30,12 @@ import java.util.concurrent.FutureTask; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; - +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.CuratorCache; +import org.apache.curator.framework.recipes.cache.CuratorCacheListener; +import org.apache.curator.retry.RetryNTimes; import org.apache.seata.common.exception.NotSupportYetException; import org.apache.seata.common.thread.NamedThreadFactory; import org.apache.seata.common.util.CollectionUtils; @@ -41,10 +47,7 @@ import org.apache.seata.config.ConfigurationChangeListener; import org.apache.seata.config.ConfigurationChangeType; import org.apache.seata.config.ConfigurationFactory; import org.apache.seata.config.processor.ConfigProcessor; -import org.I0Itec.zkclient.IZkDataListener; -import org.I0Itec.zkclient.ZkClient; -import org.I0Itec.zkclient.serialize.ZkSerializer; -import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,7 +57,6 @@ import static org.apache.seata.config.ConfigurationKeys.SEATA_FILE_ROOT_CONFIG; /** * The type Zookeeper configuration. - * */ public class ZookeeperConfiguration extends AbstractConfiguration { private final static Logger LOGGER = LoggerFactory.getLogger(ZookeeperConfiguration.class); @@ -75,15 +77,17 @@ public class ZookeeperConfiguration extends AbstractConfiguration { private static final int DEFAULT_CONNECT_TIMEOUT = 2000; private static final String DEFAULT_CONFIG_PATH = ROOT_PATH + "/seata.properties"; private static final String FILE_CONFIG_KEY_PREFIX = FILE_ROOT_CONFIG + FILE_CONFIG_SPLIT_CHAR + CONFIG_TYPE - + FILE_CONFIG_SPLIT_CHAR; + + FILE_CONFIG_SPLIT_CHAR; private static final ExecutorService CONFIG_EXECUTOR = new ThreadPoolExecutor(THREAD_POOL_NUM, THREAD_POOL_NUM, - Integer.MAX_VALUE, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), - new NamedThreadFactory("ZKConfigThread", THREAD_POOL_NUM)); - private static volatile ZkClient zkClient; + Integer.MAX_VALUE, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), + new NamedThreadFactory("ZKConfigThread", THREAD_POOL_NUM)); + private static volatile CuratorFramework zkClient; private static final int MAP_INITIAL_CAPACITY = 8; - private static final ConcurrentMap<String, ConcurrentMap<ConfigurationChangeListener, ZKListener>> CONFIG_LISTENERS_MAP - = new ConcurrentHashMap<>(MAP_INITIAL_CAPACITY); + private static final ConcurrentMap<String, ConcurrentMap<ConfigurationChangeListener, NodeCacheListenerImpl>> CONFIG_LISTENERS_MAP + = new ConcurrentHashMap<>(MAP_INITIAL_CAPACITY); private static volatile Properties seataConfig = new Properties(); + static final Charset CHARSET = StandardCharsets.UTF_8; + private static Map<String, CuratorCache> nodeCacheMap = new ConcurrentHashMap<>(); /** * Instantiates a new Zookeeper configuration. @@ -93,26 +97,51 @@ public class ZookeeperConfiguration extends AbstractConfiguration { if (zkClient == null) { synchronized (ZookeeperConfiguration.class) { if (zkClient == null) { - ZkSerializer zkSerializer = getZkSerializer(); String serverAddr = FILE_CONFIG.getConfig(FILE_CONFIG_KEY_PREFIX + SERVER_ADDR_KEY); int sessionTimeout = FILE_CONFIG.getInt(FILE_CONFIG_KEY_PREFIX + SESSION_TIMEOUT_KEY, DEFAULT_SESSION_TIMEOUT); int connectTimeout = FILE_CONFIG.getInt(FILE_CONFIG_KEY_PREFIX + CONNECT_TIMEOUT_KEY, DEFAULT_CONNECT_TIMEOUT); - zkClient = new ZkClient(serverAddr, sessionTimeout, connectTimeout, zkSerializer); + CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder() + .connectString(serverAddr) + .retryPolicy(new RetryNTimes(1, 1000)) + .connectionTimeoutMs(connectTimeout) + .sessionTimeoutMs(sessionTimeout); String username = FILE_CONFIG.getConfig(FILE_CONFIG_KEY_PREFIX + AUTH_USERNAME); String password = FILE_CONFIG.getConfig(FILE_CONFIG_KEY_PREFIX + AUTH_PASSWORD); if (!StringUtils.isBlank(username) && !StringUtils.isBlank(password)) { StringBuilder auth = new StringBuilder(username).append(":").append(password); - zkClient.addAuthInfo("digest", auth.toString().getBytes()); + builder.authorization("digest", auth.toString().getBytes()); } + zkClient = builder.build(); + zkClient.start(); } } - if (!zkClient.exists(ROOT_PATH)) { - zkClient.createPersistent(ROOT_PATH, true); + if (!checkExists(ROOT_PATH)) { + createPersistent(ROOT_PATH); } initSeataConfig(); } } + public void createPersistent(String path) { + try { + zkClient.create().forPath(path); + } catch (KeeperException.NodeExistsException e) { + LOGGER.warn("ZNode " + path + " already exists.", e); + } catch (Exception e) { + throw new IllegalStateException(e.getMessage(), e); + } + } + + public boolean checkExists(String path) { + try { + if (zkClient.checkExists().forPath(path) != null) { + return true; + } + } catch (Exception e) { + } + return false; + } + @Override public String getTypeName() { return CONFIG_TYPE; @@ -125,13 +154,13 @@ public class ZookeeperConfiguration extends AbstractConfiguration { return value; } FutureTask<String> future = new FutureTask<>(() -> { - String path = ROOT_PATH + ZK_PATH_SPLIT_CHAR + dataId; - if (!zkClient.exists(path)) { + String path = buildPath(dataId); + if (!checkExists(path)) { LOGGER.warn("config {} is not existed, return defaultValue {} ", - dataId, defaultValue); + dataId, defaultValue); return defaultValue; } - String value1 = zkClient.readData(path); + String value1 = readData(path); return StringUtils.isNullOrEmpty(value1) ? defaultValue : value1; }); CONFIG_EXECUTOR.execute(future); @@ -139,25 +168,37 @@ public class ZookeeperConfiguration extends AbstractConfiguration { return future.get(timeoutMills, TimeUnit.MILLISECONDS); } catch (Exception e) { LOGGER.error("getConfig {} error or timeout, return defaultValue {}, exception:{} ", - dataId, defaultValue, e.getMessage()); + dataId, defaultValue, e.getMessage()); return defaultValue; } } + public String readData(String path) { + try { + byte[] dataBytes = zkClient.getData().forPath(path); + return (dataBytes == null || dataBytes.length == 0) ? null : new String(dataBytes, CHARSET); + } catch (KeeperException.NoNodeException e) { + // ignore NoNode Exception. + } catch (Exception e) { + throw new IllegalStateException(e.getMessage(), e); + } + return null; + } + @Override public boolean putConfig(String dataId, String content, long timeoutMills) { if (!seataConfig.isEmpty()) { seataConfig.setProperty(dataId, content); - zkClient.writeData(getConfigPath(), getSeataConfigStr()); + createPersistent(getConfigPath(), getSeataConfigStr()); return true; } FutureTask<Boolean> future = new FutureTask<>(() -> { - String path = ROOT_PATH + ZK_PATH_SPLIT_CHAR + dataId; - if (!zkClient.exists(path)) { - zkClient.create(path, content, CreateMode.PERSISTENT); + String path = buildPath(dataId); + if (!checkExists(path)) { + createPersistent(path, content); } else { - zkClient.writeData(path, content); + createPersistent(path, content); } return true; }); @@ -166,11 +207,31 @@ public class ZookeeperConfiguration extends AbstractConfiguration { return future.get(timeoutMills, TimeUnit.MILLISECONDS); } catch (Exception e) { LOGGER.error("putConfig {}, value: {} is error or timeout, exception: {}", - dataId, content, e.getMessage()); + dataId, content, e.getMessage()); return false; } } + public String buildPath(String dataId) { + String path = ROOT_PATH + ZK_PATH_SPLIT_CHAR + dataId; + return path; + } + + protected void createPersistent(String path, String data) { + byte[] dataBytes = data.getBytes(CHARSET); + try { + zkClient.create().forPath(path, dataBytes); + } catch (KeeperException.NodeExistsException e) { + try { + zkClient.setData().forPath(path, dataBytes); + } catch (Exception e1) { + throw new IllegalStateException(e.getMessage(), e1); + } + } catch (Exception e) { + throw new IllegalStateException(e.getMessage(), e); + } + } + @Override public boolean putConfigIfAbsent(String dataId, String content, long timeoutMills) { throw new NotSupportYetException("not support atomic operation putConfigIfAbsent"); @@ -180,13 +241,13 @@ public class ZookeeperConfiguration extends AbstractConfiguration { public boolean removeConfig(String dataId, long timeoutMills) { if (!seataConfig.isEmpty()) { seataConfig.remove(dataId); - zkClient.writeData(getConfigPath(), getSeataConfigStr()); + createPersistent(getConfigPath(), getSeataConfigStr()); return true; } FutureTask<Boolean> future = new FutureTask<>(() -> { - String path = ROOT_PATH + ZK_PATH_SPLIT_CHAR + dataId; - return zkClient.delete(path); + String path = buildPath(dataId); + return deletePath(path); }); CONFIG_EXECUTOR.execute(future); try { @@ -198,25 +259,36 @@ public class ZookeeperConfiguration extends AbstractConfiguration { } + protected boolean deletePath(String path) { + try { + zkClient.delete().deletingChildrenIfNeeded().forPath(path); + return true; + } catch (KeeperException.NoNodeException ignored) { + return true; + } catch (Exception e) { + LOGGER.error("deletePath {} is error or timeout", path, e); + return false; + } + } + @Override public void addConfigListener(String dataId, ConfigurationChangeListener listener) { if (StringUtils.isBlank(dataId) || listener == null) { return; } - + String path = buildPath(dataId); if (!seataConfig.isEmpty()) { - ZKListener zkListener = new ZKListener(dataId, listener); + NodeCacheListenerImpl zkListener = new NodeCacheListenerImpl(dataId, listener); + CuratorCacheListener.builder().forAll(zkListener).build(); CONFIG_LISTENERS_MAP.computeIfAbsent(dataId, key -> new ConcurrentHashMap<>()) - .put(listener, zkListener); + .put(listener, zkListener); return; } - - String path = ROOT_PATH + ZK_PATH_SPLIT_CHAR + dataId; - if (zkClient.exists(path)) { - ZKListener zkListener = new ZKListener(path, listener); + if (checkExists(path)) { + NodeCacheListenerImpl zkListener = new NodeCacheListenerImpl(path, listener); CONFIG_LISTENERS_MAP.computeIfAbsent(dataId, key -> new ConcurrentHashMap<>()) - .put(listener, zkListener); - zkClient.subscribeDataChanges(path, zkListener); + .put(listener, zkListener); + addDataListener(path, zkListener); } } @@ -227,18 +299,18 @@ public class ZookeeperConfiguration extends AbstractConfiguration { } Set<ConfigurationChangeListener> configChangeListeners = getConfigListeners(dataId); if (CollectionUtils.isNotEmpty(configChangeListeners)) { - String path = ROOT_PATH + ZK_PATH_SPLIT_CHAR + dataId; - if (zkClient.exists(path)) { + String path = buildPath(dataId); + if (checkExists(path)) { for (ConfigurationChangeListener entry : configChangeListeners) { if (listener.equals(entry)) { - ZKListener zkListener = null; - Map<ConfigurationChangeListener, ZKListener> configListeners = CONFIG_LISTENERS_MAP.get(dataId); + NodeCacheListenerImpl zkListener = null; + Map<ConfigurationChangeListener, NodeCacheListenerImpl> configListeners = CONFIG_LISTENERS_MAP.get(dataId); if (configListeners != null) { zkListener = configListeners.get(listener); configListeners.remove(entry); } if (zkListener != null) { - zkClient.unsubscribeDataChanges(path, zkListener); + removeDataListener(path, zkListener); } break; } @@ -249,7 +321,7 @@ public class ZookeeperConfiguration extends AbstractConfiguration { @Override public Set<ConfigurationChangeListener> getConfigListeners(String dataId) { - ConcurrentMap<ConfigurationChangeListener, ZKListener> configListeners = CONFIG_LISTENERS_MAP.get(dataId); + ConcurrentMap<ConfigurationChangeListener, NodeCacheListenerImpl> configListeners = CONFIG_LISTENERS_MAP.get(dataId); if (CollectionUtils.isNotEmpty(configListeners)) { return configListeners.keySet(); } else { @@ -259,15 +331,14 @@ public class ZookeeperConfiguration extends AbstractConfiguration { private void initSeataConfig() { String configPath = getConfigPath(); - String config = zkClient.readData(configPath, true); + String config = readData(configPath); if (StringUtils.isNotBlank(config)) { try { seataConfig = ConfigProcessor.processConfig(config, getZkDataType()); } catch (IOException e) { LOGGER.error("init config properties error", e); } - ZKListener zkListener = new ZKListener(configPath, null); - zkClient.subscribeDataChanges(configPath, zkListener); + addDataListener(configPath, new NodeCacheListenerImpl(configPath, null)); } } @@ -292,28 +363,25 @@ public class ZookeeperConfiguration extends AbstractConfiguration { return sb.toString(); } - /** - * The type Zk listener. - */ - public static class ZKListener implements IZkDataListener { - + public static class NodeCacheListenerImpl implements CuratorCacheListener { private String path; private ConfigurationChangeListener listener; - /** - * Instantiates a new Zk listener. - * - * @param path the path - * @param listener the listener - */ - public ZKListener(String path, ConfigurationChangeListener listener) { + public NodeCacheListenerImpl(String path, ConfigurationChangeListener listener) { this.path = path; this.listener = listener; } @Override - public void handleDataChange(String s, Object o) { - if (s.equals(getConfigPath())) { + public void event(Type type, ChildData oldData, ChildData data) { + + String o; + if (type == Type.NODE_DELETED) { + o = ""; + } else { + o = new String(data.getData()); + } + if (path.equals(getConfigPath())) { Properties seataConfigNew = new Properties(); if (StringUtils.isNotBlank(o.toString())) { try { @@ -325,17 +393,17 @@ public class ZookeeperConfiguration extends AbstractConfiguration { } } - for (Map.Entry<String, ConcurrentMap<ConfigurationChangeListener, ZKListener>> entry : CONFIG_LISTENERS_MAP.entrySet()) { + for (Map.Entry<String, ConcurrentMap<ConfigurationChangeListener, NodeCacheListenerImpl>> entry : CONFIG_LISTENERS_MAP.entrySet()) { String listenedDataId = entry.getKey(); String propertyOld = seataConfig.getProperty(listenedDataId, ""); String propertyNew = seataConfigNew.getProperty(listenedDataId, ""); if (!propertyOld.equals(propertyNew)) { ConfigurationChangeEvent event = new ConfigurationChangeEvent() - .setDataId(listenedDataId) - .setNewValue(propertyNew) - .setChangeType(ConfigurationChangeType.MODIFY); + .setDataId(listenedDataId) + .setNewValue(propertyNew) + .setChangeType(ConfigurationChangeType.MODIFY); - ConcurrentMap<ConfigurationChangeListener, ZKListener> configListeners = entry.getValue(); + ConcurrentMap<ConfigurationChangeListener, NodeCacheListenerImpl> configListeners = entry.getValue(); for (ConfigurationChangeListener configListener : configListeners.keySet()) { configListener.onProcessEvent(event); } @@ -344,42 +412,42 @@ public class ZookeeperConfiguration extends AbstractConfiguration { seataConfig = seataConfigNew; return; + } else { + if (type == Type.NODE_DELETED) { + // Node is deleted. + String dataId = path.replaceFirst(ROOT_PATH + ZK_PATH_SPLIT_CHAR, ""); + ConfigurationChangeEvent event = new ConfigurationChangeEvent().setDataId(dataId).setChangeType( + ConfigurationChangeType.DELETE); + listener.onProcessEvent(event); + } else { + // Node is changed. + String dataId = path.replaceFirst(ROOT_PATH + ZK_PATH_SPLIT_CHAR, ""); + ConfigurationChangeEvent event = new ConfigurationChangeEvent().setDataId(dataId).setNewValue(o.toString()) + .setChangeType(ConfigurationChangeType.MODIFY); + listener.onProcessEvent(event); + } } - String dataId = s.replaceFirst(ROOT_PATH + ZK_PATH_SPLIT_CHAR, ""); - ConfigurationChangeEvent event = new ConfigurationChangeEvent().setDataId(dataId).setNewValue(o.toString()) - .setChangeType(ConfigurationChangeType.MODIFY); - listener.onProcessEvent(event); - } - - @Override - public void handleDataDeleted(String s) { - String dataId = s.replaceFirst(ROOT_PATH + ZK_PATH_SPLIT_CHAR, ""); - ConfigurationChangeEvent event = new ConfigurationChangeEvent().setDataId(dataId).setChangeType( - ConfigurationChangeType.DELETE); - listener.onProcessEvent(event); } } - private ZkSerializer getZkSerializer() { - ZkSerializer zkSerializer = null; - String serializer = FILE_CONFIG.getConfig(FILE_CONFIG_KEY_PREFIX + SERIALIZER_KEY); - if (StringUtils.isNotBlank(serializer)) { - try { - Class<?> clazz = Class.forName(serializer); - Constructor<?> constructor = clazz.getDeclaredConstructor(); - constructor.setAccessible(true); - zkSerializer = (ZkSerializer) constructor.newInstance(); - } catch (ClassNotFoundException cfe) { - LOGGER.warn("No zk serializer class found, serializer:{}", serializer, cfe); - } catch (Throwable cause) { - LOGGER.warn("found zk serializer encountered an unknown exception", cause); + protected void addDataListener(String path, NodeCacheListenerImpl nodeCacheListener) { + try { + CuratorCache nodeCache = CuratorCache.build(zkClient, path); + if (nodeCacheMap.putIfAbsent(path, nodeCache) != null) { + return; } + nodeCache.listenable().addListener(nodeCacheListener); + nodeCache.start(); + } catch (Exception e) { + throw new IllegalStateException("Add nodeCache listener for path:" + path, e); } - if (zkSerializer == null) { - zkSerializer = new DefaultZkSerializer(); - LOGGER.info("Use default zk serializer: org.apache.seata.config.zk.DefaultZkSerializer."); - } - return zkSerializer; } + protected void removeDataListener(String path, NodeCacheListenerImpl nodeCacheListener) { + CuratorCache nodeCache = nodeCacheMap.get(path); + if (nodeCache != null) { + nodeCache.listenable().removeListener(nodeCacheListener); + } + nodeCacheListener.listener = null; + } } diff --git a/config/seata-config-zk/src/test/java/org/apache/seata/config/zk/ZkConfigurationTest.java b/config/seata-config-zk/src/test/java/org/apache/seata/config/zk/ZkConfigurationTest.java new file mode 100644 index 0000000000..a5cc19442f --- /dev/null +++ b/config/seata-config-zk/src/test/java/org/apache/seata/config/zk/ZkConfigurationTest.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.config.zk; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.curator.test.TestingServer; +import org.apache.seata.config.ConfigurationChangeEvent; +import org.apache.seata.config.ConfigurationChangeListener; +import org.apache.seata.config.ConfigurationChangeType; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The type zk configuration test + */ +public class ZkConfigurationTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(ZkConfigurationTest.class); + + protected static TestingServer server = null; + + @BeforeAll + public static void adBeforeClass() throws Exception { + System.setProperty("config.type", "zk"); + System.setProperty("config.zk.serverAddr", "127.0.0.1:2181"); + server = new TestingServer(2181); + server.start(); + } + + @AfterAll + public static void adAfterClass() throws Exception { + if (server != null) { + server.stop(); + } + } + + @Test + public void testCheckExist() { + ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(); + boolean exist = zookeeperConfiguration.checkExists("/"); + Assertions.assertTrue(exist); + } + + @Test + public void testPutConfig() { + ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(); + CountDownLatch countDownLatch = new CountDownLatch(1); + final boolean[] listened = {false}; + String dataId = "putMockDataId"; + ConfigurationChangeListener changeListener = new ConfigurationChangeListener() { + @Override + public void onChangeEvent(ConfigurationChangeEvent event) { + LOGGER.info("onChangeEvent:{}", event); + if (event.getChangeType() == ConfigurationChangeType.MODIFY) { + Assertions.assertEquals("value2", event.getNewValue()); + listened[0] = true; + countDownLatch.countDown(); + } + } + }; + zookeeperConfiguration.createPersistent(zookeeperConfiguration.buildPath(dataId), "value"); + zookeeperConfiguration.addConfigListener(dataId, changeListener); + zookeeperConfiguration.putConfig(dataId, "value2"); + try { + countDownLatch.await(10000, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + Assertions.assertTrue(listened[0]); + + zookeeperConfiguration.removeConfig(dataId); + + zookeeperConfiguration.removeConfigListener(dataId, changeListener); + } + + @Test + public void testRemoveConfig() { + ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(); + CountDownLatch countDownLatch = new CountDownLatch(1); + final boolean[] listened = {false}; + String dataId = "removeMockDataId"; + zookeeperConfiguration.createPersistent(zookeeperConfiguration.buildPath(dataId), "value"); + ConfigurationChangeListener changeListener = new ConfigurationChangeListener() { + @Override + public void onChangeEvent(ConfigurationChangeEvent event) { + LOGGER.info("onChangeEvent:{}", event); + if (event.getChangeType() == ConfigurationChangeType.DELETE) { + Assertions.assertNull(event.getNewValue()); + listened[0] = true; + countDownLatch.countDown(); + } + } + }; + + zookeeperConfiguration.addConfigListener(dataId, changeListener); + zookeeperConfiguration.putConfig(dataId, "value2"); + boolean remove = zookeeperConfiguration.removeConfig(dataId); + Assertions.assertTrue(remove); + try { + countDownLatch.await(10000, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + Assertions.assertTrue(listened[0]); + } + +} diff --git a/dependencies/pom.xml b/dependencies/pom.xml index d00ac503b7..bf5a1a2084 100644 --- a/dependencies/pom.xml +++ b/dependencies/pom.xml @@ -50,7 +50,7 @@ <aopalliance.version>1.0</aopalliance.version> <zkclient.version>0.11</zkclient.version> <apache-zookeeper.version>3.7.2</apache-zookeeper.version> - <curator-test.version>5.1.0</curator-test.version> + <curator.version>5.1.0</curator.version> <spring-context-support.version>1.0.2</spring-context-support.version> <apollo-client.version>2.0.1</apollo-client.version> <eureka-clients.version>1.10.18</eureka-clients.version> @@ -354,10 +354,20 @@ </exclusion> </exclusions> </dependency> + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-recipes</artifactId> + <version>${curator.version}</version> + </dependency> + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-framework</artifactId> + <version>${curator.version}</version> + </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-test</artifactId> - <version>${curator-test.version}</version> + <version>${curator.version}</version> </dependency> <dependency> <groupId>com.alipay.sofa</groupId> --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@seata.apache.org For additional commands, e-mail: notifications-h...@seata.apache.org