This is an automated email from the ASF dual-hosted git repository.

xiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shenyu.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a290d275 etcd sync discoveryData (#4800)
8a290d275 is described below

commit 8a290d2754e9956e540d5f280e5eda4037fefc03
Author: Misaya295 <[email protected]>
AuthorDate: Wed Jul 5 16:56:04 2023 +0800

    etcd sync discoveryData (#4800)
---
 .../admin/listener/etcd/EtcdDataChangedInit.java   |  6 +-
 .../sync/data/etcd/EtcdSyncDataConfiguration.java  | 16 ++++-
 .../shenyu/sync/data/etcd/EtcdSyncDataService.java | 84 +++++++++++++++++++---
 .../sync/data/etcd/EtcdSyncDataServiceTest.java    | 12 ++--
 4 files changed, 100 insertions(+), 18 deletions(-)

diff --git 
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/etcd/EtcdDataChangedInit.java
 
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/etcd/EtcdDataChangedInit.java
index 7110cd38d..81cde311b 100644
--- 
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/etcd/EtcdDataChangedInit.java
+++ 
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/etcd/EtcdDataChangedInit.java
@@ -42,6 +42,10 @@ public class EtcdDataChangedInit extends 
AbstractDataChangedInit {
 
     @Override
     protected boolean notExist() {
-        return Stream.of(DefaultPathConstants.PLUGIN_PARENT, 
DefaultPathConstants.APP_AUTH_PARENT, 
DefaultPathConstants.META_DATA).noneMatch(etcdClient::exists);
+        return Stream.of(DefaultPathConstants.PLUGIN_PARENT,
+                DefaultPathConstants.APP_AUTH_PARENT,
+                DefaultPathConstants.META_DATA,
+                DefaultPathConstants.PROXY_SELECTOR,
+                
DefaultPathConstants.DISCOVERY_UPSTREAM).noneMatch(etcdClient::exists);
     }
 }
diff --git 
a/shenyu-spring-boot-starter/shenyu-spring-boot-starter-sync-data-center/shenyu-spring-boot-starter-sync-data-etcd/src/main/java/org/apache/shenyu/springboot/sync/data/etcd/EtcdSyncDataConfiguration.java
 
b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-sync-data-center/shenyu-spring-boot-starter-sync-data-etcd/src/main/java/org/apache/shenyu/springboot/sync/data/etcd/EtcdSyncDataConfiguration.java
index 5c1a64b79..2698cc78c 100644
--- 
a/shenyu-spring-boot-starter/shenyu-spring-boot-starter-sync-data-center/shenyu-spring-boot-starter-sync-data-etcd/src/main/java/org/apache/shenyu/springboot/sync/data/etcd/EtcdSyncDataConfiguration.java
+++ 
b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-sync-data-center/shenyu-spring-boot-starter-sync-data-etcd/src/main/java/org/apache/shenyu/springboot/sync/data/etcd/EtcdSyncDataConfiguration.java
@@ -22,6 +22,8 @@ import org.apache.shenyu.sync.data.api.AuthDataSubscriber;
 import org.apache.shenyu.sync.data.api.MetaDataSubscriber;
 import org.apache.shenyu.sync.data.api.PluginDataSubscriber;
 import org.apache.shenyu.sync.data.api.SyncDataService;
+import org.apache.shenyu.sync.data.api.DiscoveryUpstreamDataSubscriber;
+import org.apache.shenyu.sync.data.api.ProxySelectorDataSubscriber;
 import org.apache.shenyu.sync.data.etcd.EtcdClient;
 import org.apache.shenyu.sync.data.etcd.EtcdSyncDataService;
 import org.slf4j.Logger;
@@ -54,16 +56,24 @@ public class EtcdSyncDataConfiguration {
      * @param pluginSubscriber the plugin subscriber
      * @param metaSubscribers the meta subscribers
      * @param authSubscribers the auth subscribers
+     * @param proxySelectorDataSubscribers the proxy selector data subscribers
+     * @param discoveryUpstreamDataSubscribers the discovery upstream data 
subscribers
      * @return the sync data service
      */
     @Bean
     public SyncDataService syncDataService(final ObjectProvider<EtcdClient> 
etcdClients,
                                            final 
ObjectProvider<PluginDataSubscriber> pluginSubscriber,
                                            final 
ObjectProvider<List<MetaDataSubscriber>> metaSubscribers,
-                                           final 
ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {
+                                           final 
ObjectProvider<List<AuthDataSubscriber>> authSubscribers,
+                                           final 
ObjectProvider<List<ProxySelectorDataSubscriber>> proxySelectorDataSubscribers,
+                                           final 
ObjectProvider<List<DiscoveryUpstreamDataSubscriber>> 
discoveryUpstreamDataSubscribers) {
         LOGGER.info("you use etcd sync shenyu data.......");
-        return new EtcdSyncDataService(etcdClients.getIfAvailable(), 
pluginSubscriber.getIfAvailable(),
-                metaSubscribers.getIfAvailable(Collections::emptyList), 
authSubscribers.getIfAvailable(Collections::emptyList));
+        return new EtcdSyncDataService(etcdClients.getIfAvailable(),
+                pluginSubscriber.getIfAvailable(),
+                metaSubscribers.getIfAvailable(Collections::emptyList),
+                authSubscribers.getIfAvailable(Collections::emptyList),
+                
proxySelectorDataSubscribers.getIfAvailable(Collections::emptyList),
+                
discoveryUpstreamDataSubscribers.getIfAvailable(Collections::emptyList));
     }
 
     /**
diff --git 
a/shenyu-sync-data-center/shenyu-sync-data-etcd/src/main/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataService.java
 
b/shenyu-sync-data-center/shenyu-sync-data-etcd/src/main/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataService.java
index 889e2cbfe..6406834e9 100644
--- 
a/shenyu-sync-data-center/shenyu-sync-data-etcd/src/main/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataService.java
+++ 
b/shenyu-sync-data-center/shenyu-sync-data-etcd/src/main/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataService.java
@@ -25,13 +25,17 @@ import org.apache.shenyu.common.dto.AppAuthData;
 import org.apache.shenyu.common.dto.MetaData;
 import org.apache.shenyu.common.dto.PluginData;
 import org.apache.shenyu.common.dto.RuleData;
+import org.apache.shenyu.common.dto.ProxySelectorData;
 import org.apache.shenyu.common.dto.SelectorData;
+import org.apache.shenyu.common.dto.DiscoverySyncData;
 import org.apache.shenyu.common.enums.ConfigGroupEnum;
 import org.apache.shenyu.common.utils.GsonUtils;
 import org.apache.shenyu.sync.data.api.AuthDataSubscriber;
 import org.apache.shenyu.sync.data.api.MetaDataSubscriber;
 import org.apache.shenyu.sync.data.api.PluginDataSubscriber;
 import org.apache.shenyu.sync.data.api.SyncDataService;
+import org.apache.shenyu.sync.data.api.ProxySelectorDataSubscriber;
+import org.apache.shenyu.sync.data.api.DiscoveryUpstreamDataSubscriber;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,24 +71,34 @@ public class EtcdSyncDataService implements SyncDataService 
{
 
     private final List<AuthDataSubscriber> authDataSubscribers;
 
+    private final List<ProxySelectorDataSubscriber> 
proxySelectorDataSubscribers;
+
+    private final List<DiscoveryUpstreamDataSubscriber> 
discoveryUpstreamDataSubscribers;
+
     private Map<String, String> keysMap = new ConcurrentHashMap<>();
 
     /**
      * Instantiates a new Zookeeper cache manager.
      *
-     * @param etcdClient           the etcd client
-     * @param pluginDataSubscriber the plugin data subscriber
-     * @param metaDataSubscribers  the meta data subscribers
-     * @param authDataSubscribers  the auth data subscribers
+     * @param etcdClient                       the etcd client
+     * @param pluginDataSubscriber             the plugin data subscriber
+     * @param metaDataSubscribers              the meta data subscribers
+     * @param authDataSubscribers              the auth data subscribers
+     * @param proxySelectorDataSubscribers     the proxy selector data 
subscribers
+     * @param discoveryUpstreamDataSubscribers the discovery upstream data 
subscribers
      */
     public EtcdSyncDataService(final EtcdClient etcdClient,
                                final PluginDataSubscriber pluginDataSubscriber,
                                final List<MetaDataSubscriber> 
metaDataSubscribers,
-                               final List<AuthDataSubscriber> 
authDataSubscribers) {
+                               final List<AuthDataSubscriber> 
authDataSubscribers,
+                               final List<ProxySelectorDataSubscriber> 
proxySelectorDataSubscribers,
+                               final List<DiscoveryUpstreamDataSubscriber> 
discoveryUpstreamDataSubscribers) {
         this.etcdClient = etcdClient;
         this.pluginDataSubscriber = pluginDataSubscriber;
         this.metaDataSubscribers = metaDataSubscribers;
         this.authDataSubscribers = authDataSubscribers;
+        this.proxySelectorDataSubscribers = proxySelectorDataSubscribers;
+        this.discoveryUpstreamDataSubscribers = 
discoveryUpstreamDataSubscribers;
         watchAllKeys();
         watcherData();
         watchAppAuth();
@@ -95,9 +109,7 @@ public class EtcdSyncDataService implements SyncDataService {
         keysMap = etcdClient.getKeysMapByPrefix(PRE_FIX);
         etcdClient.watchDataChange(PRE_FIX, (updateKey, updateValue) -> {
             keysMap.put(updateKey, updateValue);
-        }, deleteKey -> {
-                keysMap.remove(deleteKey);
-            });
+        }, deleteKey -> keysMap.remove(deleteKey));
 
     }
 
@@ -204,6 +216,18 @@ public class EtcdSyncDataService implements 
SyncDataService {
                     subscribeMetaDataChanges(updatePath);
                 }, null);
                 break;
+            case DISCOVER_UPSTREAM:
+                etcdClient.watchChildChange(groupParentPath, (updatePath, 
updateValue) -> {
+                    cacheDiscoveryUpstreamData(keysMap.get(updatePath));
+                    subscribeDiscoveryUpstreamDataChanges(updatePath);
+                }, null);
+                break;
+            case PROXY_SELECTOR:
+                etcdClient.watchChildChange(groupParentPath, (updatePath, 
updateValue) -> {
+                    cacheProxySelectorData(keysMap.get(updatePath));
+                    subscribeProxySelectorDataChanges(updatePath);
+                }, null);
+                break;
             default:
                 throw new IllegalStateException("Unexpected groupKey: " + 
groupKey);
         }
@@ -327,6 +351,50 @@ public class EtcdSyncDataService implements 
SyncDataService {
                 .ifPresent(data -> metaDataSubscribers.forEach(e -> 
e.unSubscribe(metaData)));
     }
 
+    private void cacheDiscoveryUpstreamData(final String dataString) {
+        final DiscoverySyncData discoveryUpstream = 
GsonUtils.getInstance().fromJson(dataString, DiscoverySyncData.class);
+        Optional.ofNullable(discoveryUpstream)
+                .ifPresent(data -> discoveryUpstreamDataSubscribers.forEach(e 
-> e.onSubscribe(data)));
+    }
+
+    private void unCacheDiscoveryUpstreamData(final String dataPath) {
+        DiscoverySyncData discoverySyncData = new DiscoverySyncData();
+        final String selectorId = dataPath.substring(dataPath.lastIndexOf("/") 
+ 1);
+        final String str = 
dataPath.substring(DefaultPathConstants.DISCOVERY_UPSTREAM.length());
+        final String pluginName = str.substring(1, str.length() - 
selectorId.length() - 1);
+        discoverySyncData.setPluginName(pluginName);
+        discoverySyncData.setSelectorId(selectorId);
+        discoveryUpstreamDataSubscribers.forEach(e -> 
e.unSubscribe(discoverySyncData));
+        etcdClient.watchClose(dataPath);
+    }
+
+    private void subscribeDiscoveryUpstreamDataChanges(final String realPath) {
+        etcdClient.watchDataChange(realPath, (updatePath, updateValue) -> 
cacheDiscoveryUpstreamData(updateValue),
+                this::unCacheDiscoveryUpstreamData);
+    }
+
+    private void cacheProxySelectorData(final String dataString) {
+        final ProxySelectorData proxySelectorData = 
GsonUtils.getInstance().fromJson(dataString, ProxySelectorData.class);
+        Optional.ofNullable(proxySelectorData)
+                .ifPresent(data -> proxySelectorDataSubscribers.forEach(e -> 
e.onSubscribe(data)));
+    }
+
+    private void unCacheProxySelectorData(final String dataPath) {
+        ProxySelectorData proxySelectorData = new ProxySelectorData();
+        final String selectorId = dataPath.substring(dataPath.lastIndexOf("/") 
+ 1);
+        final String str = 
dataPath.substring(DefaultPathConstants.PROXY_SELECTOR.length());
+        final String pluginName = str.substring(1, str.length() - 
selectorId.length() - 1);
+        proxySelectorData.setPluginName(pluginName);
+        proxySelectorData.setId(selectorId);
+        proxySelectorDataSubscribers.forEach(e -> 
e.unSubscribe(proxySelectorData));
+        etcdClient.watchClose(dataPath);
+    }
+
+    private void subscribeProxySelectorDataChanges(final String realPath) {
+        etcdClient.watchDataChange(realPath, (updatePath, updateValue) -> 
cacheProxySelectorData(updateValue),
+                this::unCacheProxySelectorData);
+    }
+
     private String buildRealPath(final String parent, final String children) {
         return String.join("/", parent, children);
     }
diff --git 
a/shenyu-sync-data-center/shenyu-sync-data-etcd/src/test/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataServiceTest.java
 
b/shenyu-sync-data-center/shenyu-sync-data-etcd/src/test/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataServiceTest.java
index e1cd37c0a..8e36b6ff3 100644
--- 
a/shenyu-sync-data-center/shenyu-sync-data-etcd/src/test/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataServiceTest.java
+++ 
b/shenyu-sync-data-center/shenyu-sync-data-etcd/src/test/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataServiceTest.java
@@ -133,7 +133,7 @@ public class EtcdSyncDataServiceTest {
             public void onSubscribe(final PluginData pluginData) {
                 subscribeList.add(pluginData);
             }
-        }, Collections.emptyList(), Collections.emptyList());
+        }, Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList(), Collections.emptyList());
         assertThat(subscribeList.size(), is(1));
         assertThat(subscribeList.get(0).getName(), is("divide"));
     }
