This is an automated email from the ASF dual-hosted git repository.

xiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shenyu.git


The following commit(s) were added to refs/heads/master by this push:
     new ff17ae6c6 [Discovery] HttpLongPolling Sync DiscoveryData (#4795)
ff17ae6c6 is described below

commit ff17ae6c696ad0f3c78938f2dc1ca0abcbb84b61
Author: 杨文杰 <[email protected]>
AuthorDate: Tue Jul 4 18:10:20 2023 +0800

    [Discovery] HttpLongPolling Sync DiscoveryData (#4795)
    
    * Integration module
    
    * Integration module
    
    * fix some bugs
    
    * fix some bug
    
    * fix some bugs
    
    * fix some bugs
    
    * fix some bugs
    
    * fix some bugs
    
    * fix some bugs
    
    * fix some bugs
    
    * fix some bugs
    
    * fix some bugs
    
    * fix some bug
    
    * fix some bug
    
    * fix some bugs
    
    * fix some bug
    
    * fix some bug
    
    * HttpLongPolling Sync DiscoverData
    
    * merge master
---
 .../listener/AbstractDataChangedListener.java      | 60 +++++++++++++++---
 .../http/HttpLongPollingDataChangedListener.java   | 20 +++++-
 .../admin/model/dto/ProxySelectorAddDTO.java       | 25 ++++++++
 .../service/impl/ProxySelectorServiceImpl.java     |  2 +-
 .../sync/data/http/HttpSyncDataConfiguration.java  | 30 +++++----
 .../shenyu/sync/data/http/HttpSyncDataService.java |  7 ++-
 .../sync/data/http/refresh/DataRefreshFactory.java |  9 ++-
 .../http/refresh/DiscoveryUpstreamDataRefresh.java | 73 ++++++++++++++++++++++
 .../data/http/refresh/ProxySelectorRefresh.java    | 73 ++++++++++++++++++++++
 .../sync/data/http/HttpSyncDataServiceTest.java    | 11 +++-
 10 files changed, 285 insertions(+), 25 deletions(-)

diff --git 
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/AbstractDataChangedListener.java
 
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/AbstractDataChangedListener.java
index 7c28a5cc0..a302e9604 100644
--- 
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/AbstractDataChangedListener.java
+++ 
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/AbstractDataChangedListener.java
@@ -169,7 +169,7 @@ public abstract class AbstractDataChangedListener 
implements DataChangedListener
         this.updatePluginCache();
         this.afterPluginChanged(changed, eventType);
     }
-
+    
     /**
      * After plugin changed.
      *
@@ -178,7 +178,7 @@ public abstract class AbstractDataChangedListener 
implements DataChangedListener
      */
     protected void afterPluginChanged(final List<PluginData> changed, final 
DataEventTypeEnum eventType) {
     }
-
+    
     @Override
     public void onRuleChanged(final List<RuleData> changed, final 
DataEventTypeEnum eventType) {
         if (CollectionUtils.isEmpty(changed)) {
@@ -187,7 +187,7 @@ public abstract class AbstractDataChangedListener 
implements DataChangedListener
         this.updateRuleCache();
         this.afterRuleChanged(changed, eventType);
     }
-
+    
     /**
      * After rule changed.
      *
@@ -196,7 +196,7 @@ public abstract class AbstractDataChangedListener 
implements DataChangedListener
      */
     protected void afterRuleChanged(final List<RuleData> changed, final 
DataEventTypeEnum eventType) {
     }
-
+    
     @Override
     public void onSelectorChanged(final List<SelectorData> changed, final 
DataEventTypeEnum eventType) {
         if (CollectionUtils.isEmpty(changed)) {
@@ -206,6 +206,52 @@ public abstract class AbstractDataChangedListener 
implements DataChangedListener
         this.afterSelectorChanged(changed, eventType);
     }
 
+    /**
+     * invoke this method when ProxySelector was changed.
+     *
+     * @param changed   the changed
+     * @param eventType the event type
+     */
+    public void onProxySelectorChanged(final List<ProxySelectorData> changed, 
final DataEventTypeEnum eventType) {
+        if (CollectionUtils.isEmpty(changed)) {
+            return;
+        }
+        this.updateProxySelectorDataCache();
+        this.afterProxySelectorChanged(changed, eventType);
+    }
+
+    /**
+     * After proxySelector changed.
+     *
+     * @param changed   the changed
+     * @param eventType the event type
+     */
+    protected void afterProxySelectorChanged(final List<ProxySelectorData> 
changed, final DataEventTypeEnum eventType) {
+    }
+
+    /**
+     * invoke this method when DiscoveryUpstream was changed.
+     *
+     * @param changed   the changed
+     * @param eventType the event type
+     */
+    public void onDiscoveryUpstreamChanged(final List<DiscoverySyncData> 
changed, final DataEventTypeEnum eventType) {
+        if (CollectionUtils.isEmpty(changed)) {
+            return;
+        }
+        this.updateDiscoveryUpstreamDataCache();
+        this.afterDiscoveryUpstreamDataChanged(changed, eventType);
+    }
+
+    /**
+     * After DiscoveryUpstreamData changed.
+     *
+     * @param changed   the changed
+     * @param eventType the event type
+     */
+    protected void afterDiscoveryUpstreamDataChanged(final 
List<DiscoverySyncData> changed, final DataEventTypeEnum eventType) {
+    }
+
     /**
      * After selector changed.
      *
@@ -236,7 +282,7 @@ public abstract class AbstractDataChangedListener 
implements DataChangedListener
         ConfigDataCache oldVal = CACHE.put(newVal.getGroup(), newVal);
         LOG.info("update config cache[{}], old: {}, updated: {}", group, 
oldVal, newVal);
     }
-
+    
     /**
      * refresh local cache.
      */
@@ -270,14 +316,14 @@ public abstract class AbstractDataChangedListener 
implements DataChangedListener
     protected void updatePluginCache() {
         this.updateCache(ConfigGroupEnum.PLUGIN, pluginService.listAll());
     }
-
+    
     /**
      * Update app auth cache.
      */
     protected void updateAppAuthCache() {
         this.updateCache(ConfigGroupEnum.APP_AUTH, appAuthService.listAll());
     }
-
+    
     /**
      * Update meta data cache.
      */
diff --git 
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/http/HttpLongPollingDataChangedListener.java
 
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/http/HttpLongPollingDataChangedListener.java
index 3fe87c639..b98f880b1 100644
--- 
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/http/HttpLongPollingDataChangedListener.java
+++ 
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/http/HttpLongPollingDataChangedListener.java
@@ -18,7 +18,9 @@
 package org.apache.shenyu.admin.listener.http;
 
 import com.google.common.collect.Lists;
+
 import java.util.Objects;
+
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.math.NumberUtils;
@@ -34,6 +36,8 @@ import org.apache.shenyu.common.dto.MetaData;
 import org.apache.shenyu.common.dto.PluginData;
 import org.apache.shenyu.common.dto.RuleData;
 import org.apache.shenyu.common.dto.SelectorData;
+import org.apache.shenyu.common.dto.ProxySelectorData;
+import org.apache.shenyu.common.dto.DiscoverySyncData;
 import org.apache.shenyu.common.enums.ConfigGroupEnum;
 import org.apache.shenyu.common.enums.DataEventTypeEnum;
 import org.apache.shenyu.common.exception.ShenyuException;
@@ -88,6 +92,7 @@ public class HttpLongPollingDataChangedListener extends 
AbstractDataChangedListe
 
     /**
      * Instantiates a new Http long polling data changed listener.
+     *
      * @param httpSyncProperties the HttpSyncProperties
      */
     public HttpLongPollingDataChangedListener(final HttpSyncProperties 
httpSyncProperties) {
@@ -163,6 +168,16 @@ public class HttpLongPollingDataChangedListener extends 
AbstractDataChangedListe
         scheduler.execute(new DataChangeTask(ConfigGroupEnum.SELECTOR));
     }
 
+    @Override
+    protected void afterProxySelectorChanged(final List<ProxySelectorData> 
changed, final DataEventTypeEnum eventType) {
+        scheduler.execute(new DataChangeTask(ConfigGroupEnum.PROXY_SELECTOR));
+    }
+
+    @Override
+    protected void afterDiscoveryUpstreamDataChanged(final 
List<DiscoverySyncData> changed, final DataEventTypeEnum eventType) {
+        scheduler.execute(new 
DataChangeTask(ConfigGroupEnum.DISCOVER_UPSTREAM));
+    }
+
     private List<ConfigGroupEnum> compareChangedGroup(final HttpServletRequest 
request) {
         List<ConfigGroupEnum> changedGroup = new 
ArrayList<>(ConfigGroupEnum.values().length);
         for (ConfigGroupEnum group : ConfigGroupEnum.values()) {
@@ -184,8 +199,9 @@ public class HttpLongPollingDataChangedListener extends 
AbstractDataChangedListe
 
     /**
      * check whether the client needs to update the cache.
-     * @param serverCache the admin local cache
-     * @param clientMd5 the client md5 value
+     *
+     * @param serverCache      the admin local cache
+     * @param clientMd5        the client md5 value
      * @param clientModifyTime the client last modify time
      * @return true: the client needs to be updated, false: not need.
      */
diff --git 
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/model/dto/ProxySelectorAddDTO.java
 
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/model/dto/ProxySelectorAddDTO.java
index e9b727b4b..80e23ef51 100644
--- 
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/model/dto/ProxySelectorAddDTO.java
+++ 
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/model/dto/ProxySelectorAddDTO.java
@@ -44,6 +44,13 @@ public class ProxySelectorAddDTO implements Serializable {
     @NotBlank
     private String name;
 
+
+    /**
+     * pluginName.
+     */
+    @NotBlank
+    private String pluginName;
+
     /**
      * proxy forward port.
      */
@@ -242,6 +249,24 @@ public class ProxySelectorAddDTO implements Serializable {
         this.discoveryUpstreams = discoveryUpstreams;
     }
 
+    /**
+     * getPluginName.
+     *
+     * @return pluginName
+     */
+    public String getPluginName() {
+        return pluginName;
+    }
+
+    /**
+     * setPluginName.
+     *
+     * @param pluginName pluginName
+     */
+    public void setPluginName(final String pluginName) {
+        this.pluginName = pluginName;
+    }
+
     /**
      * get discovery.
      */
diff --git 
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/impl/ProxySelectorServiceImpl.java
 
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/impl/ProxySelectorServiceImpl.java
index 8727507fa..67c5c7500 100644
--- 
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/impl/ProxySelectorServiceImpl.java
+++ 
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/impl/ProxySelectorServiceImpl.java
@@ -226,7 +226,7 @@ public class ProxySelectorServiceImpl implements 
ProxySelectorService {
                 discoveryHandlerMapper.insertSelective(discoveryHandlerDO);
                 DiscoveryRelDO discoveryRelDO = DiscoveryRelDO.builder()
                         .id(UUIDUtils.getInstance().generateShortUuid())
-                        .pluginName(proxySelectorAddDTO.getName())
+                        .pluginName(proxySelectorAddDTO.getPluginName())
                         .discoveryHandlerId(discoveryHandlerId)
                         .proxySelectorId(proxySelectorId)
                         .selectorId("")
diff --git 
a/shenyu-spring-boot-starter/shenyu-spring-boot-starter-sync-data-center/shenyu-spring-boot-starter-sync-data-http/src/main/java/org/apache/shenyu/springboot/starter/sync/data/http/HttpSyncDataConfiguration.java
 
b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-sync-data-center/shenyu-spring-boot-starter-sync-data-http/src/main/java/org/apache/shenyu/springboot/starter/sync/data/http/HttpSyncDataConfiguration.java
index 5d219a634..81c8db151 100644
--- 
a/shenyu-spring-boot-starter/shenyu-spring-boot-starter-sync-data-center/shenyu-spring-boot-starter-sync-data-http/src/main/java/org/apache/shenyu/springboot/starter/sync/data/http/HttpSyncDataConfiguration.java
+++ 
b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-sync-data-center/shenyu-spring-boot-starter-sync-data-http/src/main/java/org/apache/shenyu/springboot/starter/sync/data/http/HttpSyncDataConfiguration.java
@@ -21,6 +21,8 @@ import org.apache.shenyu.common.constant.HttpConstants;
 import org.apache.shenyu.sync.data.api.AuthDataSubscriber;
 import org.apache.shenyu.sync.data.api.MetaDataSubscriber;
 import org.apache.shenyu.sync.data.api.PluginDataSubscriber;
+import org.apache.shenyu.sync.data.api.ProxySelectorDataSubscriber;
+import org.apache.shenyu.sync.data.api.DiscoveryUpstreamDataSubscriber;
 import org.apache.shenyu.sync.data.api.SyncDataService;
 import org.apache.shenyu.sync.data.http.AccessTokenManager;
 import org.apache.shenyu.sync.data.http.HttpSyncDataService;
@@ -49,7 +51,7 @@ import java.util.Objects;
 public class HttpSyncDataConfiguration {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(HttpSyncDataConfiguration.class);
-    
+
     /**
      * Http config http config.
      *
@@ -60,7 +62,7 @@ public class HttpSyncDataConfiguration {
     public HttpConfig httpConfig() {
         return new HttpConfig();
     }
-    
+
     /**
      * Rest template.
      *
@@ -75,11 +77,11 @@ public class HttpSyncDataConfiguration {
         factory.setWriteTimeout(Objects.isNull(httpConfig.getWriteTimeout()) ? 
(int) HttpConstants.CLIENT_POLLING_WRITE_TIMEOUT : 
httpConfig.getWriteTimeout());
         return new RestTemplate(factory);
     }
-    
+
     /**
      * AccessTokenManager.
      *
-     * @param httpConfig the http config.
+     * @param httpConfig   the http config.
      * @param restTemplate the rest template.
      * @return the access token manager.
      */
@@ -87,16 +89,18 @@ public class HttpSyncDataConfiguration {
     public AccessTokenManager accessTokenManager(final HttpConfig httpConfig, 
final RestTemplate restTemplate) {
         return new AccessTokenManager(restTemplate, httpConfig);
     }
-    
+
     /**
      * Http sync data service.
      *
-     * @param httpConfig the http config
-     * @param pluginSubscriber the plugin subscriber
-     * @param restTemplate the rest template
-     * @param metaSubscribers the meta subscribers
-     * @param authSubscribers the auth subscribers
+     * @param httpConfig         the http config
+     * @param pluginSubscriber   the plugin subscriber
+     * @param restTemplate       the rest template
+     * @param metaSubscribers    the meta subscribers
+     * @param authSubscribers    the auth subscribers
      * @param accessTokenManager the access token manager
+     * @param proxySelectorDataSubscribers the proxySelectorData subscribers
+     * @param discoveryUpstreamDataSubscribers the discoveryUpstreamData 
subscribers
      * @return the sync data service
      */
     @Bean
@@ -105,7 +109,9 @@ public class HttpSyncDataConfiguration {
                                                final 
ObjectProvider<RestTemplate> restTemplate,
                                                final 
ObjectProvider<List<MetaDataSubscriber>> metaSubscribers,
                                                final 
ObjectProvider<List<AuthDataSubscriber>> authSubscribers,
-                                               final 
ObjectProvider<AccessTokenManager> accessTokenManager) {
+                                               final 
ObjectProvider<AccessTokenManager> accessTokenManager,
+                                               final 
ObjectProvider<List<ProxySelectorDataSubscriber>> proxySelectorDataSubscribers,
+                                               final 
ObjectProvider<List<DiscoveryUpstreamDataSubscriber>> 
discoveryUpstreamDataSubscribers) {
         LOGGER.info("you use http long pull sync shenyu data");
         return new HttpSyncDataService(
                 Objects.requireNonNull(httpConfig.getIfAvailable()),
@@ -113,6 +119,8 @@ public class HttpSyncDataConfiguration {
                 Objects.requireNonNull(restTemplate.getIfAvailable()),
                 metaSubscribers.getIfAvailable(Collections::emptyList),
                 authSubscribers.getIfAvailable(Collections::emptyList),
+                
proxySelectorDataSubscribers.getIfAvailable(Collections::emptyList),
+                
discoveryUpstreamDataSubscribers.getIfAvailable(Collections::emptyList),
                 Objects.requireNonNull(accessTokenManager.getIfAvailable())
         );
     }
diff --git 
a/shenyu-sync-data-center/shenyu-sync-data-http/src/main/java/org/apache/shenyu/sync/data/http/HttpSyncDataService.java
 
b/shenyu-sync-data-center/shenyu-sync-data-http/src/main/java/org/apache/shenyu/sync/data/http/HttpSyncDataService.java
index 7a360b399..dadada80b 100644
--- 
a/shenyu-sync-data-center/shenyu-sync-data-http/src/main/java/org/apache/shenyu/sync/data/http/HttpSyncDataService.java
+++ 
b/shenyu-sync-data-center/shenyu-sync-data-http/src/main/java/org/apache/shenyu/sync/data/http/HttpSyncDataService.java
@@ -33,6 +33,8 @@ import org.apache.shenyu.sync.data.api.AuthDataSubscriber;
 import org.apache.shenyu.sync.data.api.MetaDataSubscriber;
 import org.apache.shenyu.sync.data.api.PluginDataSubscriber;
 import org.apache.shenyu.sync.data.api.SyncDataService;
+import org.apache.shenyu.sync.data.api.ProxySelectorDataSubscriber;
+import org.apache.shenyu.sync.data.api.DiscoveryUpstreamDataSubscriber;
 import org.apache.shenyu.sync.data.http.config.HttpConfig;
 import org.apache.shenyu.sync.data.http.refresh.DataRefreshFactory;
 import org.slf4j.Logger;
@@ -85,9 +87,11 @@ public class HttpSyncDataService implements SyncDataService {
                                final RestTemplate restTemplate,
                                final List<MetaDataSubscriber> 
metaDataSubscribers,
                                final List<AuthDataSubscriber> 
authDataSubscribers,
+                               final List<ProxySelectorDataSubscriber> 
proxySelectorDataSubscribers,
+                               final List<DiscoveryUpstreamDataSubscriber> 
discoveryUpstreamDataSubscribers,
                                final AccessTokenManager accessTokenManager) {
         this.accessTokenManager = accessTokenManager;
-        this.factory = new DataRefreshFactory(pluginDataSubscriber, 
metaDataSubscribers, authDataSubscribers);
+        this.factory = new DataRefreshFactory(pluginDataSubscriber, 
metaDataSubscribers, authDataSubscribers, proxySelectorDataSubscribers, 
discoveryUpstreamDataSubscribers);
         this.serverList = 
Lists.newArrayList(Splitter.on(",").split(httpConfig.getUrl()));
         this.restTemplate = restTemplate;
         this.start();
@@ -155,7 +159,6 @@ public class HttpSyncDataService implements SyncDataService 
{
     }
 
 
-
     /**
      * update local cache.
      *
diff --git 
a/shenyu-sync-data-center/shenyu-sync-data-http/src/main/java/org/apache/shenyu/sync/data/http/refresh/DataRefreshFactory.java
 
b/shenyu-sync-data-center/shenyu-sync-data-http/src/main/java/org/apache/shenyu/sync/data/http/refresh/DataRefreshFactory.java
index f3c898311..7d0b51a4e 100644
--- 
a/shenyu-sync-data-center/shenyu-sync-data-http/src/main/java/org/apache/shenyu/sync/data/http/refresh/DataRefreshFactory.java
+++ 
b/shenyu-sync-data-center/shenyu-sync-data-http/src/main/java/org/apache/shenyu/sync/data/http/refresh/DataRefreshFactory.java
@@ -23,6 +23,8 @@ import org.apache.shenyu.common.enums.ConfigGroupEnum;
 import org.apache.shenyu.sync.data.api.AuthDataSubscriber;
 import org.apache.shenyu.sync.data.api.MetaDataSubscriber;
 import org.apache.shenyu.sync.data.api.PluginDataSubscriber;
+import org.apache.shenyu.sync.data.api.ProxySelectorDataSubscriber;
+import org.apache.shenyu.sync.data.api.DiscoveryUpstreamDataSubscriber;
 
 import java.util.EnumMap;
 import java.util.List;
@@ -44,12 +46,17 @@ public final class DataRefreshFactory {
      */
     public DataRefreshFactory(final PluginDataSubscriber pluginDataSubscriber,
                               final List<MetaDataSubscriber> 
metaDataSubscribers,
-                              final List<AuthDataSubscriber> 
authDataSubscribers) {
+                              final List<AuthDataSubscriber> 
authDataSubscribers,
+                              final List<ProxySelectorDataSubscriber> 
proxySelectorDataSubscribers,
+                              final List<DiscoveryUpstreamDataSubscriber> 
discoveryUpstreamDataSubscribers
+                              ) {
         ENUM_MAP.put(ConfigGroupEnum.PLUGIN, new 
PluginDataRefresh(pluginDataSubscriber));
         ENUM_MAP.put(ConfigGroupEnum.SELECTOR, new 
SelectorDataRefresh(pluginDataSubscriber));
         ENUM_MAP.put(ConfigGroupEnum.RULE, new 
RuleDataRefresh(pluginDataSubscriber));
         ENUM_MAP.put(ConfigGroupEnum.APP_AUTH, new 
AppAuthDataRefresh(authDataSubscribers));
         ENUM_MAP.put(ConfigGroupEnum.META_DATA, new 
MetaDataRefresh(metaDataSubscribers));
+        ENUM_MAP.put(ConfigGroupEnum.PROXY_SELECTOR, new 
ProxySelectorRefresh(proxySelectorDataSubscribers));
+        ENUM_MAP.put(ConfigGroupEnum.DISCOVER_UPSTREAM, new 
DiscoveryUpstreamDataRefresh(discoveryUpstreamDataSubscribers));
     }
 
     /**
diff --git 
a/shenyu-sync-data-center/shenyu-sync-data-http/src/main/java/org/apache/shenyu/sync/data/http/refresh/DiscoveryUpstreamDataRefresh.java
 
b/shenyu-sync-data-center/shenyu-sync-data-http/src/main/java/org/apache/shenyu/sync/data/http/refresh/DiscoveryUpstreamDataRefresh.java
new file mode 100644
index 000000000..a64bcf8a2
--- /dev/null
+++ 
b/shenyu-sync-data-center/shenyu-sync-data-http/src/main/java/org/apache/shenyu/sync/data/http/refresh/DiscoveryUpstreamDataRefresh.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.sync.data.http.refresh;
+
+import com.google.gson.JsonObject;
+import com.google.gson.reflect.TypeToken;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.shenyu.common.dto.ConfigData;
+import org.apache.shenyu.common.dto.DiscoverySyncData;
+import org.apache.shenyu.common.enums.ConfigGroupEnum;
+import org.apache.shenyu.common.utils.GsonUtils;
+import org.apache.shenyu.sync.data.api.DiscoveryUpstreamDataSubscriber;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class DiscoveryUpstreamDataRefresh extends 
AbstractDataRefresh<DiscoverySyncData> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DiscoveryUpstreamDataRefresh.class);
+
+    private final List<DiscoveryUpstreamDataSubscriber> 
discoveryUpstreamDataSubscribers;
+
+    public DiscoveryUpstreamDataRefresh(final 
List<DiscoveryUpstreamDataSubscriber> discoveryUpstreamDataSubscribers) {
+        this.discoveryUpstreamDataSubscribers = 
discoveryUpstreamDataSubscribers;
+    }
+
+    @Override
+    protected JsonObject convert(final JsonObject data) {
+        return data.getAsJsonObject(ConfigGroupEnum.DISCOVER_UPSTREAM.name());
+    }
+
+    @Override
+    protected ConfigData<DiscoverySyncData> fromJson(final JsonObject data) {
+        return GsonUtils.getGson().fromJson(data, new 
TypeToken<ConfigData<DiscoverySyncData>>() {
+        }.getType());
+    }
+
+    @Override
+    protected void refresh(final List<DiscoverySyncData> data) {
+        if (CollectionUtils.isEmpty(data)) {
+            LOG.info("clear all plugin data cache");
+            return;
+        }
+        data.forEach(d -> discoveryUpstreamDataSubscribers.forEach(dus -> 
dus.onSubscribe(d)));
+    }
+
+    @Override
+    protected boolean updateCacheIfNeed(final ConfigData<DiscoverySyncData> 
result) {
+        return updateCacheIfNeed(result, ConfigGroupEnum.DISCOVER_UPSTREAM);
+    }
+
+    @Override
+    public ConfigData<?> cacheConfigData() {
+        return GROUP_CACHE.get(ConfigGroupEnum.PROXY_SELECTOR);
+    }
+
+}
diff --git 
a/shenyu-sync-data-center/shenyu-sync-data-http/src/main/java/org/apache/shenyu/sync/data/http/refresh/ProxySelectorRefresh.java
 
b/shenyu-sync-data-center/shenyu-sync-data-http/src/main/java/org/apache/shenyu/sync/data/http/refresh/ProxySelectorRefresh.java
new file mode 100644
index 000000000..f2387dde2
--- /dev/null
+++ 
b/shenyu-sync-data-center/shenyu-sync-data-http/src/main/java/org/apache/shenyu/sync/data/http/refresh/ProxySelectorRefresh.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.sync.data.http.refresh;
+
+import com.google.gson.JsonObject;
+import com.google.gson.reflect.TypeToken;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.shenyu.common.dto.ConfigData;
+import org.apache.shenyu.common.dto.ProxySelectorData;
+import org.apache.shenyu.common.enums.ConfigGroupEnum;
+import org.apache.shenyu.common.utils.GsonUtils;
+import org.apache.shenyu.sync.data.api.ProxySelectorDataSubscriber;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class ProxySelectorRefresh extends 
AbstractDataRefresh<ProxySelectorData> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ProxySelectorRefresh.class);
+
+    private final List<ProxySelectorDataSubscriber> 
proxySelectorDataSubscribers;
+
+    public ProxySelectorRefresh(final List<ProxySelectorDataSubscriber> 
proxySelectorDataSubscribers) {
+        this.proxySelectorDataSubscribers = proxySelectorDataSubscribers;
+    }
+
+    @Override
+    protected JsonObject convert(final JsonObject data) {
+        return data.getAsJsonObject(ConfigGroupEnum.PROXY_SELECTOR.name());
+    }
+
+    @Override
+    protected ConfigData<ProxySelectorData> fromJson(final JsonObject data) {
+        return GsonUtils.getGson().fromJson(data, new 
TypeToken<ConfigData<ProxySelectorData>>() {
+        }.getType());
+    }
+
+    @Override
+    protected void refresh(final List<ProxySelectorData> data) {
+        if (CollectionUtils.isEmpty(data)) {
+            LOG.info("clear all ProxySelector data cache");
+            return;
+        }
+        data.forEach(d -> proxySelectorDataSubscribers.forEach(pss -> 
pss.onSubscribe(d)));
+    }
+
+    @Override
+    protected boolean updateCacheIfNeed(final ConfigData<ProxySelectorData> 
result) {
+        return updateCacheIfNeed(result, ConfigGroupEnum.PROXY_SELECTOR);
+    }
+
+    @Override
+    public ConfigData<?> cacheConfigData() {
+        return GROUP_CACHE.get(ConfigGroupEnum.PROXY_SELECTOR);
+    }
+
+}
diff --git 
a/shenyu-sync-data-center/shenyu-sync-data-http/src/test/java/org/apache/shenyu/sync/data/http/HttpSyncDataServiceTest.java
 
b/shenyu-sync-data-center/shenyu-sync-data-http/src/test/java/org/apache/shenyu/sync/data/http/HttpSyncDataServiceTest.java
index f5ee4cae4..6c1930e68 100644
--- 
a/shenyu-sync-data-center/shenyu-sync-data-http/src/test/java/org/apache/shenyu/sync/data/http/HttpSyncDataServiceTest.java
+++ 
b/shenyu-sync-data-center/shenyu-sync-data-http/src/test/java/org/apache/shenyu/sync/data/http/HttpSyncDataServiceTest.java
@@ -28,6 +28,8 @@ import org.apache.shenyu.common.utils.GsonUtils;
 import org.apache.shenyu.sync.data.api.AuthDataSubscriber;
 import org.apache.shenyu.sync.data.api.MetaDataSubscriber;
 import org.apache.shenyu.sync.data.api.PluginDataSubscriber;
+import org.apache.shenyu.sync.data.api.ProxySelectorDataSubscriber;
+import org.apache.shenyu.sync.data.api.DiscoveryUpstreamDataSubscriber;
 import org.apache.shenyu.sync.data.http.config.HttpConfig;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -75,6 +77,10 @@ public final class HttpSyncDataServiceTest {
 
     private AuthDataSubscriber authDataSubscriber;
 
+    private ProxySelectorDataSubscriber proxySelectorDataSubscriber;
+
+    private DiscoveryUpstreamDataSubscriber discoveryUpstreamDataSubscriber;
+
     private HttpSyncDataService httpSyncDataService;
 
     @BeforeEach
@@ -114,6 +120,8 @@ public final class HttpSyncDataServiceTest {
         this.pluginDataSubscriber = mock(PluginDataSubscriber.class);
         this.metaDataSubscriber = mock(MetaDataSubscriber.class);
         this.authDataSubscriber = mock(AuthDataSubscriber.class);
+        this.proxySelectorDataSubscriber = 
mock(ProxySelectorDataSubscriber.class);
+        this.discoveryUpstreamDataSubscriber = 
mock(DiscoveryUpstreamDataSubscriber.class);
 
         OkHttp3ClientHttpRequestFactory factory = new 
OkHttp3ClientHttpRequestFactory();
         
factory.setConnectTimeout(Objects.isNull(httpConfig.getConnectionTimeout()) ? 
(int) HttpConstants.CLIENT_POLLING_CONNECT_TIMEOUT : 
httpConfig.getConnectionTimeout());
@@ -123,7 +131,8 @@ public final class HttpSyncDataServiceTest {
 
         AccessTokenManager accessTokenManager = new 
AccessTokenManager(restTemplate, httpConfig);
         this.httpSyncDataService = new HttpSyncDataService(httpConfig, 
pluginDataSubscriber, restTemplate,
-                Collections.singletonList(metaDataSubscriber), 
Collections.singletonList(authDataSubscriber), accessTokenManager);
+                Collections.singletonList(metaDataSubscriber), 
Collections.singletonList(authDataSubscriber), 
Collections.singletonList(proxySelectorDataSubscriber),
+                Collections.singletonList(discoveryUpstreamDataSubscriber), 
accessTokenManager);
     }
 
     @AfterEach

Reply via email to