This is an automated email from the ASF dual-hosted git repository.

jianbin pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/incubator-seata.git


The following commit(s) were added to refs/heads/2.x by this push:
     new be01fb010c optimize: use curator instead of zkclient in config model 
(#6779)
be01fb010c is described below

commit be01fb010c176886adf795c8226f9fb603c90309
Author: Lei Zhiyuan <leizhiy...@gmail.com>
AuthorDate: Thu Sep 12 11:22:00 2024 +0800

    optimize: use curator instead of zkclient in config model (#6779)
---
 changes/en-us/2.x.md                               |   1 +
 changes/zh-cn/2.x.md                               |   2 +-
 .../seata/config/ConfigurationChangeEvent.java     |  11 +
 config/seata-config-zk/pom.xml                     |  18 +-
 .../seata/config/zk/DefaultZkSerializer.java       |  42 ----
 .../seata/config/zk/ZookeeperConfiguration.java    | 266 +++++++++++++--------
 .../seata/config/zk/ZkConfigurationTest.java       | 127 ++++++++++
 dependencies/pom.xml                               |  14 +-
 8 files changed, 329 insertions(+), 152 deletions(-)

diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md
index 84f351c675..d6e978f854 100644
--- a/changes/en-us/2.x.md
+++ b/changes/en-us/2.x.md
@@ -95,6 +95,7 @@ Add changes here for all PR submitted to the 2.x branch.
 - [[#6819](https://github.com/apache/incubator-seata/pull/6819)] merge the 
packaging processes of namingserver and seata-server
 - [[#6827](https://github.com/apache/incubator-seata/pull/6827)] rename 
namingserver registry type
 - [[#6836](https://github.com/apache/incubator-seata/pull/6836)] add 
independent nacos for the CI process
+- [[#6779](https://github.com/apache/incubator-seata/pull/6779)] use curator 
instead of zkclient in config model
 
 
 ### refactor:
diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md
index f0987c88e6..72d7533a1a 100644
--- a/changes/zh-cn/2.x.md
+++ b/changes/zh-cn/2.x.md
@@ -95,7 +95,7 @@
 - [[#6819](https://github.com/apache/incubator-seata/pull/6819)] 
namingserver与server的合并打包
 - [[#6827](https://github.com/apache/incubator-seata/pull/6827)] 
重命名namingserver注册类型改为seata
 - [[#6836](https://github.com/apache/incubator-seata/pull/6836)] 为CI流程增加独立nacos
-
+- [[#6779](https://github.com/apache/incubator-seata/pull/6779)] 
在config模块中使用curator替代zkclient
 
 ### refactor:
 
diff --git 
a/config/seata-config-core/src/main/java/org/apache/seata/config/ConfigurationChangeEvent.java
 
b/config/seata-config-core/src/main/java/org/apache/seata/config/ConfigurationChangeEvent.java
index 215805a301..714f8b1767 100644
--- 
a/config/seata-config-core/src/main/java/org/apache/seata/config/ConfigurationChangeEvent.java
+++ 
b/config/seata-config-core/src/main/java/org/apache/seata/config/ConfigurationChangeEvent.java
@@ -141,4 +141,15 @@ public class ConfigurationChangeEvent {
         this.namespace = namespace;
         return this;
     }
+
+    @Override
+    public String toString() {
+        return "ConfigurationChangeEvent{" +
+            "dataId='" + dataId + '\'' +
+            ", oldValue='" + oldValue + '\'' +
+            ", newValue='" + newValue + '\'' +
+            ", namespace='" + namespace + '\'' +
+            ", changeType=" + changeType +
+            '}';
+    }
 }
diff --git a/config/seata-config-zk/pom.xml b/config/seata-config-zk/pom.xml
index eacb161136..c84196de83 100644
--- a/config/seata-config-zk/pom.xml
+++ b/config/seata-config-zk/pom.xml
@@ -36,14 +36,16 @@
             <version>${project.version}</version>
         </dependency>
         <dependency>
-            <groupId>com.101tec</groupId>
-            <artifactId>zkclient</artifactId>
-            <exclusions>
-                <exclusion>
-                    <artifactId>log4j</artifactId>
-                    <groupId>log4j</groupId>
-                </exclusion>
-            </exclusions>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-recipes</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-framework</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-test</artifactId>
         </dependency>
     </dependencies>
 
diff --git 
a/config/seata-config-zk/src/main/java/org/apache/seata/config/zk/DefaultZkSerializer.java
 
b/config/seata-config-zk/src/main/java/org/apache/seata/config/zk/DefaultZkSerializer.java
deleted file mode 100644
index bd764d75c3..0000000000
--- 
a/config/seata-config-zk/src/main/java/org/apache/seata/config/zk/DefaultZkSerializer.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.seata.config.zk;
-
-import org.I0Itec.zkclient.exception.ZkMarshallingError;
-import org.I0Itec.zkclient.serialize.ZkSerializer;
-
-import java.nio.charset.StandardCharsets;
-
-/**
- * Default zk serializer.
- * <p>
- * If the user is not configured in config.zk.serializer configuration item, 
then use default serializer.
- *
- * @since 1.3.0
- */
-public class DefaultZkSerializer implements ZkSerializer {
-
-    @Override
-    public byte[] serialize(Object data) throws ZkMarshallingError {
-        return String.valueOf(data).getBytes(StandardCharsets.UTF_8);
-    }
-
-    @Override
-    public Object deserialize(byte[] bytes) throws ZkMarshallingError {
-        return new String(bytes, StandardCharsets.UTF_8);
-    }
-}
diff --git 
a/config/seata-config-zk/src/main/java/org/apache/seata/config/zk/ZookeeperConfiguration.java
 
b/config/seata-config-zk/src/main/java/org/apache/seata/config/zk/ZookeeperConfiguration.java
index 8c29ad2086..b045984f66 100644
--- 
a/config/seata-config-zk/src/main/java/org/apache/seata/config/zk/ZookeeperConfiguration.java
+++ 
b/config/seata-config-zk/src/main/java/org/apache/seata/config/zk/ZookeeperConfiguration.java
@@ -17,7 +17,8 @@
 package org.apache.seata.config.zk;
 
 import java.io.IOException;
-import java.lang.reflect.Constructor;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
 import java.util.Enumeration;
 import java.util.Map;
 import java.util.Properties;
@@ -29,7 +30,12 @@ import java.util.concurrent.FutureTask;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
+import org.apache.curator.retry.RetryNTimes;
 import org.apache.seata.common.exception.NotSupportYetException;
 import org.apache.seata.common.thread.NamedThreadFactory;
 import org.apache.seata.common.util.CollectionUtils;
@@ -41,10 +47,7 @@ import org.apache.seata.config.ConfigurationChangeListener;
 import org.apache.seata.config.ConfigurationChangeType;
 import org.apache.seata.config.ConfigurationFactory;
 import org.apache.seata.config.processor.ConfigProcessor;
-import org.I0Itec.zkclient.IZkDataListener;
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.serialize.ZkSerializer;
-import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,7 +57,6 @@ import static 
org.apache.seata.config.ConfigurationKeys.SEATA_FILE_ROOT_CONFIG;
 
 /**
  * The type Zookeeper configuration.
- *
  */
 public class ZookeeperConfiguration extends AbstractConfiguration {
     private final static Logger LOGGER = 
LoggerFactory.getLogger(ZookeeperConfiguration.class);
@@ -75,15 +77,17 @@ public class ZookeeperConfiguration extends 
AbstractConfiguration {
     private static final int DEFAULT_CONNECT_TIMEOUT = 2000;
     private static final String DEFAULT_CONFIG_PATH = ROOT_PATH + 
"/seata.properties";
     private static final String FILE_CONFIG_KEY_PREFIX = FILE_ROOT_CONFIG + 
FILE_CONFIG_SPLIT_CHAR + CONFIG_TYPE
-            + FILE_CONFIG_SPLIT_CHAR;
+        + FILE_CONFIG_SPLIT_CHAR;
     private static final ExecutorService CONFIG_EXECUTOR = new 
ThreadPoolExecutor(THREAD_POOL_NUM, THREAD_POOL_NUM,
-            Integer.MAX_VALUE, TimeUnit.MILLISECONDS, new 
LinkedBlockingQueue<>(),
-            new NamedThreadFactory("ZKConfigThread", THREAD_POOL_NUM));
-    private static volatile ZkClient zkClient;
+        Integer.MAX_VALUE, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(),
+        new NamedThreadFactory("ZKConfigThread", THREAD_POOL_NUM));
+    private static volatile CuratorFramework zkClient;
     private static final int MAP_INITIAL_CAPACITY = 8;
-    private static final ConcurrentMap<String, 
ConcurrentMap<ConfigurationChangeListener, ZKListener>> CONFIG_LISTENERS_MAP
-            = new ConcurrentHashMap<>(MAP_INITIAL_CAPACITY);
+    private static final ConcurrentMap<String, 
ConcurrentMap<ConfigurationChangeListener, NodeCacheListenerImpl>> 
CONFIG_LISTENERS_MAP
+        = new ConcurrentHashMap<>(MAP_INITIAL_CAPACITY);
     private static volatile Properties seataConfig = new Properties();
+    static final Charset CHARSET = StandardCharsets.UTF_8;
+    private static Map<String, CuratorCache> nodeCacheMap = new 
ConcurrentHashMap<>();
 
     /**
      * Instantiates a new Zookeeper configuration.
@@ -93,26 +97,51 @@ public class ZookeeperConfiguration extends 
AbstractConfiguration {
         if (zkClient == null) {
             synchronized (ZookeeperConfiguration.class) {
                 if (zkClient == null) {
-                    ZkSerializer zkSerializer = getZkSerializer();
                     String serverAddr = 
FILE_CONFIG.getConfig(FILE_CONFIG_KEY_PREFIX + SERVER_ADDR_KEY);
                     int sessionTimeout = 
FILE_CONFIG.getInt(FILE_CONFIG_KEY_PREFIX + SESSION_TIMEOUT_KEY, 
DEFAULT_SESSION_TIMEOUT);
                     int connectTimeout = 
FILE_CONFIG.getInt(FILE_CONFIG_KEY_PREFIX + CONNECT_TIMEOUT_KEY, 
DEFAULT_CONNECT_TIMEOUT);
-                    zkClient = new ZkClient(serverAddr, sessionTimeout, 
connectTimeout, zkSerializer);
+                    CuratorFrameworkFactory.Builder builder = 
CuratorFrameworkFactory.builder()
+                        .connectString(serverAddr)
+                        .retryPolicy(new RetryNTimes(1, 1000))
+                        .connectionTimeoutMs(connectTimeout)
+                        .sessionTimeoutMs(sessionTimeout);
                     String username = 
FILE_CONFIG.getConfig(FILE_CONFIG_KEY_PREFIX + AUTH_USERNAME);
                     String password = 
FILE_CONFIG.getConfig(FILE_CONFIG_KEY_PREFIX + AUTH_PASSWORD);
                     if (!StringUtils.isBlank(username) && 
!StringUtils.isBlank(password)) {
                         StringBuilder auth = new 
StringBuilder(username).append(":").append(password);
-                        zkClient.addAuthInfo("digest", 
auth.toString().getBytes());
+                        builder.authorization("digest", 
auth.toString().getBytes());
                     }
+                    zkClient = builder.build();
+                    zkClient.start();
                 }
             }
-            if (!zkClient.exists(ROOT_PATH)) {
-                zkClient.createPersistent(ROOT_PATH, true);
+            if (!checkExists(ROOT_PATH)) {
+                createPersistent(ROOT_PATH);
             }
             initSeataConfig();
         }
     }
 
+    public void createPersistent(String path) {
+        try {
+            zkClient.create().forPath(path);
+        } catch (KeeperException.NodeExistsException e) {
+            LOGGER.warn("ZNode " + path + " already exists.", e);
+        } catch (Exception e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+
+    public boolean checkExists(String path) {
+        try {
+            if (zkClient.checkExists().forPath(path) != null) {
+                return true;
+            }
+        } catch (Exception e) {
+        }
+        return false;
+    }
+
     @Override
     public String getTypeName() {
         return CONFIG_TYPE;
@@ -125,13 +154,13 @@ public class ZookeeperConfiguration extends 
AbstractConfiguration {
             return value;
         }
         FutureTask<String> future = new FutureTask<>(() -> {
-            String path = ROOT_PATH + ZK_PATH_SPLIT_CHAR + dataId;
-            if (!zkClient.exists(path)) {
+            String path = buildPath(dataId);
+            if (!checkExists(path)) {
                 LOGGER.warn("config {} is not existed, return defaultValue {} 
",
-                        dataId, defaultValue);
+                    dataId, defaultValue);
                 return defaultValue;
             }
-            String value1 = zkClient.readData(path);
+            String value1 = readData(path);
             return StringUtils.isNullOrEmpty(value1) ? defaultValue : value1;
         });
         CONFIG_EXECUTOR.execute(future);
@@ -139,25 +168,37 @@ public class ZookeeperConfiguration extends 
AbstractConfiguration {
             return future.get(timeoutMills, TimeUnit.MILLISECONDS);
         } catch (Exception e) {
             LOGGER.error("getConfig {} error or timeout, return defaultValue 
{}, exception:{} ",
-                    dataId, defaultValue, e.getMessage());
+                dataId, defaultValue, e.getMessage());
             return defaultValue;
         }
     }
 
+    public String readData(String path) {
+        try {
+            byte[] dataBytes = zkClient.getData().forPath(path);
+            return (dataBytes == null || dataBytes.length == 0) ? null : new 
String(dataBytes, CHARSET);
+        } catch (KeeperException.NoNodeException e) {
+            // ignore NoNode Exception.
+        } catch (Exception e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+        return null;
+    }
+
     @Override
     public boolean putConfig(String dataId, String content, long timeoutMills) 
{
         if (!seataConfig.isEmpty()) {
             seataConfig.setProperty(dataId, content);
-            zkClient.writeData(getConfigPath(), getSeataConfigStr());
+            createPersistent(getConfigPath(), getSeataConfigStr());
             return true;
         }
 
         FutureTask<Boolean> future = new FutureTask<>(() -> {
-            String path = ROOT_PATH + ZK_PATH_SPLIT_CHAR + dataId;
-            if (!zkClient.exists(path)) {
-                zkClient.create(path, content, CreateMode.PERSISTENT);
+            String path = buildPath(dataId);
+            if (!checkExists(path)) {
+                createPersistent(path, content);
             } else {
-                zkClient.writeData(path, content);
+                createPersistent(path, content);
             }
             return true;
         });
@@ -166,11 +207,31 @@ public class ZookeeperConfiguration extends 
AbstractConfiguration {
             return future.get(timeoutMills, TimeUnit.MILLISECONDS);
         } catch (Exception e) {
             LOGGER.error("putConfig {}, value: {} is error or timeout, 
exception: {}",
-                    dataId, content, e.getMessage());
+                dataId, content, e.getMessage());
             return false;
         }
     }
 
+    public String buildPath(String dataId) {
+        String path = ROOT_PATH + ZK_PATH_SPLIT_CHAR + dataId;
+        return path;
+    }
+
+    protected void createPersistent(String path, String data) {
+        byte[] dataBytes = data.getBytes(CHARSET);
+        try {
+            zkClient.create().forPath(path, dataBytes);
+        } catch (KeeperException.NodeExistsException e) {
+            try {
+                zkClient.setData().forPath(path, dataBytes);
+            } catch (Exception e1) {
+                throw new IllegalStateException(e.getMessage(), e1);
+            }
+        } catch (Exception e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+
     @Override
     public boolean putConfigIfAbsent(String dataId, String content, long 
timeoutMills) {
         throw new NotSupportYetException("not support atomic operation 
putConfigIfAbsent");
@@ -180,13 +241,13 @@ public class ZookeeperConfiguration extends 
AbstractConfiguration {
     public boolean removeConfig(String dataId, long timeoutMills) {
         if (!seataConfig.isEmpty()) {
             seataConfig.remove(dataId);
-            zkClient.writeData(getConfigPath(), getSeataConfigStr());
+            createPersistent(getConfigPath(), getSeataConfigStr());
             return true;
         }
 
         FutureTask<Boolean> future = new FutureTask<>(() -> {
-            String path = ROOT_PATH + ZK_PATH_SPLIT_CHAR + dataId;
-            return zkClient.delete(path);
+            String path = buildPath(dataId);
+            return deletePath(path);
         });
         CONFIG_EXECUTOR.execute(future);
         try {
@@ -198,25 +259,36 @@ public class ZookeeperConfiguration extends 
AbstractConfiguration {
 
     }
 
+    protected boolean deletePath(String path) {
+        try {
+            zkClient.delete().deletingChildrenIfNeeded().forPath(path);
+            return true;
+        } catch (KeeperException.NoNodeException ignored) {
+            return true;
+        } catch (Exception e) {
+            LOGGER.error("deletePath {} is error or timeout", path, e);
+            return false;
+        }
+    }
+
     @Override
     public void addConfigListener(String dataId, ConfigurationChangeListener 
listener) {
         if (StringUtils.isBlank(dataId) || listener == null) {
             return;
         }
-
+        String path = buildPath(dataId);
         if (!seataConfig.isEmpty()) {
-            ZKListener zkListener = new ZKListener(dataId, listener);
+            NodeCacheListenerImpl zkListener = new 
NodeCacheListenerImpl(dataId, listener);
+            CuratorCacheListener.builder().forAll(zkListener).build();
             CONFIG_LISTENERS_MAP.computeIfAbsent(dataId, key -> new 
ConcurrentHashMap<>())
-                    .put(listener, zkListener);
+                .put(listener, zkListener);
             return;
         }
-
-        String path = ROOT_PATH + ZK_PATH_SPLIT_CHAR + dataId;
-        if (zkClient.exists(path)) {
-            ZKListener zkListener = new ZKListener(path, listener);
+        if (checkExists(path)) {
+            NodeCacheListenerImpl zkListener = new NodeCacheListenerImpl(path, 
listener);
             CONFIG_LISTENERS_MAP.computeIfAbsent(dataId, key -> new 
ConcurrentHashMap<>())
-                    .put(listener, zkListener);
-            zkClient.subscribeDataChanges(path, zkListener);
+                .put(listener, zkListener);
+            addDataListener(path, zkListener);
         }
     }
 
@@ -227,18 +299,18 @@ public class ZookeeperConfiguration extends 
AbstractConfiguration {
         }
         Set<ConfigurationChangeListener> configChangeListeners = 
getConfigListeners(dataId);
         if (CollectionUtils.isNotEmpty(configChangeListeners)) {
-            String path = ROOT_PATH + ZK_PATH_SPLIT_CHAR + dataId;
-            if (zkClient.exists(path)) {
+            String path = buildPath(dataId);
+            if (checkExists(path)) {
                 for (ConfigurationChangeListener entry : 
configChangeListeners) {
                     if (listener.equals(entry)) {
-                        ZKListener zkListener = null;
-                        Map<ConfigurationChangeListener, ZKListener> 
configListeners = CONFIG_LISTENERS_MAP.get(dataId);
+                        NodeCacheListenerImpl zkListener = null;
+                        Map<ConfigurationChangeListener, 
NodeCacheListenerImpl> configListeners = CONFIG_LISTENERS_MAP.get(dataId);
                         if (configListeners != null) {
                             zkListener = configListeners.get(listener);
                             configListeners.remove(entry);
                         }
                         if (zkListener != null) {
-                            zkClient.unsubscribeDataChanges(path, zkListener);
+                            removeDataListener(path, zkListener);
                         }
                         break;
                     }
@@ -249,7 +321,7 @@ public class ZookeeperConfiguration extends 
AbstractConfiguration {
 
     @Override
     public Set<ConfigurationChangeListener> getConfigListeners(String dataId) {
-        ConcurrentMap<ConfigurationChangeListener, ZKListener> configListeners 
= CONFIG_LISTENERS_MAP.get(dataId);
+        ConcurrentMap<ConfigurationChangeListener, NodeCacheListenerImpl> 
configListeners = CONFIG_LISTENERS_MAP.get(dataId);
         if (CollectionUtils.isNotEmpty(configListeners)) {
             return configListeners.keySet();
         } else {
@@ -259,15 +331,14 @@ public class ZookeeperConfiguration extends 
AbstractConfiguration {
 
     private void initSeataConfig() {
         String configPath = getConfigPath();
-        String config = zkClient.readData(configPath, true);
+        String config = readData(configPath);
         if (StringUtils.isNotBlank(config)) {
             try {
                 seataConfig = ConfigProcessor.processConfig(config, 
getZkDataType());
             } catch (IOException e) {
                 LOGGER.error("init config properties error", e);
             }
-            ZKListener zkListener = new ZKListener(configPath, null);
-            zkClient.subscribeDataChanges(configPath, zkListener);
+            addDataListener(configPath, new NodeCacheListenerImpl(configPath, 
null));
         }
     }
 
@@ -292,28 +363,25 @@ public class ZookeeperConfiguration extends 
AbstractConfiguration {
         return sb.toString();
     }
 
-    /**
-     * The type Zk listener.
-     */
-    public static class ZKListener implements IZkDataListener {
-
+    public static class NodeCacheListenerImpl implements CuratorCacheListener {
         private String path;
         private ConfigurationChangeListener listener;
 
-        /**
-         * Instantiates a new Zk listener.
-         *
-         * @param path     the path
-         * @param listener the listener
-         */
-        public ZKListener(String path, ConfigurationChangeListener listener) {
+        public NodeCacheListenerImpl(String path, ConfigurationChangeListener 
listener) {
             this.path = path;
             this.listener = listener;
         }
 
         @Override
-        public void handleDataChange(String s, Object o) {
-            if (s.equals(getConfigPath())) {
+        public void event(Type type, ChildData oldData, ChildData data) {
+
+            String o;
+            if (type == Type.NODE_DELETED) {
+                o = "";
+            } else {
+                o = new String(data.getData());
+            }
+            if (path.equals(getConfigPath())) {
                 Properties seataConfigNew = new Properties();
                 if (StringUtils.isNotBlank(o.toString())) {
                     try {
@@ -325,17 +393,17 @@ public class ZookeeperConfiguration extends 
AbstractConfiguration {
                     }
                 }
 
-                for (Map.Entry<String, 
ConcurrentMap<ConfigurationChangeListener, ZKListener>> entry : 
CONFIG_LISTENERS_MAP.entrySet()) {
+                for (Map.Entry<String, 
ConcurrentMap<ConfigurationChangeListener, NodeCacheListenerImpl>> entry : 
CONFIG_LISTENERS_MAP.entrySet()) {
                     String listenedDataId = entry.getKey();
                     String propertyOld = 
seataConfig.getProperty(listenedDataId, "");
                     String propertyNew = 
seataConfigNew.getProperty(listenedDataId, "");
                     if (!propertyOld.equals(propertyNew)) {
                         ConfigurationChangeEvent event = new 
ConfigurationChangeEvent()
-                                .setDataId(listenedDataId)
-                                .setNewValue(propertyNew)
-                                .setChangeType(ConfigurationChangeType.MODIFY);
+                            .setDataId(listenedDataId)
+                            .setNewValue(propertyNew)
+                            .setChangeType(ConfigurationChangeType.MODIFY);
 
-                        ConcurrentMap<ConfigurationChangeListener, ZKListener> 
configListeners = entry.getValue();
+                        ConcurrentMap<ConfigurationChangeListener, 
NodeCacheListenerImpl> configListeners = entry.getValue();
                         for (ConfigurationChangeListener configListener : 
configListeners.keySet()) {
                             configListener.onProcessEvent(event);
                         }
@@ -344,42 +412,42 @@ public class ZookeeperConfiguration extends 
AbstractConfiguration {
                 seataConfig = seataConfigNew;
 
                 return;
+            } else {
+                if (type == Type.NODE_DELETED) {
+                    // Node is deleted.
+                    String dataId = path.replaceFirst(ROOT_PATH + 
ZK_PATH_SPLIT_CHAR, "");
+                    ConfigurationChangeEvent event = new 
ConfigurationChangeEvent().setDataId(dataId).setChangeType(
+                        ConfigurationChangeType.DELETE);
+                    listener.onProcessEvent(event);
+                } else {
+                    // Node is changed.
+                    String dataId = path.replaceFirst(ROOT_PATH + 
ZK_PATH_SPLIT_CHAR, "");
+                    ConfigurationChangeEvent event = new 
ConfigurationChangeEvent().setDataId(dataId).setNewValue(o.toString())
+                        .setChangeType(ConfigurationChangeType.MODIFY);
+                    listener.onProcessEvent(event);
+                }
             }
-            String dataId = s.replaceFirst(ROOT_PATH + ZK_PATH_SPLIT_CHAR, "");
-            ConfigurationChangeEvent event = new 
ConfigurationChangeEvent().setDataId(dataId).setNewValue(o.toString())
-                .setChangeType(ConfigurationChangeType.MODIFY);
-            listener.onProcessEvent(event);
-        }
-
-        @Override
-        public void handleDataDeleted(String s) {
-            String dataId = s.replaceFirst(ROOT_PATH + ZK_PATH_SPLIT_CHAR, "");
-            ConfigurationChangeEvent event = new 
ConfigurationChangeEvent().setDataId(dataId).setChangeType(
-                    ConfigurationChangeType.DELETE);
-            listener.onProcessEvent(event);
         }
     }
 
-    private ZkSerializer getZkSerializer() {
-        ZkSerializer zkSerializer = null;
-        String serializer = FILE_CONFIG.getConfig(FILE_CONFIG_KEY_PREFIX + 
SERIALIZER_KEY);
-        if (StringUtils.isNotBlank(serializer)) {
-            try {
-                Class<?> clazz = Class.forName(serializer);
-                Constructor<?> constructor = clazz.getDeclaredConstructor();
-                constructor.setAccessible(true);
-                zkSerializer = (ZkSerializer) constructor.newInstance();
-            } catch (ClassNotFoundException cfe) {
-                LOGGER.warn("No zk serializer class found, serializer:{}", 
serializer, cfe);
-            } catch (Throwable cause) {
-                LOGGER.warn("found zk serializer encountered an unknown 
exception", cause);
+    protected void addDataListener(String path, NodeCacheListenerImpl 
nodeCacheListener) {
+        try {
+            CuratorCache nodeCache = CuratorCache.build(zkClient, path);
+            if (nodeCacheMap.putIfAbsent(path, nodeCache) != null) {
+                return;
             }
+            nodeCache.listenable().addListener(nodeCacheListener);
+            nodeCache.start();
+        } catch (Exception e) {
+            throw new IllegalStateException("Add nodeCache listener for path:" 
+ path, e);
         }
-        if (zkSerializer == null) {
-            zkSerializer = new DefaultZkSerializer();
-            LOGGER.info("Use default zk serializer: 
org.apache.seata.config.zk.DefaultZkSerializer.");
-        }
-        return zkSerializer;
     }
 
+    protected void removeDataListener(String path, NodeCacheListenerImpl 
nodeCacheListener) {
+        CuratorCache nodeCache = nodeCacheMap.get(path);
+        if (nodeCache != null) {
+            nodeCache.listenable().removeListener(nodeCacheListener);
+        }
+        nodeCacheListener.listener = null;
+    }
 }
diff --git 
a/config/seata-config-zk/src/test/java/org/apache/seata/config/zk/ZkConfigurationTest.java
 
b/config/seata-config-zk/src/test/java/org/apache/seata/config/zk/ZkConfigurationTest.java
new file mode 100644
index 0000000000..a5cc19442f
--- /dev/null
+++ 
b/config/seata-config-zk/src/test/java/org/apache/seata/config/zk/ZkConfigurationTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.config.zk;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.curator.test.TestingServer;
+import org.apache.seata.config.ConfigurationChangeEvent;
+import org.apache.seata.config.ConfigurationChangeListener;
+import org.apache.seata.config.ConfigurationChangeType;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The type zk configuration test
+ */
+public class ZkConfigurationTest {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ZkConfigurationTest.class);
+
+    protected static TestingServer server = null;
+
+    @BeforeAll
+    public static void adBeforeClass() throws Exception {
+        System.setProperty("config.type", "zk");
+        System.setProperty("config.zk.serverAddr", "127.0.0.1:2181");
+        server = new TestingServer(2181);
+        server.start();
+    }
+
+    @AfterAll
+    public static void adAfterClass() throws Exception {
+        if (server != null) {
+            server.stop();
+        }
+    }
+
+    @Test
+    public void testCheckExist() {
+        ZookeeperConfiguration zookeeperConfiguration = new 
ZookeeperConfiguration();
+        boolean exist = zookeeperConfiguration.checkExists("/");
+        Assertions.assertTrue(exist);
+    }
+
+    @Test
+    public void testPutConfig() {
+        ZookeeperConfiguration zookeeperConfiguration = new 
ZookeeperConfiguration();
+        CountDownLatch countDownLatch = new CountDownLatch(1);
+        final boolean[] listened = {false};
+        String dataId = "putMockDataId";
+        ConfigurationChangeListener changeListener = new 
ConfigurationChangeListener() {
+            @Override
+            public void onChangeEvent(ConfigurationChangeEvent event) {
+                LOGGER.info("onChangeEvent:{}", event);
+                if (event.getChangeType() == ConfigurationChangeType.MODIFY) {
+                    Assertions.assertEquals("value2", event.getNewValue());
+                    listened[0] = true;
+                    countDownLatch.countDown();
+                }
+            }
+        };
+        
zookeeperConfiguration.createPersistent(zookeeperConfiguration.buildPath(dataId),
 "value");
+        zookeeperConfiguration.addConfigListener(dataId, changeListener);
+        zookeeperConfiguration.putConfig(dataId, "value2");
+        try {
+            countDownLatch.await(10000, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+
+        Assertions.assertTrue(listened[0]);
+
+        zookeeperConfiguration.removeConfig(dataId);
+
+        zookeeperConfiguration.removeConfigListener(dataId, changeListener);
+    }
+
+    @Test
+    public void testRemoveConfig() {
+        ZookeeperConfiguration zookeeperConfiguration = new 
ZookeeperConfiguration();
+        CountDownLatch countDownLatch = new CountDownLatch(1);
+        final boolean[] listened = {false};
+        String dataId = "removeMockDataId";
+        
zookeeperConfiguration.createPersistent(zookeeperConfiguration.buildPath(dataId),
 "value");
+        ConfigurationChangeListener changeListener = new 
ConfigurationChangeListener() {
+            @Override
+            public void onChangeEvent(ConfigurationChangeEvent event) {
+                LOGGER.info("onChangeEvent:{}", event);
+                if (event.getChangeType() == ConfigurationChangeType.DELETE) {
+                    Assertions.assertNull(event.getNewValue());
+                    listened[0] = true;
+                    countDownLatch.countDown();
+                }
+            }
+        };
+
+        zookeeperConfiguration.addConfigListener(dataId, changeListener);
+        zookeeperConfiguration.putConfig(dataId, "value2");
+        boolean remove = zookeeperConfiguration.removeConfig(dataId);
+        Assertions.assertTrue(remove);
+        try {
+            countDownLatch.await(10000, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+        Assertions.assertTrue(listened[0]);
+    }
+
+}
diff --git a/dependencies/pom.xml b/dependencies/pom.xml
index d00ac503b7..bf5a1a2084 100644
--- a/dependencies/pom.xml
+++ b/dependencies/pom.xml
@@ -50,7 +50,7 @@
         <aopalliance.version>1.0</aopalliance.version>
         <zkclient.version>0.11</zkclient.version>
         <apache-zookeeper.version>3.7.2</apache-zookeeper.version>
-        <curator-test.version>5.1.0</curator-test.version>
+        <curator.version>5.1.0</curator.version>
         <spring-context-support.version>1.0.2</spring-context-support.version>
         <apollo-client.version>2.0.1</apollo-client.version>
         <eureka-clients.version>1.10.18</eureka-clients.version>
@@ -354,10 +354,20 @@
                     </exclusion>
                 </exclusions>
             </dependency>
+            <dependency>
+                <groupId>org.apache.curator</groupId>
+                <artifactId>curator-recipes</artifactId>
+                <version>${curator.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.curator</groupId>
+                <artifactId>curator-framework</artifactId>
+                <version>${curator.version}</version>
+            </dependency>
             <dependency>
                 <groupId>org.apache.curator</groupId>
                 <artifactId>curator-test</artifactId>
-                <version>${curator-test.version}</version>
+                <version>${curator.version}</version>
             </dependency>
             <dependency>
                 <groupId>com.alipay.sofa</groupId>


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@seata.apache.org
For additional commands, e-mail: notifications-h...@seata.apache.org


Reply via email to