This is an automated email from the ASF dual-hosted git repository.
hefengen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shenyu.git
The following commit(s) were added to refs/heads/master by this push:
new 1cd6444f5 feature: sync consul (#4802)
1cd6444f5 is described below
commit 1cd6444f5ab9efe1b803d56055cb61582a8af42d
Author: zhengpeng <[email protected]>
AuthorDate: Fri Jul 7 14:19:13 2023 +0800
feature: sync consul (#4802)
Co-authored-by: xiaoyu <[email protected]>
Co-authored-by: moremind <[email protected]>
---
.../data/consul/ConsulSyncDataConfiguration.java | 11 ++++--
.../sync/data/consul/ConsulSyncDataService.java | 10 ++++--
.../data/consul/handler/ConsulCacheHandler.java | 39 +++++++++++++++++++++-
.../data/consul/ConsulSyncDataServiceTest.java | 2 +-
.../consul/handler/ConsulCacheHandlerTest.java | 12 +++----
5 files changed, 62 insertions(+), 12 deletions(-)
diff --git
a/shenyu-spring-boot-starter/shenyu-spring-boot-starter-sync-data-center/shenyu-spring-boot-starter-sync-data-consul/src/main/java/org/apache/shenyu/springboot/sync/data/consul/ConsulSyncDataConfiguration.java
b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-sync-data-center/shenyu-spring-boot-starter-sync-data-consul/src/main/java/org/apache/shenyu/springboot/sync/data/consul/ConsulSyncDataConfiguration.java
index 7531e522f..290ff6d3a 100644
---
a/shenyu-spring-boot-starter/shenyu-spring-boot-starter-sync-data-center/shenyu-spring-boot-starter-sync-data-consul/src/main/java/org/apache/shenyu/springboot/sync/data/consul/ConsulSyncDataConfiguration.java
+++
b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-sync-data-center/shenyu-spring-boot-starter-sync-data-consul/src/main/java/org/apache/shenyu/springboot/sync/data/consul/ConsulSyncDataConfiguration.java
@@ -19,8 +19,10 @@ package org.apache.shenyu.springboot.sync.data.consul;
import com.ecwid.consul.v1.ConsulClient;
import org.apache.shenyu.sync.data.api.AuthDataSubscriber;
+import org.apache.shenyu.sync.data.api.DiscoveryUpstreamDataSubscriber;
import org.apache.shenyu.sync.data.api.MetaDataSubscriber;
import org.apache.shenyu.sync.data.api.PluginDataSubscriber;
+import org.apache.shenyu.sync.data.api.ProxySelectorDataSubscriber;
import org.apache.shenyu.sync.data.api.SyncDataService;
import org.apache.shenyu.sync.data.consul.ConsulSyncDataService;
import org.apache.shenyu.sync.data.consul.config.ConsulConfig;
@@ -54,6 +56,8 @@ public class ConsulSyncDataConfiguration {
* @param pluginSubscriber the plugin subscriber
* @param metaSubscribers the meta subscribers
* @param authSubscribers the auth subscribers
+ * @param proxySelectorSubscribers the proxySelectorSubscribers
+ * @param discoveryUpstreamSubscribers the discoveryUpstreamSubscribers
* @return the sync data service
*/
@Bean
@@ -61,10 +65,13 @@ public class ConsulSyncDataConfiguration {
final ObjectProvider<ConsulConfig>
consulConfig,
final
ObjectProvider<PluginDataSubscriber> pluginSubscriber,
final
ObjectProvider<List<MetaDataSubscriber>> metaSubscribers,
- final
ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {
+ final
ObjectProvider<List<AuthDataSubscriber>> authSubscribers,
+ final
ObjectProvider<List<ProxySelectorDataSubscriber>> proxySelectorSubscribers,
+ final
ObjectProvider<List<DiscoveryUpstreamDataSubscriber>>
discoveryUpstreamSubscribers) {
LOGGER.info("you use consul sync shenyu data.......");
return new ConsulSyncDataService(consulClient.getIfAvailable(),
consulConfig.getIfAvailable(), pluginSubscriber.getIfAvailable(),
- metaSubscribers.getIfAvailable(Collections::emptyList),
authSubscribers.getIfAvailable(Collections::emptyList));
+ metaSubscribers.getIfAvailable(Collections::emptyList),
authSubscribers.getIfAvailable(Collections::emptyList),
+
proxySelectorSubscribers.getIfAvailable(Collections::emptyList),
discoveryUpstreamSubscribers.getIfAvailable(Collections::emptyList));
}
/**
diff --git
a/shenyu-sync-data-center/shenyu-sync-data-consul/src/main/java/org/apache/shenyu/sync/data/consul/ConsulSyncDataService.java
b/shenyu-sync-data-center/shenyu-sync-data-consul/src/main/java/org/apache/shenyu/sync/data/consul/ConsulSyncDataService.java
index be390ac82..57b1e547c 100644
---
a/shenyu-sync-data-center/shenyu-sync-data-consul/src/main/java/org/apache/shenyu/sync/data/consul/ConsulSyncDataService.java
+++
b/shenyu-sync-data-center/shenyu-sync-data-consul/src/main/java/org/apache/shenyu/sync/data/consul/ConsulSyncDataService.java
@@ -24,8 +24,10 @@ import com.ecwid.consul.v1.kv.model.GetValue;
import org.apache.shenyu.common.concurrent.ShenyuThreadFactory;
import org.apache.shenyu.common.constant.ConsulConstants;
import org.apache.shenyu.sync.data.api.AuthDataSubscriber;
+import org.apache.shenyu.sync.data.api.DiscoveryUpstreamDataSubscriber;
import org.apache.shenyu.sync.data.api.MetaDataSubscriber;
import org.apache.shenyu.sync.data.api.PluginDataSubscriber;
+import org.apache.shenyu.sync.data.api.ProxySelectorDataSubscriber;
import org.apache.shenyu.sync.data.api.SyncDataService;
import org.apache.shenyu.sync.data.consul.config.ConsulConfig;
import org.apache.shenyu.sync.data.consul.handler.ConsulCacheHandler;
@@ -77,8 +79,10 @@ public class ConsulSyncDataService extends
ConsulCacheHandler implements SyncDat
final ConsulConfig consulConfig,
final PluginDataSubscriber
pluginDataSubscriber,
final List<MetaDataSubscriber>
metaDataSubscribers,
- final List<AuthDataSubscriber>
authDataSubscribers) {
- super(pluginDataSubscriber, metaDataSubscribers, authDataSubscribers);
+ final List<AuthDataSubscriber>
authDataSubscribers,
+ final List<ProxySelectorDataSubscriber>
proxySelectorDataSubscribers,
+ final List<DiscoveryUpstreamDataSubscriber>
discoveryUpstreamDataSubscribers) {
+ super(pluginDataSubscriber, metaDataSubscribers, authDataSubscribers,
proxySelectorDataSubscribers, discoveryUpstreamDataSubscribers);
this.consulClient = consulClient;
this.consulConfig = consulConfig;
this.executor = new ScheduledThreadPoolExecutor(1,
@@ -97,6 +101,8 @@ public class ConsulSyncDataService extends
ConsulCacheHandler implements SyncDat
groupMap.put(ConsulConstants.RULE_DATA, this::updateRuleMap);
groupMap.put(ConsulConstants.META_DATA, this::updateMetaDataMap);
groupMap.put(ConsulConstants.AUTH_DATA, this::updateAuthMap);
+ groupMap.put(ConsulConstants.PROXY_SELECTOR_DATA_ID,
this::updateSelectorDataMap);
+ groupMap.put(ConsulConstants.DISCOVERY_UPSTREAM,
this::updateDiscoveryUpstreamMap);
}
private void watchConfigKeyValues() {
diff --git
a/shenyu-sync-data-center/shenyu-sync-data-consul/src/main/java/org/apache/shenyu/sync/data/consul/handler/ConsulCacheHandler.java
b/shenyu-sync-data-center/shenyu-sync-data-consul/src/main/java/org/apache/shenyu/sync/data/consul/handler/ConsulCacheHandler.java
index d593448fe..296da3ff0 100644
---
a/shenyu-sync-data-center/shenyu-sync-data-consul/src/main/java/org/apache/shenyu/sync/data/consul/handler/ConsulCacheHandler.java
+++
b/shenyu-sync-data-center/shenyu-sync-data-consul/src/main/java/org/apache/shenyu/sync/data/consul/handler/ConsulCacheHandler.java
@@ -19,14 +19,18 @@ package org.apache.shenyu.sync.data.consul.handler;
import com.google.gson.JsonParseException;
import org.apache.shenyu.common.dto.AppAuthData;
+import org.apache.shenyu.common.dto.DiscoverySyncData;
import org.apache.shenyu.common.dto.MetaData;
import org.apache.shenyu.common.dto.PluginData;
+import org.apache.shenyu.common.dto.ProxySelectorData;
import org.apache.shenyu.common.dto.RuleData;
import org.apache.shenyu.common.dto.SelectorData;
import org.apache.shenyu.common.utils.GsonUtils;
import org.apache.shenyu.sync.data.api.AuthDataSubscriber;
+import org.apache.shenyu.sync.data.api.DiscoveryUpstreamDataSubscriber;
import org.apache.shenyu.sync.data.api.MetaDataSubscriber;
import org.apache.shenyu.sync.data.api.PluginDataSubscriber;
+import org.apache.shenyu.sync.data.api.ProxySelectorDataSubscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,12 +55,21 @@ public class ConsulCacheHandler {
private final List<AuthDataSubscriber> authDataSubscribers;
+ private final List<ProxySelectorDataSubscriber>
proxySelectorDataSubscribers;
+
+ private final List<DiscoveryUpstreamDataSubscriber>
discoveryUpstreamDataSubscribers;
+
public ConsulCacheHandler(final PluginDataSubscriber pluginDataSubscriber,
final List<MetaDataSubscriber>
metaDataSubscribers,
- final List<AuthDataSubscriber>
authDataSubscribers) {
+ final List<AuthDataSubscriber>
authDataSubscribers,
+ final List<ProxySelectorDataSubscriber>
proxySelectorDataSubscribers,
+ final List<DiscoveryUpstreamDataSubscriber>
discoveryUpstreamDataSubscribers
+ ) {
this.pluginDataSubscriber = pluginDataSubscriber;
this.metaDataSubscribers = metaDataSubscribers;
this.authDataSubscribers = authDataSubscribers;
+ this.proxySelectorDataSubscribers = proxySelectorDataSubscribers;
+ this.discoveryUpstreamDataSubscribers =
discoveryUpstreamDataSubscribers;
}
protected void updatePluginData(final String configInfo) {
@@ -121,6 +134,30 @@ public class ConsulCacheHandler {
}
}
+ protected void updateSelectorDataMap(final String configInfo) {
+ try {
+ List<ProxySelectorData> proxySelectorDataList = new
ArrayList<>(GsonUtils.getInstance().toObjectMap(configInfo,
ProxySelectorData.class).values());
+ proxySelectorDataList.forEach(proxySelectorData ->
proxySelectorDataSubscribers.forEach(subscriber -> {
+ subscriber.onSubscribe(proxySelectorData);
+ subscriber.unSubscribe(proxySelectorData);
+ }));
+ } catch (JsonParseException e) {
+ LOG.error("sync proxy selector data have error:", e);
+ }
+ }
+
+ protected void updateDiscoveryUpstreamMap(final String configInfo) {
+ try {
+ List<DiscoverySyncData> discoverySyncDataList = new
ArrayList<>(GsonUtils.getInstance().toObjectMap(configInfo,
DiscoverySyncData.class).values());
+ discoverySyncDataList.forEach(discoverySyncData ->
discoveryUpstreamDataSubscribers.forEach(subscriber -> {
+ subscriber.onSubscribe(discoverySyncData);
+ subscriber.unSubscribe(discoverySyncData);
+ }));
+ } catch (JsonParseException e) {
+ LOG.error("sync discovery data have error:", e);
+ }
+ }
+
protected interface OnChange {
void change(String changeData);
diff --git
a/shenyu-sync-data-center/shenyu-sync-data-consul/src/test/java/org/apache/shenyu/sync/data/consul/ConsulSyncDataServiceTest.java
b/shenyu-sync-data-center/shenyu-sync-data-consul/src/test/java/org/apache/shenyu/sync/data/consul/ConsulSyncDataServiceTest.java
index 37e87a163..89c76c716 100644
---
a/shenyu-sync-data-center/shenyu-sync-data-consul/src/test/java/org/apache/shenyu/sync/data/consul/ConsulSyncDataServiceTest.java
+++
b/shenyu-sync-data-center/shenyu-sync-data-consul/src/test/java/org/apache/shenyu/sync/data/consul/ConsulSyncDataServiceTest.java
@@ -89,7 +89,7 @@ public final class ConsulSyncDataServiceTest {
when(response.getValue()).thenReturn(list);
when(response.getConsulIndex()).thenReturn(INDEX);
consulSyncDataService = new ConsulSyncDataService(consulClient,
consulConfig, null,
- Collections.emptyList(), Collections.emptyList());
+ Collections.emptyList(), Collections.emptyList(),
Collections.emptyList(), Collections.emptyList());
}
@Test
diff --git
a/shenyu-sync-data-center/shenyu-sync-data-consul/src/test/java/org/apache/shenyu/sync/data/consul/handler/ConsulCacheHandlerTest.java
b/shenyu-sync-data-center/shenyu-sync-data-consul/src/test/java/org/apache/shenyu/sync/data/consul/handler/ConsulCacheHandlerTest.java
index 024e142c2..c1b5e4030 100644
---
a/shenyu-sync-data-center/shenyu-sync-data-consul/src/test/java/org/apache/shenyu/sync/data/consul/handler/ConsulCacheHandlerTest.java
+++
b/shenyu-sync-data-center/shenyu-sync-data-consul/src/test/java/org/apache/shenyu/sync/data/consul/handler/ConsulCacheHandlerTest.java
@@ -66,7 +66,7 @@ public final class ConsulCacheHandlerTest {
public void unSubscribe(final PluginData pluginData) {
unsubscribeList.add(pluginData);
}
- }, Collections.emptyList(), Collections.emptyList());
+ }, Collections.emptyList(), Collections.emptyList(),
Collections.emptyList(), Collections.emptyList());
consulCacheHandler.updatePluginData(pluginData);
assertEquals(2, onSubscribeList.size());
assertEquals(2, unsubscribeList.size());
@@ -104,7 +104,7 @@ public final class ConsulCacheHandlerTest {
public void unSelectorSubscribe(final SelectorData selectorData) {
unsubscribeList.add(selectorData);
}
- }, Collections.emptyList(), Collections.emptyList());
+ }, Collections.emptyList(), Collections.emptyList(),
Collections.emptyList(), Collections.emptyList());
consulCacheHandler.updateSelectorMap(selectorDataParam);
assertEquals(2, subscribeList.size());
assertEquals(2, unsubscribeList.size());
@@ -137,7 +137,7 @@ public final class ConsulCacheHandlerTest {
public void unRuleSubscribe(final RuleData ruleData) {
unsubscribeList.add(ruleData);
}
- }, Collections.emptyList(), Collections.emptyList());
+ }, Collections.emptyList(), Collections.emptyList(),
Collections.emptyList(), Collections.emptyList());
consulCacheHandler.updateRuleMap(ruleDataParam);
assertEquals(2, subscribeList.size());
assertEquals(2, unsubscribeList.size());
@@ -166,7 +166,7 @@ public final class ConsulCacheHandlerTest {
}
};
ConsulCacheHandler consulCacheHandler = new ConsulCacheHandler(null,
Lists.newArrayList(metaDataSubscriber),
- Collections.emptyList());
+ Collections.emptyList(), Collections.emptyList(),
Collections.emptyList());
consulCacheHandler.updateMetaDataMap(metaDataParam);
assertEquals(2, subscribeList.size());
assertEquals(2, unsubscribeList.size());
@@ -199,7 +199,7 @@ public final class ConsulCacheHandlerTest {
}
};
ConsulCacheHandler consulCacheHandler = new ConsulCacheHandler(null,
- Collections.emptyList(),
Lists.newArrayList(authDataSubscriber));
+ Collections.emptyList(),
Lists.newArrayList(authDataSubscriber), Collections.emptyList(),
Collections.emptyList());
consulCacheHandler.updateAuthMap(appAuthDataParam);
assertEquals(2, subscribeList.size());
@@ -209,7 +209,7 @@ public final class ConsulCacheHandlerTest {
@Test
public void testError() {
ConsulCacheHandler consulCacheHandler = new ConsulCacheHandler(null,
- Collections.emptyList(), Collections.emptyList());
+ Collections.emptyList(), Collections.emptyList(),
Collections.emptyList(), Collections.emptyList());
assertDoesNotThrow(() ->
consulCacheHandler.updateAuthMap("errorJson"));
assertDoesNotThrow(() ->
consulCacheHandler.updateMetaDataMap("errorJson"));
assertDoesNotThrow(() ->
consulCacheHandler.updateRuleMap("errorJson"));