This is an automated email from the ASF dual-hosted git repository.
xiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shenyu.git
The following commit(s) were added to refs/heads/master by this push:
new 8a290d275 etcd sync discoveryData (#4800)
8a290d275 is described below
commit 8a290d2754e9956e540d5f280e5eda4037fefc03
Author: Misaya295 <[email protected]>
AuthorDate: Wed Jul 5 16:56:04 2023 +0800
etcd sync discoveryData (#4800)
---
.../admin/listener/etcd/EtcdDataChangedInit.java | 6 +-
.../sync/data/etcd/EtcdSyncDataConfiguration.java | 16 ++++-
.../shenyu/sync/data/etcd/EtcdSyncDataService.java | 84 +++++++++++++++++++---
.../sync/data/etcd/EtcdSyncDataServiceTest.java | 12 ++--
4 files changed, 100 insertions(+), 18 deletions(-)
diff --git
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/etcd/EtcdDataChangedInit.java
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/etcd/EtcdDataChangedInit.java
index 7110cd38d..81cde311b 100644
---
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/etcd/EtcdDataChangedInit.java
+++
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/etcd/EtcdDataChangedInit.java
@@ -42,6 +42,10 @@ public class EtcdDataChangedInit extends
AbstractDataChangedInit {
@Override
protected boolean notExist() {
- return Stream.of(DefaultPathConstants.PLUGIN_PARENT,
DefaultPathConstants.APP_AUTH_PARENT,
DefaultPathConstants.META_DATA).noneMatch(etcdClient::exists);
+ return Stream.of(DefaultPathConstants.PLUGIN_PARENT,
+ DefaultPathConstants.APP_AUTH_PARENT,
+ DefaultPathConstants.META_DATA,
+ DefaultPathConstants.PROXY_SELECTOR,
+
DefaultPathConstants.DISCOVERY_UPSTREAM).noneMatch(etcdClient::exists);
}
}
diff --git
a/shenyu-spring-boot-starter/shenyu-spring-boot-starter-sync-data-center/shenyu-spring-boot-starter-sync-data-etcd/src/main/java/org/apache/shenyu/springboot/sync/data/etcd/EtcdSyncDataConfiguration.java
b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-sync-data-center/shenyu-spring-boot-starter-sync-data-etcd/src/main/java/org/apache/shenyu/springboot/sync/data/etcd/EtcdSyncDataConfiguration.java
index 5c1a64b79..2698cc78c 100644
---
a/shenyu-spring-boot-starter/shenyu-spring-boot-starter-sync-data-center/shenyu-spring-boot-starter-sync-data-etcd/src/main/java/org/apache/shenyu/springboot/sync/data/etcd/EtcdSyncDataConfiguration.java
+++
b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-sync-data-center/shenyu-spring-boot-starter-sync-data-etcd/src/main/java/org/apache/shenyu/springboot/sync/data/etcd/EtcdSyncDataConfiguration.java
@@ -22,6 +22,8 @@ import org.apache.shenyu.sync.data.api.AuthDataSubscriber;
import org.apache.shenyu.sync.data.api.MetaDataSubscriber;
import org.apache.shenyu.sync.data.api.PluginDataSubscriber;
import org.apache.shenyu.sync.data.api.SyncDataService;
+import org.apache.shenyu.sync.data.api.DiscoveryUpstreamDataSubscriber;
+import org.apache.shenyu.sync.data.api.ProxySelectorDataSubscriber;
import org.apache.shenyu.sync.data.etcd.EtcdClient;
import org.apache.shenyu.sync.data.etcd.EtcdSyncDataService;
import org.slf4j.Logger;
@@ -54,16 +56,24 @@ public class EtcdSyncDataConfiguration {
* @param pluginSubscriber the plugin subscriber
* @param metaSubscribers the meta subscribers
* @param authSubscribers the auth subscribers
+ * @param proxySelectorDataSubscribers the proxy selector data subscribers
+ * @param discoveryUpstreamDataSubscribers the discovery upstream data
subscribers
* @return the sync data service
*/
@Bean
public SyncDataService syncDataService(final ObjectProvider<EtcdClient>
etcdClients,
final
ObjectProvider<PluginDataSubscriber> pluginSubscriber,
final
ObjectProvider<List<MetaDataSubscriber>> metaSubscribers,
- final
ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {
+ final
ObjectProvider<List<AuthDataSubscriber>> authSubscribers,
+ final
ObjectProvider<List<ProxySelectorDataSubscriber>> proxySelectorDataSubscribers,
+ final
ObjectProvider<List<DiscoveryUpstreamDataSubscriber>>
discoveryUpstreamDataSubscribers) {
LOGGER.info("you use etcd sync shenyu data.......");
- return new EtcdSyncDataService(etcdClients.getIfAvailable(),
pluginSubscriber.getIfAvailable(),
- metaSubscribers.getIfAvailable(Collections::emptyList),
authSubscribers.getIfAvailable(Collections::emptyList));
+ return new EtcdSyncDataService(etcdClients.getIfAvailable(),
+ pluginSubscriber.getIfAvailable(),
+ metaSubscribers.getIfAvailable(Collections::emptyList),
+ authSubscribers.getIfAvailable(Collections::emptyList),
+
proxySelectorDataSubscribers.getIfAvailable(Collections::emptyList),
+
discoveryUpstreamDataSubscribers.getIfAvailable(Collections::emptyList));
}
/**
diff --git
a/shenyu-sync-data-center/shenyu-sync-data-etcd/src/main/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataService.java
b/shenyu-sync-data-center/shenyu-sync-data-etcd/src/main/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataService.java
index 889e2cbfe..6406834e9 100644
---
a/shenyu-sync-data-center/shenyu-sync-data-etcd/src/main/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataService.java
+++
b/shenyu-sync-data-center/shenyu-sync-data-etcd/src/main/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataService.java
@@ -25,13 +25,17 @@ import org.apache.shenyu.common.dto.AppAuthData;
import org.apache.shenyu.common.dto.MetaData;
import org.apache.shenyu.common.dto.PluginData;
import org.apache.shenyu.common.dto.RuleData;
+import org.apache.shenyu.common.dto.ProxySelectorData;
import org.apache.shenyu.common.dto.SelectorData;
+import org.apache.shenyu.common.dto.DiscoverySyncData;
import org.apache.shenyu.common.enums.ConfigGroupEnum;
import org.apache.shenyu.common.utils.GsonUtils;
import org.apache.shenyu.sync.data.api.AuthDataSubscriber;
import org.apache.shenyu.sync.data.api.MetaDataSubscriber;
import org.apache.shenyu.sync.data.api.PluginDataSubscriber;
import org.apache.shenyu.sync.data.api.SyncDataService;
+import org.apache.shenyu.sync.data.api.ProxySelectorDataSubscriber;
+import org.apache.shenyu.sync.data.api.DiscoveryUpstreamDataSubscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,24 +71,34 @@ public class EtcdSyncDataService implements SyncDataService
{
private final List<AuthDataSubscriber> authDataSubscribers;
+ private final List<ProxySelectorDataSubscriber>
proxySelectorDataSubscribers;
+
+ private final List<DiscoveryUpstreamDataSubscriber>
discoveryUpstreamDataSubscribers;
+
private Map<String, String> keysMap = new ConcurrentHashMap<>();
/**
* Instantiates a new Zookeeper cache manager.
*
- * @param etcdClient the etcd client
- * @param pluginDataSubscriber the plugin data subscriber
- * @param metaDataSubscribers the meta data subscribers
- * @param authDataSubscribers the auth data subscribers
+ * @param etcdClient the etcd client
+ * @param pluginDataSubscriber the plugin data subscriber
+ * @param metaDataSubscribers the meta data subscribers
+ * @param authDataSubscribers the auth data subscribers
+ * @param proxySelectorDataSubscribers the proxy selector data
subscribers
+ * @param discoveryUpstreamDataSubscribers the discovery upstream data
subscribers
*/
public EtcdSyncDataService(final EtcdClient etcdClient,
final PluginDataSubscriber pluginDataSubscriber,
final List<MetaDataSubscriber>
metaDataSubscribers,
- final List<AuthDataSubscriber>
authDataSubscribers) {
+ final List<AuthDataSubscriber>
authDataSubscribers,
+ final List<ProxySelectorDataSubscriber>
proxySelectorDataSubscribers,
+ final List<DiscoveryUpstreamDataSubscriber>
discoveryUpstreamDataSubscribers) {
this.etcdClient = etcdClient;
this.pluginDataSubscriber = pluginDataSubscriber;
this.metaDataSubscribers = metaDataSubscribers;
this.authDataSubscribers = authDataSubscribers;
+ this.proxySelectorDataSubscribers = proxySelectorDataSubscribers;
+ this.discoveryUpstreamDataSubscribers =
discoveryUpstreamDataSubscribers;
watchAllKeys();
watcherData();
watchAppAuth();
@@ -95,9 +109,7 @@ public class EtcdSyncDataService implements SyncDataService {
keysMap = etcdClient.getKeysMapByPrefix(PRE_FIX);
etcdClient.watchDataChange(PRE_FIX, (updateKey, updateValue) -> {
keysMap.put(updateKey, updateValue);
- }, deleteKey -> {
- keysMap.remove(deleteKey);
- });
+ }, deleteKey -> keysMap.remove(deleteKey));
}
@@ -204,6 +216,18 @@ public class EtcdSyncDataService implements
SyncDataService {
subscribeMetaDataChanges(updatePath);
}, null);
break;
+ case DISCOVER_UPSTREAM:
+ etcdClient.watchChildChange(groupParentPath, (updatePath,
updateValue) -> {
+ cacheDiscoveryUpstreamData(keysMap.get(updatePath));
+ subscribeDiscoveryUpstreamDataChanges(updatePath);
+ }, null);
+ break;
+ case PROXY_SELECTOR:
+ etcdClient.watchChildChange(groupParentPath, (updatePath,
updateValue) -> {
+ cacheProxySelectorData(keysMap.get(updatePath));
+ subscribeProxySelectorDataChanges(updatePath);
+ }, null);
+ break;
default:
throw new IllegalStateException("Unexpected groupKey: " +
groupKey);
}
@@ -327,6 +351,50 @@ public class EtcdSyncDataService implements
SyncDataService {
.ifPresent(data -> metaDataSubscribers.forEach(e ->
e.unSubscribe(metaData)));
}
+ private void cacheDiscoveryUpstreamData(final String dataString) {
+ final DiscoverySyncData discoveryUpstream =
GsonUtils.getInstance().fromJson(dataString, DiscoverySyncData.class);
+ Optional.ofNullable(discoveryUpstream)
+ .ifPresent(data -> discoveryUpstreamDataSubscribers.forEach(e
-> e.onSubscribe(data)));
+ }
+
+ private void unCacheDiscoveryUpstreamData(final String dataPath) {
+ DiscoverySyncData discoverySyncData = new DiscoverySyncData();
+ final String selectorId = dataPath.substring(dataPath.lastIndexOf("/")
+ 1);
+ final String str =
dataPath.substring(DefaultPathConstants.DISCOVERY_UPSTREAM.length());
+ final String pluginName = str.substring(1, str.length() -
selectorId.length() - 1);
+ discoverySyncData.setPluginName(pluginName);
+ discoverySyncData.setSelectorId(selectorId);
+ discoveryUpstreamDataSubscribers.forEach(e ->
e.unSubscribe(discoverySyncData));
+ etcdClient.watchClose(dataPath);
+ }
+
+ private void subscribeDiscoveryUpstreamDataChanges(final String realPath) {
+ etcdClient.watchDataChange(realPath, (updatePath, updateValue) ->
cacheDiscoveryUpstreamData(updateValue),
+ this::unCacheDiscoveryUpstreamData);
+ }
+
+ private void cacheProxySelectorData(final String dataString) {
+ final ProxySelectorData proxySelectorData =
GsonUtils.getInstance().fromJson(dataString, ProxySelectorData.class);
+ Optional.ofNullable(proxySelectorData)
+ .ifPresent(data -> proxySelectorDataSubscribers.forEach(e ->
e.onSubscribe(data)));
+ }
+
+ private void unCacheProxySelectorData(final String dataPath) {
+ ProxySelectorData proxySelectorData = new ProxySelectorData();
+ final String selectorId = dataPath.substring(dataPath.lastIndexOf("/")
+ 1);
+ final String str =
dataPath.substring(DefaultPathConstants.PROXY_SELECTOR.length());
+ final String pluginName = str.substring(1, str.length() -
selectorId.length() - 1);
+ proxySelectorData.setPluginName(pluginName);
+ proxySelectorData.setId(selectorId);
+ proxySelectorDataSubscribers.forEach(e ->
e.unSubscribe(proxySelectorData));
+ etcdClient.watchClose(dataPath);
+ }
+
+ private void subscribeProxySelectorDataChanges(final String realPath) {
+ etcdClient.watchDataChange(realPath, (updatePath, updateValue) ->
cacheProxySelectorData(updateValue),
+ this::unCacheProxySelectorData);
+ }
+
private String buildRealPath(final String parent, final String children) {
return String.join("/", parent, children);
}
diff --git
a/shenyu-sync-data-center/shenyu-sync-data-etcd/src/test/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataServiceTest.java
b/shenyu-sync-data-center/shenyu-sync-data-etcd/src/test/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataServiceTest.java
index e1cd37c0a..8e36b6ff3 100644
---
a/shenyu-sync-data-center/shenyu-sync-data-etcd/src/test/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataServiceTest.java
+++
b/shenyu-sync-data-center/shenyu-sync-data-etcd/src/test/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataServiceTest.java
@@ -133,7 +133,7 @@ public class EtcdSyncDataServiceTest {
public void onSubscribe(final PluginData pluginData) {
subscribeList.add(pluginData);
}
- }, Collections.emptyList(), Collections.emptyList());
+ }, Collections.emptyList(), Collections.emptyList(),
Collections.emptyList(), Collections.emptyList());
assertThat(subscribeList.size(), is(1));
assertThat(subscribeList.get(0).getName(), is("divide"));
}
@@ -154,7 +154,7 @@ public class EtcdSyncDataServiceTest {
.findFirst().orElse(null);
subscribeList.remove(pluginDataDel);
}
- }, Collections.emptyList(), Collections.emptyList());
+ }, Collections.emptyList(), Collections.emptyList(),
Collections.emptyList(), Collections.emptyList());
subscribeList.clear();
final Method deletePlugin =
EtcdSyncDataService.class.getDeclaredMethod("deletePlugin", String.class);
final Method cachePluginData =
EtcdSyncDataService.class.getDeclaredMethod("cachePluginData", String.class);
@@ -185,7 +185,7 @@ public class EtcdSyncDataServiceTest {
.findFirst().orElse(null);
subscribeList.remove(metaDataDel);
}
- }), Collections.emptyList());
+ }), Collections.emptyList(), Collections.emptyList(),
Collections.emptyList());
subscribeList.clear();
final Method cacheMetaData =
EtcdSyncDataService.class.getDeclaredMethod("cacheMetaData", String.class);
final Method deleteMetaData =
EtcdSyncDataService.class.getDeclaredMethod("unCacheMetaData", MetaData.class);
@@ -216,7 +216,7 @@ public class EtcdSyncDataServiceTest {
.findFirst().orElse(null);
subscribeList.remove(appAuthDataOld);
}
- }));
+ }), Collections.emptyList(), Collections.emptyList());
subscribeList.clear();
final Method cacheAuthData =
EtcdSyncDataService.class.getDeclaredMethod("cacheAuthData", String.class);
final Method unCacheAuthData =
EtcdSyncDataService.class.getDeclaredMethod("unCacheAuthData", String.class);
@@ -233,7 +233,7 @@ public class EtcdSyncDataServiceTest {
@Test
public void closeTest() {
etcdSyncDataService = new EtcdSyncDataService(etcdClient,
mock(PluginDataSubscriber.class), Collections.emptyList(),
- Collections.emptyList());
+ Collections.emptyList(), Collections.emptyList(),
Collections.emptyList());
assertDoesNotThrow(() -> etcdSyncDataService.close());
}
@@ -271,7 +271,7 @@ public class EtcdSyncDataServiceTest {
etcdSyncDataService = new EtcdSyncDataService(etcdClient,
mock(PluginDataSubscriber.class),
Collections.emptyList(),
- Collections.emptyList());
+ Collections.emptyList(), Collections.emptyList(),
Collections.emptyList());
final Field etcdClient =
EtcdSyncDataService.class.getDeclaredField("etcdClient");
etcdClient.setAccessible(true);
etcdClient.set(etcdSyncDataService, mockEtcdClient);