This is an automated email from the ASF dual-hosted git repository.

hefengen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shenyu.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f69000220 [type:refactor] refactor sync data, abstract the template 
method of zk and etcd, reimplement consul data sync. (#5001)
2f69000220 is described below

commit 2f6900022035b97aaf2aa52b6701ae115cbf6f8f
Author: yunlongn <[email protected]>
AuthorDate: Fri Aug 18 12:04:03 2023 +0800

    [type:refactor] refactor sync data, abstract the template method of zk and 
etcd, reimplement consul data sync. (#5001)
    
    * [type:bug] fix etcd sync error.
    
    * [type:bug] fix etcd sync error.
    
    * upgrade version
    
    * [type:refactor] refactor zookeeper sync.
    
    * [type:bug] fix etcd sync error.
    
    * [type:refactor] refactor etcd sync.
    
    * [type:refactor] refactor zookeeper sync.
    
    * [type:refactor] refactor admin sync data.
    
    * [type:refactor] refactor admin sync data.
    
    * [type:refactor] refactor admin sync data.
    
    * [type:refactor] refactor consul sync data.
    
    * [type:refactor] refactor admin sync data.
    
    * [type:refactor] refactor admin sync data.
    
    * [type:refactor] refactor sync data.
    
    * [type:refactor] refactor sync data.
    
    * [type:refactor] refactor sync data.
    
    ---------
    
    Co-authored-by: Misaya295 <[email protected]>
    Co-authored-by: misaya295 <[email protected]>
    Co-authored-by: moremind <[email protected]>
---
 .../listener/consul/ConsulDataChangedListener.java |  25 +-
 .../consul/ConsulDataChangedListenerTest.java      |  77 ++---
 .../src/main/resources/application.yml             |   4 +-
 .../data/core/AbstractNodeDataSyncService.java     | 295 ++++++++++++++++
 .../sync/data/consul/ConsulSyncDataService.java    | 225 ++++++++-----
 .../data/consul/handler/ConsulCacheHandler.java    | 165 ---------
 .../data/consul/ConsulSyncDataServiceTest.java     |  78 ++---
 .../consul/handler/ConsulCacheHandlerTest.java     | 219 ------------
 .../shenyu/sync/data/etcd/EtcdSyncDataService.java | 370 ++-------------------
 .../sync/data/etcd/EtcdSyncDataServiceTest.java    | 261 ++-------------
 .../data/zookeeper/ZookeeperSyncDataService.java   | 344 ++-----------------
 11 files changed, 589 insertions(+), 1474 deletions(-)

diff --git 
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/consul/ConsulDataChangedListener.java
 
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/consul/ConsulDataChangedListener.java
index a35bdc1519..4be0bb5854 100644
--- 
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/consul/ConsulDataChangedListener.java
+++ 
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/consul/ConsulDataChangedListener.java
@@ -18,34 +18,31 @@
 package org.apache.shenyu.admin.listener.consul;
 
 import com.ecwid.consul.v1.ConsulClient;
-import com.ecwid.consul.v1.Response;
-import com.ecwid.consul.v1.kv.model.GetValue;
-import org.apache.shenyu.admin.listener.AbstractListDataChangedListener;
-import org.apache.shenyu.common.constant.ConsulConstants;
+import org.apache.shenyu.admin.listener.AbstractNodeDataChangedListener;
 import org.apache.shenyu.common.utils.GsonUtils;
 
-import java.util.Objects;
-
 /**
  *  Use Consul to push data changes.
  */
-public class ConsulDataChangedListener extends AbstractListDataChangedListener 
{
+public class ConsulDataChangedListener extends AbstractNodeDataChangedListener 
{
     private final ConsulClient consulClient;
 
     public ConsulDataChangedListener(final ConsulClient consulClient) {
-        super(new ChangeData(ConsulConstants.PLUGIN_DATA, 
ConsulConstants.SELECTOR_DATA,
-                ConsulConstants.RULE_DATA, ConsulConstants.AUTH_DATA, 
ConsulConstants.META_DATA, ConsulConstants.PROXY_SELECTOR_DATA_ID, 
ConsulConstants.DISCOVERY_UPSTREAM));
         this.consulClient = consulClient;
     }
 
     @Override
-    public void publishConfig(final String dataKey, final Object data) {
-        consulClient.setKVValue(dataKey, GsonUtils.getInstance().toJson(data));
+    public void createOrUpdate(final String pluginPath, final Object data) {
+        consulClient.setKVValue(pluginPath, 
GsonUtils.getInstance().toJson(data));
+    }
+
+    @Override
+    public void deleteNode(final String pluginPath) {
+        consulClient.deleteKVValue(pluginPath);
     }
 
     @Override
-    public String getConfig(final String dataKey) {
-        Response<GetValue> kvValue = consulClient.getKVValue(dataKey);
-        return Objects.nonNull(kvValue.getValue()) ? 
kvValue.getValue().getDecodedValue() : 
ConsulConstants.EMPTY_CONFIG_DEFAULT_VALUE;
+    public void deletePathRecursive(final String selectorParentPath) {
+        consulClient.deleteKVValues(selectorParentPath);
     }
 }
diff --git 
a/shenyu-admin/src/test/java/org/apache/shenyu/admin/listener/consul/ConsulDataChangedListenerTest.java
 
b/shenyu-admin/src/test/java/org/apache/shenyu/admin/listener/consul/ConsulDataChangedListenerTest.java
index 73fbb2d18d..11708c82ec 100644
--- 
a/shenyu-admin/src/test/java/org/apache/shenyu/admin/listener/consul/ConsulDataChangedListenerTest.java
+++ 
b/shenyu-admin/src/test/java/org/apache/shenyu/admin/listener/consul/ConsulDataChangedListenerTest.java
@@ -18,10 +18,7 @@
 package org.apache.shenyu.admin.listener.consul;
 
 import com.ecwid.consul.v1.ConsulClient;
-import com.ecwid.consul.v1.Response;
-import com.ecwid.consul.v1.kv.model.GetValue;
 import com.google.common.collect.ImmutableList;
-import org.apache.shenyu.common.constant.ConsulConstants;
 import org.apache.shenyu.common.dto.AppAuthData;
 import org.apache.shenyu.common.dto.MetaData;
 import org.apache.shenyu.common.dto.PluginData;
@@ -35,11 +32,8 @@ import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
 
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
 /**
  * The testCase for {@link ConsulDataChangedListener}.
@@ -73,106 +67,83 @@ public class ConsulDataChangedListenerTest {
 
     @Test
     public void testOnAppAuthChanged() {
-        String config = 
"{\"divide\":{\"appKey\":\"appKey\",\"appSecret\":\"appSecret\",\"open\":true}}";
         final AppAuthData appAuthData = 
AppAuthData.builder().appKey(MOCK_APP_KEY).appSecret(MOCK_APP_SECRET).build();
 
-        Response<GetValue> response = mock(Response.class);
-        GetValue getValueModel = mock(GetValue.class);
-        when(consulClient.getKVValue(anyString())).thenReturn(response);
-        when(response.getValue()).thenReturn(getValueModel);
-        when(getValueModel.getDecodedValue()).thenReturn(config);
-
         
consulDataChangedListener.onAppAuthChanged(ImmutableList.of(appAuthData), 
DataEventTypeEnum.DELETE);
         
consulDataChangedListener.onAppAuthChanged(ImmutableList.of(appAuthData), 
DataEventTypeEnum.REFRESH);
         
consulDataChangedListener.onAppAuthChanged(ImmutableList.of(appAuthData), 
DataEventTypeEnum.MYSELF);
         
consulDataChangedListener.onAppAuthChanged(ImmutableList.of(appAuthData), 
DataEventTypeEnum.CREATE);
-        verify(consulClient, times(4)).setKVValue(any(String.class), 
any(String.class));
+        verify(consulClient, times(3)).setKVValue(any(String.class), 
any(String.class));
+        verify(consulClient, times(1)).deleteKVValue(any(String.class));
 
-        getValueModel.setValue(ConsulConstants.EMPTY_CONFIG_DEFAULT_VALUE);
         
consulDataChangedListener.onAppAuthChanged(ImmutableList.of(appAuthData), 
DataEventTypeEnum.DELETE);
         
consulDataChangedListener.onAppAuthChanged(ImmutableList.of(appAuthData), 
DataEventTypeEnum.REFRESH);
         
consulDataChangedListener.onAppAuthChanged(ImmutableList.of(appAuthData), 
DataEventTypeEnum.MYSELF);
         
consulDataChangedListener.onAppAuthChanged(ImmutableList.of(appAuthData), 
DataEventTypeEnum.CREATE);
-        verify(consulClient, times(8)).setKVValue(any(String.class), 
any(String.class));
+        verify(consulClient, times(6)).setKVValue(any(String.class), 
any(String.class));
+        verify(consulClient, times(2)).deleteKVValue(any(String.class));
     }
 
     @Test
     public void testOnPluginChanged() {
-        String config = 
"{\"divide\":{\"id\":\"id\",\"name\":\"name\",\"enabled\":true}}";
         final PluginData pluginData = 
PluginData.builder().id(MOCK_ID).name(MOCK_NAME).config(MOCK_CONFIG).build();
-        Response<GetValue> response = mock(Response.class);
-        GetValue getValueModel = mock(GetValue.class);
-        when(consulClient.getKVValue(anyString())).thenReturn(response);
-        when(response.getValue()).thenReturn(getValueModel);
-        when(getValueModel.getDecodedValue()).thenReturn(config);
 
         
consulDataChangedListener.onPluginChanged(ImmutableList.of(pluginData), 
DataEventTypeEnum.DELETE);
         
consulDataChangedListener.onPluginChanged(ImmutableList.of(pluginData), 
DataEventTypeEnum.REFRESH);
         
consulDataChangedListener.onPluginChanged(ImmutableList.of(pluginData), 
DataEventTypeEnum.MYSELF);
         
consulDataChangedListener.onPluginChanged(ImmutableList.of(pluginData), 
DataEventTypeEnum.CREATE);
-        verify(consulClient, times(4)).setKVValue(any(String.class), 
any(String.class));
+        verify(consulClient, times(3)).setKVValue(any(String.class), 
any(String.class));
+        verify(consulClient, times(3)).deleteKVValues(any(String.class));
 
-        getValueModel.setValue(ConsulConstants.EMPTY_CONFIG_DEFAULT_VALUE);
         
consulDataChangedListener.onPluginChanged(ImmutableList.of(pluginData), 
DataEventTypeEnum.DELETE);
         
consulDataChangedListener.onPluginChanged(ImmutableList.of(pluginData), 
DataEventTypeEnum.REFRESH);
         
consulDataChangedListener.onPluginChanged(ImmutableList.of(pluginData), 
DataEventTypeEnum.MYSELF);
         
consulDataChangedListener.onPluginChanged(ImmutableList.of(pluginData), 
DataEventTypeEnum.CREATE);
-        verify(consulClient, times(8)).setKVValue(any(String.class), 
any(String.class));
+        verify(consulClient, times(6)).setKVValue(any(String.class), 
any(String.class));
+        verify(consulClient, times(6)).deleteKVValues(any(String.class));
     }
 
     @Test
     public void testOnSelectorChanged() {
-        String config = 
"{\"divide\":[{\"id\":\"id\",\"name\":\"name\",\"enabled\":true}]}";
         final SelectorData selectorData = 
SelectorData.builder().id(MOCK_ID).name(MOCK_NAME).pluginName(MOCK_PLUGIN_NAME).build();
 
-        Response<GetValue> response = mock(Response.class);
-        GetValue getValueModel = mock(GetValue.class);
-        when(consulClient.getKVValue(anyString())).thenReturn(response);
-        when(response.getValue()).thenReturn(getValueModel);
-        when(getValueModel.getDecodedValue()).thenReturn(config);
-
         
consulDataChangedListener.onSelectorChanged(ImmutableList.of(selectorData), 
DataEventTypeEnum.DELETE);
         
consulDataChangedListener.onSelectorChanged(ImmutableList.of(selectorData), 
DataEventTypeEnum.REFRESH);
         
consulDataChangedListener.onSelectorChanged(ImmutableList.of(selectorData), 
DataEventTypeEnum.MYSELF);
         
consulDataChangedListener.onSelectorChanged(ImmutableList.of(selectorData), 
DataEventTypeEnum.CREATE);
-        verify(consulClient, times(4)).setKVValue(any(String.class), 
any(String.class));
+        verify(consulClient, times(3)).setKVValue(any(String.class), 
any(String.class));
+        verify(consulClient, times(1)).deleteKVValue(any(String.class));
 
-        getValueModel.setValue(ConsulConstants.EMPTY_CONFIG_DEFAULT_VALUE);
         
consulDataChangedListener.onSelectorChanged(ImmutableList.of(selectorData), 
DataEventTypeEnum.DELETE);
         
consulDataChangedListener.onSelectorChanged(ImmutableList.of(selectorData), 
DataEventTypeEnum.REFRESH);
         
consulDataChangedListener.onSelectorChanged(ImmutableList.of(selectorData), 
DataEventTypeEnum.MYSELF);
         
consulDataChangedListener.onSelectorChanged(ImmutableList.of(selectorData), 
DataEventTypeEnum.CREATE);
-        verify(consulClient, times(8)).setKVValue(any(String.class), 
any(String.class));
+        verify(consulClient, times(6)).setKVValue(any(String.class), 
any(String.class));
+        verify(consulClient, times(2)).deleteKVValue(any(String.class));
+        verify(consulClient, times(2)).deleteKVValues(any(String.class));
     }
 
     @Test
     public void testOnMetaDataChanged() {
-        String config = 
"{\"divide\":{\"id\":\"id\",\"appName\":\"appName\",\"enabled\":true}}";
         final MetaData metaData = 
MetaData.builder().id(MOCK_ID).path(MOCK_PATH).appName(MOCK_APP_NAME).build();
 
-        Response<GetValue> response = mock(Response.class);
-        GetValue getValueModel = mock(GetValue.class);
-        when(consulClient.getKVValue(anyString())).thenReturn(response);
-        when(response.getValue()).thenReturn(getValueModel);
-        when(getValueModel.getDecodedValue()).thenReturn(config);
-
         
consulDataChangedListener.onMetaDataChanged(ImmutableList.of(metaData), 
DataEventTypeEnum.DELETE);
         
consulDataChangedListener.onMetaDataChanged(ImmutableList.of(metaData), 
DataEventTypeEnum.REFRESH);
         
consulDataChangedListener.onMetaDataChanged(ImmutableList.of(metaData), 
DataEventTypeEnum.MYSELF);
         
consulDataChangedListener.onMetaDataChanged(ImmutableList.of(metaData), 
DataEventTypeEnum.CREATE);
-        verify(consulClient, times(4)).setKVValue(any(String.class), 
any(String.class));
+        verify(consulClient, times(3)).setKVValue(any(String.class), 
any(String.class));
+        verify(consulClient, times(1)).deleteKVValue(any(String.class));
 
-        getValueModel.setValue(ConsulConstants.EMPTY_CONFIG_DEFAULT_VALUE);
         
consulDataChangedListener.onMetaDataChanged(ImmutableList.of(metaData), 
DataEventTypeEnum.DELETE);
         
consulDataChangedListener.onMetaDataChanged(ImmutableList.of(metaData), 
DataEventTypeEnum.REFRESH);
         
consulDataChangedListener.onMetaDataChanged(ImmutableList.of(metaData), 
DataEventTypeEnum.MYSELF);
         
consulDataChangedListener.onMetaDataChanged(ImmutableList.of(metaData), 
DataEventTypeEnum.CREATE);
-        verify(consulClient, times(8)).setKVValue(any(String.class), 
any(String.class));
+        verify(consulClient, times(6)).setKVValue(any(String.class), 
any(String.class));
+        verify(consulClient, times(2)).deleteKVValue(any(String.class));
     }
 
     @Test
     public void testOnRuleChanged() {
-        String config = 
"{\"divide\":[{\"id\":\"id\",\"appName\":\"appName\",\"enabled\":true}]}";
         final RuleData ruleData = RuleData.builder()
                 .id(MOCK_ID)
                 .name(MOCK_NAME)
@@ -180,23 +151,19 @@ public class ConsulDataChangedListenerTest {
                 .selectorId(MOCK_SELECTOR_ID)
                 .build();
 
-        Response<GetValue> response = mock(Response.class);
-        GetValue getValueModel = mock(GetValue.class);
-        when(consulClient.getKVValue(anyString())).thenReturn(response);
-        when(response.getValue()).thenReturn(getValueModel);
-        when(getValueModel.getDecodedValue()).thenReturn(config);
-
         consulDataChangedListener.onRuleChanged(ImmutableList.of(ruleData), 
DataEventTypeEnum.DELETE);
         consulDataChangedListener.onRuleChanged(ImmutableList.of(ruleData), 
DataEventTypeEnum.REFRESH);
         consulDataChangedListener.onRuleChanged(ImmutableList.of(ruleData), 
DataEventTypeEnum.MYSELF);
         consulDataChangedListener.onRuleChanged(ImmutableList.of(ruleData), 
DataEventTypeEnum.CREATE);
-        verify(consulClient, times(4)).setKVValue(any(String.class), 
any(String.class));
+        verify(consulClient, times(3)).setKVValue(any(String.class), 
any(String.class));
+        verify(consulClient, times(1)).deleteKVValue(any(String.class));
 
-        getValueModel.setValue(ConsulConstants.EMPTY_CONFIG_DEFAULT_VALUE);
         consulDataChangedListener.onRuleChanged(ImmutableList.of(ruleData), 
DataEventTypeEnum.DELETE);
         consulDataChangedListener.onRuleChanged(ImmutableList.of(ruleData), 
DataEventTypeEnum.REFRESH);
         consulDataChangedListener.onRuleChanged(ImmutableList.of(ruleData), 
DataEventTypeEnum.MYSELF);
         consulDataChangedListener.onRuleChanged(ImmutableList.of(ruleData), 
DataEventTypeEnum.CREATE);
-        verify(consulClient, times(8)).setKVValue(any(String.class), 
any(String.class));
+        verify(consulClient, times(6)).setKVValue(any(String.class), 
any(String.class));
+        verify(consulClient, times(2)).deleteKVValue(any(String.class));
+        verify(consulClient, times(2)).deleteKVValues(any(String.class));
     }
 }
diff --git a/shenyu-bootstrap/src/main/resources/application.yml 
b/shenyu-bootstrap/src/main/resources/application.yml
index 47d8571416..35e279e540 100644
--- a/shenyu-bootstrap/src/main/resources/application.yml
+++ b/shenyu-bootstrap/src/main/resources/application.yml
@@ -262,8 +262,8 @@ shenyu:
 #      url: http://localhost:2379
 #    consul:
 #      url: http://localhost:8500
-#      waitTime: 1000
-#      watchDelay: 1000
+#      waitTime: 10000
+#      watchDelay: 10000
   exclude:
     enabled: false
     paths:
diff --git 
a/shenyu-sync-data-center/shenyu-sync-data-api/src/main/java/org/apache/shenyu/sync/data/core/AbstractNodeDataSyncService.java
 
b/shenyu-sync-data-center/shenyu-sync-data-api/src/main/java/org/apache/shenyu/sync/data/core/AbstractNodeDataSyncService.java
new file mode 100644
index 0000000000..9231876f2f
--- /dev/null
+++ 
b/shenyu-sync-data-center/shenyu-sync-data-api/src/main/java/org/apache/shenyu/sync/data/core/AbstractNodeDataSyncService.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.sync.data.core;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import org.apache.shenyu.common.constant.DefaultPathConstants;
+import org.apache.shenyu.common.dto.AppAuthData;
+import org.apache.shenyu.common.dto.DiscoverySyncData;
+import org.apache.shenyu.common.dto.MetaData;
+import org.apache.shenyu.common.dto.PluginData;
+import org.apache.shenyu.common.dto.ProxySelectorData;
+import org.apache.shenyu.common.dto.RuleData;
+import org.apache.shenyu.common.dto.SelectorData;
+import org.apache.shenyu.common.exception.ShenyuException;
+import org.apache.shenyu.common.utils.GsonUtils;
+import org.apache.shenyu.sync.data.api.AuthDataSubscriber;
+import org.apache.shenyu.sync.data.api.DiscoveryUpstreamDataSubscriber;
+import org.apache.shenyu.sync.data.api.MetaDataSubscriber;
+import org.apache.shenyu.sync.data.api.PluginDataSubscriber;
+import org.apache.shenyu.sync.data.api.ProxySelectorDataSubscriber;
+import org.apache.shenyu.sync.data.api.SyncDataService;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * AbstractNodeDataSyncService.
+ * Abstract method to monitor child node changes.
+ */
+public abstract class AbstractNodeDataSyncService implements SyncDataService {
+
+    private final PluginDataSubscriber pluginDataSubscriber;
+
+    private final List<MetaDataSubscriber> metaDataSubscribers;
+
+    private final List<AuthDataSubscriber> authDataSubscribers;
+
+    private final List<ProxySelectorDataSubscriber> 
proxySelectorDataSubscribers;
+
+    private final List<DiscoveryUpstreamDataSubscriber> 
discoveryUpstreamDataSubscribers;
+
+    public AbstractNodeDataSyncService(final PluginDataSubscriber 
pluginDataSubscriber,
+                                       final List<MetaDataSubscriber> 
metaDataSubscribers,
+                                       final List<AuthDataSubscriber> 
authDataSubscribers,
+                                       final List<ProxySelectorDataSubscriber> 
proxySelectorDataSubscribers,
+                                       final 
List<DiscoveryUpstreamDataSubscriber> discoveryUpstreamDataSubscribers) {
+        this.pluginDataSubscriber = pluginDataSubscriber;
+        this.metaDataSubscribers = metaDataSubscribers;
+        this.authDataSubscribers = authDataSubscribers;
+        this.proxySelectorDataSubscribers = proxySelectorDataSubscribers;
+        this.discoveryUpstreamDataSubscribers = 
discoveryUpstreamDataSubscribers;
+    }
+
+
+    /**
+     * event.
+     *
+     * @param updatePath updatePath
+     * @param updateData updateData
+     * @param registerPath registerPath
+     * @param eventType eventType
+     */
+    public void event(final String updatePath, final String updateData, final 
String registerPath, final EventType eventType) {
+        switch (registerPath) {
+            case DefaultPathConstants.PLUGIN_PARENT:
+                pluginHandlerEvent(updatePath, updateData, eventType);
+                break;
+            case DefaultPathConstants.SELECTOR_PARENT:
+                selectorHandlerEvent(updatePath, updateData, eventType);
+                break;
+            case DefaultPathConstants.META_DATA:
+                metaDataHandlerEvent(updatePath, updateData, eventType);
+                break;
+            case DefaultPathConstants.APP_AUTH_PARENT:
+                appAuthHandlerEvent(updatePath, updateData, eventType);
+                break;
+            case DefaultPathConstants.RULE_PARENT:
+                ruleHandlerEvent(updatePath, updateData, eventType);
+                break;
+            case DefaultPathConstants.DISCOVERY_UPSTREAM:
+                discoveryUpstreamHandlerEvent(updatePath, updateData, 
eventType);
+                break;
+            case DefaultPathConstants.PROXY_SELECTOR:
+                proxyHandlerEvent(updatePath, updateData, eventType);
+                break;
+            default:
+                break;
+        }
+    }
+
+    private void proxyHandlerEvent(final String updatePath, final String 
updateData, final EventType eventType) {
+        String[] pathInfoArray = updatePath.split("/");
+        if (pathInfoArray.length != 5) {
+            return;
+        }
+        String pluginName = pathInfoArray[pathInfoArray.length - 2];
+        String proxySelectorName = pathInfoArray[pathInfoArray.length - 1];
+        if (EventType.DELETE.equals(eventType)) {
+            ProxySelectorData proxySelectorData = new ProxySelectorData();
+            proxySelectorData.setPluginName(pluginName);
+            proxySelectorData.setName(proxySelectorName);
+            unCacheProxySelectorData(proxySelectorData);
+            return;
+        }
+        ProxySelectorData proxySelectorData = 
GsonUtils.getInstance().fromJson(updateData, ProxySelectorData.class);
+        proxySelectorData.setName(proxySelectorName);
+        proxySelectorData.setPluginName(pluginName);
+        Optional.ofNullable(updateData)
+                .ifPresent(e -> cacheProxySelectorData(proxySelectorData));
+    }
+
+    private void discoveryUpstreamHandlerEvent(final String updatePath, final 
String updateData, final EventType eventType) {
+        String[] pathInfoArray2 = updatePath.split("/");
+        if (pathInfoArray2.length != 5) {
+            return;
+        }
+        if (!EventType.DELETE.equals(eventType)) {
+            Optional.ofNullable(updateData)
+                    .ifPresent(e -> 
cacheDiscoveryUpstreamData(GsonUtils.getInstance().fromJson(updateData, 
DiscoverySyncData.class)));
+        }
+    }
+
+    private void ruleHandlerEvent(final String updatePath, final String 
updateData, final EventType eventType) {
+        if (EventType.DELETE.equals(eventType)) {
+            unCacheRuleData(updatePath);
+            return;
+        }
+        Optional.ofNullable(updateData)
+                .ifPresent(e -> 
cacheRuleData(GsonUtils.getInstance().fromJson(updateData, RuleData.class)));
+    }
+
+    private void appAuthHandlerEvent(final String updatePath, final String 
updateData, final EventType eventType) {
+        if (EventType.DELETE.equals(eventType)) {
+            unCacheAuthData(updatePath);
+            return;
+        }
+        Optional.ofNullable(updateData)
+                .ifPresent(e -> 
cacheAuthData(GsonUtils.getInstance().fromJson(updateData, AppAuthData.class)));
+    }
+
+    private void metaDataHandlerEvent(final String updatePath, final String 
updateData, final EventType eventType) {
+        if (EventType.DELETE.equals(eventType)) {
+            final String realPath = 
updatePath.substring(DefaultPathConstants.META_DATA.length() + 1);
+            MetaData metaData = new MetaData();
+            try {
+                metaData.setPath(URLDecoder.decode(realPath, 
StandardCharsets.UTF_8.name()));
+            } catch (UnsupportedEncodingException e) {
+                throw new ShenyuException(e);
+            }
+            unCacheMetaData(metaData);
+            return;
+        }
+        Optional.ofNullable(updateData)
+                .ifPresent(e -> 
cacheMetaData(GsonUtils.getInstance().fromJson(updateData, MetaData.class)));
+    }
+
+    private void selectorHandlerEvent(final String updatePath, final String 
updateData, final EventType eventType) {
+        if (EventType.DELETE.equals(eventType)) {
+            unCacheSelectorData(updatePath);
+            return;
+        }
+        Optional.ofNullable(updateData)
+                .ifPresent(e -> 
cacheSelectorData(GsonUtils.getInstance().fromJson(updateData, 
SelectorData.class)));
+    }
+
+    private void pluginHandlerEvent(final String updatePath, final String 
updateData, final EventType eventType) {
+        if (EventType.DELETE.equals(eventType)) {
+            String pluginName = 
updatePath.substring(updatePath.lastIndexOf("/") + 1);
+            unCachePluginName(pluginName);
+            return;
+        }
+        Optional.ofNullable(updateData)
+                .ifPresent(e -> 
cachePluginData(GsonUtils.getInstance().fromJson(updateData, 
PluginData.class)));
+    }
+
+    public enum EventType {
+        PUT,
+        DELETE,
+    }
+
+    protected void cachePluginData(final PluginData pluginData) {
+        Optional.ofNullable(pluginData)
+                .flatMap(data -> Optional.ofNullable(pluginDataSubscriber))
+                .ifPresent(e -> e.onSubscribe(pluginData));
+    }
+
+    protected void cacheSelectorData(final SelectorData selectorData) {
+        Optional.ofNullable(selectorData)
+                .ifPresent(data -> Optional.ofNullable(pluginDataSubscriber)
+                        .ifPresent(e -> e.onSelectorSubscribe(data)));
+    }
+
+    protected void unCacheSelectorData(final String dataPath) {
+        SelectorData selectorData = new SelectorData();
+        final String selectorId = dataPath.substring(dataPath.lastIndexOf("/") 
+ 1);
+        final String str = 
dataPath.substring(DefaultPathConstants.SELECTOR_PARENT.length());
+        final int pluginNameIndex = str.length() - selectorId.length() - 1;
+        if (pluginNameIndex <= 0) {
+            return;
+        }
+        final String pluginName = str.substring(1, pluginNameIndex);
+        selectorData.setPluginName(pluginName);
+        selectorData.setId(selectorId);
+
+        Optional.ofNullable(pluginDataSubscriber)
+                .ifPresent(e -> e.unSelectorSubscribe(selectorData));
+    }
+
+    protected void unCachePluginName(final String pluginName) {
+        final PluginData pluginData = new PluginData();
+        pluginData.setName(pluginName);
+        Optional.ofNullable(pluginDataSubscriber).ifPresent(e -> 
e.unSubscribe(pluginData));
+    }
+
+    protected void cacheRuleData(final RuleData ruleData) {
+        Optional.ofNullable(ruleData)
+                .ifPresent(data -> Optional.ofNullable(pluginDataSubscriber)
+                        .ifPresent(e -> e.onRuleSubscribe(data)));
+    }
+
+    protected void unCacheRuleData(final String dataPath) {
+        String ruleDataId = dataPath.substring(dataPath.lastIndexOf("/") + 1);
+        final String str = 
dataPath.substring(DefaultPathConstants.RULE_PARENT.length());
+        final int pluginNameIndex = str.length() - ruleDataId.length() - 1;
+        if (pluginNameIndex <= 0) {
+            return;
+        }
+        final String pluginName = str.substring(1, pluginNameIndex);
+        final List<String> list = 
Lists.newArrayList(Splitter.on(DefaultPathConstants.SELECTOR_JOIN_RULE).split(ruleDataId));
+
+        RuleData ruleData = new RuleData();
+        ruleData.setPluginName(pluginName);
+        ruleData.setSelectorId(list.get(0));
+        ruleData.setId(list.get(1));
+
+        Optional.ofNullable(pluginDataSubscriber)
+                .ifPresent(e -> e.unRuleSubscribe(ruleData));
+    }
+
+    protected void cacheAuthData(final AppAuthData appAuthData) {
+        Optional.ofNullable(appAuthData)
+                .ifPresent(data -> authDataSubscribers.forEach(e -> 
e.onSubscribe(data)));
+    }
+
+    protected void unCacheAuthData(final String dataPath) {
+        final String key = 
dataPath.substring(DefaultPathConstants.APP_AUTH_PARENT.length() + 1);
+        AppAuthData appAuthData = new AppAuthData();
+        appAuthData.setAppKey(key);
+        authDataSubscribers.forEach(e -> e.unSubscribe(appAuthData));
+    }
+
+    protected void cacheMetaData(final MetaData metaData) {
+        Optional.ofNullable(metaData)
+                .ifPresent(data -> metaDataSubscribers.forEach(e -> 
e.onSubscribe(metaData)));
+    }
+
+    protected void cacheProxySelectorData(final ProxySelectorData 
proxySelectorData) {
+        Optional.ofNullable(proxySelectorData)
+                .ifPresent(data -> proxySelectorDataSubscribers.forEach(e -> 
e.onSubscribe(proxySelectorData)));
+    }
+
+    protected void cacheDiscoveryUpstreamData(final DiscoverySyncData 
upstreamDataList) {
+        Optional.ofNullable(discoveryUpstreamDataSubscribers)
+                .ifPresent(data -> discoveryUpstreamDataSubscribers.forEach(e 
-> e.onSubscribe(upstreamDataList)));
+    }
+
+    protected void unCacheMetaData(final MetaData metaData) {
+        Optional.ofNullable(metaData)
+                .ifPresent(data -> metaDataSubscribers.forEach(e -> 
e.unSubscribe(metaData)));
+    }
+
+    protected void unCacheProxySelectorData(final ProxySelectorData 
proxySelectorData) {
+        Optional.ofNullable(proxySelectorData)
+                .ifPresent(data -> proxySelectorDataSubscribers.forEach(e -> 
e.unSubscribe(proxySelectorData)));
+    }
+}
diff --git 
a/shenyu-sync-data-center/shenyu-sync-data-consul/src/main/java/org/apache/shenyu/sync/data/consul/ConsulSyncDataService.java
 
b/shenyu-sync-data-center/shenyu-sync-data-consul/src/main/java/org/apache/shenyu/sync/data/consul/ConsulSyncDataService.java
index 57b1e547ce..4fbd99fe15 100644
--- 
a/shenyu-sync-data-center/shenyu-sync-data-consul/src/main/java/org/apache/shenyu/sync/data/consul/ConsulSyncDataService.java
+++ 
b/shenyu-sync-data-center/shenyu-sync-data-consul/src/main/java/org/apache/shenyu/sync/data/consul/ConsulSyncDataService.java
@@ -21,16 +21,19 @@ import com.ecwid.consul.v1.ConsulClient;
 import com.ecwid.consul.v1.QueryParams;
 import com.ecwid.consul.v1.Response;
 import com.ecwid.consul.v1.kv.model.GetValue;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.lang3.ObjectUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.shenyu.common.concurrent.ShenyuThreadFactory;
 import org.apache.shenyu.common.constant.ConsulConstants;
+import org.apache.shenyu.common.constant.DefaultPathConstants;
 import org.apache.shenyu.sync.data.api.AuthDataSubscriber;
 import org.apache.shenyu.sync.data.api.DiscoveryUpstreamDataSubscriber;
 import org.apache.shenyu.sync.data.api.MetaDataSubscriber;
 import org.apache.shenyu.sync.data.api.PluginDataSubscriber;
 import org.apache.shenyu.sync.data.api.ProxySelectorDataSubscriber;
-import org.apache.shenyu.sync.data.api.SyncDataService;
 import org.apache.shenyu.sync.data.consul.config.ConsulConfig;
-import org.apache.shenyu.sync.data.consul.handler.ConsulCacheHandler;
+import org.apache.shenyu.sync.data.core.AbstractNodeDataSyncService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,34 +41,31 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
 
 /**
  * Consul sync data service.
  */
-public class ConsulSyncDataService extends ConsulCacheHandler implements 
SyncDataService {
+public class ConsulSyncDataService extends AbstractNodeDataSyncService {
     /**
      * logger.
      */
     private static final Logger LOG = 
LoggerFactory.getLogger(ConsulSyncDataService.class);
 
-    private final Map<String, OnChange> groupMap = new HashMap<>();
-
     private final Map<String, Long> consulIndexes = new HashMap<>();
 
-    private final ScheduledThreadPoolExecutor executor;
+    private final Map<String, List<ConsulData>> cacheConsulDataKeyMap = new 
HashMap<>();
 
-    private ScheduledFuture<?> watchFuture;
+    private final ScheduledThreadPoolExecutor executor;
 
     private final ConsulConfig consulConfig;
 
     private final ConsulClient consulClient;
 
-    private final AtomicBoolean running = new AtomicBoolean(false);
-
     /**
      * Instantiates a new Consul sync data service.
      *
@@ -85,88 +85,159 @@ public class ConsulSyncDataService extends 
ConsulCacheHandler implements SyncDat
         super(pluginDataSubscriber, metaDataSubscribers, authDataSubscribers, 
proxySelectorDataSubscribers, discoveryUpstreamDataSubscribers);
         this.consulClient = consulClient;
         this.consulConfig = consulConfig;
-        this.executor = new ScheduledThreadPoolExecutor(1,
+        // corePool is the total number of watcher nodes
+        this.executor = new ScheduledThreadPoolExecutor(7,
                 ShenyuThreadFactory.create("consul-config-watch", true));
-        consulIndexes.put(ConsulConstants.SYNC_PRE_FIX, 0L);
-        initUpdateMap();
-        start();
+        watcherData();
     }
 
-    /**
-     * init config key and update method mapping.
-     */
-    private void initUpdateMap() {
-        groupMap.put(ConsulConstants.PLUGIN_DATA, this::updatePluginData);
-        groupMap.put(ConsulConstants.SELECTOR_DATA, this::updateSelectorMap);
-        groupMap.put(ConsulConstants.RULE_DATA, this::updateRuleMap);
-        groupMap.put(ConsulConstants.META_DATA, this::updateMetaDataMap);
-        groupMap.put(ConsulConstants.AUTH_DATA, this::updateAuthMap);
-        groupMap.put(ConsulConstants.PROXY_SELECTOR_DATA_ID, 
this::updateSelectorDataMap);
-        groupMap.put(ConsulConstants.DISCOVERY_UPSTREAM, 
this::updateDiscoveryUpstreamMap);
+    private void watcherData() {
+        watcherData0(DefaultPathConstants.PLUGIN_PARENT);
+        watcherData0(DefaultPathConstants.SELECTOR_PARENT);
+        watcherData0(DefaultPathConstants.RULE_PARENT);
+        watcherData0(DefaultPathConstants.PROXY_SELECTOR);
+        watcherData0(DefaultPathConstants.DISCOVERY_UPSTREAM);
+        watcherData0(DefaultPathConstants.APP_AUTH_PARENT);
+        watcherData0(DefaultPathConstants.META_DATA);
     }
 
-    private void watchConfigKeyValues() {
-        if (this.running.get()) {
-            for (String context : this.consulIndexes.keySet()) {
-                try {
-                    Long currentIndex = this.consulIndexes.get(context);
-                    if (Objects.isNull(currentIndex)) {
-                        currentIndex = 
ConsulConstants.INIT_CONFIG_VERSION_INDEX;
-                    }
-                    Response<List<GetValue>> response = 
this.consulClient.getKVValues(context, null,
-                            new QueryParams(consulConfig.getWaitTime(), 
currentIndex));
-                    if (Objects.isNull(response.getValue()) || 
response.getValue().isEmpty()) {
-                        if (LOG.isTraceEnabled()) {
-                            LOG.trace("No value for context " + context);
-                        }
-                        continue;
-                    }
-                    Long newIndex = response.getConsulIndex();
-                    if (Objects.isNull(newIndex) || Objects.equals(newIndex, 
currentIndex)) {
-                        if (LOG.isTraceEnabled()) {
-                            LOG.trace("Same index for context " + context);
-                        }
-                        continue;
+    private void watcherData0(final String registerPath) {
+        consulIndexes.put(registerPath, 0L);
+        BiConsumer<String, String> updateHandler = (changeData, decodedValue) 
-> this.event(changeData, decodedValue, registerPath, EventType.PUT);
+        Consumer<String> deleteHandler = removeKey -> this.event(removeKey, 
null, registerPath, EventType.DELETE);
+        this.executor.schedule(() -> watchConfigKeyValues(registerPath, 
updateHandler, deleteHandler), -1, TimeUnit.MILLISECONDS);
+    }
+
+    private void watchConfigKeyValues(final String watchPathRoot,
+                                      final BiConsumer<String, String> 
updateHandler,
+                                      final Consumer<String> deleteHandler) {
+        try {
+            Long currentIndex = this.consulIndexes.get(watchPathRoot);
+            if (Objects.isNull(currentIndex)) {
+                currentIndex = ConsulConstants.INIT_CONFIG_VERSION_INDEX;
+            }
+            Response<List<GetValue>> response = 
this.consulClient.getKVValues(watchPathRoot, null,
+                    new 
QueryParams(TimeUnit.MILLISECONDS.toSeconds(consulConfig.getWaitTime()), 
currentIndex));
+            if (Objects.isNull(response.getValue()) || 
response.getValue().isEmpty()) {
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("No value for watchPathRoot " + watchPathRoot);
+                }
+                this.executor.schedule(() -> 
watchConfigKeyValues(watchPathRoot, updateHandler, deleteHandler),
+                        consulConfig.getWatchDelay(), TimeUnit.MILLISECONDS);
+                return;
+            }
+            Long newIndex = response.getConsulIndex();
+            if (Objects.isNull(newIndex)) {
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Same index for watchPathRoot " + watchPathRoot);
+                }
+                this.executor.schedule(() -> 
watchConfigKeyValues(watchPathRoot, updateHandler, deleteHandler),
+                        consulConfig.getWatchDelay(), TimeUnit.MILLISECONDS);
+                return;
+            }
+            if (Objects.equals(newIndex, currentIndex)) {
+                this.executor.schedule(() -> 
watchConfigKeyValues(watchPathRoot, updateHandler, deleteHandler),
+                        -1, TimeUnit.MILLISECONDS);
+                return;
+            }
+            if (!this.consulIndexes.containsValue(newIndex)
+                    && 
!currentIndex.equals(ConsulConstants.INIT_CONFIG_VERSION_INDEX)) {
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("watchPathRoot " + watchPathRoot + " has new 
index " + newIndex);
+                }
+                final Long lastIndex = currentIndex;
+                final List<ConsulData> lastDatas = 
cacheConsulDataKeyMap.get(watchPathRoot);
+                response.getValue().forEach(data -> {
+                    if (data.getModifyIndex() == lastIndex) {
+                        //data has not changed
+                        return;
                     }
-                    if (!this.consulIndexes.containsValue(newIndex)
-                            && 
!currentIndex.equals(ConsulConstants.INIT_CONFIG_VERSION_INDEX)) {
-                        if (LOG.isTraceEnabled()) {
-                            LOG.trace("Context " + context + " has new index " 
+ newIndex);
+                    if (!Objects.isNull(lastDatas)) {
+                        final ConsulData consulData = lastDatas.stream()
+                                .filter(lastData -> 
data.getKey().equals(lastData.getConsulKey())).findFirst().orElse(null);
+                        if (!Objects.isNull(consulData) && 
!StringUtils.isBlank(consulData.getConsulDataMd5())
+                                && 
consulData.getConsulDataMd5().equals(DigestUtils.md5Hex(data.getValue()))) {
+                            return;
                         }
-                        final Long lastIndex = currentIndex;
-                        response.getValue().forEach(data -> {
-                            if (data.getModifyIndex() == lastIndex) {
-                                //data has not changed
-                                return;
-                            }
-                            
groupMap.get(data.getKey()).change(data.getDecodedValue());
-                        });
-
-                    } else if (LOG.isTraceEnabled()) {
-                        LOG.info("Event for index already published for 
context " + context);
                     }
-                    this.consulIndexes.put(context, newIndex);
-                } catch (Exception e) {
-                    LOG.warn("Error querying consul Key/Values for context '" 
+ context + "'. Message: " + e.getMessage());
+                    updateHandler.accept(data.getKey(), 
data.getDecodedValue());
+                });
+                final List<String> currentKeys = 
response.getValue().stream().map(GetValue::getKey).collect(Collectors.toList());
+                if (!ObjectUtils.isEmpty(lastDatas)) {
+                    // handler delete event
+                    lastDatas.stream()
+                            .map(ConsulData::getConsulKey)
+                            .filter(lastKey -> !currentKeys.contains(lastKey))
+                            .forEach(deleteHandler);
                 }
+
+                // save last Keys
+                cacheConsulDataKeyMap.put(watchPathRoot, 
response.getValue().stream().map(data -> {
+                    final ConsulData consulData = new ConsulData();
+                    consulData.setConsulKey(data.getKey());
+                    
consulData.setConsulDataMd5(DigestUtils.md5Hex(data.getValue()));
+                    return consulData;
+                }).collect(Collectors.toList()));
+            } else if (LOG.isTraceEnabled()) {
+                LOG.info("Event for index already published for watchPathRoot 
" + watchPathRoot);
             }
+            this.consulIndexes.put(watchPathRoot, newIndex);
+            this.executor.schedule(() -> watchConfigKeyValues(watchPathRoot, 
updateHandler, deleteHandler),
+                    -1, TimeUnit.MILLISECONDS);
+        } catch (Exception e) {
+            LOG.warn("Error querying consul Key/Values for watchPathRoot '" + 
watchPathRoot + "'. Message: ", e);
+            this.executor.schedule(() -> watchConfigKeyValues(watchPathRoot, 
updateHandler, deleteHandler),
+                    consulConfig.getWatchDelay(), TimeUnit.MILLISECONDS);
         }
     }
 
-    /**
-     * Start.
-     */
-    public void start() {
-        if (this.running.compareAndSet(false, true)) {
-            this.watchFuture = 
this.executor.scheduleWithFixedDelay(this::watchConfigKeyValues,
-                    5, consulConfig.getWatchDelay(), TimeUnit.MILLISECONDS);
+    @Override
+    public void close() {
+        if (!ObjectUtils.isEmpty(executor)) {
+            executor.shutdown();
         }
     }
 
-    @Override
-    public void close() {
-        if (this.running.compareAndSet(true, false) && 
Objects.nonNull(this.watchFuture)) {
-            this.watchFuture.cancel(true);
+    private static class ConsulData {
+
+        private String consulKey;
+
+        private String consulDataMd5;
+
+        /**
+         * consulKey.
+         *
+         * @return ConsulKey
+         */
+        public String getConsulKey() {
+            return consulKey;
+        }
+
+        /**
+         * set consulKey.
+         *
+         * @param consulKey consulKey
+         */
+        public void setConsulKey(final String consulKey) {
+            this.consulKey = consulKey;
+        }
+
+        /**
+         * consulDataMd5.
+         *
+         * @return ConsulDataMd5
+         */
+        public String getConsulDataMd5() {
+            return consulDataMd5;
+        }
+
+        /**
+         * set consulDataMd5.
+         *
+         * @param consulDataMd5 consulDataMd5
+         */
+        public void setConsulDataMd5(final String consulDataMd5) {
+            this.consulDataMd5 = consulDataMd5;
         }
     }
 }
diff --git 
a/shenyu-sync-data-center/shenyu-sync-data-consul/src/main/java/org/apache/shenyu/sync/data/consul/handler/ConsulCacheHandler.java
 
b/shenyu-sync-data-center/shenyu-sync-data-consul/src/main/java/org/apache/shenyu/sync/data/consul/handler/ConsulCacheHandler.java
deleted file mode 100644
index 296da3ff05..0000000000
--- 
a/shenyu-sync-data-center/shenyu-sync-data-consul/src/main/java/org/apache/shenyu/sync/data/consul/handler/ConsulCacheHandler.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shenyu.sync.data.consul.handler;
-
-import com.google.gson.JsonParseException;
-import org.apache.shenyu.common.dto.AppAuthData;
-import org.apache.shenyu.common.dto.DiscoverySyncData;
-import org.apache.shenyu.common.dto.MetaData;
-import org.apache.shenyu.common.dto.PluginData;
-import org.apache.shenyu.common.dto.ProxySelectorData;
-import org.apache.shenyu.common.dto.RuleData;
-import org.apache.shenyu.common.dto.SelectorData;
-import org.apache.shenyu.common.utils.GsonUtils;
-import org.apache.shenyu.sync.data.api.AuthDataSubscriber;
-import org.apache.shenyu.sync.data.api.DiscoveryUpstreamDataSubscriber;
-import org.apache.shenyu.sync.data.api.MetaDataSubscriber;
-import org.apache.shenyu.sync.data.api.PluginDataSubscriber;
-import org.apache.shenyu.sync.data.api.ProxySelectorDataSubscriber;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
-/**
- * Consul cache handler.
- */
-public class ConsulCacheHandler {
-    /**
-     * logger.
-     */
-    private static final Logger LOG = 
LoggerFactory.getLogger(ConsulCacheHandler.class);
-
-    private final PluginDataSubscriber pluginDataSubscriber;
-
-    private final List<MetaDataSubscriber> metaDataSubscribers;
-
-    private final List<AuthDataSubscriber> authDataSubscribers;
-
-    private final List<ProxySelectorDataSubscriber> 
proxySelectorDataSubscribers;
-
-    private final List<DiscoveryUpstreamDataSubscriber> 
discoveryUpstreamDataSubscribers;
-
-    public ConsulCacheHandler(final PluginDataSubscriber pluginDataSubscriber,
-                              final List<MetaDataSubscriber> 
metaDataSubscribers,
-                              final List<AuthDataSubscriber> 
authDataSubscribers,
-                              final List<ProxySelectorDataSubscriber> 
proxySelectorDataSubscribers,
-                              final List<DiscoveryUpstreamDataSubscriber> 
discoveryUpstreamDataSubscribers
-    ) {
-        this.pluginDataSubscriber = pluginDataSubscriber;
-        this.metaDataSubscribers = metaDataSubscribers;
-        this.authDataSubscribers = authDataSubscribers;
-        this.proxySelectorDataSubscribers = proxySelectorDataSubscribers;
-        this.discoveryUpstreamDataSubscribers = 
discoveryUpstreamDataSubscribers;
-    }
-
-    protected void updatePluginData(final String configInfo) {
-        try {
-            List<PluginData> pluginDataList = new 
ArrayList<>(GsonUtils.getInstance().toObjectMap(configInfo, 
PluginData.class).values());
-            pluginDataList.forEach(pluginData -> 
Optional.ofNullable(pluginDataSubscriber).ifPresent(subscriber -> {
-                subscriber.unSubscribe(pluginData);
-                subscriber.onSubscribe(pluginData);
-            }));
-        } catch (JsonParseException e) {
-            LOG.error("sync plugin data have error:", e);
-        }
-    }
-
-    protected void updateSelectorMap(final String configInfo) {
-        try {
-            List<SelectorData> selectorDataList = 
GsonUtils.getInstance().toObjectMapList(configInfo, 
SelectorData.class).values().stream().flatMap(Collection::stream).collect(Collectors.toList());
-            selectorDataList.forEach(selectorData -> 
Optional.ofNullable(pluginDataSubscriber).ifPresent(subscriber -> {
-                subscriber.unSelectorSubscribe(selectorData);
-                subscriber.onSelectorSubscribe(selectorData);
-            }));
-        } catch (JsonParseException e) {
-            LOG.error("sync selector data have error:", e);
-        }
-    }
-
-    protected void updateRuleMap(final String configInfo) {
-        try {
-            List<RuleData> ruleDataList = 
GsonUtils.getInstance().toObjectMapList(configInfo, RuleData.class).values()
-                    .stream().flatMap(Collection::stream)
-                    .collect(Collectors.toList());
-            ruleDataList.forEach(ruleData -> 
Optional.ofNullable(pluginDataSubscriber).ifPresent(subscriber -> {
-                subscriber.unRuleSubscribe(ruleData);
-                subscriber.onRuleSubscribe(ruleData);
-            }));
-        } catch (JsonParseException e) {
-            LOG.error("sync rule data have error:", e);
-        }
-    }
-
-    protected void updateMetaDataMap(final String configInfo) {
-        try {
-            List<MetaData> metaDataList = new 
ArrayList<>(GsonUtils.getInstance().toObjectMap(configInfo, 
MetaData.class).values());
-            metaDataList.forEach(metaData -> 
metaDataSubscribers.forEach(subscriber -> {
-                subscriber.unSubscribe(metaData);
-                subscriber.onSubscribe(metaData);
-            }));
-        } catch (JsonParseException e) {
-            LOG.error("sync meta data have error:", e);
-        }
-    }
-
-    protected void updateAuthMap(final String configInfo) {
-        try {
-            List<AppAuthData> appAuthDataList = new 
ArrayList<>(GsonUtils.getInstance().toObjectMap(configInfo, 
AppAuthData.class).values());
-            appAuthDataList.forEach(appAuthData -> 
authDataSubscribers.forEach(subscriber -> {
-                subscriber.unSubscribe(appAuthData);
-                subscriber.onSubscribe(appAuthData);
-            }));
-        } catch (JsonParseException e) {
-            LOG.error("sync auth data have error:", e);
-        }
-    }
-
-    protected void updateSelectorDataMap(final String configInfo) {
-        try {
-            List<ProxySelectorData> proxySelectorDataList = new 
ArrayList<>(GsonUtils.getInstance().toObjectMap(configInfo, 
ProxySelectorData.class).values());
-            proxySelectorDataList.forEach(proxySelectorData -> 
proxySelectorDataSubscribers.forEach(subscriber -> {
-                subscriber.onSubscribe(proxySelectorData);
-                subscriber.unSubscribe(proxySelectorData);
-            }));
-        } catch (JsonParseException e) {
-            LOG.error("sync proxy selector data have error:", e);
-        }
-    }
-
-    protected void updateDiscoveryUpstreamMap(final String configInfo) {
-        try {
-            List<DiscoverySyncData> discoverySyncDataList = new 
ArrayList<>(GsonUtils.getInstance().toObjectMap(configInfo, 
DiscoverySyncData.class).values());
-            discoverySyncDataList.forEach(discoverySyncData -> 
discoveryUpstreamDataSubscribers.forEach(subscriber -> {
-                subscriber.onSubscribe(discoverySyncData);
-                subscriber.unSubscribe(discoverySyncData);
-            }));
-        } catch (JsonParseException e) {
-            LOG.error("sync discovery data have error:", e);
-        }
-    }
-
-    protected interface OnChange {
-
-        void change(String changeData);
-    }
-}
diff --git 
a/shenyu-sync-data-center/shenyu-sync-data-consul/src/test/java/org/apache/shenyu/sync/data/consul/ConsulSyncDataServiceTest.java
 
b/shenyu-sync-data-center/shenyu-sync-data-consul/src/test/java/org/apache/shenyu/sync/data/consul/ConsulSyncDataServiceTest.java
index 89c76c7160..ae847a4764 100644
--- 
a/shenyu-sync-data-center/shenyu-sync-data-consul/src/test/java/org/apache/shenyu/sync/data/consul/ConsulSyncDataServiceTest.java
+++ 
b/shenyu-sync-data-center/shenyu-sync-data-consul/src/test/java/org/apache/shenyu/sync/data/consul/ConsulSyncDataServiceTest.java
@@ -18,16 +18,13 @@
 package org.apache.shenyu.sync.data.consul;
 
 import com.ecwid.consul.v1.ConsulClient;
-import com.ecwid.consul.v1.QueryParams;
 import com.ecwid.consul.v1.Response;
 import com.ecwid.consul.v1.kv.model.GetValue;
-import org.apache.shenyu.common.constant.ConsulConstants;
 import org.apache.shenyu.sync.data.consul.config.ConsulConfig;
-import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
 import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
 import org.mockito.junit.jupiter.MockitoSettings;
@@ -36,16 +33,14 @@ import org.mockito.quality.Strictness;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
 
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 /**
@@ -67,6 +62,7 @@ public final class ConsulSyncDataServiceTest {
     @Mock
     private ConsulConfig consulConfig;
 
+    @InjectMocks
     private ConsulSyncDataService consulSyncDataService;
 
     @Mock
@@ -75,72 +71,58 @@ public final class ConsulSyncDataServiceTest {
     @Mock
     private Response response;
 
-    @BeforeEach
-    public void setup() {
-        when(consulConfig.getWaitTime()).thenReturn(WAIT_TIME);
-        when(consulConfig.getWatchDelay()).thenReturn(WATCH_DELAY);
-        final List<GetValue> list = new ArrayList<>(1);
-        when(getValue.getModifyIndex()).thenReturn(INDEX);
-        when(getValue.getKey()).thenReturn(ConsulConstants.PLUGIN_DATA);
-        when(getValue.getDecodedValue()).thenReturn("{}");
-        list.add(getValue);
-        when(consulClient.getKVValues(any(), any(), any()))
-                .thenReturn(response);
-        when(response.getValue()).thenReturn(list);
-        when(response.getConsulIndex()).thenReturn(INDEX);
-        consulSyncDataService = new ConsulSyncDataService(consulClient, 
consulConfig, null,
-                Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList(), Collections.emptyList());
-    }
-
-    @Test
-    public void testStart() throws Exception {
-        TimeUnit.SECONDS.sleep(2);
-        verify(consulClient, 
atLeast(1)).getKVValues(eq(ConsulConstants.SYNC_PRE_FIX),
-                eq(null), any(QueryParams.class));
-        verify(getValue, atLeast(1)).getModifyIndex();
-        verify(getValue, atLeast(1)).getDecodedValue();
-        verify(response, atLeast(3)).getValue();
-    }
-
-    @AfterEach
-    public void testStop() {
-        consulSyncDataService.close();
-    }
-
     @Test
     public void testWatchConfigKeyValues() throws NoSuchMethodException, 
IllegalAccessException, NoSuchFieldException {
-        final Method watchConfigKeyValues = 
ConsulSyncDataService.class.getDeclaredMethod("watchConfigKeyValues");
+        final Method watchConfigKeyValues = 
ConsulSyncDataService.class.getDeclaredMethod("watchConfigKeyValues",
+                String.class, BiConsumer.class, Consumer.class);
         watchConfigKeyValues.setAccessible(true);
 
         final Field consul = 
ConsulSyncDataService.class.getDeclaredField("consulClient");
         consul.setAccessible(true);
         final ConsulClient consulClient = mock(ConsulClient.class);
         consul.set(consulSyncDataService, consulClient);
+
+        final Field declaredField = 
ConsulSyncDataService.class.getDeclaredField("consulConfig");
+        declaredField.setAccessible(true);
+        final ConsulConfig consulConfig = mock(ConsulConfig.class);
+        declaredField.set(consulSyncDataService, consulConfig);
+
+        final Field executorField = 
ConsulSyncDataService.class.getDeclaredField("executor");
+        executorField.setAccessible(true);
+        executorField.set(consulSyncDataService, 
mock(ScheduledThreadPoolExecutor.class));
+
         final Response<List<GetValue>> response = mock(Response.class);
 
         when(consulClient.getKVValues(any(), any(), 
any())).thenReturn(response);
 
-        Assertions.assertDoesNotThrow(() -> 
watchConfigKeyValues.invoke(consulSyncDataService));
+        BiConsumer<String, String> updateHandler = (changeData, decodedValue) 
-> {
+
+        };
+        Consumer<String> deleteHandler = removeKey -> {
+
+        };
+        String watchPathRoot = "/shenyu";
+        Assertions.assertDoesNotThrow(() -> 
watchConfigKeyValues.invoke(consulSyncDataService, watchPathRoot, 
updateHandler, deleteHandler));
 
         List<GetValue> getValues = new ArrayList<>(1);
         getValues.add(mock(GetValue.class));
         when(response.getValue()).thenReturn(getValues);
-        Assertions.assertDoesNotThrow(() -> 
watchConfigKeyValues.invoke(consulSyncDataService));
+        Assertions.assertDoesNotThrow(() -> 
watchConfigKeyValues.invoke(consulSyncDataService, watchPathRoot, 
updateHandler, deleteHandler));
 
         when(response.getConsulIndex()).thenReturn(2L);
-        Assertions.assertDoesNotThrow(() -> 
watchConfigKeyValues.invoke(consulSyncDataService));
+        Assertions.assertDoesNotThrow(() -> 
watchConfigKeyValues.invoke(consulSyncDataService, watchPathRoot, 
updateHandler, deleteHandler));
 
         when(response.getConsulIndex()).thenReturn(null);
-        Assertions.assertDoesNotThrow(() -> 
watchConfigKeyValues.invoke(consulSyncDataService));
+        Assertions.assertDoesNotThrow(() -> 
watchConfigKeyValues.invoke(consulSyncDataService, watchPathRoot, 
updateHandler, deleteHandler));
 
         when(response.getConsulIndex()).thenReturn(0L);
-        Assertions.assertDoesNotThrow(() -> 
watchConfigKeyValues.invoke(consulSyncDataService));
+        Assertions.assertDoesNotThrow(() -> 
watchConfigKeyValues.invoke(consulSyncDataService, watchPathRoot, 
updateHandler, deleteHandler));
 
         final Field consulIndexes = 
ConsulSyncDataService.class.getDeclaredField("consulIndexes");
         consulIndexes.setAccessible(true);
         final Map<String, Long> consulIndexesSource = (Map<String, Long>) 
consulIndexes.get(consulSyncDataService);
         consulIndexesSource.put("/null", null);
         when(response.getConsulIndex()).thenReturn(2L);
-        Assertions.assertDoesNotThrow(() -> 
watchConfigKeyValues.invoke(consulSyncDataService));
+        Assertions.assertDoesNotThrow(() -> 
watchConfigKeyValues.invoke(consulSyncDataService, watchPathRoot, 
updateHandler, deleteHandler));
     }
 }
diff --git 
a/shenyu-sync-data-center/shenyu-sync-data-consul/src/test/java/org/apache/shenyu/sync/data/consul/handler/ConsulCacheHandlerTest.java
 
b/shenyu-sync-data-center/shenyu-sync-data-consul/src/test/java/org/apache/shenyu/sync/data/consul/handler/ConsulCacheHandlerTest.java
deleted file mode 100644
index c1b5e4030e..0000000000
--- 
a/shenyu-sync-data-center/shenyu-sync-data-consul/src/test/java/org/apache/shenyu/sync/data/consul/handler/ConsulCacheHandlerTest.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shenyu.sync.data.consul.handler;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import org.apache.shenyu.common.dto.AppAuthData;
-import org.apache.shenyu.common.dto.MetaData;
-import org.apache.shenyu.common.dto.PluginData;
-import org.apache.shenyu.common.dto.RuleData;
-import org.apache.shenyu.common.dto.SelectorData;
-import org.apache.shenyu.common.utils.GsonUtils;
-import org.apache.shenyu.sync.data.api.AuthDataSubscriber;
-import org.apache.shenyu.sync.data.api.MetaDataSubscriber;
-import org.apache.shenyu.sync.data.api.PluginDataSubscriber;
-import org.junit.jupiter.api.Test;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-/**
- * test case for {@link ConsulCacheHandler}.
- */
-public final class ConsulCacheHandlerTest {
-
-    @Test
-    public void testUpdatePluginMap() {
-        String pluginName1 = "PLUGIN_NAME_1";
-        String pluginName2 = "PLUGIN_NAME_2";
-        PluginData pluginData1 =
-                
PluginData.builder().name(pluginName1).id("plugin_1").config("config_1").build();
-        PluginData pluginData2 =
-                
PluginData.builder().name(pluginName2).id("plugin_2").config("config_2").build();
-        String pluginData = GsonUtils.getInstance()
-                .toJson(ImmutableMap.of(pluginName2, pluginData2, pluginName1, 
pluginData1));
-
-        final List<PluginData> onSubscribeList = new ArrayList<>();
-        final List<PluginData> unsubscribeList = new ArrayList<>();
-        ConsulCacheHandler consulCacheHandler = new ConsulCacheHandler(new 
PluginDataSubscriber() {
-            @Override
-            public void onSubscribe(final PluginData pluginData) {
-                onSubscribeList.add(pluginData);
-            }
-
-            @Override
-            public void unSubscribe(final PluginData pluginData) {
-                unsubscribeList.add(pluginData);
-            }
-        }, Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList(), Collections.emptyList());
-        consulCacheHandler.updatePluginData(pluginData);
-        assertEquals(2, onSubscribeList.size());
-        assertEquals(2, unsubscribeList.size());
-    }
-
-    @Test
-    public void testUpdateSelectorMap() {
-        String selectorDataPluginName1 = "SELECTOR_DATA_1";
-        String selectorDataPluginName2 = "SELECTOR_DATA_2";
-        SelectorData selectorData1 =
-                SelectorData.builder()
-                        .pluginName(selectorDataPluginName1)
-                        .id("select_1")
-                        .name("SELECT_DATA_NAME_1")
-                        .build();
-        SelectorData selectorData2 =
-                SelectorData.builder()
-                        .pluginName(selectorDataPluginName2)
-                        .id("select_2")
-                        .name("SELECT_DATA_NAME_2")
-                        .build();
-
-        String selectorDataParam = GsonUtils.getInstance()
-                .toJson(ImmutableMap.of(selectorDataPluginName2, 
ImmutableList.of(selectorData2),
-                        selectorDataPluginName1, 
ImmutableList.of(selectorData1)));
-        final List<SelectorData> subscribeList = new ArrayList<>();
-        final List<SelectorData> unsubscribeList = new ArrayList<>();
-        ConsulCacheHandler consulCacheHandler = new ConsulCacheHandler(new 
PluginDataSubscriber() {
-            @Override
-            public void onSelectorSubscribe(final SelectorData selectorData) {
-                subscribeList.add(selectorData);
-            }
-
-            @Override
-            public void unSelectorSubscribe(final SelectorData selectorData) {
-                unsubscribeList.add(selectorData);
-            }
-        }, Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList(), Collections.emptyList());
-        consulCacheHandler.updateSelectorMap(selectorDataParam);
-        assertEquals(2, subscribeList.size());
-        assertEquals(2, unsubscribeList.size());
-    }
-
-    @Test
-    public void testUpdateRuleMap() {
-        String ruleDataId1 = "RULE_DATA_1";
-        String ruleDataId2 = "RULE_DATA_2";
-        String selectorId1 = "ID_1";
-        String selectorId2 = "ID_2";
-        RuleData ruleData1 = 
RuleData.builder().selectorId(selectorId1).id(ruleDataId1).build();
-        RuleData ruleData2 = 
RuleData.builder().selectorId(selectorId2).id(ruleDataId2).build();
-        String ruleDataParam = GsonUtils.getInstance()
-                .toJson(
-                        ImmutableMap.of(
-                                selectorId2,
-                                ImmutableList.of(ruleData2),
-                                selectorId1,
-                                ImmutableList.of(ruleData1)));
-        final List<RuleData> subscribeList = new ArrayList<>();
-        final List<RuleData> unsubscribeList = new ArrayList<>();
-        ConsulCacheHandler consulCacheHandler = new ConsulCacheHandler(new 
PluginDataSubscriber() {
-            @Override
-            public void onRuleSubscribe(final RuleData ruleData) {
-                subscribeList.add(ruleData);
-            }
-
-            @Override
-            public void unRuleSubscribe(final RuleData ruleData) {
-                unsubscribeList.add(ruleData);
-            }
-        }, Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList(), Collections.emptyList());
-        consulCacheHandler.updateRuleMap(ruleDataParam);
-        assertEquals(2, subscribeList.size());
-        assertEquals(2, unsubscribeList.size());
-    }
-
-    @Test
-    public void testUpdateMetaDataMap() {
-        String metadataPath1 = "METADATA_PATH_1";
-        String metadataPath2 = "METADATA_PATH_2";
-        MetaData metaData1 = 
MetaData.builder().path(metadataPath1).id("meta_1").build();
-        MetaData metaData2 = 
MetaData.builder().path(metadataPath2).id("meta_2").build();
-
-        String metaDataParam = GsonUtils.getInstance()
-                .toJson(ImmutableMap.of(metadataPath1, metaData1, 
metadataPath2, metaData2));
-        final List<MetaData> subscribeList = new ArrayList<>();
-        final List<MetaData> unsubscribeList = new ArrayList<>();
-        MetaDataSubscriber metaDataSubscriber = new MetaDataSubscriber() {
-            @Override
-            public void onSubscribe(final MetaData metaData) {
-                subscribeList.add(metaData);
-            }
-
-            @Override
-            public void unSubscribe(final MetaData metaData) {
-                unsubscribeList.add(metaData);
-            }
-        };
-        ConsulCacheHandler consulCacheHandler = new ConsulCacheHandler(null, 
Lists.newArrayList(metaDataSubscriber),
-                Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList());
-        consulCacheHandler.updateMetaDataMap(metaDataParam);
-        assertEquals(2, subscribeList.size());
-        assertEquals(2, unsubscribeList.size());
-    }
-
-    @Test
-    public void testUpdateAuthMap() {
-        String mockAppKey = "MOCK_APP_KEY";
-        String mockAppKey2 = "MOCK_APP_KEY2";
-        String mockAppSecret = "MOCK_APP_SECRET";
-        AppAuthData appAuthData =
-                
AppAuthData.builder().appKey(mockAppKey).appSecret(mockAppSecret).enabled(true).build();
-        AppAuthData appAuthData2 =
-                
AppAuthData.builder().appKey(mockAppKey2).appSecret(mockAppSecret).enabled(true).build();
-
-        String appAuthDataParam = GsonUtils.getInstance()
-                .toJson(ImmutableMap.of(mockAppKey2, appAuthData2, mockAppKey, 
appAuthData));
-        final List<AppAuthData> subscribeList = new ArrayList<>();
-        final List<AppAuthData> unsubscribeList = new ArrayList<>();
-
-        AuthDataSubscriber authDataSubscriber = new AuthDataSubscriber() {
-            @Override
-            public void onSubscribe(final AppAuthData appAuthData) {
-                subscribeList.add(appAuthData);
-            }
-
-            @Override
-            public void unSubscribe(final AppAuthData appAuthData) {
-                unsubscribeList.add(appAuthData);
-            }
-        };
-        ConsulCacheHandler consulCacheHandler = new ConsulCacheHandler(null,
-                Collections.emptyList(), 
Lists.newArrayList(authDataSubscriber), Collections.emptyList(), 
Collections.emptyList());
-
-        consulCacheHandler.updateAuthMap(appAuthDataParam);
-        assertEquals(2, subscribeList.size());
-        assertEquals(2, unsubscribeList.size());
-    }
-
-    @Test
-    public void testError() {
-        ConsulCacheHandler consulCacheHandler = new ConsulCacheHandler(null,
-                Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList(), Collections.emptyList());
-        assertDoesNotThrow(() -> 
consulCacheHandler.updateAuthMap("errorJson"));
-        assertDoesNotThrow(() -> 
consulCacheHandler.updateMetaDataMap("errorJson"));
-        assertDoesNotThrow(() -> 
consulCacheHandler.updateRuleMap("errorJson"));
-        assertDoesNotThrow(() -> 
consulCacheHandler.updatePluginData("errorJson"));
-        assertDoesNotThrow(() -> 
consulCacheHandler.updateSelectorMap("errorJson"));
-    }
-}
diff --git 
a/shenyu-sync-data-center/shenyu-sync-data-etcd/src/main/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataService.java
 
b/shenyu-sync-data-center/shenyu-sync-data-etcd/src/main/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataService.java
index 8abb1c4dc6..3a10aac6f7 100644
--- 
a/shenyu-sync-data-center/shenyu-sync-data-etcd/src/main/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataService.java
+++ 
b/shenyu-sync-data-center/shenyu-sync-data-etcd/src/main/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataService.java
@@ -17,70 +17,37 @@
 
 package org.apache.shenyu.sync.data.etcd;
 
-import com.google.common.base.Splitter;
-import com.google.common.collect.Lists;
-import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.ObjectUtils;
+import org.apache.shenyu.common.constant.Constants;
 import org.apache.shenyu.common.constant.DefaultPathConstants;
-import org.apache.shenyu.common.dto.AppAuthData;
-import org.apache.shenyu.common.dto.DiscoverySyncData;
-import org.apache.shenyu.common.dto.MetaData;
-import org.apache.shenyu.common.dto.PluginData;
-import org.apache.shenyu.common.dto.ProxySelectorData;
-import org.apache.shenyu.common.dto.RuleData;
-import org.apache.shenyu.common.dto.SelectorData;
-import org.apache.shenyu.common.enums.ConfigGroupEnum;
-import org.apache.shenyu.common.utils.GsonUtils;
 import org.apache.shenyu.sync.data.api.AuthDataSubscriber;
 import org.apache.shenyu.sync.data.api.DiscoveryUpstreamDataSubscriber;
 import org.apache.shenyu.sync.data.api.MetaDataSubscriber;
 import org.apache.shenyu.sync.data.api.PluginDataSubscriber;
 import org.apache.shenyu.sync.data.api.ProxySelectorDataSubscriber;
-import org.apache.shenyu.sync.data.api.SyncDataService;
+import org.apache.shenyu.sync.data.core.AbstractNodeDataSyncService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.UnsupportedEncodingException;
-import java.net.URLDecoder;
-import java.nio.charset.StandardCharsets;
-import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
-import java.util.Optional;
-import java.util.concurrent.ExecutionException;
 
 /**
  * Data synchronize of etcd.
  */
-public class EtcdSyncDataService implements SyncDataService {
+public class EtcdSyncDataService extends AbstractNodeDataSyncService {
 
-    /**
-     * logger.
-     */
     private static final Logger LOG = 
LoggerFactory.getLogger(EtcdSyncDataService.class);
 
-    private static final String PRE_FIX = "/shenyu";
-
     private final EtcdClient etcdClient;
 
-    private final PluginDataSubscriber pluginDataSubscriber;
-
-    private final List<MetaDataSubscriber> metaDataSubscribers;
-
-    private final List<AuthDataSubscriber> authDataSubscribers;
-
-    private final List<ProxySelectorDataSubscriber> 
proxySelectorDataSubscribers;
-
-    private final List<DiscoveryUpstreamDataSubscriber> 
discoveryUpstreamDataSubscribers;
-
     /**
      * Instantiates a new Zookeeper cache manager.
      *
-     * @param etcdClient                       the etcd client
-     * @param pluginDataSubscriber             the plugin data subscriber
-     * @param metaDataSubscribers              the meta data subscribers
-     * @param authDataSubscribers              the auth data subscribers
-     * @param proxySelectorDataSubscribers     the proxy selector data 
subscribers
-     * @param discoveryUpstreamDataSubscribers the discovery upstream data 
subscribers
+     * @param etcdClient           etcdClient
+     * @param pluginDataSubscriber the plugin data subscriber
+     * @param metaDataSubscribers  the meta data subscribers
+     * @param authDataSubscribers  the auth data subscribers
      */
     public EtcdSyncDataService(final EtcdClient etcdClient,
                                final PluginDataSubscriber pluginDataSubscriber,
@@ -88,310 +55,38 @@ public class EtcdSyncDataService implements 
SyncDataService {
                                final List<AuthDataSubscriber> 
authDataSubscribers,
                                final List<ProxySelectorDataSubscriber> 
proxySelectorDataSubscribers,
                                final List<DiscoveryUpstreamDataSubscriber> 
discoveryUpstreamDataSubscribers) {
+        super(pluginDataSubscriber, metaDataSubscribers, authDataSubscribers, 
proxySelectorDataSubscribers, discoveryUpstreamDataSubscribers);
         this.etcdClient = etcdClient;
-        this.pluginDataSubscriber = pluginDataSubscriber;
-        this.metaDataSubscribers = metaDataSubscribers;
-        this.authDataSubscribers = authDataSubscribers;
-        this.proxySelectorDataSubscribers = proxySelectorDataSubscribers;
-        this.discoveryUpstreamDataSubscribers = 
discoveryUpstreamDataSubscribers;
         watcherData();
-        watchAppAuth();
-        watchMetaData();
     }
 
     private void watcherData() {
-        final String pluginParent = DefaultPathConstants.PLUGIN_PARENT;
-        List<String> pluginChildren = etcdClientGetChildren(pluginParent);
-        for (String pluginName : pluginChildren) {
-            watcherAll(pluginName);
-        }
-        etcdClient.watchChildChange(pluginParent, (pluginPath, updateValue) -> 
{
-            if (!pluginPath.isEmpty()) {
-                final String pluginName = 
pluginPath.substring(pluginPath.lastIndexOf("/") + 1);
-                cachePluginData(updateValue);
-                subscribePluginDataChanges(pluginPath, pluginName);
-            }
-        }, null);
-    }
-
-    private void watcherAll(final String pluginName) {
-        watcherPlugin(pluginName);
-        watcherSelector(pluginName);
-        watcherRule(pluginName);
-    }
-
-    private void watcherPlugin(final String pluginName) {
-        String pluginPath = DefaultPathConstants.buildPluginPath(pluginName);
-        cachePluginData(etcdClient.get(pluginPath));
-        subscribePluginDataChanges(pluginPath, pluginName);
-    }
-
-    private void watcherSelector(final String pluginName) {
-        String selectorParentPath = 
DefaultPathConstants.buildSelectorParentPath(pluginName);
-        List<String> childrenList = etcdClientGetChildren(selectorParentPath);
-        if (CollectionUtils.isNotEmpty(childrenList)) {
-            childrenList.forEach(children -> {
-                String realPath = buildRealPath(selectorParentPath, children);
-                cacheSelectorData(etcdClient.get(realPath));
-                subscribeSelectorDataChanges(realPath);
-            });
-        }
-        subscribeChildChanges(ConfigGroupEnum.SELECTOR, selectorParentPath);
-    }
-
-    private void watcherRule(final String pluginName) {
-        String ruleParent = 
DefaultPathConstants.buildRuleParentPath(pluginName);
-        List<String> childrenList = etcdClientGetChildren(ruleParent);
-        if (CollectionUtils.isNotEmpty(childrenList)) {
-            childrenList.forEach(children -> {
-                String realPath = buildRealPath(ruleParent, children);
-                cacheRuleData(etcdClient.get(realPath));
-                subscribeRuleDataChanges(realPath);
-            });
-        }
-        subscribeChildChanges(ConfigGroupEnum.RULE, ruleParent);
-    }
-
-    private void watchAppAuth() {
-        final String appAuthParent = DefaultPathConstants.APP_AUTH_PARENT;
-        List<String> childrenList = etcdClientGetChildren(appAuthParent);
-        if (CollectionUtils.isNotEmpty(childrenList)) {
-            childrenList.forEach(children -> {
-                String realPath = buildRealPath(appAuthParent, children);
-                cacheAuthData(etcdClient.get(realPath));
-                subscribeAppAuthDataChanges(realPath);
-            });
-        }
-        subscribeChildChanges(ConfigGroupEnum.APP_AUTH, appAuthParent);
-    }
-
-    private void watchMetaData() {
-        final String metaDataPath = DefaultPathConstants.META_DATA;
-        List<String> childrenList = etcdClientGetChildren(metaDataPath);
-        if (CollectionUtils.isNotEmpty(childrenList)) {
-            childrenList.forEach(children -> {
-                String realPath = buildRealPath(metaDataPath, children);
-                cacheMetaData(etcdClient.get(realPath));
-                subscribeMetaDataChanges(realPath);
-            });
-        }
-        subscribeChildChanges(ConfigGroupEnum.META_DATA, metaDataPath);
-    }
-
-    private void subscribeChildChanges(final ConfigGroupEnum groupKey, final 
String groupParentPath) {
-        switch (groupKey) {
-            case SELECTOR:
-                etcdClient.watchChildChange(groupParentPath, (updatePath, 
updateValue) -> {
-                    cacheSelectorData(updateValue);
-                    subscribeSelectorDataChanges(updatePath);
-                }, null);
-                break;
-            case RULE:
-                etcdClient.watchChildChange(groupParentPath, (updatePath, 
updateValue) -> {
-                    cacheRuleData(updateValue);
-                    subscribeRuleDataChanges(updatePath);
-                }, null);
-                break;
-            case APP_AUTH:
-                etcdClient.watchChildChange(groupParentPath, (updatePath, 
updateValue) -> {
-                    cacheAuthData(updateValue);
-                    subscribeAppAuthDataChanges(updatePath);
-                }, null);
-                break;
-            case META_DATA:
-                etcdClient.watchChildChange(groupParentPath, (updatePath, 
updateValue) -> {
-                    cacheMetaData(updateValue);
-                    subscribeMetaDataChanges(updatePath);
-                }, null);
-                break;
-            case DISCOVER_UPSTREAM:
-                etcdClient.watchChildChange(groupParentPath, (updatePath, 
updateValue) -> {
-                    cacheDiscoveryUpstreamData(updateValue);
-                    subscribeDiscoveryUpstreamDataChanges(updatePath);
-                }, null);
-                break;
-            case PROXY_SELECTOR:
-                etcdClient.watchChildChange(groupParentPath, (updatePath, 
updateValue) -> {
-                    cacheProxySelectorData(updateValue);
-                    subscribeProxySelectorDataChanges(updatePath);
-                }, null);
-                break;
-            default:
-                throw new IllegalStateException("Unexpected groupKey: " + 
groupKey);
-        }
-    }
-
-    private void subscribePluginDataChanges(final String pluginPath, final 
String pluginName) {
-        etcdClient.watchDataChange(pluginPath, (updatePath, updateValue) -> {
-            final PluginData data = 
GsonUtils.getInstance().fromJson(updateValue, PluginData.class);
-            Optional.ofNullable(data)
-                    .ifPresent(d -> 
Optional.ofNullable(pluginDataSubscriber).ifPresent(e -> e.onSubscribe(d)));
-        }, deleteNode -> deletePlugin(pluginName));
-    }
-
-    private void deletePlugin(final String pluginName) {
-        final PluginData data = new PluginData();
-        data.setName(pluginName);
-        Optional.ofNullable(pluginDataSubscriber).ifPresent(e -> 
e.unSubscribe(data));
-    }
-
-    private void subscribeSelectorDataChanges(final String path) {
-        etcdClient.watchDataChange(path, (updateNode, updateValue) -> 
cacheSelectorData(updateValue),
-                this::unCacheSelectorData);
-    }
-
-    private void subscribeRuleDataChanges(final String path) {
-        etcdClient.watchDataChange(path, (updatePath, updateValue) -> 
cacheRuleData(updateValue),
-                this::unCacheRuleData);
-    }
-
-    private void subscribeAppAuthDataChanges(final String realPath) {
-        etcdClient.watchDataChange(realPath, (updatePath, updateValue) -> 
cacheAuthData(updateValue),
-                this::unCacheAuthData);
-    }
-
-    private void subscribeMetaDataChanges(final String realPath) {
-        etcdClient.watchDataChange(realPath, (updatePath, updateValue) -> 
cacheMetaData(updateValue),
-                this::deleteMetaData);
-    }
-
-    private void deleteMetaData(final String deletePath) {
-        final String path = 
deletePath.substring(DefaultPathConstants.META_DATA.length() + 1);
-        MetaData metaData = new MetaData();
-
-        try {
-            metaData.setPath(URLDecoder.decode(path, 
StandardCharsets.UTF_8.name()));
-            unCacheMetaData(metaData);
-            etcdClient.watchClose(path);
-        } catch (UnsupportedEncodingException e) {
-            LOG.error("delete meta data error.", e);
-        }
-    }
-
-    private void cachePluginData(final String dataString) {
-        final PluginData pluginData = 
GsonUtils.getInstance().fromJson(dataString, PluginData.class);
-        Optional.ofNullable(pluginData)
-                .flatMap(data -> 
Optional.ofNullable(pluginDataSubscriber)).ifPresent(e -> 
e.onSubscribe(pluginData));
-    }
-
-    private void cacheSelectorData(final String dataString) {
-        final SelectorData selectorData = 
GsonUtils.getInstance().fromJson(dataString, SelectorData.class);
-        Optional.ofNullable(selectorData)
-                .ifPresent(data -> 
Optional.ofNullable(pluginDataSubscriber).ifPresent(e -> 
e.onSelectorSubscribe(data)));
-    }
-
-    private void unCacheSelectorData(final String dataPath) {
-        SelectorData selectorData = new SelectorData();
-        final String selectorId = dataPath.substring(dataPath.lastIndexOf("/") 
+ 1);
-        final String str = 
dataPath.substring(DefaultPathConstants.SELECTOR_PARENT.length());
-        final String pluginName = str.substring(1, str.length() - 
selectorId.length() - 1);
-        selectorData.setPluginName(pluginName);
-        selectorData.setId(selectorId);
-        Optional.ofNullable(pluginDataSubscriber).ifPresent(e -> 
e.unSelectorSubscribe(selectorData));
-        etcdClient.watchClose(dataPath);
-    }
-
-    private void cacheRuleData(final String dataString) {
-        final RuleData ruleData = GsonUtils.getInstance().fromJson(dataString, 
RuleData.class);
-        Optional.ofNullable(ruleData)
-                .ifPresent(data -> 
Optional.ofNullable(pluginDataSubscriber).ifPresent(e -> 
e.onRuleSubscribe(data)));
-    }
-
-    private void unCacheRuleData(final String dataPath) {
-        String substring = dataPath.substring(dataPath.lastIndexOf("/") + 1);
-        final String str = 
dataPath.substring(DefaultPathConstants.RULE_PARENT.length());
-        final String pluginName = str.substring(1, str.length() - 
substring.length() - 1);
-        final List<String> list = 
Lists.newArrayList(Splitter.on(DefaultPathConstants.SELECTOR_JOIN_RULE).split(substring));
-
-        RuleData ruleData = new RuleData();
-        ruleData.setPluginName(pluginName);
-        ruleData.setSelectorId(list.get(0));
-        ruleData.setId(list.get(1));
-
-        Optional.ofNullable(pluginDataSubscriber).ifPresent(e -> 
e.unRuleSubscribe(ruleData));
-        etcdClient.watchClose(dataPath);
-    }
-
-    private void cacheAuthData(final String dataString) {
-        final AppAuthData appAuthData = 
GsonUtils.getInstance().fromJson(dataString, AppAuthData.class);
-        Optional.ofNullable(appAuthData)
-                .ifPresent(data -> authDataSubscribers.forEach(e -> 
e.onSubscribe(data)));
-    }
-
-    private void unCacheAuthData(final String dataPath) {
-        final String key = 
dataPath.substring(DefaultPathConstants.APP_AUTH_PARENT.length() + 1);
-        AppAuthData appAuthData = new AppAuthData();
-        appAuthData.setAppKey(key);
-        authDataSubscribers.forEach(e -> e.unSubscribe(appAuthData));
-        etcdClient.watchClose(dataPath);
-    }
-
-    private void cacheMetaData(final String dataString) {
-        final MetaData metaData = GsonUtils.getInstance().fromJson(dataString, 
MetaData.class);
-        Optional.ofNullable(metaData)
-                .ifPresent(data -> metaDataSubscribers.forEach(e -> 
e.onSubscribe(metaData)));
-    }
-
-    private void unCacheMetaData(final MetaData metaData) {
-        Optional.ofNullable(metaData)
-                .ifPresent(data -> metaDataSubscribers.forEach(e -> 
e.unSubscribe(metaData)));
-    }
-
-    private void cacheDiscoveryUpstreamData(final String dataString) {
-        final DiscoverySyncData discoveryUpstream = 
GsonUtils.getInstance().fromJson(dataString, DiscoverySyncData.class);
-        Optional.ofNullable(discoveryUpstream)
-                .ifPresent(data -> discoveryUpstreamDataSubscribers.forEach(e 
-> e.onSubscribe(data)));
-    }
-
-    private void unCacheDiscoveryUpstreamData(final String dataPath) {
-        DiscoverySyncData discoverySyncData = new DiscoverySyncData();
-        final String selectorId = dataPath.substring(dataPath.lastIndexOf("/") 
+ 1);
-        final String str = 
dataPath.substring(DefaultPathConstants.DISCOVERY_UPSTREAM.length());
-        final String pluginName = str.substring(1, str.length() - 
selectorId.length() - 1);
-        discoverySyncData.setPluginName(pluginName);
-        discoverySyncData.setSelectorId(selectorId);
-        discoveryUpstreamDataSubscribers.forEach(e -> 
e.unSubscribe(discoverySyncData));
-        etcdClient.watchClose(dataPath);
-    }
-
-    private void subscribeDiscoveryUpstreamDataChanges(final String realPath) {
-        etcdClient.watchDataChange(realPath, (updatePath, updateValue) -> 
cacheDiscoveryUpstreamData(updateValue),
-                this::unCacheDiscoveryUpstreamData);
-    }
-
-    private void cacheProxySelectorData(final String dataString) {
-        final ProxySelectorData proxySelectorData = 
GsonUtils.getInstance().fromJson(dataString, ProxySelectorData.class);
-        Optional.ofNullable(proxySelectorData)
-                .ifPresent(data -> proxySelectorDataSubscribers.forEach(e -> 
e.onSubscribe(data)));
-    }
-
-    private void unCacheProxySelectorData(final String dataPath) {
-        ProxySelectorData proxySelectorData = new ProxySelectorData();
-        final String selectorId = dataPath.substring(dataPath.lastIndexOf("/") 
+ 1);
-        final String str = 
dataPath.substring(DefaultPathConstants.PROXY_SELECTOR.length());
-        final String pluginName = str.substring(1, str.length() - 
selectorId.length() - 1);
-        proxySelectorData.setPluginName(pluginName);
-        proxySelectorData.setId(selectorId);
-        proxySelectorDataSubscribers.forEach(e -> 
e.unSubscribe(proxySelectorData));
-        etcdClient.watchClose(dataPath);
-    }
-
-    private void subscribeProxySelectorDataChanges(final String realPath) {
-        etcdClient.watchDataChange(realPath, (updatePath, updateValue) -> 
cacheProxySelectorData(updateValue),
-                this::unCacheProxySelectorData);
-    }
-
-    private String buildRealPath(final String parent, final String children) {
-        return String.join("/", parent, children);
-    }
-
-    private List<String> etcdClientGetChildren(final String parent) {
+        watcherData0(DefaultPathConstants.PLUGIN_PARENT);
+        watcherData0(DefaultPathConstants.SELECTOR_PARENT);
+        watcherData0(DefaultPathConstants.RULE_PARENT);
+        watcherData0(DefaultPathConstants.PROXY_SELECTOR);
+        watcherData0(DefaultPathConstants.DISCOVERY_UPSTREAM);
+        watcherData0(DefaultPathConstants.APP_AUTH_PARENT);
+        watcherData0(DefaultPathConstants.META_DATA);
+    }
+
+    private void watcherData0(final String registerPath) {
+        etcdClient.watchChildChange(
+            registerPath,
+            (updatePath, updateValue) -> super.event(updatePath, updateValue, 
registerPath, EventType.PUT),
+            deletePath -> super.event(deletePath, null, registerPath, 
EventType.DELETE));
         try {
-            return etcdClient.getChildrenKeys(parent, "/");
-        } catch (ExecutionException | InterruptedException e) {
+            // load all key
+            final List<String> childrenKeys = 
etcdClient.getChildrenKeys(registerPath, "/");
+            if (!ObjectUtils.isEmpty(childrenKeys)) {
+                childrenKeys.forEach(nodePath -> {
+                    final String nodeData = 
etcdClient.get(String.join(Constants.PATH_SEPARATOR, registerPath, nodePath));
+                    super.event(nodePath, nodeData, registerPath, 
EventType.PUT);
+                });
+            }
+        } catch (Exception e) {
             LOG.error(e.getMessage(), e);
         }
-        return Collections.emptyList();
     }
 
     @Override
@@ -400,4 +95,5 @@ public class EtcdSyncDataService implements SyncDataService {
             etcdClient.close();
         }
     }
+
 }
diff --git 
a/shenyu-sync-data-center/shenyu-sync-data-etcd/src/test/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataServiceTest.java
 
b/shenyu-sync-data-center/shenyu-sync-data-etcd/src/test/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataServiceTest.java
index 6870e60809..3d6eeb1c72 100644
--- 
a/shenyu-sync-data-center/shenyu-sync-data-etcd/src/test/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataServiceTest.java
+++ 
b/shenyu-sync-data-center/shenyu-sync-data-etcd/src/test/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataServiceTest.java
@@ -17,41 +17,20 @@
 
 package org.apache.shenyu.sync.data.etcd;
 
-import io.etcd.jetcd.Client;
-import io.etcd.jetcd.Watch;
-import org.apache.shenyu.common.constant.DefaultPathConstants;
-import org.apache.shenyu.common.dto.AppAuthData;
-import org.apache.shenyu.common.dto.MetaData;
-import org.apache.shenyu.common.dto.PluginData;
-import org.apache.shenyu.common.enums.ConfigGroupEnum;
-import org.apache.shenyu.common.utils.GsonUtils;
 import org.apache.shenyu.sync.data.api.AuthDataSubscriber;
+import org.apache.shenyu.sync.data.api.DiscoveryUpstreamDataSubscriber;
 import org.apache.shenyu.sync.data.api.MetaDataSubscriber;
 import org.apache.shenyu.sync.data.api.PluginDataSubscriber;
-import org.junit.jupiter.api.BeforeEach;
+import org.apache.shenyu.sync.data.api.ProxySelectorDataSubscriber;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
 
-import java.lang.reflect.Field;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.core.Is.is;
-import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -61,221 +40,29 @@ import static org.mockito.Mockito.when;
  */
 @ExtendWith(MockitoExtension.class)
 public class EtcdSyncDataServiceTest {
-    
-    private static final String MOCK_PLUGIN_PATH = "/shenyu/plugin/divide";
-
-    private static final String MOCK_PLUGIN_NAME = "divide";
-
-    private EtcdSyncDataService etcdSyncDataService;
-
-    private PluginData pluginData;
-
-    @Mock
-    private EtcdClient etcdClient;
-
-    @Mock
-    private Client client;
-
-    @Mock
-    private Watch.Watcher watcher;
-
-    @BeforeEach
-    public void setUp() {
-        pluginData = 
PluginData.builder().name(MOCK_PLUGIN_NAME).enabled(Boolean.FALSE).build();
-    }
-
-    @Test
-    public void testWatchPluginWhenInit() throws ExecutionException, 
InterruptedException {
-        final List<PluginData> subscribeList = new ArrayList<>(1);
-        when(etcdClient.getChildrenKeys(anyString(), 
anyString())).thenReturn(Collections.singletonList("sign"));
-        
when(etcdClient.get(anyString())).thenReturn(GsonUtils.getInstance().toJson(pluginData));
-        etcdSyncDataService = new EtcdSyncDataService(etcdClient, new 
PluginDataSubscriber() {
-            @Override
-            public void onSubscribe(final PluginData pluginData) {
-                subscribeList.add(pluginData);
-            }
-        }, Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList(), Collections.emptyList());
-        assertThat(subscribeList.size(), is(1));
-        assertThat(subscribeList.get(0).getName(), is("divide"));
-    }
-
-    @Test
-    public void deletePluginTest() throws NoSuchMethodException, 
InvocationTargetException, IllegalAccessException {
-        final List<PluginData> subscribeList = new ArrayList<>(1);
-        etcdSyncDataService = new EtcdSyncDataService(etcdClient, new 
PluginDataSubscriber() {
-            @Override
-            public void onSubscribe(final PluginData pluginData) {
-                subscribeList.add(pluginData);
-            }
-
-            @Override
-            public void unSubscribe(final PluginData pluginData) {
-                final PluginData pluginDataDel = subscribeList.stream()
-                        .filter(pluginDataSource -> 
pluginDataSource.getName().equals(pluginData.getName()))
-                        .findFirst().orElse(null);
-                subscribeList.remove(pluginDataDel);
-            }
-        }, Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList(), Collections.emptyList());
-        subscribeList.clear();
-        final Method deletePlugin = 
EtcdSyncDataService.class.getDeclaredMethod("deletePlugin", String.class);
-        final Method cachePluginData = 
EtcdSyncDataService.class.getDeclaredMethod("cachePluginData", String.class);
-        deletePlugin.setAccessible(true);
-        cachePluginData.setAccessible(true);
-        final PluginData pluginData = new PluginData();
-        pluginData.setName("pluginName");
-        cachePluginData.invoke(etcdSyncDataService, 
GsonUtils.getInstance().toJson(pluginData));
-        assertEquals(subscribeList.get(0).getName(), "pluginName");
-        deletePlugin.invoke(etcdSyncDataService, "pluginName");
-        assertTrue(subscribeList.isEmpty());
-    }
-
-    @Test
-    public void deleteMetaDataTest() throws NoSuchMethodException, 
InvocationTargetException, IllegalAccessException {
-        final List<MetaData> subscribeList = new ArrayList<>(1);
-        etcdSyncDataService = new EtcdSyncDataService(etcdClient, 
mock(PluginDataSubscriber.class),
-                Collections.singletonList(new MetaDataSubscriber() {
-                    @Override
-                    public void onSubscribe(final MetaData metaData) {
-                        subscribeList.add(metaData);
-                    }
-
-                    @Override
-                    public void unSubscribe(final MetaData metaData) {
-                        final MetaData metaDataDel = subscribeList.stream()
-                                .filter(metaDataSource -> 
metaDataSource.getId().equals(metaData.getId()))
-                                .findFirst().orElse(null);
-                        subscribeList.remove(metaDataDel);
-                    }
-                }), Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList());
-        subscribeList.clear();
-        final Method cacheMetaData = 
EtcdSyncDataService.class.getDeclaredMethod("cacheMetaData", String.class);
-        final Method deleteMetaData = 
EtcdSyncDataService.class.getDeclaredMethod("unCacheMetaData", MetaData.class);
-        cacheMetaData.setAccessible(true);
-        deleteMetaData.setAccessible(true);
-        MetaData metaData = new MetaData();
-        metaData.setId("metaDataId");
-        cacheMetaData.invoke(etcdSyncDataService, 
GsonUtils.getInstance().toJson(metaData));
-        assertEquals(subscribeList.get(0).getId(), metaData.getId());
-        deleteMetaData.invoke(etcdSyncDataService, metaData);
-        assertTrue(subscribeList.isEmpty());
-    }
-
-    @Test
-    public void authDataTest() throws NoSuchMethodException, 
InvocationTargetException, IllegalAccessException {
-        final List<AppAuthData> subscribeList = new ArrayList<>(1);
-        etcdSyncDataService = new EtcdSyncDataService(etcdClient, 
mock(PluginDataSubscriber.class), Collections.emptyList(),
-                Collections.singletonList(new AuthDataSubscriber() {
-                    @Override
-                    public void onSubscribe(final AppAuthData metaData) {
-                        subscribeList.add(metaData);
-                    }
-
-                    @Override
-                    public void unSubscribe(final AppAuthData appAuthData) {
-                        final AppAuthData appAuthDataOld = 
subscribeList.stream()
-                                .filter(appAuthDataSource -> 
appAuthDataSource.getAppKey().equals(appAuthData.getAppKey()))
-                                .findFirst().orElse(null);
-                        subscribeList.remove(appAuthDataOld);
-                    }
-                }), Collections.emptyList(), Collections.emptyList());
-        subscribeList.clear();
-        final Method cacheAuthData = 
EtcdSyncDataService.class.getDeclaredMethod("cacheAuthData", String.class);
-        final Method unCacheAuthData = 
EtcdSyncDataService.class.getDeclaredMethod("unCacheAuthData", String.class);
-        cacheAuthData.setAccessible(true);
-        unCacheAuthData.setAccessible(true);
-        AppAuthData appAuthData = new AppAuthData();
-        appAuthData.setAppKey("appKeyValue");
-        cacheAuthData.invoke(etcdSyncDataService, 
GsonUtils.getInstance().toJson(appAuthData));
-        assertEquals(subscribeList.get(0).getAppKey(), 
appAuthData.getAppKey());
-        unCacheAuthData.invoke(etcdSyncDataService, String.join("/", 
DefaultPathConstants.APP_AUTH_PARENT, appAuthData.getAppKey()));
-        assertTrue(subscribeList.isEmpty());
-    }
 
     @Test
-    public void closeTest() {
-        etcdSyncDataService = new EtcdSyncDataService(etcdClient, 
mock(PluginDataSubscriber.class), Collections.emptyList(),
-                Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList());
-        assertDoesNotThrow(() -> etcdSyncDataService.close());
-    }
-
-    @Test
-    public void ruleTest() throws NoSuchMethodException, 
InvocationTargetException, IllegalAccessException, NoSuchFieldException {
-        this.commonTest(ConfigGroupEnum.RULE, 
"/shenyu/rule/divide/selectorId-ruleId");
-    }
-
-    @Test
-    public void selectorTest() throws NoSuchMethodException, 
InvocationTargetException, IllegalAccessException, NoSuchFieldException {
-        this.commonTest(ConfigGroupEnum.SELECTOR, 
"/shenyu/selector/divide/selectorId");
-    }
-
-    @Test
-    public void metaDataTest() throws NoSuchMethodException, 
InvocationTargetException, IllegalAccessException, NoSuchFieldException {
-        this.commonTest(ConfigGroupEnum.META_DATA, 
"/shenyu/metaData/divide/metaDataId");
-    }
-
-    @Test
-    public void discoverUpstreamTest() throws NoSuchMethodException, 
InvocationTargetException, IllegalAccessException, NoSuchFieldException {
-        this.commonTest(ConfigGroupEnum.DISCOVER_UPSTREAM, 
"/shenyu/discoveryUpstream/divide/id");
-    }
-
-    @Test
-    public void proxySelectorTest() throws NoSuchMethodException, 
InvocationTargetException, IllegalAccessException, NoSuchFieldException {
-        this.commonTest(ConfigGroupEnum.PROXY_SELECTOR, 
"/shenyu/proxySelectorData/divide/id");
-    }
-
-    @Test
-    public void appAuthTest() throws NoSuchMethodException, 
InvocationTargetException, IllegalAccessException, NoSuchFieldException {
-        this.commonTest(ConfigGroupEnum.APP_AUTH, 
"/shenyu/appAuth/divide/appAuthId");
-    }
-
-    /**
-    * commonTest.
-    * @param configGroupEnum configGroupEnum
-    * @param key key
-    * @throws NoSuchMethodException NoSuchMethodException
-    * @throws InvocationTargetException InvocationTargetException
-    * @throws IllegalAccessException IllegalAccessException
-    * @throws NoSuchFieldException NoSuchFieldException
-    */
-    public void commonTest(final ConfigGroupEnum configGroupEnum, final String 
key) throws NoSuchMethodException, InvocationTargetException, 
IllegalAccessException, NoSuchFieldException {
-        final EtcdClient mockEtcdClient = mock(EtcdClient.class);
-        etcdSyncDataService = new EtcdSyncDataService(etcdClient,
-                mock(PluginDataSubscriber.class),
-                Collections.emptyList(),
-                Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList());
-        final Field etcdClient = 
EtcdSyncDataService.class.getDeclaredField("etcdClient");
-        etcdClient.setAccessible(true);
-        etcdClient.set(etcdSyncDataService, mockEtcdClient);
-        final Method subscribeChildChanges = 
EtcdSyncDataService.class.getDeclaredMethod("subscribeChildChanges", 
ConfigGroupEnum.class, String.class);
-        subscribeChildChanges.setAccessible(true);
-
-        final List<BiConsumer<String, String>> biConsumers = new 
ArrayList<>(4);
-        doAnswer(invocation -> {
-            biConsumers.add(invocation.getArgument(1));
-            return true;
-        }).when(mockEtcdClient).watchChildChange(anyString(), any(), any());
-
-        final List<BiConsumer<String, String>> dataBiConsumers = new 
ArrayList<>(4);
-        final List<Consumer<String>> dataConsumers = new ArrayList<>(4);
-        doAnswer(invocation -> {
-            dataBiConsumers.add(invocation.getArgument(1));
-            dataConsumers.add(invocation.getArgument(2));
-            return true;
-        }).when(mockEtcdClient).watchDataChange(anyString(), any(), any());
-
-        subscribeChildChanges.invoke(etcdSyncDataService, configGroupEnum, 
"groupParentPath");
-        // hit default
-        assertThrows(InvocationTargetException.class, () -> 
subscribeChildChanges.invoke(etcdSyncDataService, ConfigGroupEnum.PLUGIN, 
"groupParentPath"));
-
-        biConsumers.forEach(biConsumer -> {
-            biConsumer.accept("updatePath", "{}");
-        });
-        dataConsumers.forEach(consumer -> {
-            consumer.accept(key);
-        });
-        dataBiConsumers.forEach(biConsumer -> {
-            biConsumer.accept("updatePath", "{}");
-        });
-        mockEtcdClient.close();
+    public void testZookeeperInstanceRegisterRepository() throws Exception {
+
+        EtcdClient etcdClient = mock(EtcdClient.class);
+        PluginDataSubscriber pluginDataSubscriber = 
mock(PluginDataSubscriber.class);
+        doAnswer(invocationOnMock -> {
+            BiConsumer<String, String> updateHandler = 
invocationOnMock.getArgument(1);
+            updateHandler.accept("updateData", "{}");
+            Consumer<String> deleteHandler = invocationOnMock.getArgument(2);
+            String pluginName = invocationOnMock.getArgument(0);
+            deleteHandler.accept(pluginName + "removeKey");
+            return null;
+        }).when(etcdClient).watchChildChange(any(), any(), any());
+        when(etcdClient.getChildrenKeys(any(), 
any())).thenReturn(Collections.singletonList("test"));
+        final AuthDataSubscriber authDataSubscriber = 
mock(AuthDataSubscriber.class);
+        final MetaDataSubscriber metaDataSubscriber = 
mock(MetaDataSubscriber.class);
+        final ProxySelectorDataSubscriber proxySelectorDataSubscriber = 
mock(ProxySelectorDataSubscriber.class);
+        final DiscoveryUpstreamDataSubscriber discoveryUpstreamDataSubscriber 
= mock(DiscoveryUpstreamDataSubscriber.class);
+        final EtcdSyncDataService zookeeperSyncDataService = new 
EtcdSyncDataService(etcdClient,
+                pluginDataSubscriber, 
Collections.singletonList(metaDataSubscriber), 
Collections.singletonList(authDataSubscriber),
+                Collections.singletonList(proxySelectorDataSubscriber), 
Collections.singletonList(discoveryUpstreamDataSubscriber));
+
+        zookeeperSyncDataService.close();
     }
 }
diff --git 
a/shenyu-sync-data-center/shenyu-sync-data-zookeeper/src/main/java/org/apache/shenyu/sync/data/zookeeper/ZookeeperSyncDataService.java
 
b/shenyu-sync-data-center/shenyu-sync-data-zookeeper/src/main/java/org/apache/shenyu/sync/data/zookeeper/ZookeeperSyncDataService.java
index 874a4d4c42..c87a2aebc8 100644
--- 
a/shenyu-sync-data-center/shenyu-sync-data-zookeeper/src/main/java/org/apache/shenyu/sync/data/zookeeper/ZookeeperSyncDataService.java
+++ 
b/shenyu-sync-data-center/shenyu-sync-data-zookeeper/src/main/java/org/apache/shenyu/sync/data/zookeeper/ZookeeperSyncDataService.java
@@ -17,54 +17,28 @@
 
 package org.apache.shenyu.sync.data.zookeeper;
 
-import com.google.common.base.Splitter;
 import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.recipes.cache.ChildData;
 import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
-import org.apache.curator.framework.recipes.cache.TreeCacheListener;
 import org.apache.shenyu.common.constant.DefaultPathConstants;
-import org.apache.shenyu.common.dto.AppAuthData;
-import org.apache.shenyu.common.dto.DiscoverySyncData;
-import org.apache.shenyu.common.dto.PluginData;
-import org.apache.shenyu.common.dto.RuleData;
-import org.apache.shenyu.common.dto.SelectorData;
-import org.apache.shenyu.common.dto.MetaData;
-import org.apache.shenyu.common.dto.ProxySelectorData;
-import org.apache.shenyu.common.exception.ShenyuException;
-import org.apache.shenyu.common.utils.GsonUtils;
 import org.apache.shenyu.sync.data.api.AuthDataSubscriber;
 import org.apache.shenyu.sync.data.api.DiscoveryUpstreamDataSubscriber;
 import org.apache.shenyu.sync.data.api.MetaDataSubscriber;
 import org.apache.shenyu.sync.data.api.PluginDataSubscriber;
 import org.apache.shenyu.sync.data.api.ProxySelectorDataSubscriber;
-import org.apache.shenyu.sync.data.api.SyncDataService;
+import org.apache.shenyu.sync.data.core.AbstractNodeDataSyncService;
 
-import java.io.UnsupportedEncodingException;
-import java.net.URLDecoder;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Objects;
-import java.util.Optional;
 
 /**
  * this cache data with zookeeper.
  */
-public class ZookeeperSyncDataService implements SyncDataService {
+public class ZookeeperSyncDataService extends AbstractNodeDataSyncService {
 
     private final ZookeeperClient zkClient;
 
-    private final PluginDataSubscriber pluginDataSubscriber;
-
-    private final List<MetaDataSubscriber> metaDataSubscribers;
-
-    private final List<AuthDataSubscriber> authDataSubscribers;
-
-    private final List<ProxySelectorDataSubscriber> 
proxySelectorDataSubscribers;
-
-    private final List<DiscoveryUpstreamDataSubscriber> 
discoveryUpstreamDataSubscribers;
-
     /**
      * Instantiates a new Zookeeper cache manager.
      *
@@ -79,134 +53,24 @@ public class ZookeeperSyncDataService implements 
SyncDataService {
                                     final List<AuthDataSubscriber> 
authDataSubscribers,
                                     final List<ProxySelectorDataSubscriber> 
proxySelectorDataSubscribers,
                                     final 
List<DiscoveryUpstreamDataSubscriber> discoveryUpstreamDataSubscribers) {
+        super(pluginDataSubscriber, metaDataSubscribers, authDataSubscribers, 
proxySelectorDataSubscribers, discoveryUpstreamDataSubscribers);
         this.zkClient = zkClient;
-        this.pluginDataSubscriber = pluginDataSubscriber;
-        this.metaDataSubscribers = metaDataSubscribers;
-        this.authDataSubscribers = authDataSubscribers;
-        this.proxySelectorDataSubscribers = proxySelectorDataSubscribers;
-        this.discoveryUpstreamDataSubscribers = 
discoveryUpstreamDataSubscribers;
         watcherData();
-        watchAppAuth();
-        watchMetaData();
     }
 
     private void watcherData() {
-        zkClient.addCache(DefaultPathConstants.PLUGIN_PARENT, new 
PluginCacheListener());
-        zkClient.addCache(DefaultPathConstants.SELECTOR_PARENT, new 
SelectorCacheListener());
-        zkClient.addCache(DefaultPathConstants.RULE_PARENT, new 
RuleCacheListener());
-        zkClient.addCache(DefaultPathConstants.PROXY_SELECTOR, new 
ProxySelectorCacheListener());
-        zkClient.addCache(DefaultPathConstants.DISCOVERY_UPSTREAM, new 
DiscoveryUpstreamCacheListener());
-    }
-
-    private void watchAppAuth() {
-        zkClient.addCache(DefaultPathConstants.APP_AUTH_PARENT, new 
AuthCacheListener());
-    }
-
-    private void watchMetaData() {
-        zkClient.addCache(DefaultPathConstants.META_DATA, new 
MetadataCacheListener());
-    }
-
-    private void cachePluginData(final PluginData pluginData) {
-        Optional.ofNullable(pluginData)
-                .flatMap(data -> Optional.ofNullable(pluginDataSubscriber))
-                .ifPresent(e -> e.onSubscribe(pluginData));
-    }
-
-    private void cacheSelectorData(final SelectorData selectorData) {
-        Optional.ofNullable(selectorData)
-                .ifPresent(data -> Optional.ofNullable(pluginDataSubscriber)
-                        .ifPresent(e -> e.onSelectorSubscribe(data)));
-    }
-
-    private void unCacheSelectorData(final String dataPath) {
-        SelectorData selectorData = new SelectorData();
-        final String selectorId = dataPath.substring(dataPath.lastIndexOf("/") 
+ 1);
-        final String str = 
dataPath.substring(DefaultPathConstants.SELECTOR_PARENT.length());
-        final int pluginNameIndex = str.length() - selectorId.length() - 1;
-        if (pluginNameIndex <= 0) {
-            return;
-        }
-        final String pluginName = str.substring(1, pluginNameIndex);
-        selectorData.setPluginName(pluginName);
-        selectorData.setId(selectorId);
-
-        Optional.ofNullable(pluginDataSubscriber)
-                .ifPresent(e -> e.unSelectorSubscribe(selectorData));
-    }
-
-    private void cacheRuleData(final RuleData ruleData) {
-        Optional.ofNullable(ruleData)
-                .ifPresent(data -> Optional.ofNullable(pluginDataSubscriber)
-                        .ifPresent(e -> e.onRuleSubscribe(data)));
-    }
-
-    private void unCacheRuleData(final String dataPath) {
-        String ruleDataId = dataPath.substring(dataPath.lastIndexOf("/") + 1);
-        final String str = 
dataPath.substring(DefaultPathConstants.RULE_PARENT.length());
-        final int pluginNameIndex = str.length() - ruleDataId.length() - 1;
-        if (pluginNameIndex <= 0) {
-            return;
-        }
-        final String pluginName = str.substring(1, pluginNameIndex);
-        final List<String> list = 
Lists.newArrayList(Splitter.on(DefaultPathConstants.SELECTOR_JOIN_RULE).split(ruleDataId));
-
-        RuleData ruleData = new RuleData();
-        ruleData.setPluginName(pluginName);
-        ruleData.setSelectorId(list.get(0));
-        ruleData.setId(list.get(1));
-
-        Optional.ofNullable(pluginDataSubscriber)
-                .ifPresent(e -> e.unRuleSubscribe(ruleData));
-    }
-
-    private void cacheAuthData(final AppAuthData appAuthData) {
-        Optional.ofNullable(appAuthData)
-                .ifPresent(data -> authDataSubscribers.forEach(e -> 
e.onSubscribe(data)));
-    }
-
-    private void unCacheAuthData(final String dataPath) {
-        final String key = 
dataPath.substring(DefaultPathConstants.APP_AUTH_PARENT.length() + 1);
-        AppAuthData appAuthData = new AppAuthData();
-        appAuthData.setAppKey(key);
-        authDataSubscribers.forEach(e -> e.unSubscribe(appAuthData));
-    }
-
-    private void cacheMetaData(final MetaData metaData) {
-        Optional.ofNullable(metaData)
-                .ifPresent(data -> metaDataSubscribers.forEach(e -> 
e.onSubscribe(metaData)));
-    }
-
-    private void cacheProxySelectorData(final ProxySelectorData 
proxySelectorData) {
-        Optional.ofNullable(proxySelectorData)
-                .ifPresent(data -> proxySelectorDataSubscribers.forEach(e -> 
e.onSubscribe(proxySelectorData)));
-    }
-
-    private void cacheDiscoveryUpstreamData(final DiscoverySyncData 
upstreamDataList) {
-        Optional.ofNullable(discoveryUpstreamDataSubscribers)
-                .ifPresent(data -> discoveryUpstreamDataSubscribers.forEach(e 
-> e.onSubscribe(upstreamDataList)));
-    }
-
-    private void unCacheMetaData(final MetaData metaData) {
-        Optional.ofNullable(metaData)
-                .ifPresent(data -> metaDataSubscribers.forEach(e -> 
e.unSubscribe(metaData)));
-    }
-
-    private void unCacheProxySelectorData(final ProxySelectorData 
proxySelectorData) {
-        Optional.ofNullable(proxySelectorData)
-                .ifPresent(data -> proxySelectorDataSubscribers.forEach(e -> 
e.unSubscribe(proxySelectorData)));
-    }
-
-    @Override
-    public void close() {
-        if (Objects.nonNull(zkClient)) {
-            zkClient.close();
-        }
-    }
-
-    abstract static class AbstractDataSyncListener implements 
TreeCacheListener {
-        @Override
-        public final void childEvent(final CuratorFramework client, final 
TreeCacheEvent event) {
-            ChildData childData = event.getData();
+        watcherData0(DefaultPathConstants.PLUGIN_PARENT);
+        watcherData0(DefaultPathConstants.SELECTOR_PARENT);
+        watcherData0(DefaultPathConstants.RULE_PARENT);
+        watcherData0(DefaultPathConstants.PROXY_SELECTOR);
+        watcherData0(DefaultPathConstants.DISCOVERY_UPSTREAM);
+        watcherData0(DefaultPathConstants.APP_AUTH_PARENT);
+        watcherData0(DefaultPathConstants.META_DATA);
+    }
+
+    private void watcherData0(final String registerPath) {
+        zkClient.addCache(registerPath, (curatorFramework, treeCacheEvent) -> {
+            ChildData childData = treeCacheEvent.getData();
             if (null == childData) {
                 return;
             }
@@ -214,181 +78,21 @@ public class ZookeeperSyncDataService implements 
SyncDataService {
             if (Strings.isNullOrEmpty(path)) {
                 return;
             }
-            event(event.getType(), path, childData);
-        }
-
-        /**
-         * data sync event.
-         *
-         * @param type tree cache event type.
-         * @param path tree cache event path.
-         * @param data tree cache event data.
-         */
-        protected abstract void event(TreeCacheEvent.Type type, String path, 
ChildData data);
-    }
-
-    class PluginCacheListener extends AbstractDataSyncListener {
-
-        @Override
-        public void event(final TreeCacheEvent.Type type, final String path, 
final ChildData data) {
-            // if not uri register path, return.
-            if (!path.contains(DefaultPathConstants.PLUGIN_PARENT)) {
-                return;
-            }
-
-            String pluginName = path.substring(path.lastIndexOf("/") + 1);
-
-            // delete a plugin
-            if (type.equals(TreeCacheEvent.Type.NODE_REMOVED)) {
-                final PluginData pluginData = new PluginData();
-                pluginData.setName(pluginName);
-                Optional.ofNullable(pluginDataSubscriber).ifPresent(e -> 
e.unSubscribe(pluginData));
-                return;
-            }
-
-            // create or update
-            Optional.ofNullable(data)
-                    .ifPresent(e -> 
cachePluginData(GsonUtils.getInstance().fromJson(new String(data.getData(), 
StandardCharsets.UTF_8), PluginData.class)));
-        }
-    }
-
-    class SelectorCacheListener extends AbstractDataSyncListener {
-
-        @Override
-        public void event(final TreeCacheEvent.Type type, final String path, 
final ChildData data) {
-
-            // if not uri register path, return.
-            if (!path.contains(DefaultPathConstants.SELECTOR_PARENT)) {
-                return;
-            }
-
-            if (type.equals(TreeCacheEvent.Type.NODE_REMOVED)) {
-                unCacheSelectorData(path);
-                return;
-            }
-
-            // create or update
-            Optional.ofNullable(data)
-                    .ifPresent(e -> 
cacheSelectorData(GsonUtils.getInstance().fromJson(new String(data.getData(), 
StandardCharsets.UTF_8), SelectorData.class)));
-        }
-    }
-
-    class MetadataCacheListener extends AbstractDataSyncListener {
-
-        @Override
-        public void event(final TreeCacheEvent.Type type, final String path, 
final ChildData data) {
-            // if not uri register path, return.
-            if (!path.contains(DefaultPathConstants.META_DATA)) {
-                return;
-            }
-
-            if (type.equals(TreeCacheEvent.Type.NODE_REMOVED)) {
-                final String realPath = 
path.substring(DefaultPathConstants.META_DATA.length() + 1);
-                MetaData metaData = new MetaData();
-                try {
-                    metaData.setPath(URLDecoder.decode(realPath, 
StandardCharsets.UTF_8.name()));
-                } catch (UnsupportedEncodingException e) {
-                    throw new ShenyuException(e);
-                }
-                unCacheMetaData(metaData);
-                return;
-            }
-
-            // create or update
-            Optional.ofNullable(data)
-                    .ifPresent(e -> 
cacheMetaData(GsonUtils.getInstance().fromJson(new String(data.getData(), 
StandardCharsets.UTF_8), MetaData.class)));
-        }
-    }
-
-    class AuthCacheListener extends AbstractDataSyncListener {
-
-        @Override
-        public void event(final TreeCacheEvent.Type type, final String path, 
final ChildData data) {
-            // if not uri register path, return.
-            if (!path.contains(DefaultPathConstants.APP_AUTH_PARENT)) {
-                return;
-            }
-
-            if (type.equals(TreeCacheEvent.Type.NODE_REMOVED)) {
-                unCacheAuthData(path);
-                return;
-            }
-
-            // create or update
-            Optional.ofNullable(data)
-                    .ifPresent(e -> 
cacheAuthData(GsonUtils.getInstance().fromJson(new String(data.getData(), 
StandardCharsets.UTF_8), AppAuthData.class)));
-        }
-    }
-
-    class RuleCacheListener extends AbstractDataSyncListener {
-
-        @Override
-        public void event(final TreeCacheEvent.Type type, final String path, 
final ChildData data) {
             // if not uri register path, return.
-            if (!path.contains(DefaultPathConstants.RULE_PARENT)) {
-                return;
-            }
-            if (type.equals(TreeCacheEvent.Type.NODE_REMOVED)) {
-                unCacheRuleData(path);
+            if (!path.contains(registerPath)) {
                 return;
             }
 
-            // create or update
-            Optional.ofNullable(data)
-                    .ifPresent(e -> 
cacheRuleData(GsonUtils.getInstance().fromJson(new String(data.getData(), 
StandardCharsets.UTF_8), RuleData.class)));
-        }
+            EventType eventType = 
treeCacheEvent.getType().equals(TreeCacheEvent.Type.NODE_REMOVED) ? 
EventType.DELETE : EventType.PUT;
+            final String updateData = childData.getData() != null ? new 
String(childData.getData(), StandardCharsets.UTF_8) : null;
+            this.event(path, updateData, registerPath, eventType);
+        });
     }
 
-    class ProxySelectorCacheListener extends AbstractDataSyncListener {
-
-        @Override
-        protected void event(final TreeCacheEvent.Type type, final String 
path, final ChildData data) {
-            // if not uri register path, return.
-            if (!path.contains(DefaultPathConstants.PROXY_SELECTOR)) {
-                return;
-            }
-            String[] pathInfoArray = path.split("/");
-            if (pathInfoArray.length != 5) {
-                return;
-            }
-            String pluginName = pathInfoArray[pathInfoArray.length - 2];
-            String proxySelectorName = pathInfoArray[pathInfoArray.length - 1];
-            if (type.equals(TreeCacheEvent.Type.NODE_REMOVED)) {
-                ProxySelectorData proxySelectorData = new ProxySelectorData();
-                proxySelectorData.setPluginName(pluginName);
-                proxySelectorData.setName(proxySelectorName);
-                unCacheProxySelectorData(proxySelectorData);
-                return;
-            }
-            ProxySelectorData proxySelectorData = 
GsonUtils.getInstance().fromJson(new String(data.getData(), 
StandardCharsets.UTF_8), ProxySelectorData.class);
-            proxySelectorData.setName(proxySelectorName);
-            proxySelectorData.setPluginName(pluginName);
-            // create or update
-            Optional.ofNullable(data)
-                    .ifPresent(e -> cacheProxySelectorData(proxySelectorData));
-
-        }
-    }
-
-    class DiscoveryUpstreamCacheListener extends AbstractDataSyncListener {
-
-        @Override
-        protected void event(final TreeCacheEvent.Type type, final String 
path, final ChildData data) {
-            // if not uri register path, return.
-            if (!path.contains(DefaultPathConstants.DISCOVERY_UPSTREAM)) {
-                return;
-            }
-            String[] pathInfoArray = path.split("/");
-            if (pathInfoArray.length != 5) {
-                return;
-            }
-            // only support update
-            if (type.equals(TreeCacheEvent.Type.NODE_UPDATED)) {
-                DiscoverySyncData discoverySyncData = 
GsonUtils.getInstance().fromJson(new String(data.getData(), 
StandardCharsets.UTF_8), DiscoverySyncData.class);
-                // create or update
-                Optional.ofNullable(data)
-                        .ifPresent(e -> 
cacheDiscoveryUpstreamData(discoverySyncData));
-            }
+    @Override
+    public void close() {
+        if (Objects.nonNull(zkClient)) {
+            zkClient.close();
         }
     }
 

Reply via email to