This is an automated email from the ASF dual-hosted git repository.
xiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-shenyu.git
The following commit(s) were added to refs/heads/master by this push:
new a63a91c refactor shenyu-data-sync-center: code polish. (#2340)
a63a91c is described below
commit a63a91cc7a776c0b8f4a87a2678d2561147da42f
Author: midnight2104 <[email protected]>
AuthorDate: Thu Nov 11 14:36:42 2021 +0800
refactor shenyu-data-sync-center: code polish. (#2340)
* refactor shenyu-common: code polish
* refactor shenyu-common: code polish
* refactor shenyu-data-sync-center: code polish
* refactor shenyu-data-sync-center: code polish
---
.../sync/data/consul/ConsulSyncDataService.java | 25 +++++---
.../apache/shenyu/sync/data/etcd/EtcdClient.java | 72 ++++++++++------------
.../shenyu/sync/data/etcd/EtcdSyncDataService.java | 22 ++++---
.../shenyu/sync/data/http/HttpSyncDataService.java | 21 +++++--
.../data/http/refresh/AbstractDataRefresh.java | 18 +++---
.../sync/data/nacos/handler/NacosCacheHandler.java | 5 +-
.../data/websocket/WebsocketSyncDataService.java | 2 +
.../data/zookeeper/ZookeeperSyncDataService.java | 14 ++++-
8 files changed, 105 insertions(+), 74 deletions(-)
diff --git
a/shenyu-sync-data-center/shenyu-sync-data-consul/src/main/java/org/apache/shenyu/sync/data/consul/ConsulSyncDataService.java
b/shenyu-sync-data-center/shenyu-sync-data-consul/src/main/java/org/apache/shenyu/sync/data/consul/ConsulSyncDataService.java
index e11e055..1458047 100644
---
a/shenyu-sync-data-center/shenyu-sync-data-consul/src/main/java/org/apache/shenyu/sync/data/consul/ConsulSyncDataService.java
+++
b/shenyu-sync-data-center/shenyu-sync-data-consul/src/main/java/org/apache/shenyu/sync/data/consul/ConsulSyncDataService.java
@@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -45,7 +46,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/
public class ConsulSyncDataService extends ConsulCacheHandler implements
AutoCloseable, SyncDataService {
/**
- * logger.
+ * logger.
*/
private static final Logger LOG =
LoggerFactory.getLogger(ConsulSyncDataService.class);
@@ -57,9 +58,9 @@ public class ConsulSyncDataService extends ConsulCacheHandler
implements AutoClo
private ScheduledFuture<?> watchFuture;
- private ConsulConfig consulConfig;
+ private final ConsulConfig consulConfig;
- private ConsulClient consulClient;
+ private final ConsulClient consulClient;
private final AtomicBoolean running = new AtomicBoolean(false);
@@ -70,12 +71,16 @@ public class ConsulSyncDataService extends
ConsulCacheHandler implements AutoClo
* @param metaDataSubscribers the meta data subscribers
* @param authDataSubscribers the auth data subscribers
*/
- public ConsulSyncDataService(final ConsulClient consulClient, final
ConsulConfig consulConfig, final PluginDataSubscriber pluginDataSubscriber,
- final List<MetaDataSubscriber>
metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers) {
+ public ConsulSyncDataService(final ConsulClient consulClient,
+ final ConsulConfig consulConfig,
+ final PluginDataSubscriber
pluginDataSubscriber,
+ final List<MetaDataSubscriber>
metaDataSubscribers,
+ final List<AuthDataSubscriber>
authDataSubscribers) {
super(pluginDataSubscriber, metaDataSubscribers, authDataSubscribers);
this.consulClient = consulClient;
this.consulConfig = consulConfig;
- this.executor = new ScheduledThreadPoolExecutor(1,
ShenyuThreadFactory.create("consul-config-watch", true));
+ this.executor = new ScheduledThreadPoolExecutor(1,
+ ShenyuThreadFactory.create("consul-config-watch", true));
consulIndexes.put(ConsulConstants.SYNC_PRE_FIX, 0L);
initUpdateMap();
start();
@@ -97,19 +102,19 @@ public class ConsulSyncDataService extends
ConsulCacheHandler implements AutoClo
for (String context : this.consulIndexes.keySet()) {
try {
Long currentIndex = this.consulIndexes.get(context);
- if (currentIndex == null) {
+ if (Objects.isNull(currentIndex)) {
currentIndex =
ConsulConstants.INIT_CONFIG_VERSION_INDEX;
}
Response<List<GetValue>> response =
this.consulClient.getKVValues(context, null,
new QueryParams(consulConfig.getWaitTime(),
currentIndex));
- if (response.getValue() == null ||
response.getValue().isEmpty()) {
+ if (Objects.isNull(response.getValue()) ||
response.getValue().isEmpty()) {
if (LOG.isTraceEnabled()) {
LOG.trace("No value for context " + context);
}
continue;
}
Long newIndex = response.getConsulIndex();
- if (newIndex == null || newIndex.equals(currentIndex)) {
+ if (Objects.isNull(newIndex) || Objects.equals(newIndex,
currentIndex)) {
if (LOG.isTraceEnabled()) {
LOG.trace("Same index for context " + context);
}
@@ -152,7 +157,7 @@ public class ConsulSyncDataService extends
ConsulCacheHandler implements AutoClo
@Override
public void close() {
- if (this.running.compareAndSet(true, false) && this.watchFuture !=
null) {
+ if (this.running.compareAndSet(true, false) &&
Objects.nonNull(this.watchFuture)) {
this.watchFuture.cancel(true);
}
}
diff --git
a/shenyu-sync-data-center/shenyu-sync-data-etcd/src/main/java/org/apache/shenyu/sync/data/etcd/EtcdClient.java
b/shenyu-sync-data-center/shenyu-sync-data-etcd/src/main/java/org/apache/shenyu/sync/data/etcd/EtcdClient.java
index ab5f537..7e3a7c9 100644
---
a/shenyu-sync-data-center/shenyu-sync-data-etcd/src/main/java/org/apache/shenyu/sync/data/etcd/EtcdClient.java
+++
b/shenyu-sync-data-center/shenyu-sync-data-etcd/src/main/java/org/apache/shenyu/sync/data/etcd/EtcdClient.java
@@ -21,10 +21,10 @@ import io.etcd.jetcd.Client;
import io.etcd.jetcd.KeyValue;
import io.etcd.jetcd.Watch;
import io.etcd.jetcd.ByteSequence;
-import io.etcd.jetcd.options.DeleteOption;
import io.etcd.jetcd.options.GetOption;
import io.etcd.jetcd.options.WatchOption;
import io.etcd.jetcd.watch.WatchEvent;
+import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -74,7 +74,12 @@ public class EtcdClient {
} catch (InterruptedException | ExecutionException e) {
LOG.error(e.getMessage(), e);
}
- return keyValues.isEmpty() ? null :
keyValues.iterator().next().getValue().toString(StandardCharsets.UTF_8);
+
+ if (CollectionUtils.isEmpty(keyValues)) {
+ return null;
+ }
+
+ return
keyValues.iterator().next().getValue().toString(StandardCharsets.UTF_8);
}
/**
@@ -88,9 +93,21 @@ public class EtcdClient {
*/
public List<String> getChildrenKeys(final String prefix, final String
separator) throws ExecutionException, InterruptedException {
ByteSequence prefixByteSequence = ByteSequence.from(prefix,
StandardCharsets.UTF_8);
- GetOption getOption =
GetOption.newBuilder().withPrefix(prefixByteSequence).withSortField(GetOption.SortTarget.KEY).withSortOrder(GetOption.SortOrder.ASCEND).build();
- List<KeyValue> keyValues =
client.getKVClient().get(prefixByteSequence, getOption).get().getKvs();
- return keyValues.stream().map(e -> getSubNodeKeyName(prefix,
e.getKey().toString(StandardCharsets.UTF_8),
separator)).distinct().collect(Collectors.toList());
+ GetOption getOption = GetOption.newBuilder()
+ .withPrefix(prefixByteSequence)
+ .withSortField(GetOption.SortTarget.KEY)
+ .withSortOrder(GetOption.SortOrder.ASCEND)
+ .build();
+
+ List<KeyValue> keyValues = client.getKVClient()
+ .get(prefixByteSequence, getOption)
+ .get()
+ .getKvs();
+
+ return keyValues.stream()
+ .map(e -> getSubNodeKeyName(prefix,
e.getKey().toString(StandardCharsets.UTF_8), separator))
+ .distinct()
+ .collect(Collectors.toList());
}
private String getSubNodeKeyName(final String prefix, final String
fullPath, final String separator) {
@@ -99,46 +116,15 @@ public class EtcdClient {
}
/**
- * update value of node.
- *
- * @param key node name
- * @param value node value
- * @throws ExecutionException the exception
- * @throws InterruptedException the exception
- */
- public void put(final String key, final String value) throws
ExecutionException, InterruptedException {
- client.getKVClient().put(ByteSequence.from(key,
StandardCharsets.UTF_8), ByteSequence.from(value,
StandardCharsets.UTF_8)).get();
- }
-
- /**
- * delete node.
- *
- * @param key node name
- */
- public void delete(final String key) {
- client.getKVClient().delete(ByteSequence.from(key,
StandardCharsets.UTF_8));
- }
-
- /**
- * delete node of recursive.
- *
- * @param key parent node name
- */
- public void deleteRecursive(final String key) {
- DeleteOption option = DeleteOption.newBuilder()
- .withPrefix(ByteSequence.from(key, StandardCharsets.UTF_8))
- .build();
- client.getKVClient().delete(ByteSequence.from(key,
StandardCharsets.UTF_8), option);
- }
-
- /**
* subscribe data change.
*
* @param key node name
* @param updateHandler node value handler of update
* @param deleteHandler node value handler of delete
*/
- public void watchDataChange(final String key, final BiConsumer<String,
String> updateHandler, final Consumer<String> deleteHandler) {
+ public void watchDataChange(final String key,
+ final BiConsumer<String, String> updateHandler,
+ final Consumer<String> deleteHandler) {
Watch.Listener listener = watch(updateHandler, deleteHandler);
Watch.Watcher watch =
client.getWatchClient().watch(ByteSequence.from(key, StandardCharsets.UTF_8),
listener);
watchCache.put(key, watch);
@@ -151,7 +137,9 @@ public class EtcdClient {
* @param updateHandler sub node handler of update
* @param deleteHandler sub node delete of delete
*/
- public void watchChildChange(final String key, final BiConsumer<String,
String> updateHandler, final Consumer<String> deleteHandler) {
+ public void watchChildChange(final String key,
+ final BiConsumer<String, String>
updateHandler,
+ final Consumer<String> deleteHandler) {
Watch.Listener listener = watch(updateHandler, deleteHandler);
WatchOption option = WatchOption.newBuilder()
.withPrefix(ByteSequence.from(key, StandardCharsets.UTF_8))
@@ -160,7 +148,8 @@ public class EtcdClient {
watchCache.put(key, watch);
}
- private Watch.Listener watch(final BiConsumer<String, String>
updateHandler, final Consumer<String> deleteHandler) {
+ private Watch.Listener watch(final BiConsumer<String, String>
updateHandler,
+ final Consumer<String> deleteHandler) {
return Watch.listener(response -> {
for (WatchEvent event : response.getEvents()) {
String path =
event.getKeyValue().getKey().toString(StandardCharsets.UTF_8);
@@ -180,6 +169,7 @@ public class EtcdClient {
/**
* cancel subscribe.
+ *
* @param key node name
*/
public void watchClose(final String key) {
diff --git
a/shenyu-sync-data-center/shenyu-sync-data-etcd/src/main/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataService.java
b/shenyu-sync-data-center/shenyu-sync-data-etcd/src/main/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataService.java
index 27e61f0..4c2a8b1 100644
---
a/shenyu-sync-data-center/shenyu-sync-data-etcd/src/main/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataService.java
+++
b/shenyu-sync-data-center/shenyu-sync-data-etcd/src/main/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataService.java
@@ -40,6 +40,7 @@ import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
+import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
@@ -69,8 +70,10 @@ public class EtcdSyncDataService implements SyncDataService,
AutoCloseable {
* @param metaDataSubscribers the meta data subscribers
* @param authDataSubscribers the auth data subscribers
*/
- public EtcdSyncDataService(final EtcdClient etcdClient, final
PluginDataSubscriber pluginDataSubscriber,
- final List<MetaDataSubscriber>
metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers) {
+ public EtcdSyncDataService(final EtcdClient etcdClient,
+ final PluginDataSubscriber pluginDataSubscriber,
+ final List<MetaDataSubscriber>
metaDataSubscribers,
+ final List<AuthDataSubscriber>
authDataSubscribers) {
this.etcdClient = etcdClient;
this.pluginDataSubscriber = pluginDataSubscriber;
this.metaDataSubscribers = metaDataSubscribers;
@@ -234,7 +237,7 @@ public class EtcdSyncDataService implements
SyncDataService, AutoCloseable {
unCacheMetaData(metaData);
etcdClient.watchClose(path);
} catch (UnsupportedEncodingException e) {
- e.printStackTrace();
+ LOG.error("delete meta data error.", e);
}
}
@@ -272,17 +275,20 @@ public class EtcdSyncDataService implements
SyncDataService, AutoCloseable {
final String str =
dataPath.substring(DefaultPathConstants.RULE_PARENT.length());
final String pluginName = str.substring(1, str.length() -
substring.length() - 1);
final List<String> list =
Lists.newArrayList(Splitter.on(DefaultPathConstants.SELECTOR_JOIN_RULE).split(substring));
+
RuleData ruleData = new RuleData();
ruleData.setPluginName(pluginName);
ruleData.setSelectorId(list.get(0));
ruleData.setId(list.get(1));
+
Optional.ofNullable(pluginDataSubscriber).ifPresent(e ->
e.unRuleSubscribe(ruleData));
etcdClient.watchClose(dataPath);
}
private void cacheAuthData(final String dataString) {
final AppAuthData appAuthData =
GsonUtils.getInstance().fromJson(dataString, AppAuthData.class);
- Optional.ofNullable(appAuthData).ifPresent(data ->
authDataSubscribers.forEach(e -> e.onSubscribe(data)));
+ Optional.ofNullable(appAuthData)
+ .ifPresent(data -> authDataSubscribers.forEach(e ->
e.onSubscribe(data)));
}
private void unCacheAuthData(final String dataPath) {
@@ -295,11 +301,13 @@ public class EtcdSyncDataService implements
SyncDataService, AutoCloseable {
private void cacheMetaData(final String dataString) {
final MetaData metaData = GsonUtils.getInstance().fromJson(dataString,
MetaData.class);
- Optional.ofNullable(metaData).ifPresent(data ->
metaDataSubscribers.forEach(e -> e.onSubscribe(metaData)));
+ Optional.ofNullable(metaData)
+ .ifPresent(data -> metaDataSubscribers.forEach(e ->
e.onSubscribe(metaData)));
}
private void unCacheMetaData(final MetaData metaData) {
- Optional.ofNullable(metaData).ifPresent(data ->
metaDataSubscribers.forEach(e -> e.unSubscribe(metaData)));
+ Optional.ofNullable(metaData)
+ .ifPresent(data -> metaDataSubscribers.forEach(e ->
e.unSubscribe(metaData)));
}
private String buildRealPath(final String parent, final String children) {
@@ -317,7 +325,7 @@ public class EtcdSyncDataService implements
SyncDataService, AutoCloseable {
@Override
public void close() {
- if (null != etcdClient) {
+ if (Objects.nonNull(etcdClient)) {
etcdClient.close();
}
}
diff --git
a/shenyu-sync-data-center/shenyu-sync-data-http/src/main/java/org/apache/shenyu/sync/data/http/HttpSyncDataService.java
b/shenyu-sync-data-center/shenyu-sync-data-http/src/main/java/org/apache/shenyu/sync/data/http/HttpSyncDataService.java
index 2f1668d..e6009b1 100644
---
a/shenyu-sync-data-center/shenyu-sync-data-http/src/main/java/org/apache/shenyu/sync/data/http/HttpSyncDataService.java
+++
b/shenyu-sync-data-center/shenyu-sync-data-http/src/main/java/org/apache/shenyu/sync/data/http/HttpSyncDataService.java
@@ -50,6 +50,7 @@ import org.springframework.web.client.RestTemplate;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
+import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
@@ -72,6 +73,16 @@ public class HttpSyncDataService implements SyncDataService,
AutoCloseable {
private static final Gson GSON = new Gson();
/**
+ * shenyu admin path configs fetch.
+ */
+ private static final String SHENYU_ADMIN_PATH_CONFIGS_FETCH =
"/configs/fetch";
+
+ /**
+ * shenyu admin path configs listener.
+ */
+ private static final String SHENYU_ADMIN_PATH_CONFIGS_LISTENER =
"/configs/listener";
+
+ /**
* default: 10s.
*/
private Duration connectionTimeout = Duration.ofSeconds(10);
@@ -142,7 +153,7 @@ public class HttpSyncDataService implements
SyncDataService, AutoCloseable {
for (ConfigGroupEnum groupKey : groups) {
params.append("groupKeys").append("=").append(groupKey.name()).append("&");
}
- String url = server + "/configs/fetch?" +
StringUtils.removeEnd(params.toString(), "&");
+ String url = server + SHENYU_ADMIN_PATH_CONFIGS_FETCH + "?" +
StringUtils.removeEnd(params.toString(), "&");
LOG.info("request configs: [{}]", url);
String json = null;
try {
@@ -189,8 +200,9 @@ public class HttpSyncDataService implements
SyncDataService, AutoCloseable {
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
HttpEntity httpEntity = new HttpEntity(params, headers);
- String listenerUrl = server + "/configs/listener";
+ String listenerUrl = server + SHENYU_ADMIN_PATH_CONFIGS_LISTENER;
LOG.debug("request listener configs: [{}]", listenerUrl);
+
JsonArray groupJson = null;
try {
String json = this.httpClient.postForEntity(listenerUrl,
httpEntity, String.class).getBody();
@@ -200,7 +212,8 @@ public class HttpSyncDataService implements
SyncDataService, AutoCloseable {
String message = String.format("listener configs fail,
server:[%s], %s", server, e.getMessage());
throw new ShenyuException(message, e);
}
- if (groupJson != null) {
+
+ if (Objects.nonNull(groupJson)) {
// fetch group configuration async.
ConfigGroupEnum[] changedGroups = GSON.fromJson(groupJson,
ConfigGroupEnum[].class);
if (ArrayUtils.isNotEmpty(changedGroups)) {
@@ -213,7 +226,7 @@ public class HttpSyncDataService implements
SyncDataService, AutoCloseable {
@Override
public void close() throws Exception {
RUNNING.set(false);
- if (executor != null) {
+ if (Objects.nonNull(executor)) {
executor.shutdownNow();
// help gc
executor = null;
diff --git
a/shenyu-sync-data-center/shenyu-sync-data-http/src/main/java/org/apache/shenyu/sync/data/http/refresh/AbstractDataRefresh.java
b/shenyu-sync-data-center/shenyu-sync-data-http/src/main/java/org/apache/shenyu/sync/data/http/refresh/AbstractDataRefresh.java
index 26d3252..6ba66dc 100644
---
a/shenyu-sync-data-center/shenyu-sync-data-http/src/main/java/org/apache/shenyu/sync/data/http/refresh/AbstractDataRefresh.java
+++
b/shenyu-sync-data-center/shenyu-sync-data-http/src/main/java/org/apache/shenyu/sync/data/http/refresh/AbstractDataRefresh.java
@@ -26,6 +26,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
+import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -76,15 +77,18 @@ public abstract class AbstractDataRefresh<T> implements
DataRefresh {
@Override
public Boolean refresh(final JsonObject data) {
- boolean updated = false;
JsonObject jsonObject = convert(data);
- if (null != jsonObject) {
- ConfigData<T> result = fromJson(jsonObject);
- if (this.updateCacheIfNeed(result)) {
- updated = true;
- refresh(result.getData());
- }
+ if (Objects.isNull(jsonObject)) {
+ return false;
+ }
+
+ boolean updated = false;
+ ConfigData<T> result = fromJson(jsonObject);
+ if (this.updateCacheIfNeed(result)) {
+ updated = true;
+ refresh(result.getData());
}
+
return updated;
}
diff --git
a/shenyu-sync-data-center/shenyu-sync-data-nacos/src/main/java/org/apache/shenyu/sync/data/nacos/handler/NacosCacheHandler.java
b/shenyu-sync-data-center/shenyu-sync-data-nacos/src/main/java/org/apache/shenyu/sync/data/nacos/handler/NacosCacheHandler.java
index b10864c..eb9d123 100644
---
a/shenyu-sync-data-center/shenyu-sync-data-nacos/src/main/java/org/apache/shenyu/sync/data/nacos/handler/NacosCacheHandler.java
+++
b/shenyu-sync-data-center/shenyu-sync-data-nacos/src/main/java/org/apache/shenyu/sync/data/nacos/handler/NacosCacheHandler.java
@@ -35,10 +35,11 @@ import org.apache.shenyu.sync.data.api.PluginDataSubscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.ArrayList;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
@@ -151,7 +152,7 @@ public class NacosCacheHandler {
} catch (NacosException e) {
LOG.error(e.getMessage(), e);
}
- if (config == null) {
+ if (Objects.isNull(config)) {
config = "{}";
}
return config;
diff --git
a/shenyu-sync-data-center/shenyu-sync-data-websocket/src/main/java/org/apache/shenyu/plugin/sync/data/websocket/WebsocketSyncDataService.java
b/shenyu-sync-data-center/shenyu-sync-data-websocket/src/main/java/org/apache/shenyu/plugin/sync/data/websocket/WebsocketSyncDataService.java
index 331adfe..72a324b 100644
---
a/shenyu-sync-data-center/shenyu-sync-data-websocket/src/main/java/org/apache/shenyu/plugin/sync/data/websocket/WebsocketSyncDataService.java
+++
b/shenyu-sync-data-center/shenyu-sync-data-websocket/src/main/java/org/apache/shenyu/plugin/sync/data/websocket/WebsocketSyncDataService.java
@@ -65,6 +65,7 @@ public class WebsocketSyncDataService implements
SyncDataService, AutoCloseable
final List<AuthDataSubscriber>
authDataSubscribers) {
String[] urls = StringUtils.split(websocketConfig.getUrls(), ",");
executor = new ScheduledThreadPoolExecutor(urls.length,
ShenyuThreadFactory.create("websocket-connect", true));
+
for (String url : urls) {
try {
clients.add(new ShenyuWebsocketClient(new URI(url),
Objects.requireNonNull(pluginDataSubscriber), metaDataSubscribers,
authDataSubscribers));
@@ -72,6 +73,7 @@ public class WebsocketSyncDataService implements
SyncDataService, AutoCloseable
LOG.error("websocket url({}) is error", url, e);
}
}
+
try {
for (WebSocketClient client : clients) {
boolean success = client.connectBlocking(3000,
TimeUnit.MILLISECONDS);
diff --git
a/shenyu-sync-data-center/shenyu-sync-data-zookeeper/src/main/java/org/apache/shenyu/sync/data/zookeeper/ZookeeperSyncDataService.java
b/shenyu-sync-data-center/shenyu-sync-data-zookeeper/src/main/java/org/apache/shenyu/sync/data/zookeeper/ZookeeperSyncDataService.java
index 9123b60..f7a1b60 100644
---
a/shenyu-sync-data-center/shenyu-sync-data-zookeeper/src/main/java/org/apache/shenyu/sync/data/zookeeper/ZookeeperSyncDataService.java
+++
b/shenyu-sync-data-center/shenyu-sync-data-zookeeper/src/main/java/org/apache/shenyu/sync/data/zookeeper/ZookeeperSyncDataService.java
@@ -39,6 +39,7 @@ import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.util.List;
+import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
@@ -322,7 +323,9 @@ public class ZookeeperSyncDataService implements
SyncDataService, AutoCloseable
}
private void cachePluginData(final PluginData pluginData) {
- Optional.ofNullable(pluginData).flatMap(data ->
Optional.ofNullable(pluginDataSubscriber)).ifPresent(e ->
e.onSubscribe(pluginData));
+ Optional.ofNullable(pluginData)
+ .flatMap(data -> Optional.ofNullable(pluginDataSubscriber))
+ .ifPresent(e -> e.onSubscribe(pluginData));
}
private void cacheSelectorData(final SelectorData selectorData) {
@@ -338,6 +341,7 @@ public class ZookeeperSyncDataService implements
SyncDataService, AutoCloseable
final String pluginName = str.substring(1, str.length() -
selectorId.length() - 1);
selectorData.setPluginName(pluginName);
selectorData.setId(selectorId);
+
Optional.ofNullable(pluginDataSubscriber)
.ifPresent(e -> e.unSelectorSubscribe(selectorData));
}
@@ -353,10 +357,12 @@ public class ZookeeperSyncDataService implements
SyncDataService, AutoCloseable
final String str =
dataPath.substring(DefaultPathConstants.RULE_PARENT.length());
final String pluginName = str.substring(1, str.length() -
substring.length() - 1);
final List<String> list =
Lists.newArrayList(Splitter.on(DefaultPathConstants.SELECTOR_JOIN_RULE).split(substring));
+
RuleData ruleData = new RuleData();
ruleData.setPluginName(pluginName);
ruleData.setSelectorId(list.get(0));
ruleData.setId(list.get(1));
+
Optional.ofNullable(pluginDataSubscriber)
.ifPresent(e -> e.unRuleSubscribe(ruleData));
}
@@ -383,10 +389,12 @@ public class ZookeeperSyncDataService implements
SyncDataService, AutoCloseable
.ifPresent(data -> metaDataSubscribers.forEach(e ->
e.unSubscribe(metaData)));
}
- private List<String> addSubscribePath(final List<String> alreadyChildren,
final List<String> currentChildren) {
+ private List<String> addSubscribePath(final List<String> alreadyChildren,
+ final List<String> currentChildren) {
if (CollectionUtils.isEmpty(alreadyChildren)) {
return currentChildren;
}
+
return currentChildren.stream()
.filter(current ->
alreadyChildren.stream().noneMatch(current::equals))
.collect(Collectors.toList());
@@ -405,7 +413,7 @@ public class ZookeeperSyncDataService implements
SyncDataService, AutoCloseable
@Override
public void close() {
- if (null != zkClient) {
+ if (Objects.nonNull(zkClient)) {
zkClient.close();
}
}