This is an automated email from the ASF dual-hosted git repository.
xiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shenyu.git
The following commit(s) were added to refs/heads/master by this push:
new ff17ae6c6 [Discovery] HttpLongPolling Sync DiscoveryData (#4795)
ff17ae6c6 is described below
commit ff17ae6c696ad0f3c78938f2dc1ca0abcbb84b61
Author: 杨文杰 <[email protected]>
AuthorDate: Tue Jul 4 18:10:20 2023 +0800
[Discovery] HttpLongPolling Sync DiscoveryData (#4795)
* Integration module
* Integration module
* fix some bugs
* fix some bug
* fix some bugs
* fix some bugs
* fix some bugs
* fix some bugs
* fix some bugs
* fix some bugs
* fix some bugs
* fix some bugs
* fix some bug
* fix some bug
* fix some bugs
* fix some bug
* fix some bug
* HttpLongPolling Sync DiscoverData
* merge master
---
.../listener/AbstractDataChangedListener.java | 60 +++++++++++++++---
.../http/HttpLongPollingDataChangedListener.java | 20 +++++-
.../admin/model/dto/ProxySelectorAddDTO.java | 25 ++++++++
.../service/impl/ProxySelectorServiceImpl.java | 2 +-
.../sync/data/http/HttpSyncDataConfiguration.java | 30 +++++----
.../shenyu/sync/data/http/HttpSyncDataService.java | 7 ++-
.../sync/data/http/refresh/DataRefreshFactory.java | 9 ++-
.../http/refresh/DiscoveryUpstreamDataRefresh.java | 73 ++++++++++++++++++++++
.../data/http/refresh/ProxySelectorRefresh.java | 73 ++++++++++++++++++++++
.../sync/data/http/HttpSyncDataServiceTest.java | 11 +++-
10 files changed, 285 insertions(+), 25 deletions(-)
diff --git
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/AbstractDataChangedListener.java
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/AbstractDataChangedListener.java
index 7c28a5cc0..a302e9604 100644
---
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/AbstractDataChangedListener.java
+++
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/AbstractDataChangedListener.java
@@ -169,7 +169,7 @@ public abstract class AbstractDataChangedListener
implements DataChangedListener
this.updatePluginCache();
this.afterPluginChanged(changed, eventType);
}
-
+
/**
* After plugin changed.
*
@@ -178,7 +178,7 @@ public abstract class AbstractDataChangedListener
implements DataChangedListener
*/
protected void afterPluginChanged(final List<PluginData> changed, final
DataEventTypeEnum eventType) {
}
-
+
@Override
public void onRuleChanged(final List<RuleData> changed, final
DataEventTypeEnum eventType) {
if (CollectionUtils.isEmpty(changed)) {
@@ -187,7 +187,7 @@ public abstract class AbstractDataChangedListener
implements DataChangedListener
this.updateRuleCache();
this.afterRuleChanged(changed, eventType);
}
-
+
/**
* After rule changed.
*
@@ -196,7 +196,7 @@ public abstract class AbstractDataChangedListener
implements DataChangedListener
*/
protected void afterRuleChanged(final List<RuleData> changed, final
DataEventTypeEnum eventType) {
}
-
+
@Override
public void onSelectorChanged(final List<SelectorData> changed, final
DataEventTypeEnum eventType) {
if (CollectionUtils.isEmpty(changed)) {
@@ -206,6 +206,52 @@ public abstract class AbstractDataChangedListener
implements DataChangedListener
this.afterSelectorChanged(changed, eventType);
}
+ /**
+ * invoke this method when ProxySelector was changed.
+ *
+ * @param changed the changed
+ * @param eventType the event type
+ */
+ public void onProxySelectorChanged(final List<ProxySelectorData> changed,
final DataEventTypeEnum eventType) {
+ if (CollectionUtils.isEmpty(changed)) {
+ return;
+ }
+ this.updateProxySelectorDataCache();
+ this.afterProxySelectorChanged(changed, eventType);
+ }
+
+ /**
+ * After proxySelector changed.
+ *
+ * @param changed the changed
+ * @param eventType the event type
+ */
+ protected void afterProxySelectorChanged(final List<ProxySelectorData>
changed, final DataEventTypeEnum eventType) {
+ }
+
+ /**
+ * invoke this method when DiscoveryUpstream was changed.
+ *
+ * @param changed the changed
+ * @param eventType the event type
+ */
+ public void onDiscoveryUpstreamChanged(final List<DiscoverySyncData>
changed, final DataEventTypeEnum eventType) {
+ if (CollectionUtils.isEmpty(changed)) {
+ return;
+ }
+ this.updateDiscoveryUpstreamDataCache();
+ this.afterDiscoveryUpstreamDataChanged(changed, eventType);
+ }
+
+ /**
+ * After DiscoveryUpstreamData changed.
+ *
+ * @param changed the changed
+ * @param eventType the event type
+ */
+ protected void afterDiscoveryUpstreamDataChanged(final
List<DiscoverySyncData> changed, final DataEventTypeEnum eventType) {
+ }
+
/**
* After selector changed.
*
@@ -236,7 +282,7 @@ public abstract class AbstractDataChangedListener
implements DataChangedListener
ConfigDataCache oldVal = CACHE.put(newVal.getGroup(), newVal);
LOG.info("update config cache[{}], old: {}, updated: {}", group,
oldVal, newVal);
}
-
+
/**
* refresh local cache.
*/
@@ -270,14 +316,14 @@ public abstract class AbstractDataChangedListener
implements DataChangedListener
protected void updatePluginCache() {
this.updateCache(ConfigGroupEnum.PLUGIN, pluginService.listAll());
}
-
+
/**
* Update app auth cache.
*/
protected void updateAppAuthCache() {
this.updateCache(ConfigGroupEnum.APP_AUTH, appAuthService.listAll());
}
-
+
/**
* Update meta data cache.
*/
diff --git
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/http/HttpLongPollingDataChangedListener.java
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/http/HttpLongPollingDataChangedListener.java
index 3fe87c639..b98f880b1 100644
---
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/http/HttpLongPollingDataChangedListener.java
+++
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/http/HttpLongPollingDataChangedListener.java
@@ -18,7 +18,9 @@
package org.apache.shenyu.admin.listener.http;
import com.google.common.collect.Lists;
+
import java.util.Objects;
+
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
@@ -34,6 +36,8 @@ import org.apache.shenyu.common.dto.MetaData;
import org.apache.shenyu.common.dto.PluginData;
import org.apache.shenyu.common.dto.RuleData;
import org.apache.shenyu.common.dto.SelectorData;
+import org.apache.shenyu.common.dto.ProxySelectorData;
+import org.apache.shenyu.common.dto.DiscoverySyncData;
import org.apache.shenyu.common.enums.ConfigGroupEnum;
import org.apache.shenyu.common.enums.DataEventTypeEnum;
import org.apache.shenyu.common.exception.ShenyuException;
@@ -88,6 +92,7 @@ public class HttpLongPollingDataChangedListener extends
AbstractDataChangedListe
/**
* Instantiates a new Http long polling data changed listener.
+ *
* @param httpSyncProperties the HttpSyncProperties
*/
public HttpLongPollingDataChangedListener(final HttpSyncProperties
httpSyncProperties) {
@@ -163,6 +168,16 @@ public class HttpLongPollingDataChangedListener extends
AbstractDataChangedListe
scheduler.execute(new DataChangeTask(ConfigGroupEnum.SELECTOR));
}
+ @Override
+ protected void afterProxySelectorChanged(final List<ProxySelectorData>
changed, final DataEventTypeEnum eventType) {
+ scheduler.execute(new DataChangeTask(ConfigGroupEnum.PROXY_SELECTOR));
+ }
+
+ @Override
+ protected void afterDiscoveryUpstreamDataChanged(final
List<DiscoverySyncData> changed, final DataEventTypeEnum eventType) {
+ scheduler.execute(new
DataChangeTask(ConfigGroupEnum.DISCOVER_UPSTREAM));
+ }
+
private List<ConfigGroupEnum> compareChangedGroup(final HttpServletRequest
request) {
List<ConfigGroupEnum> changedGroup = new
ArrayList<>(ConfigGroupEnum.values().length);
for (ConfigGroupEnum group : ConfigGroupEnum.values()) {
@@ -184,8 +199,9 @@ public class HttpLongPollingDataChangedListener extends
AbstractDataChangedListe
/**
* check whether the client needs to update the cache.
- * @param serverCache the admin local cache
- * @param clientMd5 the client md5 value
+ *
+ * @param serverCache the admin local cache
+ * @param clientMd5 the client md5 value
* @param clientModifyTime the client last modify time
* @return true: the client needs to be updated, false: not need.
*/
diff --git
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/model/dto/ProxySelectorAddDTO.java
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/model/dto/ProxySelectorAddDTO.java
index e9b727b4b..80e23ef51 100644
---
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/model/dto/ProxySelectorAddDTO.java
+++
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/model/dto/ProxySelectorAddDTO.java
@@ -44,6 +44,13 @@ public class ProxySelectorAddDTO implements Serializable {
@NotBlank
private String name;
+
+ /**
+ * pluginName.
+ */
+ @NotBlank
+ private String pluginName;
+
/**
* proxy forward port.
*/
@@ -242,6 +249,24 @@ public class ProxySelectorAddDTO implements Serializable {
this.discoveryUpstreams = discoveryUpstreams;
}
+ /**
+ * getPluginName.
+ *
+ * @return pluginName
+ */
+ public String getPluginName() {
+ return pluginName;
+ }
+
+ /**
+ * setPluginName.
+ *
+ * @param pluginName pluginName
+ */
+ public void setPluginName(final String pluginName) {
+ this.pluginName = pluginName;
+ }
+
/**
* get discovery.
*/
diff --git
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/impl/ProxySelectorServiceImpl.java
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/impl/ProxySelectorServiceImpl.java
index 8727507fa..67c5c7500 100644
---
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/impl/ProxySelectorServiceImpl.java
+++
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/impl/ProxySelectorServiceImpl.java
@@ -226,7 +226,7 @@ public class ProxySelectorServiceImpl implements
ProxySelectorService {
discoveryHandlerMapper.insertSelective(discoveryHandlerDO);
DiscoveryRelDO discoveryRelDO = DiscoveryRelDO.builder()
.id(UUIDUtils.getInstance().generateShortUuid())
- .pluginName(proxySelectorAddDTO.getName())
+ .pluginName(proxySelectorAddDTO.getPluginName())
.discoveryHandlerId(discoveryHandlerId)
.proxySelectorId(proxySelectorId)
.selectorId("")
diff --git
a/shenyu-spring-boot-starter/shenyu-spring-boot-starter-sync-data-center/shenyu-spring-boot-starter-sync-data-http/src/main/java/org/apache/shenyu/springboot/starter/sync/data/http/HttpSyncDataConfiguration.java
b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-sync-data-center/shenyu-spring-boot-starter-sync-data-http/src/main/java/org/apache/shenyu/springboot/starter/sync/data/http/HttpSyncDataConfiguration.java
index 5d219a634..81c8db151 100644
---
a/shenyu-spring-boot-starter/shenyu-spring-boot-starter-sync-data-center/shenyu-spring-boot-starter-sync-data-http/src/main/java/org/apache/shenyu/springboot/starter/sync/data/http/HttpSyncDataConfiguration.java
+++
b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-sync-data-center/shenyu-spring-boot-starter-sync-data-http/src/main/java/org/apache/shenyu/springboot/starter/sync/data/http/HttpSyncDataConfiguration.java
@@ -21,6 +21,8 @@ import org.apache.shenyu.common.constant.HttpConstants;
import org.apache.shenyu.sync.data.api.AuthDataSubscriber;
import org.apache.shenyu.sync.data.api.MetaDataSubscriber;
import org.apache.shenyu.sync.data.api.PluginDataSubscriber;
+import org.apache.shenyu.sync.data.api.ProxySelectorDataSubscriber;
+import org.apache.shenyu.sync.data.api.DiscoveryUpstreamDataSubscriber;
import org.apache.shenyu.sync.data.api.SyncDataService;
import org.apache.shenyu.sync.data.http.AccessTokenManager;
import org.apache.shenyu.sync.data.http.HttpSyncDataService;
@@ -49,7 +51,7 @@ import java.util.Objects;
public class HttpSyncDataConfiguration {
private static final Logger LOGGER =
LoggerFactory.getLogger(HttpSyncDataConfiguration.class);
-
+
/**
* Http config http config.
*
@@ -60,7 +62,7 @@ public class HttpSyncDataConfiguration {
public HttpConfig httpConfig() {
return new HttpConfig();
}
-
+
/**
* Rest template.
*
@@ -75,11 +77,11 @@ public class HttpSyncDataConfiguration {
factory.setWriteTimeout(Objects.isNull(httpConfig.getWriteTimeout()) ?
(int) HttpConstants.CLIENT_POLLING_WRITE_TIMEOUT :
httpConfig.getWriteTimeout());
return new RestTemplate(factory);
}
-
+
/**
* AccessTokenManager.
*
- * @param httpConfig the http config.
+ * @param httpConfig the http config.
* @param restTemplate the rest template.
* @return the access token manager.
*/
@@ -87,16 +89,18 @@ public class HttpSyncDataConfiguration {
public AccessTokenManager accessTokenManager(final HttpConfig httpConfig,
final RestTemplate restTemplate) {
return new AccessTokenManager(restTemplate, httpConfig);
}
-
+
/**
* Http sync data service.
*
- * @param httpConfig the http config
- * @param pluginSubscriber the plugin subscriber
- * @param restTemplate the rest template
- * @param metaSubscribers the meta subscribers
- * @param authSubscribers the auth subscribers
+ * @param httpConfig the http config
+ * @param pluginSubscriber the plugin subscriber
+ * @param restTemplate the rest template
+ * @param metaSubscribers the meta subscribers
+ * @param authSubscribers the auth subscribers
* @param accessTokenManager the access token manager
+ * @param proxySelectorDataSubscribers the proxySelectorData subscribers
+ * @param discoveryUpstreamDataSubscribers the discoveryUpstreamData
subscribers
* @return the sync data service
*/
@Bean
@@ -105,7 +109,9 @@ public class HttpSyncDataConfiguration {
final
ObjectProvider<RestTemplate> restTemplate,
final
ObjectProvider<List<MetaDataSubscriber>> metaSubscribers,
final
ObjectProvider<List<AuthDataSubscriber>> authSubscribers,
- final
ObjectProvider<AccessTokenManager> accessTokenManager) {
+ final
ObjectProvider<AccessTokenManager> accessTokenManager,
+ final
ObjectProvider<List<ProxySelectorDataSubscriber>> proxySelectorDataSubscribers,
+ final
ObjectProvider<List<DiscoveryUpstreamDataSubscriber>>
discoveryUpstreamDataSubscribers) {
LOGGER.info("you use http long pull sync shenyu data");
return new HttpSyncDataService(
Objects.requireNonNull(httpConfig.getIfAvailable()),
@@ -113,6 +119,8 @@ public class HttpSyncDataConfiguration {
Objects.requireNonNull(restTemplate.getIfAvailable()),
metaSubscribers.getIfAvailable(Collections::emptyList),
authSubscribers.getIfAvailable(Collections::emptyList),
+
proxySelectorDataSubscribers.getIfAvailable(Collections::emptyList),
+
discoveryUpstreamDataSubscribers.getIfAvailable(Collections::emptyList),
Objects.requireNonNull(accessTokenManager.getIfAvailable())
);
}
diff --git
a/shenyu-sync-data-center/shenyu-sync-data-http/src/main/java/org/apache/shenyu/sync/data/http/HttpSyncDataService.java
b/shenyu-sync-data-center/shenyu-sync-data-http/src/main/java/org/apache/shenyu/sync/data/http/HttpSyncDataService.java
index 7a360b399..dadada80b 100644
---
a/shenyu-sync-data-center/shenyu-sync-data-http/src/main/java/org/apache/shenyu/sync/data/http/HttpSyncDataService.java
+++
b/shenyu-sync-data-center/shenyu-sync-data-http/src/main/java/org/apache/shenyu/sync/data/http/HttpSyncDataService.java
@@ -33,6 +33,8 @@ import org.apache.shenyu.sync.data.api.AuthDataSubscriber;
import org.apache.shenyu.sync.data.api.MetaDataSubscriber;
import org.apache.shenyu.sync.data.api.PluginDataSubscriber;
import org.apache.shenyu.sync.data.api.SyncDataService;
+import org.apache.shenyu.sync.data.api.ProxySelectorDataSubscriber;
+import org.apache.shenyu.sync.data.api.DiscoveryUpstreamDataSubscriber;
import org.apache.shenyu.sync.data.http.config.HttpConfig;
import org.apache.shenyu.sync.data.http.refresh.DataRefreshFactory;
import org.slf4j.Logger;
@@ -85,9 +87,11 @@ public class HttpSyncDataService implements SyncDataService {
final RestTemplate restTemplate,
final List<MetaDataSubscriber>
metaDataSubscribers,
final List<AuthDataSubscriber>
authDataSubscribers,
+ final List<ProxySelectorDataSubscriber>
proxySelectorDataSubscribers,
+ final List<DiscoveryUpstreamDataSubscriber>
discoveryUpstreamDataSubscribers,
final AccessTokenManager accessTokenManager) {
this.accessTokenManager = accessTokenManager;
- this.factory = new DataRefreshFactory(pluginDataSubscriber,
metaDataSubscribers, authDataSubscribers);
+ this.factory = new DataRefreshFactory(pluginDataSubscriber,
metaDataSubscribers, authDataSubscribers, proxySelectorDataSubscribers,
discoveryUpstreamDataSubscribers);
this.serverList =
Lists.newArrayList(Splitter.on(",").split(httpConfig.getUrl()));
this.restTemplate = restTemplate;
this.start();
@@ -155,7 +159,6 @@ public class HttpSyncDataService implements SyncDataService
{
}
-
/**
* update local cache.
*
diff --git
a/shenyu-sync-data-center/shenyu-sync-data-http/src/main/java/org/apache/shenyu/sync/data/http/refresh/DataRefreshFactory.java
b/shenyu-sync-data-center/shenyu-sync-data-http/src/main/java/org/apache/shenyu/sync/data/http/refresh/DataRefreshFactory.java
index f3c898311..7d0b51a4e 100644
---
a/shenyu-sync-data-center/shenyu-sync-data-http/src/main/java/org/apache/shenyu/sync/data/http/refresh/DataRefreshFactory.java
+++
b/shenyu-sync-data-center/shenyu-sync-data-http/src/main/java/org/apache/shenyu/sync/data/http/refresh/DataRefreshFactory.java
@@ -23,6 +23,8 @@ import org.apache.shenyu.common.enums.ConfigGroupEnum;
import org.apache.shenyu.sync.data.api.AuthDataSubscriber;
import org.apache.shenyu.sync.data.api.MetaDataSubscriber;
import org.apache.shenyu.sync.data.api.PluginDataSubscriber;
+import org.apache.shenyu.sync.data.api.ProxySelectorDataSubscriber;
+import org.apache.shenyu.sync.data.api.DiscoveryUpstreamDataSubscriber;
import java.util.EnumMap;
import java.util.List;
@@ -44,12 +46,17 @@ public final class DataRefreshFactory {
*/
public DataRefreshFactory(final PluginDataSubscriber pluginDataSubscriber,
final List<MetaDataSubscriber>
metaDataSubscribers,
- final List<AuthDataSubscriber>
authDataSubscribers) {
+ final List<AuthDataSubscriber>
authDataSubscribers,
+ final List<ProxySelectorDataSubscriber>
proxySelectorDataSubscribers,
+ final List<DiscoveryUpstreamDataSubscriber>
discoveryUpstreamDataSubscribers
+ ) {
ENUM_MAP.put(ConfigGroupEnum.PLUGIN, new
PluginDataRefresh(pluginDataSubscriber));
ENUM_MAP.put(ConfigGroupEnum.SELECTOR, new
SelectorDataRefresh(pluginDataSubscriber));
ENUM_MAP.put(ConfigGroupEnum.RULE, new
RuleDataRefresh(pluginDataSubscriber));
ENUM_MAP.put(ConfigGroupEnum.APP_AUTH, new
AppAuthDataRefresh(authDataSubscribers));
ENUM_MAP.put(ConfigGroupEnum.META_DATA, new
MetaDataRefresh(metaDataSubscribers));
+ ENUM_MAP.put(ConfigGroupEnum.PROXY_SELECTOR, new
ProxySelectorRefresh(proxySelectorDataSubscribers));
+ ENUM_MAP.put(ConfigGroupEnum.DISCOVER_UPSTREAM, new
DiscoveryUpstreamDataRefresh(discoveryUpstreamDataSubscribers));
}
/**
diff --git
a/shenyu-sync-data-center/shenyu-sync-data-http/src/main/java/org/apache/shenyu/sync/data/http/refresh/DiscoveryUpstreamDataRefresh.java
b/shenyu-sync-data-center/shenyu-sync-data-http/src/main/java/org/apache/shenyu/sync/data/http/refresh/DiscoveryUpstreamDataRefresh.java
new file mode 100644
index 000000000..a64bcf8a2
--- /dev/null
+++
b/shenyu-sync-data-center/shenyu-sync-data-http/src/main/java/org/apache/shenyu/sync/data/http/refresh/DiscoveryUpstreamDataRefresh.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.sync.data.http.refresh;
+
+import com.google.gson.JsonObject;
+import com.google.gson.reflect.TypeToken;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.shenyu.common.dto.ConfigData;
+import org.apache.shenyu.common.dto.DiscoverySyncData;
+import org.apache.shenyu.common.enums.ConfigGroupEnum;
+import org.apache.shenyu.common.utils.GsonUtils;
+import org.apache.shenyu.sync.data.api.DiscoveryUpstreamDataSubscriber;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class DiscoveryUpstreamDataRefresh extends
AbstractDataRefresh<DiscoverySyncData> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(DiscoveryUpstreamDataRefresh.class);
+
+ private final List<DiscoveryUpstreamDataSubscriber>
discoveryUpstreamDataSubscribers;
+
+ public DiscoveryUpstreamDataRefresh(final
List<DiscoveryUpstreamDataSubscriber> discoveryUpstreamDataSubscribers) {
+ this.discoveryUpstreamDataSubscribers =
discoveryUpstreamDataSubscribers;
+ }
+
+ @Override
+ protected JsonObject convert(final JsonObject data) {
+ return data.getAsJsonObject(ConfigGroupEnum.DISCOVER_UPSTREAM.name());
+ }
+
+ @Override
+ protected ConfigData<DiscoverySyncData> fromJson(final JsonObject data) {
+ return GsonUtils.getGson().fromJson(data, new
TypeToken<ConfigData<DiscoverySyncData>>() {
+ }.getType());
+ }
+
+ @Override
+ protected void refresh(final List<DiscoverySyncData> data) {
+ if (CollectionUtils.isEmpty(data)) {
+ LOG.info("clear all plugin data cache");
+ return;
+ }
+ data.forEach(d -> discoveryUpstreamDataSubscribers.forEach(dus ->
dus.onSubscribe(d)));
+ }
+
+ @Override
+ protected boolean updateCacheIfNeed(final ConfigData<DiscoverySyncData>
result) {
+ return updateCacheIfNeed(result, ConfigGroupEnum.DISCOVER_UPSTREAM);
+ }
+
+ @Override
+ public ConfigData<?> cacheConfigData() {
+ return GROUP_CACHE.get(ConfigGroupEnum.PROXY_SELECTOR);
+ }
+
+}
diff --git
a/shenyu-sync-data-center/shenyu-sync-data-http/src/main/java/org/apache/shenyu/sync/data/http/refresh/ProxySelectorRefresh.java
b/shenyu-sync-data-center/shenyu-sync-data-http/src/main/java/org/apache/shenyu/sync/data/http/refresh/ProxySelectorRefresh.java
new file mode 100644
index 000000000..f2387dde2
--- /dev/null
+++
b/shenyu-sync-data-center/shenyu-sync-data-http/src/main/java/org/apache/shenyu/sync/data/http/refresh/ProxySelectorRefresh.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.sync.data.http.refresh;
+
+import com.google.gson.JsonObject;
+import com.google.gson.reflect.TypeToken;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.shenyu.common.dto.ConfigData;
+import org.apache.shenyu.common.dto.ProxySelectorData;
+import org.apache.shenyu.common.enums.ConfigGroupEnum;
+import org.apache.shenyu.common.utils.GsonUtils;
+import org.apache.shenyu.sync.data.api.ProxySelectorDataSubscriber;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class ProxySelectorRefresh extends
AbstractDataRefresh<ProxySelectorData> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ProxySelectorRefresh.class);
+
+ private final List<ProxySelectorDataSubscriber>
proxySelectorDataSubscribers;
+
+ public ProxySelectorRefresh(final List<ProxySelectorDataSubscriber>
proxySelectorDataSubscribers) {
+ this.proxySelectorDataSubscribers = proxySelectorDataSubscribers;
+ }
+
+ @Override
+ protected JsonObject convert(final JsonObject data) {
+ return data.getAsJsonObject(ConfigGroupEnum.PROXY_SELECTOR.name());
+ }
+
+ @Override
+ protected ConfigData<ProxySelectorData> fromJson(final JsonObject data) {
+ return GsonUtils.getGson().fromJson(data, new
TypeToken<ConfigData<ProxySelectorData>>() {
+ }.getType());
+ }
+
+ @Override
+ protected void refresh(final List<ProxySelectorData> data) {
+ if (CollectionUtils.isEmpty(data)) {
+ LOG.info("clear all ProxySelector data cache");
+ return;
+ }
+ data.forEach(d -> proxySelectorDataSubscribers.forEach(pss ->
pss.onSubscribe(d)));
+ }
+
+ @Override
+ protected boolean updateCacheIfNeed(final ConfigData<ProxySelectorData>
result) {
+ return updateCacheIfNeed(result, ConfigGroupEnum.PROXY_SELECTOR);
+ }
+
+ @Override
+ public ConfigData<?> cacheConfigData() {
+ return GROUP_CACHE.get(ConfigGroupEnum.PROXY_SELECTOR);
+ }
+
+}
diff --git
a/shenyu-sync-data-center/shenyu-sync-data-http/src/test/java/org/apache/shenyu/sync/data/http/HttpSyncDataServiceTest.java
b/shenyu-sync-data-center/shenyu-sync-data-http/src/test/java/org/apache/shenyu/sync/data/http/HttpSyncDataServiceTest.java
index f5ee4cae4..6c1930e68 100644
---
a/shenyu-sync-data-center/shenyu-sync-data-http/src/test/java/org/apache/shenyu/sync/data/http/HttpSyncDataServiceTest.java
+++
b/shenyu-sync-data-center/shenyu-sync-data-http/src/test/java/org/apache/shenyu/sync/data/http/HttpSyncDataServiceTest.java
@@ -28,6 +28,8 @@ import org.apache.shenyu.common.utils.GsonUtils;
import org.apache.shenyu.sync.data.api.AuthDataSubscriber;
import org.apache.shenyu.sync.data.api.MetaDataSubscriber;
import org.apache.shenyu.sync.data.api.PluginDataSubscriber;
+import org.apache.shenyu.sync.data.api.ProxySelectorDataSubscriber;
+import org.apache.shenyu.sync.data.api.DiscoveryUpstreamDataSubscriber;
import org.apache.shenyu.sync.data.http.config.HttpConfig;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -75,6 +77,10 @@ public final class HttpSyncDataServiceTest {
private AuthDataSubscriber authDataSubscriber;
+ private ProxySelectorDataSubscriber proxySelectorDataSubscriber;
+
+ private DiscoveryUpstreamDataSubscriber discoveryUpstreamDataSubscriber;
+
private HttpSyncDataService httpSyncDataService;
@BeforeEach
@@ -114,6 +120,8 @@ public final class HttpSyncDataServiceTest {
this.pluginDataSubscriber = mock(PluginDataSubscriber.class);
this.metaDataSubscriber = mock(MetaDataSubscriber.class);
this.authDataSubscriber = mock(AuthDataSubscriber.class);
+ this.proxySelectorDataSubscriber =
mock(ProxySelectorDataSubscriber.class);
+ this.discoveryUpstreamDataSubscriber =
mock(DiscoveryUpstreamDataSubscriber.class);
OkHttp3ClientHttpRequestFactory factory = new
OkHttp3ClientHttpRequestFactory();
factory.setConnectTimeout(Objects.isNull(httpConfig.getConnectionTimeout()) ?
(int) HttpConstants.CLIENT_POLLING_CONNECT_TIMEOUT :
httpConfig.getConnectionTimeout());
@@ -123,7 +131,8 @@ public final class HttpSyncDataServiceTest {
AccessTokenManager accessTokenManager = new
AccessTokenManager(restTemplate, httpConfig);
this.httpSyncDataService = new HttpSyncDataService(httpConfig,
pluginDataSubscriber, restTemplate,
- Collections.singletonList(metaDataSubscriber),
Collections.singletonList(authDataSubscriber), accessTokenManager);
+ Collections.singletonList(metaDataSubscriber),
Collections.singletonList(authDataSubscriber),
Collections.singletonList(proxySelectorDataSubscriber),
+ Collections.singletonList(discoveryUpstreamDataSubscriber),
accessTokenManager);
}
@AfterEach