This is an automated email from the ASF dual-hosted git repository.

xiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-shenyu.git


The following commit(s) were added to refs/heads/master by this push:
     new a63a91c  refactor shenyu-data-sync-center: code polish. (#2340)
a63a91c is described below

commit a63a91cc7a776c0b8f4a87a2678d2561147da42f
Author: midnight2104 <[email protected]>
AuthorDate: Thu Nov 11 14:36:42 2021 +0800

    refactor shenyu-data-sync-center: code polish. (#2340)
    
    * refactor shenyu-common: code polish
    
    * refactor shenyu-common: code polish
    
    * refactor shenyu-data-sync-center: code polish
    
    * refactor shenyu-data-sync-center: code polish
---
 .../sync/data/consul/ConsulSyncDataService.java    | 25 +++++---
 .../apache/shenyu/sync/data/etcd/EtcdClient.java   | 72 ++++++++++------------
 .../shenyu/sync/data/etcd/EtcdSyncDataService.java | 22 ++++---
 .../shenyu/sync/data/http/HttpSyncDataService.java | 21 +++++--
 .../data/http/refresh/AbstractDataRefresh.java     | 18 +++---
 .../sync/data/nacos/handler/NacosCacheHandler.java |  5 +-
 .../data/websocket/WebsocketSyncDataService.java   |  2 +
 .../data/zookeeper/ZookeeperSyncDataService.java   | 14 ++++-
 8 files changed, 105 insertions(+), 74 deletions(-)

diff --git 
a/shenyu-sync-data-center/shenyu-sync-data-consul/src/main/java/org/apache/shenyu/sync/data/consul/ConsulSyncDataService.java
 
b/shenyu-sync-data-center/shenyu-sync-data-consul/src/main/java/org/apache/shenyu/sync/data/consul/ConsulSyncDataService.java
index e11e055..1458047 100644
--- 
a/shenyu-sync-data-center/shenyu-sync-data-consul/src/main/java/org/apache/shenyu/sync/data/consul/ConsulSyncDataService.java
+++ 
b/shenyu-sync-data-center/shenyu-sync-data-consul/src/main/java/org/apache/shenyu/sync/data/consul/ConsulSyncDataService.java
@@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -45,7 +46,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
  */
 public class ConsulSyncDataService extends ConsulCacheHandler implements 
AutoCloseable, SyncDataService {
     /**
-     * logger. 
+     * logger.
      */
     private static final Logger LOG = 
LoggerFactory.getLogger(ConsulSyncDataService.class);
 
@@ -57,9 +58,9 @@ public class ConsulSyncDataService extends ConsulCacheHandler 
implements AutoClo
 
     private ScheduledFuture<?> watchFuture;
 
-    private ConsulConfig consulConfig;
+    private final ConsulConfig consulConfig;
 
-    private ConsulClient consulClient;
+    private final ConsulClient consulClient;
 
     private final AtomicBoolean running = new AtomicBoolean(false);
 
@@ -70,12 +71,16 @@ public class ConsulSyncDataService extends 
ConsulCacheHandler implements AutoClo
      * @param metaDataSubscribers  the meta data subscribers
      * @param authDataSubscribers  the auth data subscribers
      */
-    public ConsulSyncDataService(final ConsulClient consulClient, final 
ConsulConfig consulConfig, final PluginDataSubscriber pluginDataSubscriber,
-                                 final List<MetaDataSubscriber> 
metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers) {
+    public ConsulSyncDataService(final ConsulClient consulClient,
+                                 final ConsulConfig consulConfig,
+                                 final PluginDataSubscriber 
pluginDataSubscriber,
+                                 final List<MetaDataSubscriber> 
metaDataSubscribers,
+                                 final List<AuthDataSubscriber> 
authDataSubscribers) {
         super(pluginDataSubscriber, metaDataSubscribers, authDataSubscribers);
         this.consulClient = consulClient;
         this.consulConfig = consulConfig;
-        this.executor = new ScheduledThreadPoolExecutor(1, 
ShenyuThreadFactory.create("consul-config-watch", true));
+        this.executor = new ScheduledThreadPoolExecutor(1,
+                ShenyuThreadFactory.create("consul-config-watch", true));
         consulIndexes.put(ConsulConstants.SYNC_PRE_FIX, 0L);
         initUpdateMap();
         start();
@@ -97,19 +102,19 @@ public class ConsulSyncDataService extends 
ConsulCacheHandler implements AutoClo
             for (String context : this.consulIndexes.keySet()) {
                 try {
                     Long currentIndex = this.consulIndexes.get(context);
-                    if (currentIndex == null) {
+                    if (Objects.isNull(currentIndex)) {
                         currentIndex = 
ConsulConstants.INIT_CONFIG_VERSION_INDEX;
                     }
                     Response<List<GetValue>> response = 
this.consulClient.getKVValues(context, null,
                             new QueryParams(consulConfig.getWaitTime(), 
currentIndex));
-                    if (response.getValue() == null || 
response.getValue().isEmpty()) {
+                    if (Objects.isNull(response.getValue()) || 
response.getValue().isEmpty()) {
                         if (LOG.isTraceEnabled()) {
                             LOG.trace("No value for context " + context);
                         }
                         continue;
                     }
                     Long newIndex = response.getConsulIndex();
-                    if (newIndex == null || newIndex.equals(currentIndex)) {
+                    if (Objects.isNull(newIndex) || Objects.equals(newIndex, 
currentIndex)) {
                         if (LOG.isTraceEnabled()) {
                             LOG.trace("Same index for context " + context);
                         }
@@ -152,7 +157,7 @@ public class ConsulSyncDataService extends 
ConsulCacheHandler implements AutoClo
 
     @Override
     public void close() {
-        if (this.running.compareAndSet(true, false) && this.watchFuture != 
null) {
+        if (this.running.compareAndSet(true, false) && 
Objects.nonNull(this.watchFuture)) {
             this.watchFuture.cancel(true);
         }
     }
diff --git 
a/shenyu-sync-data-center/shenyu-sync-data-etcd/src/main/java/org/apache/shenyu/sync/data/etcd/EtcdClient.java
 
b/shenyu-sync-data-center/shenyu-sync-data-etcd/src/main/java/org/apache/shenyu/sync/data/etcd/EtcdClient.java
index ab5f537..7e3a7c9 100644
--- 
a/shenyu-sync-data-center/shenyu-sync-data-etcd/src/main/java/org/apache/shenyu/sync/data/etcd/EtcdClient.java
+++ 
b/shenyu-sync-data-center/shenyu-sync-data-etcd/src/main/java/org/apache/shenyu/sync/data/etcd/EtcdClient.java
@@ -21,10 +21,10 @@ import io.etcd.jetcd.Client;
 import io.etcd.jetcd.KeyValue;
 import io.etcd.jetcd.Watch;
 import io.etcd.jetcd.ByteSequence;
-import io.etcd.jetcd.options.DeleteOption;
 import io.etcd.jetcd.options.GetOption;
 import io.etcd.jetcd.options.WatchOption;
 import io.etcd.jetcd.watch.WatchEvent;
+import org.apache.commons.collections4.CollectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -74,7 +74,12 @@ public class EtcdClient {
         } catch (InterruptedException | ExecutionException e) {
             LOG.error(e.getMessage(), e);
         }
-        return keyValues.isEmpty() ? null : 
keyValues.iterator().next().getValue().toString(StandardCharsets.UTF_8);
+
+        if (CollectionUtils.isEmpty(keyValues)) {
+            return null;
+        }
+
+        return 
keyValues.iterator().next().getValue().toString(StandardCharsets.UTF_8);
     }
 
     /**
@@ -88,9 +93,21 @@ public class EtcdClient {
      */
     public List<String> getChildrenKeys(final String prefix, final String 
separator) throws ExecutionException, InterruptedException {
         ByteSequence prefixByteSequence = ByteSequence.from(prefix, 
StandardCharsets.UTF_8);
-        GetOption getOption = 
GetOption.newBuilder().withPrefix(prefixByteSequence).withSortField(GetOption.SortTarget.KEY).withSortOrder(GetOption.SortOrder.ASCEND).build();
-        List<KeyValue> keyValues = 
client.getKVClient().get(prefixByteSequence, getOption).get().getKvs();
-        return keyValues.stream().map(e -> getSubNodeKeyName(prefix, 
e.getKey().toString(StandardCharsets.UTF_8), 
separator)).distinct().collect(Collectors.toList());
+        GetOption getOption = GetOption.newBuilder()
+                .withPrefix(prefixByteSequence)
+                .withSortField(GetOption.SortTarget.KEY)
+                .withSortOrder(GetOption.SortOrder.ASCEND)
+                .build();
+
+        List<KeyValue> keyValues = client.getKVClient()
+                .get(prefixByteSequence, getOption)
+                .get()
+                .getKvs();
+
+        return keyValues.stream()
+                .map(e -> getSubNodeKeyName(prefix, 
e.getKey().toString(StandardCharsets.UTF_8), separator))
+                .distinct()
+                .collect(Collectors.toList());
     }
 
     private String getSubNodeKeyName(final String prefix, final String 
fullPath, final String separator) {
@@ -99,46 +116,15 @@ public class EtcdClient {
     }
 
     /**
-     * update value of node.
-     *
-     * @param key   node name
-     * @param value node value
-     * @throws ExecutionException   the exception
-     * @throws InterruptedException the exception
-     */
-    public void put(final String key, final String value) throws 
ExecutionException, InterruptedException {
-        client.getKVClient().put(ByteSequence.from(key, 
StandardCharsets.UTF_8), ByteSequence.from(value, 
StandardCharsets.UTF_8)).get();
-    }
-
-    /**
-     * delete node.
-     *
-     * @param key node name
-     */
-    public void delete(final String key) {
-        client.getKVClient().delete(ByteSequence.from(key, 
StandardCharsets.UTF_8));
-    }
-
-    /**
-     * delete node of recursive.
-     *
-     * @param key parent node name
-     */
-    public void deleteRecursive(final String key) {
-        DeleteOption option = DeleteOption.newBuilder()
-                .withPrefix(ByteSequence.from(key, StandardCharsets.UTF_8))
-                .build();
-        client.getKVClient().delete(ByteSequence.from(key, 
StandardCharsets.UTF_8), option);
-    }
-
-    /**
      * subscribe data change.
      *
      * @param key           node name
      * @param updateHandler node value handler of update
      * @param deleteHandler node value handler of delete
      */
-    public void watchDataChange(final String key, final BiConsumer<String, 
String> updateHandler, final Consumer<String> deleteHandler) {
+    public void watchDataChange(final String key,
+                                final BiConsumer<String, String> updateHandler,
+                                final Consumer<String> deleteHandler) {
         Watch.Listener listener = watch(updateHandler, deleteHandler);
         Watch.Watcher watch = 
client.getWatchClient().watch(ByteSequence.from(key, StandardCharsets.UTF_8), 
listener);
         watchCache.put(key, watch);
@@ -151,7 +137,9 @@ public class EtcdClient {
      * @param updateHandler sub node handler of update
      * @param deleteHandler sub node delete of delete
      */
-    public void watchChildChange(final String key, final BiConsumer<String, 
String> updateHandler, final Consumer<String> deleteHandler) {
+    public void watchChildChange(final String key,
+                                 final BiConsumer<String, String> 
updateHandler,
+                                 final Consumer<String> deleteHandler) {
         Watch.Listener listener = watch(updateHandler, deleteHandler);
         WatchOption option = WatchOption.newBuilder()
                 .withPrefix(ByteSequence.from(key, StandardCharsets.UTF_8))
@@ -160,7 +148,8 @@ public class EtcdClient {
         watchCache.put(key, watch);
     }
 
-    private Watch.Listener watch(final BiConsumer<String, String> 
updateHandler, final Consumer<String> deleteHandler) {
+    private Watch.Listener watch(final BiConsumer<String, String> 
updateHandler,
+                                 final Consumer<String> deleteHandler) {
         return Watch.listener(response -> {
             for (WatchEvent event : response.getEvents()) {
                 String path = 
event.getKeyValue().getKey().toString(StandardCharsets.UTF_8);
@@ -180,6 +169,7 @@ public class EtcdClient {
 
     /**
      * cancel subscribe.
+     *
      * @param key node name
      */
     public void watchClose(final String key) {
diff --git 
a/shenyu-sync-data-center/shenyu-sync-data-etcd/src/main/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataService.java
 
b/shenyu-sync-data-center/shenyu-sync-data-etcd/src/main/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataService.java
index 27e61f0..4c2a8b1 100644
--- 
a/shenyu-sync-data-center/shenyu-sync-data-etcd/src/main/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataService.java
+++ 
b/shenyu-sync-data-center/shenyu-sync-data-etcd/src/main/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataService.java
@@ -40,6 +40,7 @@ import java.net.URLDecoder;
 import java.nio.charset.StandardCharsets;
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 
@@ -69,8 +70,10 @@ public class EtcdSyncDataService implements SyncDataService, 
AutoCloseable {
      * @param metaDataSubscribers  the meta data subscribers
      * @param authDataSubscribers  the auth data subscribers
      */
-    public EtcdSyncDataService(final EtcdClient etcdClient, final 
PluginDataSubscriber pluginDataSubscriber,
-                               final List<MetaDataSubscriber> 
metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers) {
+    public EtcdSyncDataService(final EtcdClient etcdClient,
+                               final PluginDataSubscriber pluginDataSubscriber,
+                               final List<MetaDataSubscriber> 
metaDataSubscribers,
+                               final List<AuthDataSubscriber> 
authDataSubscribers) {
         this.etcdClient = etcdClient;
         this.pluginDataSubscriber = pluginDataSubscriber;
         this.metaDataSubscribers = metaDataSubscribers;
@@ -234,7 +237,7 @@ public class EtcdSyncDataService implements 
SyncDataService, AutoCloseable {
             unCacheMetaData(metaData);
             etcdClient.watchClose(path);
         } catch (UnsupportedEncodingException e) {
-            e.printStackTrace();
+            LOG.error("delete meta data error.", e);
         }
     }
 
@@ -272,17 +275,20 @@ public class EtcdSyncDataService implements 
SyncDataService, AutoCloseable {
         final String str = 
dataPath.substring(DefaultPathConstants.RULE_PARENT.length());
         final String pluginName = str.substring(1, str.length() - 
substring.length() - 1);
         final List<String> list = 
Lists.newArrayList(Splitter.on(DefaultPathConstants.SELECTOR_JOIN_RULE).split(substring));
+
         RuleData ruleData = new RuleData();
         ruleData.setPluginName(pluginName);
         ruleData.setSelectorId(list.get(0));
         ruleData.setId(list.get(1));
+
         Optional.ofNullable(pluginDataSubscriber).ifPresent(e -> 
e.unRuleSubscribe(ruleData));
         etcdClient.watchClose(dataPath);
     }
 
     private void cacheAuthData(final String dataString) {
         final AppAuthData appAuthData = 
GsonUtils.getInstance().fromJson(dataString, AppAuthData.class);
-        Optional.ofNullable(appAuthData).ifPresent(data -> 
authDataSubscribers.forEach(e -> e.onSubscribe(data)));
+        Optional.ofNullable(appAuthData)
+                .ifPresent(data -> authDataSubscribers.forEach(e -> 
e.onSubscribe(data)));
     }
 
     private void unCacheAuthData(final String dataPath) {
@@ -295,11 +301,13 @@ public class EtcdSyncDataService implements 
SyncDataService, AutoCloseable {
 
     private void cacheMetaData(final String dataString) {
         final MetaData metaData = GsonUtils.getInstance().fromJson(dataString, 
MetaData.class);
-        Optional.ofNullable(metaData).ifPresent(data -> 
metaDataSubscribers.forEach(e -> e.onSubscribe(metaData)));
+        Optional.ofNullable(metaData)
+                .ifPresent(data -> metaDataSubscribers.forEach(e -> 
e.onSubscribe(metaData)));
     }
 
     private void unCacheMetaData(final MetaData metaData) {
-        Optional.ofNullable(metaData).ifPresent(data -> 
metaDataSubscribers.forEach(e -> e.unSubscribe(metaData)));
+        Optional.ofNullable(metaData)
+                .ifPresent(data -> metaDataSubscribers.forEach(e -> 
e.unSubscribe(metaData)));
     }
 
     private String buildRealPath(final String parent, final String children) {
@@ -317,7 +325,7 @@ public class EtcdSyncDataService implements 
SyncDataService, AutoCloseable {
 
     @Override
     public void close() {
-        if (null != etcdClient) {
+        if (Objects.nonNull(etcdClient)) {
             etcdClient.close();
         }
     }
diff --git 
a/shenyu-sync-data-center/shenyu-sync-data-http/src/main/java/org/apache/shenyu/sync/data/http/HttpSyncDataService.java
 
b/shenyu-sync-data-center/shenyu-sync-data-http/src/main/java/org/apache/shenyu/sync/data/http/HttpSyncDataService.java
index 2f1668d..e6009b1 100644
--- 
a/shenyu-sync-data-center/shenyu-sync-data-http/src/main/java/org/apache/shenyu/sync/data/http/HttpSyncDataService.java
+++ 
b/shenyu-sync-data-center/shenyu-sync-data-http/src/main/java/org/apache/shenyu/sync/data/http/HttpSyncDataService.java
@@ -50,6 +50,7 @@ import org.springframework.web.client.RestTemplate;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -72,6 +73,16 @@ public class HttpSyncDataService implements SyncDataService, 
AutoCloseable {
     private static final Gson GSON = new Gson();
 
     /**
+     * shenyu admin path configs fetch.
+     */
+    private static final String SHENYU_ADMIN_PATH_CONFIGS_FETCH = 
"/configs/fetch";
+
+    /**
+     * shenyu admin path configs listener.
+     */
+    private static final String SHENYU_ADMIN_PATH_CONFIGS_LISTENER = 
"/configs/listener";
+
+    /**
      * default: 10s.
      */
     private Duration connectionTimeout = Duration.ofSeconds(10);
@@ -142,7 +153,7 @@ public class HttpSyncDataService implements 
SyncDataService, AutoCloseable {
         for (ConfigGroupEnum groupKey : groups) {
             
params.append("groupKeys").append("=").append(groupKey.name()).append("&");
         }
-        String url = server + "/configs/fetch?" + 
StringUtils.removeEnd(params.toString(), "&");
+        String url = server + SHENYU_ADMIN_PATH_CONFIGS_FETCH + "?" + 
StringUtils.removeEnd(params.toString(), "&");
         LOG.info("request configs: [{}]", url);
         String json = null;
         try {
@@ -189,8 +200,9 @@ public class HttpSyncDataService implements 
SyncDataService, AutoCloseable {
         HttpHeaders headers = new HttpHeaders();
         headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
         HttpEntity httpEntity = new HttpEntity(params, headers);
-        String listenerUrl = server + "/configs/listener";
+        String listenerUrl = server + SHENYU_ADMIN_PATH_CONFIGS_LISTENER;
         LOG.debug("request listener configs: [{}]", listenerUrl);
+
         JsonArray groupJson = null;
         try {
             String json = this.httpClient.postForEntity(listenerUrl, 
httpEntity, String.class).getBody();
@@ -200,7 +212,8 @@ public class HttpSyncDataService implements 
SyncDataService, AutoCloseable {
             String message = String.format("listener configs fail, 
server:[%s], %s", server, e.getMessage());
             throw new ShenyuException(message, e);
         }
-        if (groupJson != null) {
+
+        if (Objects.nonNull(groupJson)) {
             // fetch group configuration async.
             ConfigGroupEnum[] changedGroups = GSON.fromJson(groupJson, 
ConfigGroupEnum[].class);
             if (ArrayUtils.isNotEmpty(changedGroups)) {
@@ -213,7 +226,7 @@ public class HttpSyncDataService implements 
SyncDataService, AutoCloseable {
     @Override
     public void close() throws Exception {
         RUNNING.set(false);
-        if (executor != null) {
+        if (Objects.nonNull(executor)) {
             executor.shutdownNow();
             // help gc
             executor = null;
diff --git 
a/shenyu-sync-data-center/shenyu-sync-data-http/src/main/java/org/apache/shenyu/sync/data/http/refresh/AbstractDataRefresh.java
 
b/shenyu-sync-data-center/shenyu-sync-data-http/src/main/java/org/apache/shenyu/sync/data/http/refresh/AbstractDataRefresh.java
index 26d3252..6ba66dc 100644
--- 
a/shenyu-sync-data-center/shenyu-sync-data-http/src/main/java/org/apache/shenyu/sync/data/http/refresh/AbstractDataRefresh.java
+++ 
b/shenyu-sync-data-center/shenyu-sync-data-http/src/main/java/org/apache/shenyu/sync/data/http/refresh/AbstractDataRefresh.java
@@ -26,6 +26,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -76,15 +77,18 @@ public abstract class AbstractDataRefresh<T> implements 
DataRefresh {
 
     @Override
     public Boolean refresh(final JsonObject data) {
-        boolean updated = false;
         JsonObject jsonObject = convert(data);
-        if (null != jsonObject) {
-            ConfigData<T> result = fromJson(jsonObject);
-            if (this.updateCacheIfNeed(result)) {
-                updated = true;
-                refresh(result.getData());
-            }
+        if (Objects.isNull(jsonObject)) {
+            return false;
+        }
+
+        boolean updated = false;
+        ConfigData<T> result = fromJson(jsonObject);
+        if (this.updateCacheIfNeed(result)) {
+            updated = true;
+            refresh(result.getData());
         }
+
         return updated;
     }
 
diff --git 
a/shenyu-sync-data-center/shenyu-sync-data-nacos/src/main/java/org/apache/shenyu/sync/data/nacos/handler/NacosCacheHandler.java
 
b/shenyu-sync-data-center/shenyu-sync-data-nacos/src/main/java/org/apache/shenyu/sync/data/nacos/handler/NacosCacheHandler.java
index b10864c..eb9d123 100644
--- 
a/shenyu-sync-data-center/shenyu-sync-data-nacos/src/main/java/org/apache/shenyu/sync/data/nacos/handler/NacosCacheHandler.java
+++ 
b/shenyu-sync-data-center/shenyu-sync-data-nacos/src/main/java/org/apache/shenyu/sync/data/nacos/handler/NacosCacheHandler.java
@@ -35,10 +35,11 @@ import org.apache.shenyu.sync.data.api.PluginDataSubscriber;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.ArrayList;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.Executor;
 import java.util.stream.Collectors;
@@ -151,7 +152,7 @@ public class NacosCacheHandler {
         } catch (NacosException e) {
             LOG.error(e.getMessage(), e);
         }
-        if (config == null) {
+        if (Objects.isNull(config)) {
             config = "{}";
         }
         return config;
diff --git 
a/shenyu-sync-data-center/shenyu-sync-data-websocket/src/main/java/org/apache/shenyu/plugin/sync/data/websocket/WebsocketSyncDataService.java
 
b/shenyu-sync-data-center/shenyu-sync-data-websocket/src/main/java/org/apache/shenyu/plugin/sync/data/websocket/WebsocketSyncDataService.java
index 331adfe..72a324b 100644
--- 
a/shenyu-sync-data-center/shenyu-sync-data-websocket/src/main/java/org/apache/shenyu/plugin/sync/data/websocket/WebsocketSyncDataService.java
+++ 
b/shenyu-sync-data-center/shenyu-sync-data-websocket/src/main/java/org/apache/shenyu/plugin/sync/data/websocket/WebsocketSyncDataService.java
@@ -65,6 +65,7 @@ public class WebsocketSyncDataService implements 
SyncDataService, AutoCloseable
                                     final List<AuthDataSubscriber> 
authDataSubscribers) {
         String[] urls = StringUtils.split(websocketConfig.getUrls(), ",");
         executor = new ScheduledThreadPoolExecutor(urls.length, 
ShenyuThreadFactory.create("websocket-connect", true));
+
         for (String url : urls) {
             try {
                 clients.add(new ShenyuWebsocketClient(new URI(url), 
Objects.requireNonNull(pluginDataSubscriber), metaDataSubscribers, 
authDataSubscribers));
@@ -72,6 +73,7 @@ public class WebsocketSyncDataService implements 
SyncDataService, AutoCloseable
                 LOG.error("websocket url({}) is error", url, e);
             }
         }
+
         try {
             for (WebSocketClient client : clients) {
                 boolean success = client.connectBlocking(3000, 
TimeUnit.MILLISECONDS);
diff --git 
a/shenyu-sync-data-center/shenyu-sync-data-zookeeper/src/main/java/org/apache/shenyu/sync/data/zookeeper/ZookeeperSyncDataService.java
 
b/shenyu-sync-data-center/shenyu-sync-data-zookeeper/src/main/java/org/apache/shenyu/sync/data/zookeeper/ZookeeperSyncDataService.java
index 9123b60..f7a1b60 100644
--- 
a/shenyu-sync-data-center/shenyu-sync-data-zookeeper/src/main/java/org/apache/shenyu/sync/data/zookeeper/ZookeeperSyncDataService.java
+++ 
b/shenyu-sync-data-center/shenyu-sync-data-zookeeper/src/main/java/org/apache/shenyu/sync/data/zookeeper/ZookeeperSyncDataService.java
@@ -39,6 +39,7 @@ import java.io.UnsupportedEncodingException;
 import java.net.URLDecoder;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
@@ -322,7 +323,9 @@ public class ZookeeperSyncDataService implements 
SyncDataService, AutoCloseable
     }
 
     private void cachePluginData(final PluginData pluginData) {
-        Optional.ofNullable(pluginData).flatMap(data -> 
Optional.ofNullable(pluginDataSubscriber)).ifPresent(e -> 
e.onSubscribe(pluginData));
+        Optional.ofNullable(pluginData)
+                .flatMap(data -> Optional.ofNullable(pluginDataSubscriber))
+                .ifPresent(e -> e.onSubscribe(pluginData));
     }
 
     private void cacheSelectorData(final SelectorData selectorData) {
@@ -338,6 +341,7 @@ public class ZookeeperSyncDataService implements 
SyncDataService, AutoCloseable
         final String pluginName = str.substring(1, str.length() - 
selectorId.length() - 1);
         selectorData.setPluginName(pluginName);
         selectorData.setId(selectorId);
+
         Optional.ofNullable(pluginDataSubscriber)
                 .ifPresent(e -> e.unSelectorSubscribe(selectorData));
     }
@@ -353,10 +357,12 @@ public class ZookeeperSyncDataService implements 
SyncDataService, AutoCloseable
         final String str = 
dataPath.substring(DefaultPathConstants.RULE_PARENT.length());
         final String pluginName = str.substring(1, str.length() - 
substring.length() - 1);
         final List<String> list = 
Lists.newArrayList(Splitter.on(DefaultPathConstants.SELECTOR_JOIN_RULE).split(substring));
+
         RuleData ruleData = new RuleData();
         ruleData.setPluginName(pluginName);
         ruleData.setSelectorId(list.get(0));
         ruleData.setId(list.get(1));
+
         Optional.ofNullable(pluginDataSubscriber)
                 .ifPresent(e -> e.unRuleSubscribe(ruleData));
     }
@@ -383,10 +389,12 @@ public class ZookeeperSyncDataService implements 
SyncDataService, AutoCloseable
                 .ifPresent(data -> metaDataSubscribers.forEach(e -> 
e.unSubscribe(metaData)));
     }
 
-    private List<String> addSubscribePath(final List<String> alreadyChildren, 
final List<String> currentChildren) {
+    private List<String> addSubscribePath(final List<String> alreadyChildren,
+                                          final List<String> currentChildren) {
         if (CollectionUtils.isEmpty(alreadyChildren)) {
             return currentChildren;
         }
+
         return currentChildren.stream()
                 .filter(current -> 
alreadyChildren.stream().noneMatch(current::equals))
                 .collect(Collectors.toList());
@@ -405,7 +413,7 @@ public class ZookeeperSyncDataService implements 
SyncDataService, AutoCloseable
 
     @Override
     public void close() {
-        if (null != zkClient) {
+        if (Objects.nonNull(zkClient)) {
             zkClient.close();
         }
     }

Reply via email to