@@ -154,7 +154,7 @@ public class EtcdSyncDataServiceTest {
                         .findFirst().orElse(null);
                 subscribeList.remove(pluginDataDel);
             }
-        }, Collections.emptyList(), Collections.emptyList());
+        }, Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList(), Collections.emptyList());
         subscribeList.clear();
         final Method deletePlugin = 
EtcdSyncDataService.class.getDeclaredMethod("deletePlugin", String.class);
         final Method cachePluginData = 
EtcdSyncDataService.class.getDeclaredMethod("cachePluginData", String.class);
@@ -185,7 +185,7 @@ public class EtcdSyncDataServiceTest {
                                 .findFirst().orElse(null);
                         subscribeList.remove(metaDataDel);
                     }
-                }), Collections.emptyList());
+                }), Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList());
         subscribeList.clear();
         final Method cacheMetaData = 
EtcdSyncDataService.class.getDeclaredMethod("cacheMetaData", String.class);
         final Method deleteMetaData = 
EtcdSyncDataService.class.getDeclaredMethod("unCacheMetaData", MetaData.class);
@@ -216,7 +216,7 @@ public class EtcdSyncDataServiceTest {
                                 .findFirst().orElse(null);
                         subscribeList.remove(appAuthDataOld);
                     }
-                }));
+                }), Collections.emptyList(), Collections.emptyList());
         subscribeList.clear();
         final Method cacheAuthData = 
EtcdSyncDataService.class.getDeclaredMethod("cacheAuthData", String.class);
         final Method unCacheAuthData = 
EtcdSyncDataService.class.getDeclaredMethod("unCacheAuthData", String.class);
@@ -233,7 +233,7 @@ public class EtcdSyncDataServiceTest {
     @Test
     public void closeTest() {
         etcdSyncDataService = new EtcdSyncDataService(etcdClient, 
mock(PluginDataSubscriber.class), Collections.emptyList(),
-                Collections.emptyList());
+                Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList());
         assertDoesNotThrow(() -> etcdSyncDataService.close());
     }
 
@@ -271,7 +271,7 @@ public class EtcdSyncDataServiceTest {
         etcdSyncDataService = new EtcdSyncDataService(etcdClient,
                 mock(PluginDataSubscriber.class),
                 Collections.emptyList(),
-                Collections.emptyList());
+                Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList());
         final Field etcdClient = 
EtcdSyncDataService.class.getDeclaredField("etcdClient");
         etcdClient.setAccessible(true);
         etcdClient.set(etcdSyncDataService, mockEtcdClient);

Reply via email to