This is an automated email from the ASF dual-hosted git repository.

hefengen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shenyu.git


The following commit(s) were added to refs/heads/master by this push:
     new 1cd6444f5 feature: sync consul (#4802)
1cd6444f5 is described below

commit 1cd6444f5ab9efe1b803d56055cb61582a8af42d
Author: zhengpeng <[email protected]>
AuthorDate: Fri Jul 7 14:19:13 2023 +0800

    feature: sync consul (#4802)
    
    Co-authored-by: xiaoyu <[email protected]>
    Co-authored-by: moremind <[email protected]>
---
 .../data/consul/ConsulSyncDataConfiguration.java   | 11 ++++--
 .../sync/data/consul/ConsulSyncDataService.java    | 10 ++++--
 .../data/consul/handler/ConsulCacheHandler.java    | 39 +++++++++++++++++++++-
 .../data/consul/ConsulSyncDataServiceTest.java     |  2 +-
 .../consul/handler/ConsulCacheHandlerTest.java     | 12 +++----
 5 files changed, 62 insertions(+), 12 deletions(-)

diff --git 
a/shenyu-spring-boot-starter/shenyu-spring-boot-starter-sync-data-center/shenyu-spring-boot-starter-sync-data-consul/src/main/java/org/apache/shenyu/springboot/sync/data/consul/ConsulSyncDataConfiguration.java
 
b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-sync-data-center/shenyu-spring-boot-starter-sync-data-consul/src/main/java/org/apache/shenyu/springboot/sync/data/consul/ConsulSyncDataConfiguration.java
index 7531e522f..290ff6d3a 100644
--- 
a/shenyu-spring-boot-starter/shenyu-spring-boot-starter-sync-data-center/shenyu-spring-boot-starter-sync-data-consul/src/main/java/org/apache/shenyu/springboot/sync/data/consul/ConsulSyncDataConfiguration.java
+++ 
b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-sync-data-center/shenyu-spring-boot-starter-sync-data-consul/src/main/java/org/apache/shenyu/springboot/sync/data/consul/ConsulSyncDataConfiguration.java
@@ -19,8 +19,10 @@ package org.apache.shenyu.springboot.sync.data.consul;
 
 import com.ecwid.consul.v1.ConsulClient;
 import org.apache.shenyu.sync.data.api.AuthDataSubscriber;
+import org.apache.shenyu.sync.data.api.DiscoveryUpstreamDataSubscriber;
 import org.apache.shenyu.sync.data.api.MetaDataSubscriber;
 import org.apache.shenyu.sync.data.api.PluginDataSubscriber;
+import org.apache.shenyu.sync.data.api.ProxySelectorDataSubscriber;
 import org.apache.shenyu.sync.data.api.SyncDataService;
 import org.apache.shenyu.sync.data.consul.ConsulSyncDataService;
 import org.apache.shenyu.sync.data.consul.config.ConsulConfig;
@@ -54,6 +56,8 @@ public class ConsulSyncDataConfiguration {
      * @param pluginSubscriber the plugin subscriber
      * @param metaSubscribers   the meta subscribers
      * @param authSubscribers   the auth subscribers
+     * @param proxySelectorSubscribers   the proxySelectorSubscribers
+     * @param discoveryUpstreamSubscribers   the discoveryUpstreamSubscribers
      * @return the sync data service
      */
     @Bean
@@ -61,10 +65,13 @@ public class ConsulSyncDataConfiguration {
                                            final ObjectProvider<ConsulConfig> 
consulConfig,
                                            final 
ObjectProvider<PluginDataSubscriber> pluginSubscriber,
                                            final 
ObjectProvider<List<MetaDataSubscriber>> metaSubscribers,
-                                           final 
ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {
+                                           final 
ObjectProvider<List<AuthDataSubscriber>> authSubscribers,
+                                           final 
ObjectProvider<List<ProxySelectorDataSubscriber>> proxySelectorSubscribers,
+                                           final 
ObjectProvider<List<DiscoveryUpstreamDataSubscriber>> 
discoveryUpstreamSubscribers) {
         LOGGER.info("you use consul sync shenyu data.......");
         return new ConsulSyncDataService(consulClient.getIfAvailable(), 
consulConfig.getIfAvailable(), pluginSubscriber.getIfAvailable(),
-                metaSubscribers.getIfAvailable(Collections::emptyList), 
authSubscribers.getIfAvailable(Collections::emptyList));
+                metaSubscribers.getIfAvailable(Collections::emptyList), 
authSubscribers.getIfAvailable(Collections::emptyList),
+                
proxySelectorSubscribers.getIfAvailable(Collections::emptyList), 
discoveryUpstreamSubscribers.getIfAvailable(Collections::emptyList));
     }
 
     /**
diff --git 
a/shenyu-sync-data-center/shenyu-sync-data-consul/src/main/java/org/apache/shenyu/sync/data/consul/ConsulSyncDataService.java
 
b/shenyu-sync-data-center/shenyu-sync-data-consul/src/main/java/org/apache/shenyu/sync/data/consul/ConsulSyncDataService.java
index be390ac82..57b1e547c 100644
--- 
a/shenyu-sync-data-center/shenyu-sync-data-consul/src/main/java/org/apache/shenyu/sync/data/consul/ConsulSyncDataService.java
+++ 
b/shenyu-sync-data-center/shenyu-sync-data-consul/src/main/java/org/apache/shenyu/sync/data/consul/ConsulSyncDataService.java
@@ -24,8 +24,10 @@ import com.ecwid.consul.v1.kv.model.GetValue;
 import org.apache.shenyu.common.concurrent.ShenyuThreadFactory;
 import org.apache.shenyu.common.constant.ConsulConstants;
 import org.apache.shenyu.sync.data.api.AuthDataSubscriber;
+import org.apache.shenyu.sync.data.api.DiscoveryUpstreamDataSubscriber;
 import org.apache.shenyu.sync.data.api.MetaDataSubscriber;
 import org.apache.shenyu.sync.data.api.PluginDataSubscriber;
+import org.apache.shenyu.sync.data.api.ProxySelectorDataSubscriber;
 import org.apache.shenyu.sync.data.api.SyncDataService;
 import org.apache.shenyu.sync.data.consul.config.ConsulConfig;
 import org.apache.shenyu.sync.data.consul.handler.ConsulCacheHandler;
@@ -77,8 +79,10 @@ public class ConsulSyncDataService extends 
ConsulCacheHandler implements SyncDat
                                  final ConsulConfig consulConfig,
                                  final PluginDataSubscriber 
pluginDataSubscriber,
                                  final List<MetaDataSubscriber> 
metaDataSubscribers,
-                                 final List<AuthDataSubscriber> 
authDataSubscribers) {
-        super(pluginDataSubscriber, metaDataSubscribers, authDataSubscribers);
+                                 final List<AuthDataSubscriber> 
authDataSubscribers,
+                                 final List<ProxySelectorDataSubscriber> 
proxySelectorDataSubscribers,
+                                 final List<DiscoveryUpstreamDataSubscriber> 
discoveryUpstreamDataSubscribers) {
+        super(pluginDataSubscriber, metaDataSubscribers, authDataSubscribers, 
proxySelectorDataSubscribers, discoveryUpstreamDataSubscribers);
         this.consulClient = consulClient;
         this.consulConfig = consulConfig;
         this.executor = new ScheduledThreadPoolExecutor(1,
@@ -97,6 +101,8 @@ public class ConsulSyncDataService extends 
ConsulCacheHandler implements SyncDat
         groupMap.put(ConsulConstants.RULE_DATA, this::updateRuleMap);
         groupMap.put(ConsulConstants.META_DATA, this::updateMetaDataMap);
         groupMap.put(ConsulConstants.AUTH_DATA, this::updateAuthMap);
+        groupMap.put(ConsulConstants.PROXY_SELECTOR_DATA_ID, 
this::updateSelectorDataMap);
+        groupMap.put(ConsulConstants.DISCOVERY_UPSTREAM, 
this::updateDiscoveryUpstreamMap);
     }
 
     private void watchConfigKeyValues() {
diff --git 
a/shenyu-sync-data-center/shenyu-sync-data-consul/src/main/java/org/apache/shenyu/sync/data/consul/handler/ConsulCacheHandler.java
 
b/shenyu-sync-data-center/shenyu-sync-data-consul/src/main/java/org/apache/shenyu/sync/data/consul/handler/ConsulCacheHandler.java
index d593448fe..296da3ff0 100644
--- 
a/shenyu-sync-data-center/shenyu-sync-data-consul/src/main/java/org/apache/shenyu/sync/data/consul/handler/ConsulCacheHandler.java
+++ 
b/shenyu-sync-data-center/shenyu-sync-data-consul/src/main/java/org/apache/shenyu/sync/data/consul/handler/ConsulCacheHandler.java
@@ -19,14 +19,18 @@ package org.apache.shenyu.sync.data.consul.handler;
 
 import com.google.gson.JsonParseException;
 import org.apache.shenyu.common.dto.AppAuthData;
+import org.apache.shenyu.common.dto.DiscoverySyncData;
 import org.apache.shenyu.common.dto.MetaData;
 import org.apache.shenyu.common.dto.PluginData;
+import org.apache.shenyu.common.dto.ProxySelectorData;
 import org.apache.shenyu.common.dto.RuleData;
 import org.apache.shenyu.common.dto.SelectorData;
 import org.apache.shenyu.common.utils.GsonUtils;
 import org.apache.shenyu.sync.data.api.AuthDataSubscriber;
+import org.apache.shenyu.sync.data.api.DiscoveryUpstreamDataSubscriber;
 import org.apache.shenyu.sync.data.api.MetaDataSubscriber;
 import org.apache.shenyu.sync.data.api.PluginDataSubscriber;
+import org.apache.shenyu.sync.data.api.ProxySelectorDataSubscriber;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,12 +55,21 @@ public class ConsulCacheHandler {
 
     private final List<AuthDataSubscriber> authDataSubscribers;
 
+    private final List<ProxySelectorDataSubscriber> 
proxySelectorDataSubscribers;
+
+    private final List<DiscoveryUpstreamDataSubscriber> 
discoveryUpstreamDataSubscribers;
+
     public ConsulCacheHandler(final PluginDataSubscriber pluginDataSubscriber,
                               final List<MetaDataSubscriber> 
metaDataSubscribers,
-                              final List<AuthDataSubscriber> 
authDataSubscribers) {
+                              final List<AuthDataSubscriber> 
authDataSubscribers,
+                              final List<ProxySelectorDataSubscriber> 
proxySelectorDataSubscribers,
+                              final List<DiscoveryUpstreamDataSubscriber> 
discoveryUpstreamDataSubscribers
+    ) {
         this.pluginDataSubscriber = pluginDataSubscriber;
         this.metaDataSubscribers = metaDataSubscribers;
         this.authDataSubscribers = authDataSubscribers;
+        this.proxySelectorDataSubscribers = proxySelectorDataSubscribers;
+        this.discoveryUpstreamDataSubscribers = 
discoveryUpstreamDataSubscribers;
     }
 
     protected void updatePluginData(final String configInfo) {
@@ -121,6 +134,30 @@ public class ConsulCacheHandler {
         }
     }
 
+    protected void updateSelectorDataMap(final String configInfo) {
+        try {
+            List<ProxySelectorData> proxySelectorDataList = new 
ArrayList<>(GsonUtils.getInstance().toObjectMap(configInfo, 
ProxySelectorData.class).values());
+            proxySelectorDataList.forEach(proxySelectorData -> 
proxySelectorDataSubscribers.forEach(subscriber -> {
+                subscriber.onSubscribe(proxySelectorData);
+                subscriber.unSubscribe(proxySelectorData);
+            }));
+        } catch (JsonParseException e) {
+            LOG.error("sync proxy selector data have error:", e);
+        }
+    }
+
+    protected void updateDiscoveryUpstreamMap(final String configInfo) {
+        try {
+            List<DiscoverySyncData> discoverySyncDataList = new 
ArrayList<>(GsonUtils.getInstance().toObjectMap(configInfo, 
DiscoverySyncData.class).values());
+            discoverySyncDataList.forEach(discoverySyncData -> 
discoveryUpstreamDataSubscribers.forEach(subscriber -> {
+                subscriber.onSubscribe(discoverySyncData);
+                subscriber.unSubscribe(discoverySyncData);
+            }));
+        } catch (JsonParseException e) {
+            LOG.error("sync discovery data have error:", e);
+        }
+    }
+
     protected interface OnChange {
 
         void change(String changeData);
diff --git 
a/shenyu-sync-data-center/shenyu-sync-data-consul/src/test/java/org/apache/shenyu/sync/data/consul/ConsulSyncDataServiceTest.java
 
b/shenyu-sync-data-center/shenyu-sync-data-consul/src/test/java/org/apache/shenyu/sync/data/consul/ConsulSyncDataServiceTest.java
index 37e87a163..89c76c716 100644
--- 
a/shenyu-sync-data-center/shenyu-sync-data-consul/src/test/java/org/apache/shenyu/sync/data/consul/ConsulSyncDataServiceTest.java
+++ 
b/shenyu-sync-data-center/shenyu-sync-data-consul/src/test/java/org/apache/shenyu/sync/data/consul/ConsulSyncDataServiceTest.java
@@ -89,7 +89,7 @@ public final class ConsulSyncDataServiceTest {
         when(response.getValue()).thenReturn(list);
         when(response.getConsulIndex()).thenReturn(INDEX);
         consulSyncDataService = new ConsulSyncDataService(consulClient, 
consulConfig, null,
-                Collections.emptyList(), Collections.emptyList());
+                Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList(), Collections.emptyList());
     }
 
     @Test
diff --git 
a/shenyu-sync-data-center/shenyu-sync-data-consul/src/test/java/org/apache/shenyu/sync/data/consul/handler/ConsulCacheHandlerTest.java
 
b/shenyu-sync-data-center/shenyu-sync-data-consul/src/test/java/org/apache/shenyu/sync/data/consul/handler/ConsulCacheHandlerTest.java
index 024e142c2..c1b5e4030 100644
--- 
a/shenyu-sync-data-center/shenyu-sync-data-consul/src/test/java/org/apache/shenyu/sync/data/consul/handler/ConsulCacheHandlerTest.java
+++ 
b/shenyu-sync-data-center/shenyu-sync-data-consul/src/test/java/org/apache/shenyu/sync/data/consul/handler/ConsulCacheHandlerTest.java
@@ -66,7 +66,7 @@ public final class ConsulCacheHandlerTest {
             public void unSubscribe(final PluginData pluginData) {
                 unsubscribeList.add(pluginData);
             }
-        }, Collections.emptyList(), Collections.emptyList());
+        }, Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList(), Collections.emptyList());
         consulCacheHandler.updatePluginData(pluginData);
         assertEquals(2, onSubscribeList.size());
         assertEquals(2, unsubscribeList.size());
@@ -104,7 +104,7 @@ public final class ConsulCacheHandlerTest {
             public void unSelectorSubscribe(final SelectorData selectorData) {
                 unsubscribeList.add(selectorData);
             }
-        }, Collections.emptyList(), Collections.emptyList());
+        }, Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList(), Collections.emptyList());
         consulCacheHandler.updateSelectorMap(selectorDataParam);
         assertEquals(2, subscribeList.size());
         assertEquals(2, unsubscribeList.size());
@@ -137,7 +137,7 @@ public final class ConsulCacheHandlerTest {
             public void unRuleSubscribe(final RuleData ruleData) {
                 unsubscribeList.add(ruleData);
             }
-        }, Collections.emptyList(), Collections.emptyList());
+        }, Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList(), Collections.emptyList());
         consulCacheHandler.updateRuleMap(ruleDataParam);
         assertEquals(2, subscribeList.size());
         assertEquals(2, unsubscribeList.size());
@@ -166,7 +166,7 @@ public final class ConsulCacheHandlerTest {
             }
         };
         ConsulCacheHandler consulCacheHandler = new ConsulCacheHandler(null, 
Lists.newArrayList(metaDataSubscriber),
-                Collections.emptyList());
+                Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList());
         consulCacheHandler.updateMetaDataMap(metaDataParam);
         assertEquals(2, subscribeList.size());
         assertEquals(2, unsubscribeList.size());
@@ -199,7 +199,7 @@ public final class ConsulCacheHandlerTest {
             }
         };
         ConsulCacheHandler consulCacheHandler = new ConsulCacheHandler(null,
-                Collections.emptyList(), 
Lists.newArrayList(authDataSubscriber));
+                Collections.emptyList(), 
Lists.newArrayList(authDataSubscriber), Collections.emptyList(), 
Collections.emptyList());
 
         consulCacheHandler.updateAuthMap(appAuthDataParam);
         assertEquals(2, subscribeList.size());
@@ -209,7 +209,7 @@ public final class ConsulCacheHandlerTest {
     @Test
     public void testError() {
         ConsulCacheHandler consulCacheHandler = new ConsulCacheHandler(null,
-                Collections.emptyList(), Collections.emptyList());
+                Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList(), Collections.emptyList());
         assertDoesNotThrow(() -> 
consulCacheHandler.updateAuthMap("errorJson"));
         assertDoesNotThrow(() -> 
consulCacheHandler.updateMetaDataMap("errorJson"));
         assertDoesNotThrow(() -> 
consulCacheHandler.updateRuleMap("errorJson"));

Reply via email to