This is an automated email from the ASF dual-hosted git repository.

xiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shenyu.git


The following commit(s) were added to refs/heads/master by this push:
     new 04f650db6 [ISSUE #4088] fix sync data by etcd 
io.vertx.core.VertxException (#4141)
04f650db6 is described below

commit 04f650db6c070979ba98f395c641dad4ca4a4079
Author: Misaya295 <[email protected]>
AuthorDate: Mon Oct 31 14:51:35 2022 +0800

    [ISSUE #4088] fix sync data by etcd io.vertx.core.VertxException (#4141)
    
    * fix #4088
    
    * fix sync data
    
    * fix TU
    
    * refactor code
    
    Co-authored-by: xiaoyu <[email protected]>
---
 .../apache/shenyu/sync/data/etcd/EtcdClient.java   | 71 +++++++++++++++++++---
 .../shenyu/sync/data/etcd/EtcdSyncDataService.java | 55 +++++++++++------
 .../sync/data/etcd/EtcdSyncDataServiceTest.java    |  1 -
 3 files changed, 99 insertions(+), 28 deletions(-)

diff --git 
a/shenyu-sync-data-center/shenyu-sync-data-etcd/src/main/java/org/apache/shenyu/sync/data/etcd/EtcdClient.java
 
b/shenyu-sync-data-center/shenyu-sync-data-etcd/src/main/java/org/apache/shenyu/sync/data/etcd/EtcdClient.java
index b334b9a93..596bbad4c 100644
--- 
a/shenyu-sync-data-center/shenyu-sync-data-etcd/src/main/java/org/apache/shenyu/sync/data/etcd/EtcdClient.java
+++ 
b/shenyu-sync-data-center/shenyu-sync-data-etcd/src/main/java/org/apache/shenyu/sync/data/etcd/EtcdClient.java
@@ -25,11 +25,12 @@ import io.etcd.jetcd.options.GetOption;
 import io.etcd.jetcd.options.WatchOption;
 import io.etcd.jetcd.watch.WatchEvent;
 import org.apache.commons.collections4.CollectionUtils;
+import org.apache.shenyu.common.exception.ShenyuException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.nio.charset.StandardCharsets;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
@@ -37,6 +38,8 @@ import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
+
 /**
  * Etcd client of Bootstrap.
  */
@@ -71,7 +74,7 @@ public class EtcdClient {
     public String get(final String key) {
         List<KeyValue> keyValues = null;
         try {
-            keyValues = client.getKVClient().get(ByteSequence.from(key, 
StandardCharsets.UTF_8)).get().getKvs();
+            keyValues = client.getKVClient().get(bytesOf(key)).get().getKvs();
         } catch (InterruptedException | ExecutionException e) {
             LOG.error(e.getMessage(), e);
         }
@@ -80,7 +83,37 @@ public class EtcdClient {
             return null;
         }
 
-        return 
keyValues.iterator().next().getValue().toString(StandardCharsets.UTF_8);
+        return keyValues.iterator().next().getValue().toString(UTF_8);
+    }
+
+    /**
+     * get keys by prefix.
+     *
+     * @param prefix key prefix.
+     * @return key valuesMap.
+     */
+    public Map<String, String> getKeysMapByPrefix(final String prefix) {
+        GetOption getOption = GetOption.newBuilder()
+                .isPrefix(true)
+                .build();
+        try {
+            return this.client.getKVClient().get(bytesOf(prefix), getOption)
+                    .get().getKvs().stream()
+                    .collect(Collectors.toMap(e -> e.getKey().toString(UTF_8), 
e -> e.getValue().toString(UTF_8)));
+        } catch (ExecutionException | InterruptedException e) {
+            LOG.error("etcd getKeysMapByPrefix key {} error {}", prefix, e);
+            throw new ShenyuException(e);
+        }
+
+    }
+
+    /**
+     * bytesOf string.
+     * @param val val.
+     * @return bytes val.
+     */
+    public ByteSequence bytesOf(final String val) {
+        return ByteSequence.from(val, UTF_8);
     }
 
     /**
@@ -93,7 +126,7 @@ public class EtcdClient {
      * @throws InterruptedException the exception
      */
     public List<String> getChildrenKeys(final String prefix, final String 
separator) throws ExecutionException, InterruptedException {
-        ByteSequence prefixByteSequence = ByteSequence.from(prefix, 
StandardCharsets.UTF_8);
+        ByteSequence prefixByteSequence = bytesOf(prefix);
         GetOption getOption = GetOption.newBuilder()
                 .withPrefix(prefixByteSequence)
                 .withSortField(GetOption.SortTarget.KEY)
@@ -106,7 +139,25 @@ public class EtcdClient {
                 .getKvs();
 
         return keyValues.stream()
-                .map(e -> getSubNodeKeyName(prefix, 
e.getKey().toString(StandardCharsets.UTF_8), separator))
+                .map(e -> getSubNodeKeyName(prefix, 
e.getKey().toString(UTF_8), separator))
+                .distinct()
+                .filter(e -> Objects.nonNull(e))
+                .collect(Collectors.toList());
+    }
+
+    /**
+     * get keyPrefix map.
+     *
+     * @param prefix    key prefix.
+     * @param separator separator char
+     * @param map prefix map
+     * @return sub map
+     */
+    public List<String> getChildrenKeysByMap(final String prefix, final String 
separator, final Map<String, String> map) {
+
+        return map.entrySet().stream()
+                .filter(e -> e.getKey().contains(prefix))
+                .map(e -> getSubNodeKeyName(prefix, e.getKey(), separator))
                 .distinct()
                 .filter(e -> Objects.nonNull(e))
                 .collect(Collectors.toList());
@@ -131,7 +182,7 @@ public class EtcdClient {
                                 final BiConsumer<String, String> updateHandler,
                                 final Consumer<String> deleteHandler) {
         Watch.Listener listener = watch(updateHandler, deleteHandler);
-        Watch.Watcher watch = 
client.getWatchClient().watch(ByteSequence.from(key, StandardCharsets.UTF_8), 
listener);
+        Watch.Watcher watch = 
client.getWatchClient().watch(ByteSequence.from(key, UTF_8), listener);
         watchCache.put(key, watch);
     }
 
@@ -147,9 +198,9 @@ public class EtcdClient {
                                  final Consumer<String> deleteHandler) {
         Watch.Listener listener = watch(updateHandler, deleteHandler);
         WatchOption option = WatchOption.newBuilder()
-                .withPrefix(ByteSequence.from(key, StandardCharsets.UTF_8))
+                .withPrefix(ByteSequence.from(key, UTF_8))
                 .build();
-        Watch.Watcher watch = 
client.getWatchClient().watch(ByteSequence.from(key, StandardCharsets.UTF_8), 
option, listener);
+        Watch.Watcher watch = 
client.getWatchClient().watch(ByteSequence.from(key, UTF_8), option, listener);
         watchCache.put(key, watch);
     }
 
@@ -157,8 +208,8 @@ public class EtcdClient {
                                  final Consumer<String> deleteHandler) {
         return Watch.listener(response -> {
             for (WatchEvent event : response.getEvents()) {
-                String path = 
event.getKeyValue().getKey().toString(StandardCharsets.UTF_8);
-                String value = 
event.getKeyValue().getValue().toString(StandardCharsets.UTF_8);
+                String path = event.getKeyValue().getKey().toString(UTF_8);
+                String value = event.getKeyValue().getValue().toString(UTF_8);
                 switch (event.getEventType()) {
                     case PUT:
                         updateHandler.accept(path, value);
diff --git 
a/shenyu-sync-data-center/shenyu-sync-data-etcd/src/main/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataService.java
 
b/shenyu-sync-data-center/shenyu-sync-data-etcd/src/main/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataService.java
index cfb6c5409..889e2cbfe 100644
--- 
a/shenyu-sync-data-center/shenyu-sync-data-etcd/src/main/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataService.java
+++ 
b/shenyu-sync-data-center/shenyu-sync-data-etcd/src/main/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataService.java
@@ -38,10 +38,13 @@ import org.slf4j.LoggerFactory;
 import java.io.UnsupportedEncodingException;
 import java.net.URLDecoder;
 import java.nio.charset.StandardCharsets;
-import java.util.Collections;
+
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Collections;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 
 /**
@@ -54,6 +57,8 @@ public class EtcdSyncDataService implements SyncDataService {
      */
     private static final Logger LOG = 
LoggerFactory.getLogger(EtcdSyncDataService.class);
 
+    private static final String PRE_FIX = "/shenyu";
+
     private final EtcdClient etcdClient;
 
     private final PluginDataSubscriber pluginDataSubscriber;
@@ -62,6 +67,8 @@ public class EtcdSyncDataService implements SyncDataService {
 
     private final List<AuthDataSubscriber> authDataSubscribers;
 
+    private Map<String, String> keysMap = new ConcurrentHashMap<>();
+
     /**
      * Instantiates a new Zookeeper cache manager.
      *
@@ -78,18 +85,28 @@ public class EtcdSyncDataService implements SyncDataService 
{
         this.pluginDataSubscriber = pluginDataSubscriber;
         this.metaDataSubscribers = metaDataSubscribers;
         this.authDataSubscribers = authDataSubscribers;
+        watchAllKeys();
         watcherData();
         watchAppAuth();
         watchMetaData();
     }
 
+    private void watchAllKeys() {
+        keysMap = etcdClient.getKeysMapByPrefix(PRE_FIX);
+        etcdClient.watchDataChange(PRE_FIX, (updateKey, updateValue) -> {
+            keysMap.put(updateKey, updateValue);
+        }, deleteKey -> {
+                keysMap.remove(deleteKey);
+            });
+
+    }
+
     private void watcherData() {
         final String pluginParent = DefaultPathConstants.PLUGIN_PARENT;
-        List<String> pluginChildren = etcdClientGetChildren(pluginParent);
+        List<String> pluginChildren = etcdClientGetChildrenByMap(pluginParent, 
keysMap);
         for (String pluginName : pluginChildren) {
             watcherAll(pluginName);
         }
-
         etcdClient.watchChildChange(pluginParent, (updateNode, updateValue) -> 
{
             if (!updateNode.isEmpty()) {
                 watcherAll(updateNode);
@@ -105,17 +122,17 @@ public class EtcdSyncDataService implements 
SyncDataService {
 
     private void watcherPlugin(final String pluginName) {
         String pluginPath = DefaultPathConstants.buildPluginPath(pluginName);
-        cachePluginData(etcdClient.get(pluginPath));
+        cachePluginData(keysMap.get(pluginPath));
         subscribePluginDataChanges(pluginPath, pluginName);
     }
 
     private void watcherSelector(final String pluginName) {
         String selectorParentPath = 
DefaultPathConstants.buildSelectorParentPath(pluginName);
-        List<String> childrenList = etcdClientGetChildren(selectorParentPath);
+        List<String> childrenList = 
etcdClientGetChildrenByMap(selectorParentPath, keysMap);
         if (CollectionUtils.isNotEmpty(childrenList)) {
             childrenList.forEach(children -> {
                 String realPath = buildRealPath(selectorParentPath, children);
-                cacheSelectorData(etcdClient.get(realPath));
+                cacheSelectorData(keysMap.get(realPath));
                 subscribeSelectorDataChanges(realPath);
             });
         }
@@ -124,11 +141,11 @@ public class EtcdSyncDataService implements 
SyncDataService {
 
     private void watcherRule(final String pluginName) {
         String ruleParent = 
DefaultPathConstants.buildRuleParentPath(pluginName);
-        List<String> childrenList = etcdClientGetChildren(ruleParent);
+        List<String> childrenList = etcdClientGetChildrenByMap(ruleParent, 
keysMap);
         if (CollectionUtils.isNotEmpty(childrenList)) {
             childrenList.forEach(children -> {
                 String realPath = buildRealPath(ruleParent, children);
-                cacheRuleData(etcdClient.get(realPath));
+                cacheRuleData(keysMap.get(realPath));
                 subscribeRuleDataChanges(realPath);
             });
         }
@@ -137,11 +154,11 @@ public class EtcdSyncDataService implements 
SyncDataService {
 
     private void watchAppAuth() {
         final String appAuthParent = DefaultPathConstants.APP_AUTH_PARENT;
-        List<String> childrenList = etcdClientGetChildren(appAuthParent);
+        List<String> childrenList = etcdClientGetChildrenByMap(appAuthParent, 
keysMap);
         if (CollectionUtils.isNotEmpty(childrenList)) {
             childrenList.forEach(children -> {
                 String realPath = buildRealPath(appAuthParent, children);
-                cacheAuthData(etcdClient.get(realPath));
+                cacheAuthData(keysMap.get(realPath));
                 subscribeAppAuthDataChanges(realPath);
             });
         }
@@ -150,11 +167,11 @@ public class EtcdSyncDataService implements 
SyncDataService {
 
     private void watchMetaData() {
         final String metaDataPath = DefaultPathConstants.META_DATA;
-        List<String> childrenList = etcdClientGetChildren(metaDataPath);
+        List<String> childrenList = etcdClientGetChildrenByMap(metaDataPath, 
keysMap);
         if (CollectionUtils.isNotEmpty(childrenList)) {
             childrenList.forEach(children -> {
                 String realPath = buildRealPath(metaDataPath, children);
-                cacheMetaData(etcdClient.get(realPath));
+                cacheMetaData(keysMap.get(realPath));
                 subscribeMetaDataChanges(realPath);
             });
         }
@@ -165,25 +182,25 @@ public class EtcdSyncDataService implements 
SyncDataService {
         switch (groupKey) {
             case SELECTOR:
                 etcdClient.watchChildChange(groupParentPath, (updatePath, 
updateValue) -> {
-                    cacheSelectorData(etcdClient.get(updatePath));
+                    cacheSelectorData(keysMap.get(updatePath));
                     subscribeSelectorDataChanges(updatePath);
                 }, null);
                 break;
             case RULE:
                 etcdClient.watchChildChange(groupParentPath, (updatePath, 
updateValue) -> {
-                    cacheRuleData(etcdClient.get(updatePath));
+                    cacheRuleData(keysMap.get(updatePath));
                     subscribeRuleDataChanges(updatePath);
                 }, null);
                 break;
             case APP_AUTH:
                 etcdClient.watchChildChange(groupParentPath, (updatePath, 
updateValue) -> {
-                    cacheAuthData(etcdClient.get(updatePath));
+                    cacheAuthData(keysMap.get(updatePath));
                     subscribeAppAuthDataChanges(updatePath);
                 }, null);
                 break;
             case META_DATA:
                 etcdClient.watchChildChange(groupParentPath, (updatePath, 
updateValue) -> {
-                    cacheMetaData(etcdClient.get(updatePath));
+                    cacheMetaData(keysMap.get(updatePath));
                     subscribeMetaDataChanges(updatePath);
                 }, null);
                 break;
@@ -195,7 +212,7 @@ public class EtcdSyncDataService implements SyncDataService 
{
     private void subscribePluginDataChanges(final String pluginPath, final 
String pluginName) {
         etcdClient.watchDataChange(pluginPath, (updatePath, updateValue) -> {
             final String dataPath = buildRealPath(pluginPath, updatePath);
-            final String dataStr = etcdClient.get(dataPath);
+            final String dataStr = keysMap.get(dataPath);
             final PluginData data = GsonUtils.getInstance().fromJson(dataStr, 
PluginData.class);
             Optional.ofNullable(data)
                     .ifPresent(d -> 
Optional.ofNullable(pluginDataSubscriber).ifPresent(e -> e.onSubscribe(d)));
@@ -323,6 +340,10 @@ public class EtcdSyncDataService implements 
SyncDataService {
         return Collections.emptyList();
     }
 
+    private List<String> etcdClientGetChildrenByMap(final String parent, final 
Map<String, String> map) {
+        return etcdClient.getChildrenKeysByMap(parent, "/", map);
+    }
+
     @Override
     public void close() {
         if (Objects.nonNull(etcdClient)) {
diff --git 
a/shenyu-sync-data-center/shenyu-sync-data-etcd/src/test/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataServiceTest.java
 
b/shenyu-sync-data-center/shenyu-sync-data-etcd/src/test/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataServiceTest.java
index 3d5824cf4..e1cd37c0a 100644
--- 
a/shenyu-sync-data-center/shenyu-sync-data-etcd/src/test/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataServiceTest.java
+++ 
b/shenyu-sync-data-center/shenyu-sync-data-etcd/src/test/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataServiceTest.java
@@ -101,7 +101,6 @@ public class EtcdSyncDataServiceTest {
          *  mock get method.
          */
         when(client.getKVClient()).thenReturn(kv);
-        when(kv.get(any())).thenReturn(future);
         try {
             when(future.get()).thenReturn(getResponse);
         } catch (Exception e) {

Reply via email to