This is an automated email from the ASF dual-hosted git repository.
xiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shenyu.git
The following commit(s) were added to refs/heads/master by this push:
new 04f650db6 [ISSUE #4088] fix sync data by etcd
io.vertx.core.VertxException (#4141)
04f650db6 is described below
commit 04f650db6c070979ba98f395c641dad4ca4a4079
Author: Misaya295 <[email protected]>
AuthorDate: Mon Oct 31 14:51:35 2022 +0800
[ISSUE #4088] fix sync data by etcd io.vertx.core.VertxException (#4141)
* fix #4088
* fix sync data
* fix TU
* refactor code
Co-authored-by: xiaoyu <[email protected]>
---
.../apache/shenyu/sync/data/etcd/EtcdClient.java | 71 +++++++++++++++++++---
.../shenyu/sync/data/etcd/EtcdSyncDataService.java | 55 +++++++++++------
.../sync/data/etcd/EtcdSyncDataServiceTest.java | 1 -
3 files changed, 99 insertions(+), 28 deletions(-)
diff --git
a/shenyu-sync-data-center/shenyu-sync-data-etcd/src/main/java/org/apache/shenyu/sync/data/etcd/EtcdClient.java
b/shenyu-sync-data-center/shenyu-sync-data-etcd/src/main/java/org/apache/shenyu/sync/data/etcd/EtcdClient.java
index b334b9a93..596bbad4c 100644
---
a/shenyu-sync-data-center/shenyu-sync-data-etcd/src/main/java/org/apache/shenyu/sync/data/etcd/EtcdClient.java
+++
b/shenyu-sync-data-center/shenyu-sync-data-etcd/src/main/java/org/apache/shenyu/sync/data/etcd/EtcdClient.java
@@ -25,11 +25,12 @@ import io.etcd.jetcd.options.GetOption;
import io.etcd.jetcd.options.WatchOption;
import io.etcd.jetcd.watch.WatchEvent;
import org.apache.commons.collections4.CollectionUtils;
+import org.apache.shenyu.common.exception.ShenyuException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.nio.charset.StandardCharsets;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
@@ -37,6 +38,8 @@ import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
/**
* Etcd client of Bootstrap.
*/
@@ -71,7 +74,7 @@ public class EtcdClient {
public String get(final String key) {
List<KeyValue> keyValues = null;
try {
- keyValues = client.getKVClient().get(ByteSequence.from(key,
StandardCharsets.UTF_8)).get().getKvs();
+ keyValues = client.getKVClient().get(bytesOf(key)).get().getKvs();
} catch (InterruptedException | ExecutionException e) {
LOG.error(e.getMessage(), e);
}
@@ -80,7 +83,37 @@ public class EtcdClient {
return null;
}
- return
keyValues.iterator().next().getValue().toString(StandardCharsets.UTF_8);
+ return keyValues.iterator().next().getValue().toString(UTF_8);
+ }
+
+ /**
+ * get keys by prefix.
+ *
+ * @param prefix key prefix.
+ * @return key valuesMap.
+ */
+ public Map<String, String> getKeysMapByPrefix(final String prefix) {
+ GetOption getOption = GetOption.newBuilder()
+ .isPrefix(true)
+ .build();
+ try {
+ return this.client.getKVClient().get(bytesOf(prefix), getOption)
+ .get().getKvs().stream()
+ .collect(Collectors.toMap(e -> e.getKey().toString(UTF_8),
e -> e.getValue().toString(UTF_8)));
+ } catch (ExecutionException | InterruptedException e) {
+ LOG.error("etcd getKeysMapByPrefix key {} error {}", prefix, e);
+ throw new ShenyuException(e);
+ }
+
+ }
+
+ /**
+ * bytesOf string.
+ * @param val val.
+ * @return bytes val.
+ */
+ public ByteSequence bytesOf(final String val) {
+ return ByteSequence.from(val, UTF_8);
}
/**
@@ -93,7 +126,7 @@ public class EtcdClient {
* @throws InterruptedException the exception
*/
public List<String> getChildrenKeys(final String prefix, final String
separator) throws ExecutionException, InterruptedException {
- ByteSequence prefixByteSequence = ByteSequence.from(prefix,
StandardCharsets.UTF_8);
+ ByteSequence prefixByteSequence = bytesOf(prefix);
GetOption getOption = GetOption.newBuilder()
.withPrefix(prefixByteSequence)
.withSortField(GetOption.SortTarget.KEY)
@@ -106,7 +139,25 @@ public class EtcdClient {
.getKvs();
return keyValues.stream()
- .map(e -> getSubNodeKeyName(prefix,
e.getKey().toString(StandardCharsets.UTF_8), separator))
+ .map(e -> getSubNodeKeyName(prefix,
e.getKey().toString(UTF_8), separator))
+ .distinct()
+ .filter(e -> Objects.nonNull(e))
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * get keyPrefix map.
+ *
+ * @param prefix key prefix.
+ * @param separator separator char
+ * @param map prefix map
+ * @return sub map
+ */
+ public List<String> getChildrenKeysByMap(final String prefix, final String
separator, final Map<String, String> map) {
+
+ return map.entrySet().stream()
+ .filter(e -> e.getKey().contains(prefix))
+ .map(e -> getSubNodeKeyName(prefix, e.getKey(), separator))
.distinct()
.filter(e -> Objects.nonNull(e))
.collect(Collectors.toList());
@@ -131,7 +182,7 @@ public class EtcdClient {
final BiConsumer<String, String> updateHandler,
final Consumer<String> deleteHandler) {
Watch.Listener listener = watch(updateHandler, deleteHandler);
- Watch.Watcher watch =
client.getWatchClient().watch(ByteSequence.from(key, StandardCharsets.UTF_8),
listener);
+ Watch.Watcher watch =
client.getWatchClient().watch(ByteSequence.from(key, UTF_8), listener);
watchCache.put(key, watch);
}
@@ -147,9 +198,9 @@ public class EtcdClient {
final Consumer<String> deleteHandler) {
Watch.Listener listener = watch(updateHandler, deleteHandler);
WatchOption option = WatchOption.newBuilder()
- .withPrefix(ByteSequence.from(key, StandardCharsets.UTF_8))
+ .withPrefix(ByteSequence.from(key, UTF_8))
.build();
- Watch.Watcher watch =
client.getWatchClient().watch(ByteSequence.from(key, StandardCharsets.UTF_8),
option, listener);
+ Watch.Watcher watch =
client.getWatchClient().watch(ByteSequence.from(key, UTF_8), option, listener);
watchCache.put(key, watch);
}
@@ -157,8 +208,8 @@ public class EtcdClient {
final Consumer<String> deleteHandler) {
return Watch.listener(response -> {
for (WatchEvent event : response.getEvents()) {
- String path =
event.getKeyValue().getKey().toString(StandardCharsets.UTF_8);
- String value =
event.getKeyValue().getValue().toString(StandardCharsets.UTF_8);
+ String path = event.getKeyValue().getKey().toString(UTF_8);
+ String value = event.getKeyValue().getValue().toString(UTF_8);
switch (event.getEventType()) {
case PUT:
updateHandler.accept(path, value);
diff --git
a/shenyu-sync-data-center/shenyu-sync-data-etcd/src/main/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataService.java
b/shenyu-sync-data-center/shenyu-sync-data-etcd/src/main/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataService.java
index cfb6c5409..889e2cbfe 100644
---
a/shenyu-sync-data-center/shenyu-sync-data-etcd/src/main/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataService.java
+++
b/shenyu-sync-data-center/shenyu-sync-data-etcd/src/main/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataService.java
@@ -38,10 +38,13 @@ import org.slf4j.LoggerFactory;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
-import java.util.Collections;
+
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.Collections;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
/**
@@ -54,6 +57,8 @@ public class EtcdSyncDataService implements SyncDataService {
*/
private static final Logger LOG =
LoggerFactory.getLogger(EtcdSyncDataService.class);
+ private static final String PRE_FIX = "/shenyu";
+
private final EtcdClient etcdClient;
private final PluginDataSubscriber pluginDataSubscriber;
@@ -62,6 +67,8 @@ public class EtcdSyncDataService implements SyncDataService {
private final List<AuthDataSubscriber> authDataSubscribers;
+ private Map<String, String> keysMap = new ConcurrentHashMap<>();
+
/**
* Instantiates a new Zookeeper cache manager.
*
@@ -78,18 +85,28 @@ public class EtcdSyncDataService implements SyncDataService
{
this.pluginDataSubscriber = pluginDataSubscriber;
this.metaDataSubscribers = metaDataSubscribers;
this.authDataSubscribers = authDataSubscribers;
+ watchAllKeys();
watcherData();
watchAppAuth();
watchMetaData();
}
+ private void watchAllKeys() {
+ keysMap = etcdClient.getKeysMapByPrefix(PRE_FIX);
+ etcdClient.watchDataChange(PRE_FIX, (updateKey, updateValue) -> {
+ keysMap.put(updateKey, updateValue);
+ }, deleteKey -> {
+ keysMap.remove(deleteKey);
+ });
+
+ }
+
private void watcherData() {
final String pluginParent = DefaultPathConstants.PLUGIN_PARENT;
- List<String> pluginChildren = etcdClientGetChildren(pluginParent);
+ List<String> pluginChildren = etcdClientGetChildrenByMap(pluginParent,
keysMap);
for (String pluginName : pluginChildren) {
watcherAll(pluginName);
}
-
etcdClient.watchChildChange(pluginParent, (updateNode, updateValue) ->
{
if (!updateNode.isEmpty()) {
watcherAll(updateNode);
@@ -105,17 +122,17 @@ public class EtcdSyncDataService implements
SyncDataService {
private void watcherPlugin(final String pluginName) {
String pluginPath = DefaultPathConstants.buildPluginPath(pluginName);
- cachePluginData(etcdClient.get(pluginPath));
+ cachePluginData(keysMap.get(pluginPath));
subscribePluginDataChanges(pluginPath, pluginName);
}
private void watcherSelector(final String pluginName) {
String selectorParentPath =
DefaultPathConstants.buildSelectorParentPath(pluginName);
- List<String> childrenList = etcdClientGetChildren(selectorParentPath);
+ List<String> childrenList =
etcdClientGetChildrenByMap(selectorParentPath, keysMap);
if (CollectionUtils.isNotEmpty(childrenList)) {
childrenList.forEach(children -> {
String realPath = buildRealPath(selectorParentPath, children);
- cacheSelectorData(etcdClient.get(realPath));
+ cacheSelectorData(keysMap.get(realPath));
subscribeSelectorDataChanges(realPath);
});
}
@@ -124,11 +141,11 @@ public class EtcdSyncDataService implements
SyncDataService {
private void watcherRule(final String pluginName) {
String ruleParent =
DefaultPathConstants.buildRuleParentPath(pluginName);
- List<String> childrenList = etcdClientGetChildren(ruleParent);
+ List<String> childrenList = etcdClientGetChildrenByMap(ruleParent,
keysMap);
if (CollectionUtils.isNotEmpty(childrenList)) {
childrenList.forEach(children -> {
String realPath = buildRealPath(ruleParent, children);
- cacheRuleData(etcdClient.get(realPath));
+ cacheRuleData(keysMap.get(realPath));
subscribeRuleDataChanges(realPath);
});
}
@@ -137,11 +154,11 @@ public class EtcdSyncDataService implements
SyncDataService {
private void watchAppAuth() {
final String appAuthParent = DefaultPathConstants.APP_AUTH_PARENT;
- List<String> childrenList = etcdClientGetChildren(appAuthParent);
+ List<String> childrenList = etcdClientGetChildrenByMap(appAuthParent,
keysMap);
if (CollectionUtils.isNotEmpty(childrenList)) {
childrenList.forEach(children -> {
String realPath = buildRealPath(appAuthParent, children);
- cacheAuthData(etcdClient.get(realPath));
+ cacheAuthData(keysMap.get(realPath));
subscribeAppAuthDataChanges(realPath);
});
}
@@ -150,11 +167,11 @@ public class EtcdSyncDataService implements
SyncDataService {
private void watchMetaData() {
final String metaDataPath = DefaultPathConstants.META_DATA;
- List<String> childrenList = etcdClientGetChildren(metaDataPath);
+ List<String> childrenList = etcdClientGetChildrenByMap(metaDataPath,
keysMap);
if (CollectionUtils.isNotEmpty(childrenList)) {
childrenList.forEach(children -> {
String realPath = buildRealPath(metaDataPath, children);
- cacheMetaData(etcdClient.get(realPath));
+ cacheMetaData(keysMap.get(realPath));
subscribeMetaDataChanges(realPath);
});
}
@@ -165,25 +182,25 @@ public class EtcdSyncDataService implements
SyncDataService {
switch (groupKey) {
case SELECTOR:
etcdClient.watchChildChange(groupParentPath, (updatePath,
updateValue) -> {
- cacheSelectorData(etcdClient.get(updatePath));
+ cacheSelectorData(keysMap.get(updatePath));
subscribeSelectorDataChanges(updatePath);
}, null);
break;
case RULE:
etcdClient.watchChildChange(groupParentPath, (updatePath,
updateValue) -> {
- cacheRuleData(etcdClient.get(updatePath));
+ cacheRuleData(keysMap.get(updatePath));
subscribeRuleDataChanges(updatePath);
}, null);
break;
case APP_AUTH:
etcdClient.watchChildChange(groupParentPath, (updatePath,
updateValue) -> {
- cacheAuthData(etcdClient.get(updatePath));
+ cacheAuthData(keysMap.get(updatePath));
subscribeAppAuthDataChanges(updatePath);
}, null);
break;
case META_DATA:
etcdClient.watchChildChange(groupParentPath, (updatePath,
updateValue) -> {
- cacheMetaData(etcdClient.get(updatePath));
+ cacheMetaData(keysMap.get(updatePath));
subscribeMetaDataChanges(updatePath);
}, null);
break;
@@ -195,7 +212,7 @@ public class EtcdSyncDataService implements SyncDataService
{
private void subscribePluginDataChanges(final String pluginPath, final
String pluginName) {
etcdClient.watchDataChange(pluginPath, (updatePath, updateValue) -> {
final String dataPath = buildRealPath(pluginPath, updatePath);
- final String dataStr = etcdClient.get(dataPath);
+ final String dataStr = keysMap.get(dataPath);
final PluginData data = GsonUtils.getInstance().fromJson(dataStr,
PluginData.class);
Optional.ofNullable(data)
.ifPresent(d ->
Optional.ofNullable(pluginDataSubscriber).ifPresent(e -> e.onSubscribe(d)));
@@ -323,6 +340,10 @@ public class EtcdSyncDataService implements
SyncDataService {
return Collections.emptyList();
}
+ private List<String> etcdClientGetChildrenByMap(final String parent, final
Map<String, String> map) {
+ return etcdClient.getChildrenKeysByMap(parent, "/", map);
+ }
+
@Override
public void close() {
if (Objects.nonNull(etcdClient)) {
diff --git
a/shenyu-sync-data-center/shenyu-sync-data-etcd/src/test/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataServiceTest.java
b/shenyu-sync-data-center/shenyu-sync-data-etcd/src/test/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataServiceTest.java
index 3d5824cf4..e1cd37c0a 100644
---
a/shenyu-sync-data-center/shenyu-sync-data-etcd/src/test/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataServiceTest.java
+++
b/shenyu-sync-data-center/shenyu-sync-data-etcd/src/test/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataServiceTest.java
@@ -101,7 +101,6 @@ public class EtcdSyncDataServiceTest {
* mock get method.
*/
when(client.getKVClient()).thenReturn(kv);
- when(kv.get(any())).thenReturn(future);
try {
when(future.get()).thenReturn(getResponse);
} catch (Exception e) {