This is an automated email from the ASF dual-hosted git repository.
hefengen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shenyu.git
The following commit(s) were added to refs/heads/master by this push:
new 2f69000220 [type:refactor] refactor sync data, abstract the template
method of zk and etcd, reimplement consul data sync. (#5001)
2f69000220 is described below
commit 2f6900022035b97aaf2aa52b6701ae115cbf6f8f
Author: yunlongn <[email protected]>
AuthorDate: Fri Aug 18 12:04:03 2023 +0800
[type:refactor] refactor sync data, abstract the template method of zk and
etcd, reimplement consul data sync. (#5001)
* [type:bug] fix etcd sync error.
* [type:bug] fix etcd sync error.
* upgrade version
* [type:refactor] refactor zookeeper sync.
* [type:bug] fix etcd sync error.
* [type:refactor] refactor etcd sync.
* [type:refactor] refactor zookeeper sync.
* [type:refactor] refactor admin sync data.
* [type:refactor] refactor admin sync data.
* [type:refactor] refactor admin sync data.
* [type:refactor] refactor consul sync data.
* [type:refactor] refactor admin sync data.
* [type:refactor] refactor admin sync data.
* [type:refactor] refactor sync data.
* [type:refactor] refactor sync data.
* [type:refactor] refactor sync data.
---------
Co-authored-by: Misaya295 <[email protected]>
Co-authored-by: misaya295 <[email protected]>
Co-authored-by: moremind <[email protected]>
---
.../listener/consul/ConsulDataChangedListener.java | 25 +-
.../consul/ConsulDataChangedListenerTest.java | 77 ++---
.../src/main/resources/application.yml | 4 +-
.../data/core/AbstractNodeDataSyncService.java | 295 ++++++++++++++++
.../sync/data/consul/ConsulSyncDataService.java | 225 ++++++++-----
.../data/consul/handler/ConsulCacheHandler.java | 165 ---------
.../data/consul/ConsulSyncDataServiceTest.java | 78 ++---
.../consul/handler/ConsulCacheHandlerTest.java | 219 ------------
.../shenyu/sync/data/etcd/EtcdSyncDataService.java | 370 ++-------------------
.../sync/data/etcd/EtcdSyncDataServiceTest.java | 261 ++-------------
.../data/zookeeper/ZookeeperSyncDataService.java | 344 ++-----------------
11 files changed, 589 insertions(+), 1474 deletions(-)
diff --git
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/consul/ConsulDataChangedListener.java
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/consul/ConsulDataChangedListener.java
index a35bdc1519..4be0bb5854 100644
---
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/consul/ConsulDataChangedListener.java
+++
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/consul/ConsulDataChangedListener.java
@@ -18,34 +18,31 @@
package org.apache.shenyu.admin.listener.consul;
import com.ecwid.consul.v1.ConsulClient;
-import com.ecwid.consul.v1.Response;
-import com.ecwid.consul.v1.kv.model.GetValue;
-import org.apache.shenyu.admin.listener.AbstractListDataChangedListener;
-import org.apache.shenyu.common.constant.ConsulConstants;
+import org.apache.shenyu.admin.listener.AbstractNodeDataChangedListener;
import org.apache.shenyu.common.utils.GsonUtils;
-import java.util.Objects;
-
/**
* Use Consul to push data changes.
*/
-public class ConsulDataChangedListener extends AbstractListDataChangedListener
{
+public class ConsulDataChangedListener extends AbstractNodeDataChangedListener
{
private final ConsulClient consulClient;
public ConsulDataChangedListener(final ConsulClient consulClient) {
- super(new ChangeData(ConsulConstants.PLUGIN_DATA,
ConsulConstants.SELECTOR_DATA,
- ConsulConstants.RULE_DATA, ConsulConstants.AUTH_DATA,
ConsulConstants.META_DATA, ConsulConstants.PROXY_SELECTOR_DATA_ID,
ConsulConstants.DISCOVERY_UPSTREAM));
this.consulClient = consulClient;
}
@Override
- public void publishConfig(final String dataKey, final Object data) {
- consulClient.setKVValue(dataKey, GsonUtils.getInstance().toJson(data));
+ public void createOrUpdate(final String pluginPath, final Object data) {
+ consulClient.setKVValue(pluginPath,
GsonUtils.getInstance().toJson(data));
+ }
+
+ @Override
+ public void deleteNode(final String pluginPath) {
+ consulClient.deleteKVValue(pluginPath);
}
@Override
- public String getConfig(final String dataKey) {
- Response<GetValue> kvValue = consulClient.getKVValue(dataKey);
- return Objects.nonNull(kvValue.getValue()) ?
kvValue.getValue().getDecodedValue() :
ConsulConstants.EMPTY_CONFIG_DEFAULT_VALUE;
+ public void deletePathRecursive(final String selectorParentPath) {
+ consulClient.deleteKVValues(selectorParentPath);
}
}
diff --git
a/shenyu-admin/src/test/java/org/apache/shenyu/admin/listener/consul/ConsulDataChangedListenerTest.java
b/shenyu-admin/src/test/java/org/apache/shenyu/admin/listener/consul/ConsulDataChangedListenerTest.java
index 73fbb2d18d..11708c82ec 100644
---
a/shenyu-admin/src/test/java/org/apache/shenyu/admin/listener/consul/ConsulDataChangedListenerTest.java
+++
b/shenyu-admin/src/test/java/org/apache/shenyu/admin/listener/consul/ConsulDataChangedListenerTest.java
@@ -18,10 +18,7 @@
package org.apache.shenyu.admin.listener.consul;
import com.ecwid.consul.v1.ConsulClient;
-import com.ecwid.consul.v1.Response;
-import com.ecwid.consul.v1.kv.model.GetValue;
import com.google.common.collect.ImmutableList;
-import org.apache.shenyu.common.constant.ConsulConstants;
import org.apache.shenyu.common.dto.AppAuthData;
import org.apache.shenyu.common.dto.MetaData;
import org.apache.shenyu.common.dto.PluginData;
@@ -35,11 +32,8 @@ import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
/**
* The testCase for {@link ConsulDataChangedListener}.
@@ -73,106 +67,83 @@ public class ConsulDataChangedListenerTest {
@Test
public void testOnAppAuthChanged() {
- String config =
"{\"divide\":{\"appKey\":\"appKey\",\"appSecret\":\"appSecret\",\"open\":true}}";
final AppAuthData appAuthData =
AppAuthData.builder().appKey(MOCK_APP_KEY).appSecret(MOCK_APP_SECRET).build();
- Response<GetValue> response = mock(Response.class);
- GetValue getValueModel = mock(GetValue.class);
- when(consulClient.getKVValue(anyString())).thenReturn(response);
- when(response.getValue()).thenReturn(getValueModel);
- when(getValueModel.getDecodedValue()).thenReturn(config);
-
consulDataChangedListener.onAppAuthChanged(ImmutableList.of(appAuthData),
DataEventTypeEnum.DELETE);
consulDataChangedListener.onAppAuthChanged(ImmutableList.of(appAuthData),
DataEventTypeEnum.REFRESH);
consulDataChangedListener.onAppAuthChanged(ImmutableList.of(appAuthData),
DataEventTypeEnum.MYSELF);
consulDataChangedListener.onAppAuthChanged(ImmutableList.of(appAuthData),
DataEventTypeEnum.CREATE);
- verify(consulClient, times(4)).setKVValue(any(String.class),
any(String.class));
+ verify(consulClient, times(3)).setKVValue(any(String.class),
any(String.class));
+ verify(consulClient, times(1)).deleteKVValue(any(String.class));
- getValueModel.setValue(ConsulConstants.EMPTY_CONFIG_DEFAULT_VALUE);
consulDataChangedListener.onAppAuthChanged(ImmutableList.of(appAuthData),
DataEventTypeEnum.DELETE);
consulDataChangedListener.onAppAuthChanged(ImmutableList.of(appAuthData),
DataEventTypeEnum.REFRESH);
consulDataChangedListener.onAppAuthChanged(ImmutableList.of(appAuthData),
DataEventTypeEnum.MYSELF);
consulDataChangedListener.onAppAuthChanged(ImmutableList.of(appAuthData),
DataEventTypeEnum.CREATE);
- verify(consulClient, times(8)).setKVValue(any(String.class),
any(String.class));
+ verify(consulClient, times(6)).setKVValue(any(String.class),
any(String.class));
+ verify(consulClient, times(2)).deleteKVValue(any(String.class));
}
@Test
public void testOnPluginChanged() {
- String config =
"{\"divide\":{\"id\":\"id\",\"name\":\"name\",\"enabled\":true}}";
final PluginData pluginData =
PluginData.builder().id(MOCK_ID).name(MOCK_NAME).config(MOCK_CONFIG).build();
- Response<GetValue> response = mock(Response.class);
- GetValue getValueModel = mock(GetValue.class);
- when(consulClient.getKVValue(anyString())).thenReturn(response);
- when(response.getValue()).thenReturn(getValueModel);
- when(getValueModel.getDecodedValue()).thenReturn(config);
consulDataChangedListener.onPluginChanged(ImmutableList.of(pluginData),
DataEventTypeEnum.DELETE);
consulDataChangedListener.onPluginChanged(ImmutableList.of(pluginData),
DataEventTypeEnum.REFRESH);
consulDataChangedListener.onPluginChanged(ImmutableList.of(pluginData),
DataEventTypeEnum.MYSELF);
consulDataChangedListener.onPluginChanged(ImmutableList.of(pluginData),
DataEventTypeEnum.CREATE);
- verify(consulClient, times(4)).setKVValue(any(String.class),
any(String.class));
+ verify(consulClient, times(3)).setKVValue(any(String.class),
any(String.class));
+ verify(consulClient, times(3)).deleteKVValues(any(String.class));
- getValueModel.setValue(ConsulConstants.EMPTY_CONFIG_DEFAULT_VALUE);
consulDataChangedListener.onPluginChanged(ImmutableList.of(pluginData),
DataEventTypeEnum.DELETE);
consulDataChangedListener.onPluginChanged(ImmutableList.of(pluginData),
DataEventTypeEnum.REFRESH);
consulDataChangedListener.onPluginChanged(ImmutableList.of(pluginData),
DataEventTypeEnum.MYSELF);
consulDataChangedListener.onPluginChanged(ImmutableList.of(pluginData),
DataEventTypeEnum.CREATE);
- verify(consulClient, times(8)).setKVValue(any(String.class),
any(String.class));
+ verify(consulClient, times(6)).setKVValue(any(String.class),
any(String.class));
+ verify(consulClient, times(6)).deleteKVValues(any(String.class));
}
@Test
public void testOnSelectorChanged() {
- String config =
"{\"divide\":[{\"id\":\"id\",\"name\":\"name\",\"enabled\":true}]}";
final SelectorData selectorData =
SelectorData.builder().id(MOCK_ID).name(MOCK_NAME).pluginName(MOCK_PLUGIN_NAME).build();
- Response<GetValue> response = mock(Response.class);
- GetValue getValueModel = mock(GetValue.class);
- when(consulClient.getKVValue(anyString())).thenReturn(response);
- when(response.getValue()).thenReturn(getValueModel);
- when(getValueModel.getDecodedValue()).thenReturn(config);
-
consulDataChangedListener.onSelectorChanged(ImmutableList.of(selectorData),
DataEventTypeEnum.DELETE);
consulDataChangedListener.onSelectorChanged(ImmutableList.of(selectorData),
DataEventTypeEnum.REFRESH);
consulDataChangedListener.onSelectorChanged(ImmutableList.of(selectorData),
DataEventTypeEnum.MYSELF);
consulDataChangedListener.onSelectorChanged(ImmutableList.of(selectorData),
DataEventTypeEnum.CREATE);
- verify(consulClient, times(4)).setKVValue(any(String.class),
any(String.class));
+ verify(consulClient, times(3)).setKVValue(any(String.class),
any(String.class));
+ verify(consulClient, times(1)).deleteKVValue(any(String.class));
- getValueModel.setValue(ConsulConstants.EMPTY_CONFIG_DEFAULT_VALUE);
consulDataChangedListener.onSelectorChanged(ImmutableList.of(selectorData),
DataEventTypeEnum.DELETE);
consulDataChangedListener.onSelectorChanged(ImmutableList.of(selectorData),
DataEventTypeEnum.REFRESH);
consulDataChangedListener.onSelectorChanged(ImmutableList.of(selectorData),
DataEventTypeEnum.MYSELF);
consulDataChangedListener.onSelectorChanged(ImmutableList.of(selectorData),
DataEventTypeEnum.CREATE);
- verify(consulClient, times(8)).setKVValue(any(String.class),
any(String.class));
+ verify(consulClient, times(6)).setKVValue(any(String.class),
any(String.class));
+ verify(consulClient, times(2)).deleteKVValue(any(String.class));
+ verify(consulClient, times(2)).deleteKVValues(any(String.class));
}
@Test
public void testOnMetaDataChanged() {
- String config =
"{\"divide\":{\"id\":\"id\",\"appName\":\"appName\",\"enabled\":true}}";
final MetaData metaData =
MetaData.builder().id(MOCK_ID).path(MOCK_PATH).appName(MOCK_APP_NAME).build();
- Response<GetValue> response = mock(Response.class);
- GetValue getValueModel = mock(GetValue.class);
- when(consulClient.getKVValue(anyString())).thenReturn(response);
- when(response.getValue()).thenReturn(getValueModel);
- when(getValueModel.getDecodedValue()).thenReturn(config);
-
consulDataChangedListener.onMetaDataChanged(ImmutableList.of(metaData),
DataEventTypeEnum.DELETE);
consulDataChangedListener.onMetaDataChanged(ImmutableList.of(metaData),
DataEventTypeEnum.REFRESH);
consulDataChangedListener.onMetaDataChanged(ImmutableList.of(metaData),
DataEventTypeEnum.MYSELF);
consulDataChangedListener.onMetaDataChanged(ImmutableList.of(metaData),
DataEventTypeEnum.CREATE);
- verify(consulClient, times(4)).setKVValue(any(String.class),
any(String.class));
+ verify(consulClient, times(3)).setKVValue(any(String.class),
any(String.class));
+ verify(consulClient, times(1)).deleteKVValue(any(String.class));
- getValueModel.setValue(ConsulConstants.EMPTY_CONFIG_DEFAULT_VALUE);
consulDataChangedListener.onMetaDataChanged(ImmutableList.of(metaData),
DataEventTypeEnum.DELETE);
consulDataChangedListener.onMetaDataChanged(ImmutableList.of(metaData),
DataEventTypeEnum.REFRESH);
consulDataChangedListener.onMetaDataChanged(ImmutableList.of(metaData),
DataEventTypeEnum.MYSELF);
consulDataChangedListener.onMetaDataChanged(ImmutableList.of(metaData),
DataEventTypeEnum.CREATE);
- verify(consulClient, times(8)).setKVValue(any(String.class),
any(String.class));
+ verify(consulClient, times(6)).setKVValue(any(String.class),
any(String.class));
+ verify(consulClient, times(2)).deleteKVValue(any(String.class));
}
@Test
public void testOnRuleChanged() {
- String config =
"{\"divide\":[{\"id\":\"id\",\"appName\":\"appName\",\"enabled\":true}]}";
final RuleData ruleData = RuleData.builder()
.id(MOCK_ID)
.name(MOCK_NAME)
@@ -180,23 +151,19 @@ public class ConsulDataChangedListenerTest {
.selectorId(MOCK_SELECTOR_ID)
.build();
- Response<GetValue> response = mock(Response.class);
- GetValue getValueModel = mock(GetValue.class);
- when(consulClient.getKVValue(anyString())).thenReturn(response);
- when(response.getValue()).thenReturn(getValueModel);
- when(getValueModel.getDecodedValue()).thenReturn(config);
-
consulDataChangedListener.onRuleChanged(ImmutableList.of(ruleData),
DataEventTypeEnum.DELETE);
consulDataChangedListener.onRuleChanged(ImmutableList.of(ruleData),
DataEventTypeEnum.REFRESH);
consulDataChangedListener.onRuleChanged(ImmutableList.of(ruleData),
DataEventTypeEnum.MYSELF);
consulDataChangedListener.onRuleChanged(ImmutableList.of(ruleData),
DataEventTypeEnum.CREATE);
- verify(consulClient, times(4)).setKVValue(any(String.class),
any(String.class));
+ verify(consulClient, times(3)).setKVValue(any(String.class),
any(String.class));
+ verify(consulClient, times(1)).deleteKVValue(any(String.class));
- getValueModel.setValue(ConsulConstants.EMPTY_CONFIG_DEFAULT_VALUE);
consulDataChangedListener.onRuleChanged(ImmutableList.of(ruleData),
DataEventTypeEnum.DELETE);
consulDataChangedListener.onRuleChanged(ImmutableList.of(ruleData),
DataEventTypeEnum.REFRESH);
consulDataChangedListener.onRuleChanged(ImmutableList.of(ruleData),
DataEventTypeEnum.MYSELF);
consulDataChangedListener.onRuleChanged(ImmutableList.of(ruleData),
DataEventTypeEnum.CREATE);
- verify(consulClient, times(8)).setKVValue(any(String.class),
any(String.class));
+ verify(consulClient, times(6)).setKVValue(any(String.class),
any(String.class));
+ verify(consulClient, times(2)).deleteKVValue(any(String.class));
+ verify(consulClient, times(2)).deleteKVValues(any(String.class));
}
}
diff --git a/shenyu-bootstrap/src/main/resources/application.yml
b/shenyu-bootstrap/src/main/resources/application.yml
index 47d8571416..35e279e540 100644
--- a/shenyu-bootstrap/src/main/resources/application.yml
+++ b/shenyu-bootstrap/src/main/resources/application.yml
@@ -262,8 +262,8 @@ shenyu:
# url: http://localhost:2379
# consul:
# url: http://localhost:8500
-# waitTime: 1000
-# watchDelay: 1000
+# waitTime: 10000
+# watchDelay: 10000
exclude:
enabled: false
paths:
diff --git
a/shenyu-sync-data-center/shenyu-sync-data-api/src/main/java/org/apache/shenyu/sync/data/core/AbstractNodeDataSyncService.java
b/shenyu-sync-data-center/shenyu-sync-data-api/src/main/java/org/apache/shenyu/sync/data/core/AbstractNodeDataSyncService.java
new file mode 100644
index 0000000000..9231876f2f
--- /dev/null
+++
b/shenyu-sync-data-center/shenyu-sync-data-api/src/main/java/org/apache/shenyu/sync/data/core/AbstractNodeDataSyncService.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.sync.data.core;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import org.apache.shenyu.common.constant.DefaultPathConstants;
+import org.apache.shenyu.common.dto.AppAuthData;
+import org.apache.shenyu.common.dto.DiscoverySyncData;
+import org.apache.shenyu.common.dto.MetaData;
+import org.apache.shenyu.common.dto.PluginData;
+import org.apache.shenyu.common.dto.ProxySelectorData;
+import org.apache.shenyu.common.dto.RuleData;
+import org.apache.shenyu.common.dto.SelectorData;
+import org.apache.shenyu.common.exception.ShenyuException;
+import org.apache.shenyu.common.utils.GsonUtils;
+import org.apache.shenyu.sync.data.api.AuthDataSubscriber;
+import org.apache.shenyu.sync.data.api.DiscoveryUpstreamDataSubscriber;
+import org.apache.shenyu.sync.data.api.MetaDataSubscriber;
+import org.apache.shenyu.sync.data.api.PluginDataSubscriber;
+import org.apache.shenyu.sync.data.api.ProxySelectorDataSubscriber;
+import org.apache.shenyu.sync.data.api.SyncDataService;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * AbstractNodeDataSyncService.
+ * Abstract method to monitor child node changes.
+ */
+public abstract class AbstractNodeDataSyncService implements SyncDataService {
+
+ private final PluginDataSubscriber pluginDataSubscriber;
+
+ private final List<MetaDataSubscriber> metaDataSubscribers;
+
+ private final List<AuthDataSubscriber> authDataSubscribers;
+
+ private final List<ProxySelectorDataSubscriber>
proxySelectorDataSubscribers;
+
+ private final List<DiscoveryUpstreamDataSubscriber>
discoveryUpstreamDataSubscribers;
+
+ public AbstractNodeDataSyncService(final PluginDataSubscriber
pluginDataSubscriber,
+ final List<MetaDataSubscriber>
metaDataSubscribers,
+ final List<AuthDataSubscriber>
authDataSubscribers,
+ final List<ProxySelectorDataSubscriber>
proxySelectorDataSubscribers,
+ final
List<DiscoveryUpstreamDataSubscriber> discoveryUpstreamDataSubscribers) {
+ this.pluginDataSubscriber = pluginDataSubscriber;
+ this.metaDataSubscribers = metaDataSubscribers;
+ this.authDataSubscribers = authDataSubscribers;
+ this.proxySelectorDataSubscribers = proxySelectorDataSubscribers;
+ this.discoveryUpstreamDataSubscribers =
discoveryUpstreamDataSubscribers;
+ }
+
+
+ /**
+ * event.
+ *
+ * @param updatePath updatePath
+ * @param updateData updateData
+ * @param registerPath registerPath
+ * @param eventType eventType
+ */
+ public void event(final String updatePath, final String updateData, final
String registerPath, final EventType eventType) {
+ switch (registerPath) {
+ case DefaultPathConstants.PLUGIN_PARENT:
+ pluginHandlerEvent(updatePath, updateData, eventType);
+ break;
+ case DefaultPathConstants.SELECTOR_PARENT:
+ selectorHandlerEvent(updatePath, updateData, eventType);
+ break;
+ case DefaultPathConstants.META_DATA:
+ metaDataHandlerEvent(updatePath, updateData, eventType);
+ break;
+ case DefaultPathConstants.APP_AUTH_PARENT:
+ appAuthHandlerEvent(updatePath, updateData, eventType);
+ break;
+ case DefaultPathConstants.RULE_PARENT:
+ ruleHandlerEvent(updatePath, updateData, eventType);
+ break;
+ case DefaultPathConstants.DISCOVERY_UPSTREAM:
+ discoveryUpstreamHandlerEvent(updatePath, updateData,
eventType);
+ break;
+ case DefaultPathConstants.PROXY_SELECTOR:
+ proxyHandlerEvent(updatePath, updateData, eventType);
+ break;
+ default:
+ break;
+ }
+ }
+
+ private void proxyHandlerEvent(final String updatePath, final String
updateData, final EventType eventType) {
+ String[] pathInfoArray = updatePath.split("/");
+ if (pathInfoArray.length != 5) {
+ return;
+ }
+ String pluginName = pathInfoArray[pathInfoArray.length - 2];
+ String proxySelectorName = pathInfoArray[pathInfoArray.length - 1];
+ if (EventType.DELETE.equals(eventType)) {
+ ProxySelectorData proxySelectorData = new ProxySelectorData();
+ proxySelectorData.setPluginName(pluginName);
+ proxySelectorData.setName(proxySelectorName);
+ unCacheProxySelectorData(proxySelectorData);
+ return;
+ }
+ ProxySelectorData proxySelectorData =
GsonUtils.getInstance().fromJson(updateData, ProxySelectorData.class);
+ proxySelectorData.setName(proxySelectorName);
+ proxySelectorData.setPluginName(pluginName);
+ Optional.ofNullable(updateData)
+ .ifPresent(e -> cacheProxySelectorData(proxySelectorData));
+ }
+
+ private void discoveryUpstreamHandlerEvent(final String updatePath, final
String updateData, final EventType eventType) {
+ String[] pathInfoArray2 = updatePath.split("/");
+ if (pathInfoArray2.length != 5) {
+ return;
+ }
+ if (!EventType.DELETE.equals(eventType)) {
+ Optional.ofNullable(updateData)
+ .ifPresent(e ->
cacheDiscoveryUpstreamData(GsonUtils.getInstance().fromJson(updateData,
DiscoverySyncData.class)));
+ }
+ }
+
+ private void ruleHandlerEvent(final String updatePath, final String
updateData, final EventType eventType) {
+ if (EventType.DELETE.equals(eventType)) {
+ unCacheRuleData(updatePath);
+ return;
+ }
+ Optional.ofNullable(updateData)
+ .ifPresent(e ->
cacheRuleData(GsonUtils.getInstance().fromJson(updateData, RuleData.class)));
+ }
+
+ private void appAuthHandlerEvent(final String updatePath, final String
updateData, final EventType eventType) {
+ if (EventType.DELETE.equals(eventType)) {
+ unCacheAuthData(updatePath);
+ return;
+ }
+ Optional.ofNullable(updateData)
+ .ifPresent(e ->
cacheAuthData(GsonUtils.getInstance().fromJson(updateData, AppAuthData.class)));
+ }
+
+ private void metaDataHandlerEvent(final String updatePath, final String
updateData, final EventType eventType) {
+ if (EventType.DELETE.equals(eventType)) {
+ final String realPath =
updatePath.substring(DefaultPathConstants.META_DATA.length() + 1);
+ MetaData metaData = new MetaData();
+ try {
+ metaData.setPath(URLDecoder.decode(realPath,
StandardCharsets.UTF_8.name()));
+ } catch (UnsupportedEncodingException e) {
+ throw new ShenyuException(e);
+ }
+ unCacheMetaData(metaData);
+ return;
+ }
+ Optional.ofNullable(updateData)
+ .ifPresent(e ->
cacheMetaData(GsonUtils.getInstance().fromJson(updateData, MetaData.class)));
+ }
+
+ private void selectorHandlerEvent(final String updatePath, final String
updateData, final EventType eventType) {
+ if (EventType.DELETE.equals(eventType)) {
+ unCacheSelectorData(updatePath);
+ return;
+ }
+ Optional.ofNullable(updateData)
+ .ifPresent(e ->
cacheSelectorData(GsonUtils.getInstance().fromJson(updateData,
SelectorData.class)));
+ }
+
+ private void pluginHandlerEvent(final String updatePath, final String
updateData, final EventType eventType) {
+ if (EventType.DELETE.equals(eventType)) {
+ String pluginName =
updatePath.substring(updatePath.lastIndexOf("/") + 1);
+ unCachePluginName(pluginName);
+ return;
+ }
+ Optional.ofNullable(updateData)
+ .ifPresent(e ->
cachePluginData(GsonUtils.getInstance().fromJson(updateData,
PluginData.class)));
+ }
+
+ public enum EventType {
+ PUT,
+ DELETE,
+ }
+
+ protected void cachePluginData(final PluginData pluginData) {
+ Optional.ofNullable(pluginData)
+ .flatMap(data -> Optional.ofNullable(pluginDataSubscriber))
+ .ifPresent(e -> e.onSubscribe(pluginData));
+ }
+
+ protected void cacheSelectorData(final SelectorData selectorData) {
+ Optional.ofNullable(selectorData)
+ .ifPresent(data -> Optional.ofNullable(pluginDataSubscriber)
+ .ifPresent(e -> e.onSelectorSubscribe(data)));
+ }
+
+ protected void unCacheSelectorData(final String dataPath) {
+ SelectorData selectorData = new SelectorData();
+ final String selectorId = dataPath.substring(dataPath.lastIndexOf("/")
+ 1);
+ final String str =
dataPath.substring(DefaultPathConstants.SELECTOR_PARENT.length());
+ final int pluginNameIndex = str.length() - selectorId.length() - 1;
+ if (pluginNameIndex <= 0) {
+ return;
+ }
+ final String pluginName = str.substring(1, pluginNameIndex);
+ selectorData.setPluginName(pluginName);
+ selectorData.setId(selectorId);
+
+ Optional.ofNullable(pluginDataSubscriber)
+ .ifPresent(e -> e.unSelectorSubscribe(selectorData));
+ }
+
+ protected void unCachePluginName(final String pluginName) {
+ final PluginData pluginData = new PluginData();
+ pluginData.setName(pluginName);
+ Optional.ofNullable(pluginDataSubscriber).ifPresent(e ->
e.unSubscribe(pluginData));
+ }
+
+ protected void cacheRuleData(final RuleData ruleData) {
+ Optional.ofNullable(ruleData)
+ .ifPresent(data -> Optional.ofNullable(pluginDataSubscriber)
+ .ifPresent(e -> e.onRuleSubscribe(data)));
+ }
+
+ protected void unCacheRuleData(final String dataPath) {
+ String ruleDataId = dataPath.substring(dataPath.lastIndexOf("/") + 1);
+ final String str =
dataPath.substring(DefaultPathConstants.RULE_PARENT.length());
+ final int pluginNameIndex = str.length() - ruleDataId.length() - 1;
+ if (pluginNameIndex <= 0) {
+ return;
+ }
+ final String pluginName = str.substring(1, pluginNameIndex);
+ final List<String> list =
Lists.newArrayList(Splitter.on(DefaultPathConstants.SELECTOR_JOIN_RULE).split(ruleDataId));
+
+ RuleData ruleData = new RuleData();
+ ruleData.setPluginName(pluginName);
+ ruleData.setSelectorId(list.get(0));
+ ruleData.setId(list.get(1));
+
+ Optional.ofNullable(pluginDataSubscriber)
+ .ifPresent(e -> e.unRuleSubscribe(ruleData));
+ }
+
+ protected void cacheAuthData(final AppAuthData appAuthData) {
+ Optional.ofNullable(appAuthData)
+ .ifPresent(data -> authDataSubscribers.forEach(e ->
e.onSubscribe(data)));
+ }
+
+ protected void unCacheAuthData(final String dataPath) {
+ final String key =
dataPath.substring(DefaultPathConstants.APP_AUTH_PARENT.length() + 1);
+ AppAuthData appAuthData = new AppAuthData();
+ appAuthData.setAppKey(key);
+ authDataSubscribers.forEach(e -> e.unSubscribe(appAuthData));
+ }
+
+ protected void cacheMetaData(final MetaData metaData) {
+ Optional.ofNullable(metaData)
+ .ifPresent(data -> metaDataSubscribers.forEach(e ->
e.onSubscribe(metaData)));
+ }
+
+ protected void cacheProxySelectorData(final ProxySelectorData
proxySelectorData) {
+ Optional.ofNullable(proxySelectorData)
+ .ifPresent(data -> proxySelectorDataSubscribers.forEach(e ->
e.onSubscribe(proxySelectorData)));
+ }
+
+ protected void cacheDiscoveryUpstreamData(final DiscoverySyncData
upstreamDataList) {
+ Optional.ofNullable(discoveryUpstreamDataSubscribers)
+ .ifPresent(data -> discoveryUpstreamDataSubscribers.forEach(e
-> e.onSubscribe(upstreamDataList)));
+ }
+
+ protected void unCacheMetaData(final MetaData metaData) {
+ Optional.ofNullable(metaData)
+ .ifPresent(data -> metaDataSubscribers.forEach(e ->
e.unSubscribe(metaData)));
+ }
+
+ protected void unCacheProxySelectorData(final ProxySelectorData
proxySelectorData) {
+ Optional.ofNullable(proxySelectorData)
+ .ifPresent(data -> proxySelectorDataSubscribers.forEach(e ->
e.unSubscribe(proxySelectorData)));
+ }
+}
diff --git
a/shenyu-sync-data-center/shenyu-sync-data-consul/src/main/java/org/apache/shenyu/sync/data/consul/ConsulSyncDataService.java
b/shenyu-sync-data-center/shenyu-sync-data-consul/src/main/java/org/apache/shenyu/sync/data/consul/ConsulSyncDataService.java
index 57b1e547ce..4fbd99fe15 100644
---
a/shenyu-sync-data-center/shenyu-sync-data-consul/src/main/java/org/apache/shenyu/sync/data/consul/ConsulSyncDataService.java
+++
b/shenyu-sync-data-center/shenyu-sync-data-consul/src/main/java/org/apache/shenyu/sync/data/consul/ConsulSyncDataService.java
@@ -21,16 +21,19 @@ import com.ecwid.consul.v1.ConsulClient;
import com.ecwid.consul.v1.QueryParams;
import com.ecwid.consul.v1.Response;
import com.ecwid.consul.v1.kv.model.GetValue;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.lang3.ObjectUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.shenyu.common.concurrent.ShenyuThreadFactory;
import org.apache.shenyu.common.constant.ConsulConstants;
+import org.apache.shenyu.common.constant.DefaultPathConstants;
import org.apache.shenyu.sync.data.api.AuthDataSubscriber;
import org.apache.shenyu.sync.data.api.DiscoveryUpstreamDataSubscriber;
import org.apache.shenyu.sync.data.api.MetaDataSubscriber;
import org.apache.shenyu.sync.data.api.PluginDataSubscriber;
import org.apache.shenyu.sync.data.api.ProxySelectorDataSubscriber;
-import org.apache.shenyu.sync.data.api.SyncDataService;
import org.apache.shenyu.sync.data.consul.config.ConsulConfig;
-import org.apache.shenyu.sync.data.consul.handler.ConsulCacheHandler;
+import org.apache.shenyu.sync.data.core.AbstractNodeDataSyncService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,34 +41,31 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
/**
* Consul sync data service.
*/
-public class ConsulSyncDataService extends ConsulCacheHandler implements
SyncDataService {
+public class ConsulSyncDataService extends AbstractNodeDataSyncService {
/**
* logger.
*/
private static final Logger LOG =
LoggerFactory.getLogger(ConsulSyncDataService.class);
- private final Map<String, OnChange> groupMap = new HashMap<>();
-
private final Map<String, Long> consulIndexes = new HashMap<>();
- private final ScheduledThreadPoolExecutor executor;
+ private final Map<String, List<ConsulData>> cacheConsulDataKeyMap = new
HashMap<>();
- private ScheduledFuture<?> watchFuture;
+ private final ScheduledThreadPoolExecutor executor;
private final ConsulConfig consulConfig;
private final ConsulClient consulClient;
- private final AtomicBoolean running = new AtomicBoolean(false);
-
/**
* Instantiates a new Consul sync data service.
*
@@ -85,88 +85,159 @@ public class ConsulSyncDataService extends
ConsulCacheHandler implements SyncDat
super(pluginDataSubscriber, metaDataSubscribers, authDataSubscribers,
proxySelectorDataSubscribers, discoveryUpstreamDataSubscribers);
this.consulClient = consulClient;
this.consulConfig = consulConfig;
- this.executor = new ScheduledThreadPoolExecutor(1,
+ // corePool is the total number of watcher nodes
+ this.executor = new ScheduledThreadPoolExecutor(7,
ShenyuThreadFactory.create("consul-config-watch", true));
- consulIndexes.put(ConsulConstants.SYNC_PRE_FIX, 0L);
- initUpdateMap();
- start();
+ watcherData();
}
- /**
- * init config key and update method mapping.
- */
- private void initUpdateMap() {
- groupMap.put(ConsulConstants.PLUGIN_DATA, this::updatePluginData);
- groupMap.put(ConsulConstants.SELECTOR_DATA, this::updateSelectorMap);
- groupMap.put(ConsulConstants.RULE_DATA, this::updateRuleMap);
- groupMap.put(ConsulConstants.META_DATA, this::updateMetaDataMap);
- groupMap.put(ConsulConstants.AUTH_DATA, this::updateAuthMap);
- groupMap.put(ConsulConstants.PROXY_SELECTOR_DATA_ID,
this::updateSelectorDataMap);
- groupMap.put(ConsulConstants.DISCOVERY_UPSTREAM,
this::updateDiscoveryUpstreamMap);
+ private void watcherData() {
+ watcherData0(DefaultPathConstants.PLUGIN_PARENT);
+ watcherData0(DefaultPathConstants.SELECTOR_PARENT);
+ watcherData0(DefaultPathConstants.RULE_PARENT);
+ watcherData0(DefaultPathConstants.PROXY_SELECTOR);
+ watcherData0(DefaultPathConstants.DISCOVERY_UPSTREAM);
+ watcherData0(DefaultPathConstants.APP_AUTH_PARENT);
+ watcherData0(DefaultPathConstants.META_DATA);
}
- private void watchConfigKeyValues() {
- if (this.running.get()) {
- for (String context : this.consulIndexes.keySet()) {
- try {
- Long currentIndex = this.consulIndexes.get(context);
- if (Objects.isNull(currentIndex)) {
- currentIndex =
ConsulConstants.INIT_CONFIG_VERSION_INDEX;
- }
- Response<List<GetValue>> response =
this.consulClient.getKVValues(context, null,
- new QueryParams(consulConfig.getWaitTime(),
currentIndex));
- if (Objects.isNull(response.getValue()) ||
response.getValue().isEmpty()) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("No value for context " + context);
- }
- continue;
- }
- Long newIndex = response.getConsulIndex();
- if (Objects.isNull(newIndex) || Objects.equals(newIndex,
currentIndex)) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Same index for context " + context);
- }
- continue;
+ private void watcherData0(final String registerPath) {
+ consulIndexes.put(registerPath, 0L);
+ BiConsumer<String, String> updateHandler = (changeData, decodedValue)
-> this.event(changeData, decodedValue, registerPath, EventType.PUT);
+ Consumer<String> deleteHandler = removeKey -> this.event(removeKey,
null, registerPath, EventType.DELETE);
+ this.executor.schedule(() -> watchConfigKeyValues(registerPath,
updateHandler, deleteHandler), -1, TimeUnit.MILLISECONDS);
+ }
+
+ private void watchConfigKeyValues(final String watchPathRoot,
+ final BiConsumer<String, String>
updateHandler,
+ final Consumer<String> deleteHandler) {
+ try {
+ Long currentIndex = this.consulIndexes.get(watchPathRoot);
+ if (Objects.isNull(currentIndex)) {
+ currentIndex = ConsulConstants.INIT_CONFIG_VERSION_INDEX;
+ }
+ Response<List<GetValue>> response =
this.consulClient.getKVValues(watchPathRoot, null,
+ new
QueryParams(TimeUnit.MILLISECONDS.toSeconds(consulConfig.getWaitTime()),
currentIndex));
+ if (Objects.isNull(response.getValue()) ||
response.getValue().isEmpty()) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("No value for watchPathRoot " + watchPathRoot);
+ }
+ this.executor.schedule(() ->
watchConfigKeyValues(watchPathRoot, updateHandler, deleteHandler),
+ consulConfig.getWatchDelay(), TimeUnit.MILLISECONDS);
+ return;
+ }
+ Long newIndex = response.getConsulIndex();
+ if (Objects.isNull(newIndex)) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Same index for watchPathRoot " + watchPathRoot);
+ }
+ this.executor.schedule(() ->
watchConfigKeyValues(watchPathRoot, updateHandler, deleteHandler),
+ consulConfig.getWatchDelay(), TimeUnit.MILLISECONDS);
+ return;
+ }
+ if (Objects.equals(newIndex, currentIndex)) {
+ this.executor.schedule(() ->
watchConfigKeyValues(watchPathRoot, updateHandler, deleteHandler),
+ -1, TimeUnit.MILLISECONDS);
+ return;
+ }
+ if (!this.consulIndexes.containsValue(newIndex)
+ &&
!currentIndex.equals(ConsulConstants.INIT_CONFIG_VERSION_INDEX)) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("watchPathRoot " + watchPathRoot + " has new
index " + newIndex);
+ }
+ final Long lastIndex = currentIndex;
+ final List<ConsulData> lastDatas =
cacheConsulDataKeyMap.get(watchPathRoot);
+ response.getValue().forEach(data -> {
+ if (data.getModifyIndex() == lastIndex) {
+ //data has not changed
+ return;
}
- if (!this.consulIndexes.containsValue(newIndex)
- &&
!currentIndex.equals(ConsulConstants.INIT_CONFIG_VERSION_INDEX)) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Context " + context + " has new index "
+ newIndex);
+ if (!Objects.isNull(lastDatas)) {
+ final ConsulData consulData = lastDatas.stream()
+ .filter(lastData ->
data.getKey().equals(lastData.getConsulKey())).findFirst().orElse(null);
+ if (!Objects.isNull(consulData) &&
!StringUtils.isBlank(consulData.getConsulDataMd5())
+ &&
consulData.getConsulDataMd5().equals(DigestUtils.md5Hex(data.getValue()))) {
+ return;
}
- final Long lastIndex = currentIndex;
- response.getValue().forEach(data -> {
- if (data.getModifyIndex() == lastIndex) {
- //data has not changed
- return;
- }
-
groupMap.get(data.getKey()).change(data.getDecodedValue());
- });
-
- } else if (LOG.isTraceEnabled()) {
- LOG.info("Event for index already published for
context " + context);
}
- this.consulIndexes.put(context, newIndex);
- } catch (Exception e) {
- LOG.warn("Error querying consul Key/Values for context '"
+ context + "'. Message: " + e.getMessage());
+ updateHandler.accept(data.getKey(),
data.getDecodedValue());
+ });
+ final List<String> currentKeys =
response.getValue().stream().map(GetValue::getKey).collect(Collectors.toList());
+ if (!ObjectUtils.isEmpty(lastDatas)) {
+ // handler delete event
+ lastDatas.stream()
+ .map(ConsulData::getConsulKey)
+ .filter(lastKey -> !currentKeys.contains(lastKey))
+ .forEach(deleteHandler);
}
+
+ // save last Keys
+ cacheConsulDataKeyMap.put(watchPathRoot,
response.getValue().stream().map(data -> {
+ final ConsulData consulData = new ConsulData();
+ consulData.setConsulKey(data.getKey());
+
consulData.setConsulDataMd5(DigestUtils.md5Hex(data.getValue()));
+ return consulData;
+ }).collect(Collectors.toList()));
+ } else if (LOG.isTraceEnabled()) {
+ LOG.info("Event for index already published for watchPathRoot
" + watchPathRoot);
}
+ this.consulIndexes.put(watchPathRoot, newIndex);
+ this.executor.schedule(() -> watchConfigKeyValues(watchPathRoot,
updateHandler, deleteHandler),
+ -1, TimeUnit.MILLISECONDS);
+ } catch (Exception e) {
+ LOG.warn("Error querying consul Key/Values for watchPathRoot '" +
watchPathRoot + "'. Message: ", e);
+ this.executor.schedule(() -> watchConfigKeyValues(watchPathRoot,
updateHandler, deleteHandler),
+ consulConfig.getWatchDelay(), TimeUnit.MILLISECONDS);
}
}
- /**
- * Start.
- */
- public void start() {
- if (this.running.compareAndSet(false, true)) {
- this.watchFuture =
this.executor.scheduleWithFixedDelay(this::watchConfigKeyValues,
- 5, consulConfig.getWatchDelay(), TimeUnit.MILLISECONDS);
+ @Override
+ public void close() {
+ if (!ObjectUtils.isEmpty(executor)) {
+ executor.shutdown();
}
}
- @Override
- public void close() {
- if (this.running.compareAndSet(true, false) &&
Objects.nonNull(this.watchFuture)) {
- this.watchFuture.cancel(true);
+ private static class ConsulData {
+
+ private String consulKey;
+
+ private String consulDataMd5;
+
+ /**
+ * consulKey.
+ *
+ * @return ConsulKey
+ */
+ public String getConsulKey() {
+ return consulKey;
+ }
+
+ /**
+ * set consulKey.
+ *
+ * @param consulKey consulKey
+ */
+ public void setConsulKey(final String consulKey) {
+ this.consulKey = consulKey;
+ }
+
+ /**
+ * consulDataMd5.
+ *
+ * @return ConsulDataMd5
+ */
+ public String getConsulDataMd5() {
+ return consulDataMd5;
+ }
+
+ /**
+ * set consulDataMd5.
+ *
+ * @param consulDataMd5 consulDataMd5
+ */
+ public void setConsulDataMd5(final String consulDataMd5) {
+ this.consulDataMd5 = consulDataMd5;
}
}
}
diff --git
a/shenyu-sync-data-center/shenyu-sync-data-consul/src/main/java/org/apache/shenyu/sync/data/consul/handler/ConsulCacheHandler.java
b/shenyu-sync-data-center/shenyu-sync-data-consul/src/main/java/org/apache/shenyu/sync/data/consul/handler/ConsulCacheHandler.java
deleted file mode 100644
index 296da3ff05..0000000000
---
a/shenyu-sync-data-center/shenyu-sync-data-consul/src/main/java/org/apache/shenyu/sync/data/consul/handler/ConsulCacheHandler.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shenyu.sync.data.consul.handler;
-
-import com.google.gson.JsonParseException;
-import org.apache.shenyu.common.dto.AppAuthData;
-import org.apache.shenyu.common.dto.DiscoverySyncData;
-import org.apache.shenyu.common.dto.MetaData;
-import org.apache.shenyu.common.dto.PluginData;
-import org.apache.shenyu.common.dto.ProxySelectorData;
-import org.apache.shenyu.common.dto.RuleData;
-import org.apache.shenyu.common.dto.SelectorData;
-import org.apache.shenyu.common.utils.GsonUtils;
-import org.apache.shenyu.sync.data.api.AuthDataSubscriber;
-import org.apache.shenyu.sync.data.api.DiscoveryUpstreamDataSubscriber;
-import org.apache.shenyu.sync.data.api.MetaDataSubscriber;
-import org.apache.shenyu.sync.data.api.PluginDataSubscriber;
-import org.apache.shenyu.sync.data.api.ProxySelectorDataSubscriber;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
-/**
- * Consul cache handler.
- */
-public class ConsulCacheHandler {
- /**
- * logger.
- */
- private static final Logger LOG =
LoggerFactory.getLogger(ConsulCacheHandler.class);
-
- private final PluginDataSubscriber pluginDataSubscriber;
-
- private final List<MetaDataSubscriber> metaDataSubscribers;
-
- private final List<AuthDataSubscriber> authDataSubscribers;
-
- private final List<ProxySelectorDataSubscriber>
proxySelectorDataSubscribers;
-
- private final List<DiscoveryUpstreamDataSubscriber>
discoveryUpstreamDataSubscribers;
-
- public ConsulCacheHandler(final PluginDataSubscriber pluginDataSubscriber,
- final List<MetaDataSubscriber>
metaDataSubscribers,
- final List<AuthDataSubscriber>
authDataSubscribers,
- final List<ProxySelectorDataSubscriber>
proxySelectorDataSubscribers,
- final List<DiscoveryUpstreamDataSubscriber>
discoveryUpstreamDataSubscribers
- ) {
- this.pluginDataSubscriber = pluginDataSubscriber;
- this.metaDataSubscribers = metaDataSubscribers;
- this.authDataSubscribers = authDataSubscribers;
- this.proxySelectorDataSubscribers = proxySelectorDataSubscribers;
- this.discoveryUpstreamDataSubscribers =
discoveryUpstreamDataSubscribers;
- }
-
- protected void updatePluginData(final String configInfo) {
- try {
- List<PluginData> pluginDataList = new
ArrayList<>(GsonUtils.getInstance().toObjectMap(configInfo,
PluginData.class).values());
- pluginDataList.forEach(pluginData ->
Optional.ofNullable(pluginDataSubscriber).ifPresent(subscriber -> {
- subscriber.unSubscribe(pluginData);
- subscriber.onSubscribe(pluginData);
- }));
- } catch (JsonParseException e) {
- LOG.error("sync plugin data have error:", e);
- }
- }
-
- protected void updateSelectorMap(final String configInfo) {
- try {
- List<SelectorData> selectorDataList =
GsonUtils.getInstance().toObjectMapList(configInfo,
SelectorData.class).values().stream().flatMap(Collection::stream).collect(Collectors.toList());
- selectorDataList.forEach(selectorData ->
Optional.ofNullable(pluginDataSubscriber).ifPresent(subscriber -> {
- subscriber.unSelectorSubscribe(selectorData);
- subscriber.onSelectorSubscribe(selectorData);
- }));
- } catch (JsonParseException e) {
- LOG.error("sync selector data have error:", e);
- }
- }
-
- protected void updateRuleMap(final String configInfo) {
- try {
- List<RuleData> ruleDataList =
GsonUtils.getInstance().toObjectMapList(configInfo, RuleData.class).values()
- .stream().flatMap(Collection::stream)
- .collect(Collectors.toList());
- ruleDataList.forEach(ruleData ->
Optional.ofNullable(pluginDataSubscriber).ifPresent(subscriber -> {
- subscriber.unRuleSubscribe(ruleData);
- subscriber.onRuleSubscribe(ruleData);
- }));
- } catch (JsonParseException e) {
- LOG.error("sync rule data have error:", e);
- }
- }
-
- protected void updateMetaDataMap(final String configInfo) {
- try {
- List<MetaData> metaDataList = new
ArrayList<>(GsonUtils.getInstance().toObjectMap(configInfo,
MetaData.class).values());
- metaDataList.forEach(metaData ->
metaDataSubscribers.forEach(subscriber -> {
- subscriber.unSubscribe(metaData);
- subscriber.onSubscribe(metaData);
- }));
- } catch (JsonParseException e) {
- LOG.error("sync meta data have error:", e);
- }
- }
-
- protected void updateAuthMap(final String configInfo) {
- try {
- List<AppAuthData> appAuthDataList = new
ArrayList<>(GsonUtils.getInstance().toObjectMap(configInfo,
AppAuthData.class).values());
- appAuthDataList.forEach(appAuthData ->
authDataSubscribers.forEach(subscriber -> {
- subscriber.unSubscribe(appAuthData);
- subscriber.onSubscribe(appAuthData);
- }));
- } catch (JsonParseException e) {
- LOG.error("sync auth data have error:", e);
- }
- }
-
- protected void updateSelectorDataMap(final String configInfo) {
- try {
- List<ProxySelectorData> proxySelectorDataList = new
ArrayList<>(GsonUtils.getInstance().toObjectMap(configInfo,
ProxySelectorData.class).values());
- proxySelectorDataList.forEach(proxySelectorData ->
proxySelectorDataSubscribers.forEach(subscriber -> {
- subscriber.onSubscribe(proxySelectorData);
- subscriber.unSubscribe(proxySelectorData);
- }));
- } catch (JsonParseException e) {
- LOG.error("sync proxy selector data have error:", e);
- }
- }
-
- protected void updateDiscoveryUpstreamMap(final String configInfo) {
- try {
- List<DiscoverySyncData> discoverySyncDataList = new
ArrayList<>(GsonUtils.getInstance().toObjectMap(configInfo,
DiscoverySyncData.class).values());
- discoverySyncDataList.forEach(discoverySyncData ->
discoveryUpstreamDataSubscribers.forEach(subscriber -> {
- subscriber.onSubscribe(discoverySyncData);
- subscriber.unSubscribe(discoverySyncData);
- }));
- } catch (JsonParseException e) {
- LOG.error("sync discovery data have error:", e);
- }
- }
-
- protected interface OnChange {
-
- void change(String changeData);
- }
-}
diff --git
a/shenyu-sync-data-center/shenyu-sync-data-consul/src/test/java/org/apache/shenyu/sync/data/consul/ConsulSyncDataServiceTest.java
b/shenyu-sync-data-center/shenyu-sync-data-consul/src/test/java/org/apache/shenyu/sync/data/consul/ConsulSyncDataServiceTest.java
index 89c76c7160..ae847a4764 100644
---
a/shenyu-sync-data-center/shenyu-sync-data-consul/src/test/java/org/apache/shenyu/sync/data/consul/ConsulSyncDataServiceTest.java
+++
b/shenyu-sync-data-center/shenyu-sync-data-consul/src/test/java/org/apache/shenyu/sync/data/consul/ConsulSyncDataServiceTest.java
@@ -18,16 +18,13 @@
package org.apache.shenyu.sync.data.consul;
import com.ecwid.consul.v1.ConsulClient;
-import com.ecwid.consul.v1.QueryParams;
import com.ecwid.consul.v1.Response;
import com.ecwid.consul.v1.kv.model.GetValue;
-import org.apache.shenyu.common.constant.ConsulConstants;
import org.apache.shenyu.sync.data.consul.config.ConsulConfig;
-import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
@@ -36,16 +33,14 @@ import org.mockito.quality.Strictness;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
@@ -67,6 +62,7 @@ public final class ConsulSyncDataServiceTest {
@Mock
private ConsulConfig consulConfig;
+ @InjectMocks
private ConsulSyncDataService consulSyncDataService;
@Mock
@@ -75,72 +71,58 @@ public final class ConsulSyncDataServiceTest {
@Mock
private Response response;
- @BeforeEach
- public void setup() {
- when(consulConfig.getWaitTime()).thenReturn(WAIT_TIME);
- when(consulConfig.getWatchDelay()).thenReturn(WATCH_DELAY);
- final List<GetValue> list = new ArrayList<>(1);
- when(getValue.getModifyIndex()).thenReturn(INDEX);
- when(getValue.getKey()).thenReturn(ConsulConstants.PLUGIN_DATA);
- when(getValue.getDecodedValue()).thenReturn("{}");
- list.add(getValue);
- when(consulClient.getKVValues(any(), any(), any()))
- .thenReturn(response);
- when(response.getValue()).thenReturn(list);
- when(response.getConsulIndex()).thenReturn(INDEX);
- consulSyncDataService = new ConsulSyncDataService(consulClient,
consulConfig, null,
- Collections.emptyList(), Collections.emptyList(),
Collections.emptyList(), Collections.emptyList());
- }
-
- @Test
- public void testStart() throws Exception {
- TimeUnit.SECONDS.sleep(2);
- verify(consulClient,
atLeast(1)).getKVValues(eq(ConsulConstants.SYNC_PRE_FIX),
- eq(null), any(QueryParams.class));
- verify(getValue, atLeast(1)).getModifyIndex();
- verify(getValue, atLeast(1)).getDecodedValue();
- verify(response, atLeast(3)).getValue();
- }
-
- @AfterEach
- public void testStop() {
- consulSyncDataService.close();
- }
-
@Test
public void testWatchConfigKeyValues() throws NoSuchMethodException,
IllegalAccessException, NoSuchFieldException {
- final Method watchConfigKeyValues =
ConsulSyncDataService.class.getDeclaredMethod("watchConfigKeyValues");
+ final Method watchConfigKeyValues =
ConsulSyncDataService.class.getDeclaredMethod("watchConfigKeyValues",
+ String.class, BiConsumer.class, Consumer.class);
watchConfigKeyValues.setAccessible(true);
final Field consul =
ConsulSyncDataService.class.getDeclaredField("consulClient");
consul.setAccessible(true);
final ConsulClient consulClient = mock(ConsulClient.class);
consul.set(consulSyncDataService, consulClient);
+
+ final Field declaredField =
ConsulSyncDataService.class.getDeclaredField("consulConfig");
+ declaredField.setAccessible(true);
+ final ConsulConfig consulConfig = mock(ConsulConfig.class);
+ declaredField.set(consulSyncDataService, consulConfig);
+
+ final Field executorField =
ConsulSyncDataService.class.getDeclaredField("executor");
+ executorField.setAccessible(true);
+ executorField.set(consulSyncDataService,
mock(ScheduledThreadPoolExecutor.class));
+
final Response<List<GetValue>> response = mock(Response.class);
when(consulClient.getKVValues(any(), any(),
any())).thenReturn(response);
- Assertions.assertDoesNotThrow(() ->
watchConfigKeyValues.invoke(consulSyncDataService));
+ BiConsumer<String, String> updateHandler = (changeData, decodedValue)
-> {
+
+ };
+ Consumer<String> deleteHandler = removeKey -> {
+
+ };
+ String watchPathRoot = "/shenyu";
+ Assertions.assertDoesNotThrow(() ->
watchConfigKeyValues.invoke(consulSyncDataService, watchPathRoot,
updateHandler, deleteHandler));
List<GetValue> getValues = new ArrayList<>(1);
getValues.add(mock(GetValue.class));
when(response.getValue()).thenReturn(getValues);
- Assertions.assertDoesNotThrow(() ->
watchConfigKeyValues.invoke(consulSyncDataService));
+ Assertions.assertDoesNotThrow(() ->
watchConfigKeyValues.invoke(consulSyncDataService, watchPathRoot,
updateHandler, deleteHandler));
when(response.getConsulIndex()).thenReturn(2L);
- Assertions.assertDoesNotThrow(() ->
watchConfigKeyValues.invoke(consulSyncDataService));
+ Assertions.assertDoesNotThrow(() ->
watchConfigKeyValues.invoke(consulSyncDataService, watchPathRoot,
updateHandler, deleteHandler));
when(response.getConsulIndex()).thenReturn(null);
- Assertions.assertDoesNotThrow(() ->
watchConfigKeyValues.invoke(consulSyncDataService));
+ Assertions.assertDoesNotThrow(() ->
watchConfigKeyValues.invoke(consulSyncDataService, watchPathRoot,
updateHandler, deleteHandler));
when(response.getConsulIndex()).thenReturn(0L);
- Assertions.assertDoesNotThrow(() ->
watchConfigKeyValues.invoke(consulSyncDataService));
+ Assertions.assertDoesNotThrow(() ->
watchConfigKeyValues.invoke(consulSyncDataService, watchPathRoot,
updateHandler, deleteHandler));
final Field consulIndexes =
ConsulSyncDataService.class.getDeclaredField("consulIndexes");
consulIndexes.setAccessible(true);
final Map<String, Long> consulIndexesSource = (Map<String, Long>)
consulIndexes.get(consulSyncDataService);
consulIndexesSource.put("/null", null);
when(response.getConsulIndex()).thenReturn(2L);
- Assertions.assertDoesNotThrow(() ->
watchConfigKeyValues.invoke(consulSyncDataService));
+ Assertions.assertDoesNotThrow(() ->
watchConfigKeyValues.invoke(consulSyncDataService, watchPathRoot,
updateHandler, deleteHandler));
}
}
diff --git
a/shenyu-sync-data-center/shenyu-sync-data-consul/src/test/java/org/apache/shenyu/sync/data/consul/handler/ConsulCacheHandlerTest.java
b/shenyu-sync-data-center/shenyu-sync-data-consul/src/test/java/org/apache/shenyu/sync/data/consul/handler/ConsulCacheHandlerTest.java
deleted file mode 100644
index c1b5e4030e..0000000000
---
a/shenyu-sync-data-center/shenyu-sync-data-consul/src/test/java/org/apache/shenyu/sync/data/consul/handler/ConsulCacheHandlerTest.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shenyu.sync.data.consul.handler;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import org.apache.shenyu.common.dto.AppAuthData;
-import org.apache.shenyu.common.dto.MetaData;
-import org.apache.shenyu.common.dto.PluginData;
-import org.apache.shenyu.common.dto.RuleData;
-import org.apache.shenyu.common.dto.SelectorData;
-import org.apache.shenyu.common.utils.GsonUtils;
-import org.apache.shenyu.sync.data.api.AuthDataSubscriber;
-import org.apache.shenyu.sync.data.api.MetaDataSubscriber;
-import org.apache.shenyu.sync.data.api.PluginDataSubscriber;
-import org.junit.jupiter.api.Test;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-/**
- * test case for {@link ConsulCacheHandler}.
- */
-public final class ConsulCacheHandlerTest {
-
- @Test
- public void testUpdatePluginMap() {
- String pluginName1 = "PLUGIN_NAME_1";
- String pluginName2 = "PLUGIN_NAME_2";
- PluginData pluginData1 =
-
PluginData.builder().name(pluginName1).id("plugin_1").config("config_1").build();
- PluginData pluginData2 =
-
PluginData.builder().name(pluginName2).id("plugin_2").config("config_2").build();
- String pluginData = GsonUtils.getInstance()
- .toJson(ImmutableMap.of(pluginName2, pluginData2, pluginName1,
pluginData1));
-
- final List<PluginData> onSubscribeList = new ArrayList<>();
- final List<PluginData> unsubscribeList = new ArrayList<>();
- ConsulCacheHandler consulCacheHandler = new ConsulCacheHandler(new
PluginDataSubscriber() {
- @Override
- public void onSubscribe(final PluginData pluginData) {
- onSubscribeList.add(pluginData);
- }
-
- @Override
- public void unSubscribe(final PluginData pluginData) {
- unsubscribeList.add(pluginData);
- }
- }, Collections.emptyList(), Collections.emptyList(),
Collections.emptyList(), Collections.emptyList());
- consulCacheHandler.updatePluginData(pluginData);
- assertEquals(2, onSubscribeList.size());
- assertEquals(2, unsubscribeList.size());
- }
-
- @Test
- public void testUpdateSelectorMap() {
- String selectorDataPluginName1 = "SELECTOR_DATA_1";
- String selectorDataPluginName2 = "SELECTOR_DATA_2";
- SelectorData selectorData1 =
- SelectorData.builder()
- .pluginName(selectorDataPluginName1)
- .id("select_1")
- .name("SELECT_DATA_NAME_1")
- .build();
- SelectorData selectorData2 =
- SelectorData.builder()
- .pluginName(selectorDataPluginName2)
- .id("select_2")
- .name("SELECT_DATA_NAME_2")
- .build();
-
- String selectorDataParam = GsonUtils.getInstance()
- .toJson(ImmutableMap.of(selectorDataPluginName2,
ImmutableList.of(selectorData2),
- selectorDataPluginName1,
ImmutableList.of(selectorData1)));
- final List<SelectorData> subscribeList = new ArrayList<>();
- final List<SelectorData> unsubscribeList = new ArrayList<>();
- ConsulCacheHandler consulCacheHandler = new ConsulCacheHandler(new
PluginDataSubscriber() {
- @Override
- public void onSelectorSubscribe(final SelectorData selectorData) {
- subscribeList.add(selectorData);
- }
-
- @Override
- public void unSelectorSubscribe(final SelectorData selectorData) {
- unsubscribeList.add(selectorData);
- }
- }, Collections.emptyList(), Collections.emptyList(),
Collections.emptyList(), Collections.emptyList());
- consulCacheHandler.updateSelectorMap(selectorDataParam);
- assertEquals(2, subscribeList.size());
- assertEquals(2, unsubscribeList.size());
- }
-
- @Test
- public void testUpdateRuleMap() {
- String ruleDataId1 = "RULE_DATA_1";
- String ruleDataId2 = "RULE_DATA_2";
- String selectorId1 = "ID_1";
- String selectorId2 = "ID_2";
- RuleData ruleData1 =
RuleData.builder().selectorId(selectorId1).id(ruleDataId1).build();
- RuleData ruleData2 =
RuleData.builder().selectorId(selectorId2).id(ruleDataId2).build();
- String ruleDataParam = GsonUtils.getInstance()
- .toJson(
- ImmutableMap.of(
- selectorId2,
- ImmutableList.of(ruleData2),
- selectorId1,
- ImmutableList.of(ruleData1)));
- final List<RuleData> subscribeList = new ArrayList<>();
- final List<RuleData> unsubscribeList = new ArrayList<>();
- ConsulCacheHandler consulCacheHandler = new ConsulCacheHandler(new
PluginDataSubscriber() {
- @Override
- public void onRuleSubscribe(final RuleData ruleData) {
- subscribeList.add(ruleData);
- }
-
- @Override
- public void unRuleSubscribe(final RuleData ruleData) {
- unsubscribeList.add(ruleData);
- }
- }, Collections.emptyList(), Collections.emptyList(),
Collections.emptyList(), Collections.emptyList());
- consulCacheHandler.updateRuleMap(ruleDataParam);
- assertEquals(2, subscribeList.size());
- assertEquals(2, unsubscribeList.size());
- }
-
- @Test
- public void testUpdateMetaDataMap() {
- String metadataPath1 = "METADATA_PATH_1";
- String metadataPath2 = "METADATA_PATH_2";
- MetaData metaData1 =
MetaData.builder().path(metadataPath1).id("meta_1").build();
- MetaData metaData2 =
MetaData.builder().path(metadataPath2).id("meta_2").build();
-
- String metaDataParam = GsonUtils.getInstance()
- .toJson(ImmutableMap.of(metadataPath1, metaData1,
metadataPath2, metaData2));
- final List<MetaData> subscribeList = new ArrayList<>();
- final List<MetaData> unsubscribeList = new ArrayList<>();
- MetaDataSubscriber metaDataSubscriber = new MetaDataSubscriber() {
- @Override
- public void onSubscribe(final MetaData metaData) {
- subscribeList.add(metaData);
- }
-
- @Override
- public void unSubscribe(final MetaData metaData) {
- unsubscribeList.add(metaData);
- }
- };
- ConsulCacheHandler consulCacheHandler = new ConsulCacheHandler(null,
Lists.newArrayList(metaDataSubscriber),
- Collections.emptyList(), Collections.emptyList(),
Collections.emptyList());
- consulCacheHandler.updateMetaDataMap(metaDataParam);
- assertEquals(2, subscribeList.size());
- assertEquals(2, unsubscribeList.size());
- }
-
- @Test
- public void testUpdateAuthMap() {
- String mockAppKey = "MOCK_APP_KEY";
- String mockAppKey2 = "MOCK_APP_KEY2";
- String mockAppSecret = "MOCK_APP_SECRET";
- AppAuthData appAuthData =
-
AppAuthData.builder().appKey(mockAppKey).appSecret(mockAppSecret).enabled(true).build();
- AppAuthData appAuthData2 =
-
AppAuthData.builder().appKey(mockAppKey2).appSecret(mockAppSecret).enabled(true).build();
-
- String appAuthDataParam = GsonUtils.getInstance()
- .toJson(ImmutableMap.of(mockAppKey2, appAuthData2, mockAppKey,
appAuthData));
- final List<AppAuthData> subscribeList = new ArrayList<>();
- final List<AppAuthData> unsubscribeList = new ArrayList<>();
-
- AuthDataSubscriber authDataSubscriber = new AuthDataSubscriber() {
- @Override
- public void onSubscribe(final AppAuthData appAuthData) {
- subscribeList.add(appAuthData);
- }
-
- @Override
- public void unSubscribe(final AppAuthData appAuthData) {
- unsubscribeList.add(appAuthData);
- }
- };
- ConsulCacheHandler consulCacheHandler = new ConsulCacheHandler(null,
- Collections.emptyList(),
Lists.newArrayList(authDataSubscriber), Collections.emptyList(),
Collections.emptyList());
-
- consulCacheHandler.updateAuthMap(appAuthDataParam);
- assertEquals(2, subscribeList.size());
- assertEquals(2, unsubscribeList.size());
- }
-
- @Test
- public void testError() {
- ConsulCacheHandler consulCacheHandler = new ConsulCacheHandler(null,
- Collections.emptyList(), Collections.emptyList(),
Collections.emptyList(), Collections.emptyList());
- assertDoesNotThrow(() ->
consulCacheHandler.updateAuthMap("errorJson"));
- assertDoesNotThrow(() ->
consulCacheHandler.updateMetaDataMap("errorJson"));
- assertDoesNotThrow(() ->
consulCacheHandler.updateRuleMap("errorJson"));
- assertDoesNotThrow(() ->
consulCacheHandler.updatePluginData("errorJson"));
- assertDoesNotThrow(() ->
consulCacheHandler.updateSelectorMap("errorJson"));
- }
-}
diff --git
a/shenyu-sync-data-center/shenyu-sync-data-etcd/src/main/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataService.java
b/shenyu-sync-data-center/shenyu-sync-data-etcd/src/main/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataService.java
index 8abb1c4dc6..3a10aac6f7 100644
---
a/shenyu-sync-data-center/shenyu-sync-data-etcd/src/main/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataService.java
+++
b/shenyu-sync-data-center/shenyu-sync-data-etcd/src/main/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataService.java
@@ -17,70 +17,37 @@
package org.apache.shenyu.sync.data.etcd;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Lists;
-import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.ObjectUtils;
+import org.apache.shenyu.common.constant.Constants;
import org.apache.shenyu.common.constant.DefaultPathConstants;
-import org.apache.shenyu.common.dto.AppAuthData;
-import org.apache.shenyu.common.dto.DiscoverySyncData;
-import org.apache.shenyu.common.dto.MetaData;
-import org.apache.shenyu.common.dto.PluginData;
-import org.apache.shenyu.common.dto.ProxySelectorData;
-import org.apache.shenyu.common.dto.RuleData;
-import org.apache.shenyu.common.dto.SelectorData;
-import org.apache.shenyu.common.enums.ConfigGroupEnum;
-import org.apache.shenyu.common.utils.GsonUtils;
import org.apache.shenyu.sync.data.api.AuthDataSubscriber;
import org.apache.shenyu.sync.data.api.DiscoveryUpstreamDataSubscriber;
import org.apache.shenyu.sync.data.api.MetaDataSubscriber;
import org.apache.shenyu.sync.data.api.PluginDataSubscriber;
import org.apache.shenyu.sync.data.api.ProxySelectorDataSubscriber;
-import org.apache.shenyu.sync.data.api.SyncDataService;
+import org.apache.shenyu.sync.data.core.AbstractNodeDataSyncService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.UnsupportedEncodingException;
-import java.net.URLDecoder;
-import java.nio.charset.StandardCharsets;
-import java.util.Collections;
import java.util.List;
import java.util.Objects;
-import java.util.Optional;
-import java.util.concurrent.ExecutionException;
/**
* Data synchronize of etcd.
*/
-public class EtcdSyncDataService implements SyncDataService {
+public class EtcdSyncDataService extends AbstractNodeDataSyncService {
- /**
- * logger.
- */
private static final Logger LOG =
LoggerFactory.getLogger(EtcdSyncDataService.class);
- private static final String PRE_FIX = "/shenyu";
-
private final EtcdClient etcdClient;
- private final PluginDataSubscriber pluginDataSubscriber;
-
- private final List<MetaDataSubscriber> metaDataSubscribers;
-
- private final List<AuthDataSubscriber> authDataSubscribers;
-
- private final List<ProxySelectorDataSubscriber>
proxySelectorDataSubscribers;
-
- private final List<DiscoveryUpstreamDataSubscriber>
discoveryUpstreamDataSubscribers;
-
/**
* Instantiates a new Zookeeper cache manager.
*
- * @param etcdClient the etcd client
- * @param pluginDataSubscriber the plugin data subscriber
- * @param metaDataSubscribers the meta data subscribers
- * @param authDataSubscribers the auth data subscribers
- * @param proxySelectorDataSubscribers the proxy selector data
subscribers
- * @param discoveryUpstreamDataSubscribers the discovery upstream data
subscribers
+ * @param etcdClient etcdClient
+ * @param pluginDataSubscriber the plugin data subscriber
+ * @param metaDataSubscribers the meta data subscribers
+ * @param authDataSubscribers the auth data subscribers
*/
public EtcdSyncDataService(final EtcdClient etcdClient,
final PluginDataSubscriber pluginDataSubscriber,
@@ -88,310 +55,38 @@ public class EtcdSyncDataService implements
SyncDataService {
final List<AuthDataSubscriber>
authDataSubscribers,
final List<ProxySelectorDataSubscriber>
proxySelectorDataSubscribers,
final List<DiscoveryUpstreamDataSubscriber>
discoveryUpstreamDataSubscribers) {
+ super(pluginDataSubscriber, metaDataSubscribers, authDataSubscribers,
proxySelectorDataSubscribers, discoveryUpstreamDataSubscribers);
this.etcdClient = etcdClient;
- this.pluginDataSubscriber = pluginDataSubscriber;
- this.metaDataSubscribers = metaDataSubscribers;
- this.authDataSubscribers = authDataSubscribers;
- this.proxySelectorDataSubscribers = proxySelectorDataSubscribers;
- this.discoveryUpstreamDataSubscribers =
discoveryUpstreamDataSubscribers;
watcherData();
- watchAppAuth();
- watchMetaData();
}
private void watcherData() {
- final String pluginParent = DefaultPathConstants.PLUGIN_PARENT;
- List<String> pluginChildren = etcdClientGetChildren(pluginParent);
- for (String pluginName : pluginChildren) {
- watcherAll(pluginName);
- }
- etcdClient.watchChildChange(pluginParent, (pluginPath, updateValue) ->
{
- if (!pluginPath.isEmpty()) {
- final String pluginName =
pluginPath.substring(pluginPath.lastIndexOf("/") + 1);
- cachePluginData(updateValue);
- subscribePluginDataChanges(pluginPath, pluginName);
- }
- }, null);
- }
-
- private void watcherAll(final String pluginName) {
- watcherPlugin(pluginName);
- watcherSelector(pluginName);
- watcherRule(pluginName);
- }
-
- private void watcherPlugin(final String pluginName) {
- String pluginPath = DefaultPathConstants.buildPluginPath(pluginName);
- cachePluginData(etcdClient.get(pluginPath));
- subscribePluginDataChanges(pluginPath, pluginName);
- }
-
- private void watcherSelector(final String pluginName) {
- String selectorParentPath =
DefaultPathConstants.buildSelectorParentPath(pluginName);
- List<String> childrenList = etcdClientGetChildren(selectorParentPath);
- if (CollectionUtils.isNotEmpty(childrenList)) {
- childrenList.forEach(children -> {
- String realPath = buildRealPath(selectorParentPath, children);
- cacheSelectorData(etcdClient.get(realPath));
- subscribeSelectorDataChanges(realPath);
- });
- }
- subscribeChildChanges(ConfigGroupEnum.SELECTOR, selectorParentPath);
- }
-
- private void watcherRule(final String pluginName) {
- String ruleParent =
DefaultPathConstants.buildRuleParentPath(pluginName);
- List<String> childrenList = etcdClientGetChildren(ruleParent);
- if (CollectionUtils.isNotEmpty(childrenList)) {
- childrenList.forEach(children -> {
- String realPath = buildRealPath(ruleParent, children);
- cacheRuleData(etcdClient.get(realPath));
- subscribeRuleDataChanges(realPath);
- });
- }
- subscribeChildChanges(ConfigGroupEnum.RULE, ruleParent);
- }
-
- private void watchAppAuth() {
- final String appAuthParent = DefaultPathConstants.APP_AUTH_PARENT;
- List<String> childrenList = etcdClientGetChildren(appAuthParent);
- if (CollectionUtils.isNotEmpty(childrenList)) {
- childrenList.forEach(children -> {
- String realPath = buildRealPath(appAuthParent, children);
- cacheAuthData(etcdClient.get(realPath));
- subscribeAppAuthDataChanges(realPath);
- });
- }
- subscribeChildChanges(ConfigGroupEnum.APP_AUTH, appAuthParent);
- }
-
- private void watchMetaData() {
- final String metaDataPath = DefaultPathConstants.META_DATA;
- List<String> childrenList = etcdClientGetChildren(metaDataPath);
- if (CollectionUtils.isNotEmpty(childrenList)) {
- childrenList.forEach(children -> {
- String realPath = buildRealPath(metaDataPath, children);
- cacheMetaData(etcdClient.get(realPath));
- subscribeMetaDataChanges(realPath);
- });
- }
- subscribeChildChanges(ConfigGroupEnum.META_DATA, metaDataPath);
- }
-
- private void subscribeChildChanges(final ConfigGroupEnum groupKey, final
String groupParentPath) {
- switch (groupKey) {
- case SELECTOR:
- etcdClient.watchChildChange(groupParentPath, (updatePath,
updateValue) -> {
- cacheSelectorData(updateValue);
- subscribeSelectorDataChanges(updatePath);
- }, null);
- break;
- case RULE:
- etcdClient.watchChildChange(groupParentPath, (updatePath,
updateValue) -> {
- cacheRuleData(updateValue);
- subscribeRuleDataChanges(updatePath);
- }, null);
- break;
- case APP_AUTH:
- etcdClient.watchChildChange(groupParentPath, (updatePath,
updateValue) -> {
- cacheAuthData(updateValue);
- subscribeAppAuthDataChanges(updatePath);
- }, null);
- break;
- case META_DATA:
- etcdClient.watchChildChange(groupParentPath, (updatePath,
updateValue) -> {
- cacheMetaData(updateValue);
- subscribeMetaDataChanges(updatePath);
- }, null);
- break;
- case DISCOVER_UPSTREAM:
- etcdClient.watchChildChange(groupParentPath, (updatePath,
updateValue) -> {
- cacheDiscoveryUpstreamData(updateValue);
- subscribeDiscoveryUpstreamDataChanges(updatePath);
- }, null);
- break;
- case PROXY_SELECTOR:
- etcdClient.watchChildChange(groupParentPath, (updatePath,
updateValue) -> {
- cacheProxySelectorData(updateValue);
- subscribeProxySelectorDataChanges(updatePath);
- }, null);
- break;
- default:
- throw new IllegalStateException("Unexpected groupKey: " +
groupKey);
- }
- }
-
- private void subscribePluginDataChanges(final String pluginPath, final
String pluginName) {
- etcdClient.watchDataChange(pluginPath, (updatePath, updateValue) -> {
- final PluginData data =
GsonUtils.getInstance().fromJson(updateValue, PluginData.class);
- Optional.ofNullable(data)
- .ifPresent(d ->
Optional.ofNullable(pluginDataSubscriber).ifPresent(e -> e.onSubscribe(d)));
- }, deleteNode -> deletePlugin(pluginName));
- }
-
- private void deletePlugin(final String pluginName) {
- final PluginData data = new PluginData();
- data.setName(pluginName);
- Optional.ofNullable(pluginDataSubscriber).ifPresent(e ->
e.unSubscribe(data));
- }
-
- private void subscribeSelectorDataChanges(final String path) {
- etcdClient.watchDataChange(path, (updateNode, updateValue) ->
cacheSelectorData(updateValue),
- this::unCacheSelectorData);
- }
-
- private void subscribeRuleDataChanges(final String path) {
- etcdClient.watchDataChange(path, (updatePath, updateValue) ->
cacheRuleData(updateValue),
- this::unCacheRuleData);
- }
-
- private void subscribeAppAuthDataChanges(final String realPath) {
- etcdClient.watchDataChange(realPath, (updatePath, updateValue) ->
cacheAuthData(updateValue),
- this::unCacheAuthData);
- }
-
- private void subscribeMetaDataChanges(final String realPath) {
- etcdClient.watchDataChange(realPath, (updatePath, updateValue) ->
cacheMetaData(updateValue),
- this::deleteMetaData);
- }
-
- private void deleteMetaData(final String deletePath) {
- final String path =
deletePath.substring(DefaultPathConstants.META_DATA.length() + 1);
- MetaData metaData = new MetaData();
-
- try {
- metaData.setPath(URLDecoder.decode(path,
StandardCharsets.UTF_8.name()));
- unCacheMetaData(metaData);
- etcdClient.watchClose(path);
- } catch (UnsupportedEncodingException e) {
- LOG.error("delete meta data error.", e);
- }
- }
-
- private void cachePluginData(final String dataString) {
- final PluginData pluginData =
GsonUtils.getInstance().fromJson(dataString, PluginData.class);
- Optional.ofNullable(pluginData)
- .flatMap(data ->
Optional.ofNullable(pluginDataSubscriber)).ifPresent(e ->
e.onSubscribe(pluginData));
- }
-
- private void cacheSelectorData(final String dataString) {
- final SelectorData selectorData =
GsonUtils.getInstance().fromJson(dataString, SelectorData.class);
- Optional.ofNullable(selectorData)
- .ifPresent(data ->
Optional.ofNullable(pluginDataSubscriber).ifPresent(e ->
e.onSelectorSubscribe(data)));
- }
-
- private void unCacheSelectorData(final String dataPath) {
- SelectorData selectorData = new SelectorData();
- final String selectorId = dataPath.substring(dataPath.lastIndexOf("/")
+ 1);
- final String str =
dataPath.substring(DefaultPathConstants.SELECTOR_PARENT.length());
- final String pluginName = str.substring(1, str.length() -
selectorId.length() - 1);
- selectorData.setPluginName(pluginName);
- selectorData.setId(selectorId);
- Optional.ofNullable(pluginDataSubscriber).ifPresent(e ->
e.unSelectorSubscribe(selectorData));
- etcdClient.watchClose(dataPath);
- }
-
- private void cacheRuleData(final String dataString) {
- final RuleData ruleData = GsonUtils.getInstance().fromJson(dataString,
RuleData.class);
- Optional.ofNullable(ruleData)
- .ifPresent(data ->
Optional.ofNullable(pluginDataSubscriber).ifPresent(e ->
e.onRuleSubscribe(data)));
- }
-
- private void unCacheRuleData(final String dataPath) {
- String substring = dataPath.substring(dataPath.lastIndexOf("/") + 1);
- final String str =
dataPath.substring(DefaultPathConstants.RULE_PARENT.length());
- final String pluginName = str.substring(1, str.length() -
substring.length() - 1);
- final List<String> list =
Lists.newArrayList(Splitter.on(DefaultPathConstants.SELECTOR_JOIN_RULE).split(substring));
-
- RuleData ruleData = new RuleData();
- ruleData.setPluginName(pluginName);
- ruleData.setSelectorId(list.get(0));
- ruleData.setId(list.get(1));
-
- Optional.ofNullable(pluginDataSubscriber).ifPresent(e ->
e.unRuleSubscribe(ruleData));
- etcdClient.watchClose(dataPath);
- }
-
- private void cacheAuthData(final String dataString) {
- final AppAuthData appAuthData =
GsonUtils.getInstance().fromJson(dataString, AppAuthData.class);
- Optional.ofNullable(appAuthData)
- .ifPresent(data -> authDataSubscribers.forEach(e ->
e.onSubscribe(data)));
- }
-
- private void unCacheAuthData(final String dataPath) {
- final String key =
dataPath.substring(DefaultPathConstants.APP_AUTH_PARENT.length() + 1);
- AppAuthData appAuthData = new AppAuthData();
- appAuthData.setAppKey(key);
- authDataSubscribers.forEach(e -> e.unSubscribe(appAuthData));
- etcdClient.watchClose(dataPath);
- }
-
- private void cacheMetaData(final String dataString) {
- final MetaData metaData = GsonUtils.getInstance().fromJson(dataString,
MetaData.class);
- Optional.ofNullable(metaData)
- .ifPresent(data -> metaDataSubscribers.forEach(e ->
e.onSubscribe(metaData)));
- }
-
- private void unCacheMetaData(final MetaData metaData) {
- Optional.ofNullable(metaData)
- .ifPresent(data -> metaDataSubscribers.forEach(e ->
e.unSubscribe(metaData)));
- }
-
- private void cacheDiscoveryUpstreamData(final String dataString) {
- final DiscoverySyncData discoveryUpstream =
GsonUtils.getInstance().fromJson(dataString, DiscoverySyncData.class);
- Optional.ofNullable(discoveryUpstream)
- .ifPresent(data -> discoveryUpstreamDataSubscribers.forEach(e
-> e.onSubscribe(data)));
- }
-
- private void unCacheDiscoveryUpstreamData(final String dataPath) {
- DiscoverySyncData discoverySyncData = new DiscoverySyncData();
- final String selectorId = dataPath.substring(dataPath.lastIndexOf("/")
+ 1);
- final String str =
dataPath.substring(DefaultPathConstants.DISCOVERY_UPSTREAM.length());
- final String pluginName = str.substring(1, str.length() -
selectorId.length() - 1);
- discoverySyncData.setPluginName(pluginName);
- discoverySyncData.setSelectorId(selectorId);
- discoveryUpstreamDataSubscribers.forEach(e ->
e.unSubscribe(discoverySyncData));
- etcdClient.watchClose(dataPath);
- }
-
- private void subscribeDiscoveryUpstreamDataChanges(final String realPath) {
- etcdClient.watchDataChange(realPath, (updatePath, updateValue) ->
cacheDiscoveryUpstreamData(updateValue),
- this::unCacheDiscoveryUpstreamData);
- }
-
- private void cacheProxySelectorData(final String dataString) {
- final ProxySelectorData proxySelectorData =
GsonUtils.getInstance().fromJson(dataString, ProxySelectorData.class);
- Optional.ofNullable(proxySelectorData)
- .ifPresent(data -> proxySelectorDataSubscribers.forEach(e ->
e.onSubscribe(data)));
- }
-
- private void unCacheProxySelectorData(final String dataPath) {
- ProxySelectorData proxySelectorData = new ProxySelectorData();
- final String selectorId = dataPath.substring(dataPath.lastIndexOf("/")
+ 1);
- final String str =
dataPath.substring(DefaultPathConstants.PROXY_SELECTOR.length());
- final String pluginName = str.substring(1, str.length() -
selectorId.length() - 1);
- proxySelectorData.setPluginName(pluginName);
- proxySelectorData.setId(selectorId);
- proxySelectorDataSubscribers.forEach(e ->
e.unSubscribe(proxySelectorData));
- etcdClient.watchClose(dataPath);
- }
-
- private void subscribeProxySelectorDataChanges(final String realPath) {
- etcdClient.watchDataChange(realPath, (updatePath, updateValue) ->
cacheProxySelectorData(updateValue),
- this::unCacheProxySelectorData);
- }
-
- private String buildRealPath(final String parent, final String children) {
- return String.join("/", parent, children);
- }
-
- private List<String> etcdClientGetChildren(final String parent) {
+ watcherData0(DefaultPathConstants.PLUGIN_PARENT);
+ watcherData0(DefaultPathConstants.SELECTOR_PARENT);
+ watcherData0(DefaultPathConstants.RULE_PARENT);
+ watcherData0(DefaultPathConstants.PROXY_SELECTOR);
+ watcherData0(DefaultPathConstants.DISCOVERY_UPSTREAM);
+ watcherData0(DefaultPathConstants.APP_AUTH_PARENT);
+ watcherData0(DefaultPathConstants.META_DATA);
+ }
+
+ private void watcherData0(final String registerPath) {
+ etcdClient.watchChildChange(
+ registerPath,
+ (updatePath, updateValue) -> super.event(updatePath, updateValue,
registerPath, EventType.PUT),
+ deletePath -> super.event(deletePath, null, registerPath,
EventType.DELETE));
try {
- return etcdClient.getChildrenKeys(parent, "/");
- } catch (ExecutionException | InterruptedException e) {
+ // load all key
+ final List<String> childrenKeys =
etcdClient.getChildrenKeys(registerPath, "/");
+ if (!ObjectUtils.isEmpty(childrenKeys)) {
+ childrenKeys.forEach(nodePath -> {
+ final String nodeData =
etcdClient.get(String.join(Constants.PATH_SEPARATOR, registerPath, nodePath));
+ super.event(nodePath, nodeData, registerPath,
EventType.PUT);
+ });
+ }
+ } catch (Exception e) {
LOG.error(e.getMessage(), e);
}
- return Collections.emptyList();
}
@Override
@@ -400,4 +95,5 @@ public class EtcdSyncDataService implements SyncDataService {
etcdClient.close();
}
}
+
}
diff --git
a/shenyu-sync-data-center/shenyu-sync-data-etcd/src/test/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataServiceTest.java
b/shenyu-sync-data-center/shenyu-sync-data-etcd/src/test/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataServiceTest.java
index 6870e60809..3d6eeb1c72 100644
---
a/shenyu-sync-data-center/shenyu-sync-data-etcd/src/test/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataServiceTest.java
+++
b/shenyu-sync-data-center/shenyu-sync-data-etcd/src/test/java/org/apache/shenyu/sync/data/etcd/EtcdSyncDataServiceTest.java
@@ -17,41 +17,20 @@
package org.apache.shenyu.sync.data.etcd;
-import io.etcd.jetcd.Client;
-import io.etcd.jetcd.Watch;
-import org.apache.shenyu.common.constant.DefaultPathConstants;
-import org.apache.shenyu.common.dto.AppAuthData;
-import org.apache.shenyu.common.dto.MetaData;
-import org.apache.shenyu.common.dto.PluginData;
-import org.apache.shenyu.common.enums.ConfigGroupEnum;
-import org.apache.shenyu.common.utils.GsonUtils;
import org.apache.shenyu.sync.data.api.AuthDataSubscriber;
+import org.apache.shenyu.sync.data.api.DiscoveryUpstreamDataSubscriber;
import org.apache.shenyu.sync.data.api.MetaDataSubscriber;
import org.apache.shenyu.sync.data.api.PluginDataSubscriber;
-import org.junit.jupiter.api.BeforeEach;
+import org.apache.shenyu.sync.data.api.ProxySelectorDataSubscriber;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
-import java.lang.reflect.Field;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.ArrayList;
import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.core.Is.is;
-import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -61,221 +40,29 @@ import static org.mockito.Mockito.when;
*/
@ExtendWith(MockitoExtension.class)
public class EtcdSyncDataServiceTest {
-
- private static final String MOCK_PLUGIN_PATH = "/shenyu/plugin/divide";
-
- private static final String MOCK_PLUGIN_NAME = "divide";
-
- private EtcdSyncDataService etcdSyncDataService;
-
- private PluginData pluginData;
-
- @Mock
- private EtcdClient etcdClient;
-
- @Mock
- private Client client;
-
- @Mock
- private Watch.Watcher watcher;
-
- @BeforeEach
- public void setUp() {
- pluginData =
PluginData.builder().name(MOCK_PLUGIN_NAME).enabled(Boolean.FALSE).build();
- }
-
- @Test
- public void testWatchPluginWhenInit() throws ExecutionException,
InterruptedException {
- final List<PluginData> subscribeList = new ArrayList<>(1);
- when(etcdClient.getChildrenKeys(anyString(),
anyString())).thenReturn(Collections.singletonList("sign"));
-
when(etcdClient.get(anyString())).thenReturn(GsonUtils.getInstance().toJson(pluginData));
- etcdSyncDataService = new EtcdSyncDataService(etcdClient, new
PluginDataSubscriber() {
- @Override
- public void onSubscribe(final PluginData pluginData) {
- subscribeList.add(pluginData);
- }
- }, Collections.emptyList(), Collections.emptyList(),
Collections.emptyList(), Collections.emptyList());
- assertThat(subscribeList.size(), is(1));
- assertThat(subscribeList.get(0).getName(), is("divide"));
- }
-
- @Test
- public void deletePluginTest() throws NoSuchMethodException,
InvocationTargetException, IllegalAccessException {
- final List<PluginData> subscribeList = new ArrayList<>(1);
- etcdSyncDataService = new EtcdSyncDataService(etcdClient, new
PluginDataSubscriber() {
- @Override
- public void onSubscribe(final PluginData pluginData) {
- subscribeList.add(pluginData);
- }
-
- @Override
- public void unSubscribe(final PluginData pluginData) {
- final PluginData pluginDataDel = subscribeList.stream()
- .filter(pluginDataSource ->
pluginDataSource.getName().equals(pluginData.getName()))
- .findFirst().orElse(null);
- subscribeList.remove(pluginDataDel);
- }
- }, Collections.emptyList(), Collections.emptyList(),
Collections.emptyList(), Collections.emptyList());
- subscribeList.clear();
- final Method deletePlugin =
EtcdSyncDataService.class.getDeclaredMethod("deletePlugin", String.class);
- final Method cachePluginData =
EtcdSyncDataService.class.getDeclaredMethod("cachePluginData", String.class);
- deletePlugin.setAccessible(true);
- cachePluginData.setAccessible(true);
- final PluginData pluginData = new PluginData();
- pluginData.setName("pluginName");
- cachePluginData.invoke(etcdSyncDataService,
GsonUtils.getInstance().toJson(pluginData));
- assertEquals(subscribeList.get(0).getName(), "pluginName");
- deletePlugin.invoke(etcdSyncDataService, "pluginName");
- assertTrue(subscribeList.isEmpty());
- }
-
- @Test
- public void deleteMetaDataTest() throws NoSuchMethodException,
InvocationTargetException, IllegalAccessException {
- final List<MetaData> subscribeList = new ArrayList<>(1);
- etcdSyncDataService = new EtcdSyncDataService(etcdClient,
mock(PluginDataSubscriber.class),
- Collections.singletonList(new MetaDataSubscriber() {
- @Override
- public void onSubscribe(final MetaData metaData) {
- subscribeList.add(metaData);
- }
-
- @Override
- public void unSubscribe(final MetaData metaData) {
- final MetaData metaDataDel = subscribeList.stream()
- .filter(metaDataSource ->
metaDataSource.getId().equals(metaData.getId()))
- .findFirst().orElse(null);
- subscribeList.remove(metaDataDel);
- }
- }), Collections.emptyList(), Collections.emptyList(),
Collections.emptyList());
- subscribeList.clear();
- final Method cacheMetaData =
EtcdSyncDataService.class.getDeclaredMethod("cacheMetaData", String.class);
- final Method deleteMetaData =
EtcdSyncDataService.class.getDeclaredMethod("unCacheMetaData", MetaData.class);
- cacheMetaData.setAccessible(true);
- deleteMetaData.setAccessible(true);
- MetaData metaData = new MetaData();
- metaData.setId("metaDataId");
- cacheMetaData.invoke(etcdSyncDataService,
GsonUtils.getInstance().toJson(metaData));
- assertEquals(subscribeList.get(0).getId(), metaData.getId());
- deleteMetaData.invoke(etcdSyncDataService, metaData);
- assertTrue(subscribeList.isEmpty());
- }
-
- @Test
- public void authDataTest() throws NoSuchMethodException,
InvocationTargetException, IllegalAccessException {
- final List<AppAuthData> subscribeList = new ArrayList<>(1);
- etcdSyncDataService = new EtcdSyncDataService(etcdClient,
mock(PluginDataSubscriber.class), Collections.emptyList(),
- Collections.singletonList(new AuthDataSubscriber() {
- @Override
- public void onSubscribe(final AppAuthData metaData) {
- subscribeList.add(metaData);
- }
-
- @Override
- public void unSubscribe(final AppAuthData appAuthData) {
- final AppAuthData appAuthDataOld =
subscribeList.stream()
- .filter(appAuthDataSource ->
appAuthDataSource.getAppKey().equals(appAuthData.getAppKey()))
- .findFirst().orElse(null);
- subscribeList.remove(appAuthDataOld);
- }
- }), Collections.emptyList(), Collections.emptyList());
- subscribeList.clear();
- final Method cacheAuthData =
EtcdSyncDataService.class.getDeclaredMethod("cacheAuthData", String.class);
- final Method unCacheAuthData =
EtcdSyncDataService.class.getDeclaredMethod("unCacheAuthData", String.class);
- cacheAuthData.setAccessible(true);
- unCacheAuthData.setAccessible(true);
- AppAuthData appAuthData = new AppAuthData();
- appAuthData.setAppKey("appKeyValue");
- cacheAuthData.invoke(etcdSyncDataService,
GsonUtils.getInstance().toJson(appAuthData));
- assertEquals(subscribeList.get(0).getAppKey(),
appAuthData.getAppKey());
- unCacheAuthData.invoke(etcdSyncDataService, String.join("/",
DefaultPathConstants.APP_AUTH_PARENT, appAuthData.getAppKey()));
- assertTrue(subscribeList.isEmpty());
- }
@Test
- public void closeTest() {
- etcdSyncDataService = new EtcdSyncDataService(etcdClient,
mock(PluginDataSubscriber.class), Collections.emptyList(),
- Collections.emptyList(), Collections.emptyList(),
Collections.emptyList());
- assertDoesNotThrow(() -> etcdSyncDataService.close());
- }
-
- @Test
- public void ruleTest() throws NoSuchMethodException,
InvocationTargetException, IllegalAccessException, NoSuchFieldException {
- this.commonTest(ConfigGroupEnum.RULE,
"/shenyu/rule/divide/selectorId-ruleId");
- }
-
- @Test
- public void selectorTest() throws NoSuchMethodException,
InvocationTargetException, IllegalAccessException, NoSuchFieldException {
- this.commonTest(ConfigGroupEnum.SELECTOR,
"/shenyu/selector/divide/selectorId");
- }
-
- @Test
- public void metaDataTest() throws NoSuchMethodException,
InvocationTargetException, IllegalAccessException, NoSuchFieldException {
- this.commonTest(ConfigGroupEnum.META_DATA,
"/shenyu/metaData/divide/metaDataId");
- }
-
- @Test
- public void discoverUpstreamTest() throws NoSuchMethodException,
InvocationTargetException, IllegalAccessException, NoSuchFieldException {
- this.commonTest(ConfigGroupEnum.DISCOVER_UPSTREAM,
"/shenyu/discoveryUpstream/divide/id");
- }
-
- @Test
- public void proxySelectorTest() throws NoSuchMethodException,
InvocationTargetException, IllegalAccessException, NoSuchFieldException {
- this.commonTest(ConfigGroupEnum.PROXY_SELECTOR,
"/shenyu/proxySelectorData/divide/id");
- }
-
- @Test
- public void appAuthTest() throws NoSuchMethodException,
InvocationTargetException, IllegalAccessException, NoSuchFieldException {
- this.commonTest(ConfigGroupEnum.APP_AUTH,
"/shenyu/appAuth/divide/appAuthId");
- }
-
- /**
- * commonTest.
- * @param configGroupEnum configGroupEnum
- * @param key key
- * @throws NoSuchMethodException NoSuchMethodException
- * @throws InvocationTargetException InvocationTargetException
- * @throws IllegalAccessException IllegalAccessException
- * @throws NoSuchFieldException NoSuchFieldException
- */
- public void commonTest(final ConfigGroupEnum configGroupEnum, final String
key) throws NoSuchMethodException, InvocationTargetException,
IllegalAccessException, NoSuchFieldException {
- final EtcdClient mockEtcdClient = mock(EtcdClient.class);
- etcdSyncDataService = new EtcdSyncDataService(etcdClient,
- mock(PluginDataSubscriber.class),
- Collections.emptyList(),
- Collections.emptyList(), Collections.emptyList(),
Collections.emptyList());
- final Field etcdClient =
EtcdSyncDataService.class.getDeclaredField("etcdClient");
- etcdClient.setAccessible(true);
- etcdClient.set(etcdSyncDataService, mockEtcdClient);
- final Method subscribeChildChanges =
EtcdSyncDataService.class.getDeclaredMethod("subscribeChildChanges",
ConfigGroupEnum.class, String.class);
- subscribeChildChanges.setAccessible(true);
-
- final List<BiConsumer<String, String>> biConsumers = new
ArrayList<>(4);
- doAnswer(invocation -> {
- biConsumers.add(invocation.getArgument(1));
- return true;
- }).when(mockEtcdClient).watchChildChange(anyString(), any(), any());
-
- final List<BiConsumer<String, String>> dataBiConsumers = new
ArrayList<>(4);
- final List<Consumer<String>> dataConsumers = new ArrayList<>(4);
- doAnswer(invocation -> {
- dataBiConsumers.add(invocation.getArgument(1));
- dataConsumers.add(invocation.getArgument(2));
- return true;
- }).when(mockEtcdClient).watchDataChange(anyString(), any(), any());
-
- subscribeChildChanges.invoke(etcdSyncDataService, configGroupEnum,
"groupParentPath");
- // hit default
- assertThrows(InvocationTargetException.class, () ->
subscribeChildChanges.invoke(etcdSyncDataService, ConfigGroupEnum.PLUGIN,
"groupParentPath"));
-
- biConsumers.forEach(biConsumer -> {
- biConsumer.accept("updatePath", "{}");
- });
- dataConsumers.forEach(consumer -> {
- consumer.accept(key);
- });
- dataBiConsumers.forEach(biConsumer -> {
- biConsumer.accept("updatePath", "{}");
- });
- mockEtcdClient.close();
+ public void testZookeeperInstanceRegisterRepository() throws Exception {
+
+ EtcdClient etcdClient = mock(EtcdClient.class);
+ PluginDataSubscriber pluginDataSubscriber =
mock(PluginDataSubscriber.class);
+ doAnswer(invocationOnMock -> {
+ BiConsumer<String, String> updateHandler =
invocationOnMock.getArgument(1);
+ updateHandler.accept("updateData", "{}");
+ Consumer<String> deleteHandler = invocationOnMock.getArgument(2);
+ String pluginName = invocationOnMock.getArgument(0);
+ deleteHandler.accept(pluginName + "removeKey");
+ return null;
+ }).when(etcdClient).watchChildChange(any(), any(), any());
+ when(etcdClient.getChildrenKeys(any(),
any())).thenReturn(Collections.singletonList("test"));
+ final AuthDataSubscriber authDataSubscriber =
mock(AuthDataSubscriber.class);
+ final MetaDataSubscriber metaDataSubscriber =
mock(MetaDataSubscriber.class);
+ final ProxySelectorDataSubscriber proxySelectorDataSubscriber =
mock(ProxySelectorDataSubscriber.class);
+ final DiscoveryUpstreamDataSubscriber discoveryUpstreamDataSubscriber
= mock(DiscoveryUpstreamDataSubscriber.class);
+ final EtcdSyncDataService zookeeperSyncDataService = new
EtcdSyncDataService(etcdClient,
+ pluginDataSubscriber,
Collections.singletonList(metaDataSubscriber),
Collections.singletonList(authDataSubscriber),
+ Collections.singletonList(proxySelectorDataSubscriber),
Collections.singletonList(discoveryUpstreamDataSubscriber));
+
+ zookeeperSyncDataService.close();
}
}
diff --git
a/shenyu-sync-data-center/shenyu-sync-data-zookeeper/src/main/java/org/apache/shenyu/sync/data/zookeeper/ZookeeperSyncDataService.java
b/shenyu-sync-data-center/shenyu-sync-data-zookeeper/src/main/java/org/apache/shenyu/sync/data/zookeeper/ZookeeperSyncDataService.java
index 874a4d4c42..c87a2aebc8 100644
---
a/shenyu-sync-data-center/shenyu-sync-data-zookeeper/src/main/java/org/apache/shenyu/sync/data/zookeeper/ZookeeperSyncDataService.java
+++
b/shenyu-sync-data-center/shenyu-sync-data-zookeeper/src/main/java/org/apache/shenyu/sync/data/zookeeper/ZookeeperSyncDataService.java
@@ -17,54 +17,28 @@
package org.apache.shenyu.sync.data.zookeeper;
-import com.google.common.base.Splitter;
import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
-import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.shenyu.common.constant.DefaultPathConstants;
-import org.apache.shenyu.common.dto.AppAuthData;
-import org.apache.shenyu.common.dto.DiscoverySyncData;
-import org.apache.shenyu.common.dto.PluginData;
-import org.apache.shenyu.common.dto.RuleData;
-import org.apache.shenyu.common.dto.SelectorData;
-import org.apache.shenyu.common.dto.MetaData;
-import org.apache.shenyu.common.dto.ProxySelectorData;
-import org.apache.shenyu.common.exception.ShenyuException;
-import org.apache.shenyu.common.utils.GsonUtils;
import org.apache.shenyu.sync.data.api.AuthDataSubscriber;
import org.apache.shenyu.sync.data.api.DiscoveryUpstreamDataSubscriber;
import org.apache.shenyu.sync.data.api.MetaDataSubscriber;
import org.apache.shenyu.sync.data.api.PluginDataSubscriber;
import org.apache.shenyu.sync.data.api.ProxySelectorDataSubscriber;
-import org.apache.shenyu.sync.data.api.SyncDataService;
+import org.apache.shenyu.sync.data.core.AbstractNodeDataSyncService;
-import java.io.UnsupportedEncodingException;
-import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Objects;
-import java.util.Optional;
/**
* this cache data with zookeeper.
*/
-public class ZookeeperSyncDataService implements SyncDataService {
+public class ZookeeperSyncDataService extends AbstractNodeDataSyncService {
private final ZookeeperClient zkClient;
- private final PluginDataSubscriber pluginDataSubscriber;
-
- private final List<MetaDataSubscriber> metaDataSubscribers;
-
- private final List<AuthDataSubscriber> authDataSubscribers;
-
- private final List<ProxySelectorDataSubscriber>
proxySelectorDataSubscribers;
-
- private final List<DiscoveryUpstreamDataSubscriber>
discoveryUpstreamDataSubscribers;
-
/**
* Instantiates a new Zookeeper cache manager.
*
@@ -79,134 +53,24 @@ public class ZookeeperSyncDataService implements
SyncDataService {
final List<AuthDataSubscriber>
authDataSubscribers,
final List<ProxySelectorDataSubscriber>
proxySelectorDataSubscribers,
final
List<DiscoveryUpstreamDataSubscriber> discoveryUpstreamDataSubscribers) {
+ super(pluginDataSubscriber, metaDataSubscribers, authDataSubscribers,
proxySelectorDataSubscribers, discoveryUpstreamDataSubscribers);
this.zkClient = zkClient;
- this.pluginDataSubscriber = pluginDataSubscriber;
- this.metaDataSubscribers = metaDataSubscribers;
- this.authDataSubscribers = authDataSubscribers;
- this.proxySelectorDataSubscribers = proxySelectorDataSubscribers;
- this.discoveryUpstreamDataSubscribers =
discoveryUpstreamDataSubscribers;
watcherData();
- watchAppAuth();
- watchMetaData();
}
private void watcherData() {
- zkClient.addCache(DefaultPathConstants.PLUGIN_PARENT, new
PluginCacheListener());
- zkClient.addCache(DefaultPathConstants.SELECTOR_PARENT, new
SelectorCacheListener());
- zkClient.addCache(DefaultPathConstants.RULE_PARENT, new
RuleCacheListener());
- zkClient.addCache(DefaultPathConstants.PROXY_SELECTOR, new
ProxySelectorCacheListener());
- zkClient.addCache(DefaultPathConstants.DISCOVERY_UPSTREAM, new
DiscoveryUpstreamCacheListener());
- }
-
- private void watchAppAuth() {
- zkClient.addCache(DefaultPathConstants.APP_AUTH_PARENT, new
AuthCacheListener());
- }
-
- private void watchMetaData() {
- zkClient.addCache(DefaultPathConstants.META_DATA, new
MetadataCacheListener());
- }
-
- private void cachePluginData(final PluginData pluginData) {
- Optional.ofNullable(pluginData)
- .flatMap(data -> Optional.ofNullable(pluginDataSubscriber))
- .ifPresent(e -> e.onSubscribe(pluginData));
- }
-
- private void cacheSelectorData(final SelectorData selectorData) {
- Optional.ofNullable(selectorData)
- .ifPresent(data -> Optional.ofNullable(pluginDataSubscriber)
- .ifPresent(e -> e.onSelectorSubscribe(data)));
- }
-
- private void unCacheSelectorData(final String dataPath) {
- SelectorData selectorData = new SelectorData();
- final String selectorId = dataPath.substring(dataPath.lastIndexOf("/")
+ 1);
- final String str =
dataPath.substring(DefaultPathConstants.SELECTOR_PARENT.length());
- final int pluginNameIndex = str.length() - selectorId.length() - 1;
- if (pluginNameIndex <= 0) {
- return;
- }
- final String pluginName = str.substring(1, pluginNameIndex);
- selectorData.setPluginName(pluginName);
- selectorData.setId(selectorId);
-
- Optional.ofNullable(pluginDataSubscriber)
- .ifPresent(e -> e.unSelectorSubscribe(selectorData));
- }
-
- private void cacheRuleData(final RuleData ruleData) {
- Optional.ofNullable(ruleData)
- .ifPresent(data -> Optional.ofNullable(pluginDataSubscriber)
- .ifPresent(e -> e.onRuleSubscribe(data)));
- }
-
- private void unCacheRuleData(final String dataPath) {
- String ruleDataId = dataPath.substring(dataPath.lastIndexOf("/") + 1);
- final String str =
dataPath.substring(DefaultPathConstants.RULE_PARENT.length());
- final int pluginNameIndex = str.length() - ruleDataId.length() - 1;
- if (pluginNameIndex <= 0) {
- return;
- }
- final String pluginName = str.substring(1, pluginNameIndex);
- final List<String> list =
Lists.newArrayList(Splitter.on(DefaultPathConstants.SELECTOR_JOIN_RULE).split(ruleDataId));
-
- RuleData ruleData = new RuleData();
- ruleData.setPluginName(pluginName);
- ruleData.setSelectorId(list.get(0));
- ruleData.setId(list.get(1));
-
- Optional.ofNullable(pluginDataSubscriber)
- .ifPresent(e -> e.unRuleSubscribe(ruleData));
- }
-
- private void cacheAuthData(final AppAuthData appAuthData) {
- Optional.ofNullable(appAuthData)
- .ifPresent(data -> authDataSubscribers.forEach(e ->
e.onSubscribe(data)));
- }
-
- private void unCacheAuthData(final String dataPath) {
- final String key =
dataPath.substring(DefaultPathConstants.APP_AUTH_PARENT.length() + 1);
- AppAuthData appAuthData = new AppAuthData();
- appAuthData.setAppKey(key);
- authDataSubscribers.forEach(e -> e.unSubscribe(appAuthData));
- }
-
- private void cacheMetaData(final MetaData metaData) {
- Optional.ofNullable(metaData)
- .ifPresent(data -> metaDataSubscribers.forEach(e ->
e.onSubscribe(metaData)));
- }
-
- private void cacheProxySelectorData(final ProxySelectorData
proxySelectorData) {
- Optional.ofNullable(proxySelectorData)
- .ifPresent(data -> proxySelectorDataSubscribers.forEach(e ->
e.onSubscribe(proxySelectorData)));
- }
-
- private void cacheDiscoveryUpstreamData(final DiscoverySyncData
upstreamDataList) {
- Optional.ofNullable(discoveryUpstreamDataSubscribers)
- .ifPresent(data -> discoveryUpstreamDataSubscribers.forEach(e
-> e.onSubscribe(upstreamDataList)));
- }
-
- private void unCacheMetaData(final MetaData metaData) {
- Optional.ofNullable(metaData)
- .ifPresent(data -> metaDataSubscribers.forEach(e ->
e.unSubscribe(metaData)));
- }
-
- private void unCacheProxySelectorData(final ProxySelectorData
proxySelectorData) {
- Optional.ofNullable(proxySelectorData)
- .ifPresent(data -> proxySelectorDataSubscribers.forEach(e ->
e.unSubscribe(proxySelectorData)));
- }
-
- @Override
- public void close() {
- if (Objects.nonNull(zkClient)) {
- zkClient.close();
- }
- }
-
- abstract static class AbstractDataSyncListener implements
TreeCacheListener {
- @Override
- public final void childEvent(final CuratorFramework client, final
TreeCacheEvent event) {
- ChildData childData = event.getData();
+ watcherData0(DefaultPathConstants.PLUGIN_PARENT);
+ watcherData0(DefaultPathConstants.SELECTOR_PARENT);
+ watcherData0(DefaultPathConstants.RULE_PARENT);
+ watcherData0(DefaultPathConstants.PROXY_SELECTOR);
+ watcherData0(DefaultPathConstants.DISCOVERY_UPSTREAM);
+ watcherData0(DefaultPathConstants.APP_AUTH_PARENT);
+ watcherData0(DefaultPathConstants.META_DATA);
+ }
+
+ private void watcherData0(final String registerPath) {
+ zkClient.addCache(registerPath, (curatorFramework, treeCacheEvent) -> {
+ ChildData childData = treeCacheEvent.getData();
if (null == childData) {
return;
}
@@ -214,181 +78,21 @@ public class ZookeeperSyncDataService implements
SyncDataService {
if (Strings.isNullOrEmpty(path)) {
return;
}
- event(event.getType(), path, childData);
- }
-
- /**
- * data sync event.
- *
- * @param type tree cache event type.
- * @param path tree cache event path.
- * @param data tree cache event data.
- */
- protected abstract void event(TreeCacheEvent.Type type, String path,
ChildData data);
- }
-
- class PluginCacheListener extends AbstractDataSyncListener {
-
- @Override
- public void event(final TreeCacheEvent.Type type, final String path,
final ChildData data) {
- // if not uri register path, return.
- if (!path.contains(DefaultPathConstants.PLUGIN_PARENT)) {
- return;
- }
-
- String pluginName = path.substring(path.lastIndexOf("/") + 1);
-
- // delete a plugin
- if (type.equals(TreeCacheEvent.Type.NODE_REMOVED)) {
- final PluginData pluginData = new PluginData();
- pluginData.setName(pluginName);
- Optional.ofNullable(pluginDataSubscriber).ifPresent(e ->
e.unSubscribe(pluginData));
- return;
- }
-
- // create or update
- Optional.ofNullable(data)
- .ifPresent(e ->
cachePluginData(GsonUtils.getInstance().fromJson(new String(data.getData(),
StandardCharsets.UTF_8), PluginData.class)));
- }
- }
-
- class SelectorCacheListener extends AbstractDataSyncListener {
-
- @Override
- public void event(final TreeCacheEvent.Type type, final String path,
final ChildData data) {
-
- // if not uri register path, return.
- if (!path.contains(DefaultPathConstants.SELECTOR_PARENT)) {
- return;
- }
-
- if (type.equals(TreeCacheEvent.Type.NODE_REMOVED)) {
- unCacheSelectorData(path);
- return;
- }
-
- // create or update
- Optional.ofNullable(data)
- .ifPresent(e ->
cacheSelectorData(GsonUtils.getInstance().fromJson(new String(data.getData(),
StandardCharsets.UTF_8), SelectorData.class)));
- }
- }
-
- class MetadataCacheListener extends AbstractDataSyncListener {
-
- @Override
- public void event(final TreeCacheEvent.Type type, final String path,
final ChildData data) {
- // if not uri register path, return.
- if (!path.contains(DefaultPathConstants.META_DATA)) {
- return;
- }
-
- if (type.equals(TreeCacheEvent.Type.NODE_REMOVED)) {
- final String realPath =
path.substring(DefaultPathConstants.META_DATA.length() + 1);
- MetaData metaData = new MetaData();
- try {
- metaData.setPath(URLDecoder.decode(realPath,
StandardCharsets.UTF_8.name()));
- } catch (UnsupportedEncodingException e) {
- throw new ShenyuException(e);
- }
- unCacheMetaData(metaData);
- return;
- }
-
- // create or update
- Optional.ofNullable(data)
- .ifPresent(e ->
cacheMetaData(GsonUtils.getInstance().fromJson(new String(data.getData(),
StandardCharsets.UTF_8), MetaData.class)));
- }
- }
-
- class AuthCacheListener extends AbstractDataSyncListener {
-
- @Override
- public void event(final TreeCacheEvent.Type type, final String path,
final ChildData data) {
- // if not uri register path, return.
- if (!path.contains(DefaultPathConstants.APP_AUTH_PARENT)) {
- return;
- }
-
- if (type.equals(TreeCacheEvent.Type.NODE_REMOVED)) {
- unCacheAuthData(path);
- return;
- }
-
- // create or update
- Optional.ofNullable(data)
- .ifPresent(e ->
cacheAuthData(GsonUtils.getInstance().fromJson(new String(data.getData(),
StandardCharsets.UTF_8), AppAuthData.class)));
- }
- }
-
- class RuleCacheListener extends AbstractDataSyncListener {
-
- @Override
- public void event(final TreeCacheEvent.Type type, final String path,
final ChildData data) {
// if not uri register path, return.
- if (!path.contains(DefaultPathConstants.RULE_PARENT)) {
- return;
- }
- if (type.equals(TreeCacheEvent.Type.NODE_REMOVED)) {
- unCacheRuleData(path);
+ if (!path.contains(registerPath)) {
return;
}
- // create or update
- Optional.ofNullable(data)
- .ifPresent(e ->
cacheRuleData(GsonUtils.getInstance().fromJson(new String(data.getData(),
StandardCharsets.UTF_8), RuleData.class)));
- }
+ EventType eventType =
treeCacheEvent.getType().equals(TreeCacheEvent.Type.NODE_REMOVED) ?
EventType.DELETE : EventType.PUT;
+ final String updateData = childData.getData() != null ? new
String(childData.getData(), StandardCharsets.UTF_8) : null;
+ this.event(path, updateData, registerPath, eventType);
+ });
}
- class ProxySelectorCacheListener extends AbstractDataSyncListener {
-
- @Override
- protected void event(final TreeCacheEvent.Type type, final String
path, final ChildData data) {
- // if not uri register path, return.
- if (!path.contains(DefaultPathConstants.PROXY_SELECTOR)) {
- return;
- }
- String[] pathInfoArray = path.split("/");
- if (pathInfoArray.length != 5) {
- return;
- }
- String pluginName = pathInfoArray[pathInfoArray.length - 2];
- String proxySelectorName = pathInfoArray[pathInfoArray.length - 1];
- if (type.equals(TreeCacheEvent.Type.NODE_REMOVED)) {
- ProxySelectorData proxySelectorData = new ProxySelectorData();
- proxySelectorData.setPluginName(pluginName);
- proxySelectorData.setName(proxySelectorName);
- unCacheProxySelectorData(proxySelectorData);
- return;
- }
- ProxySelectorData proxySelectorData =
GsonUtils.getInstance().fromJson(new String(data.getData(),
StandardCharsets.UTF_8), ProxySelectorData.class);
- proxySelectorData.setName(proxySelectorName);
- proxySelectorData.setPluginName(pluginName);
- // create or update
- Optional.ofNullable(data)
- .ifPresent(e -> cacheProxySelectorData(proxySelectorData));
-
- }
- }
-
- class DiscoveryUpstreamCacheListener extends AbstractDataSyncListener {
-
- @Override
- protected void event(final TreeCacheEvent.Type type, final String
path, final ChildData data) {
- // if not uri register path, return.
- if (!path.contains(DefaultPathConstants.DISCOVERY_UPSTREAM)) {
- return;
- }
- String[] pathInfoArray = path.split("/");
- if (pathInfoArray.length != 5) {
- return;
- }
- // only support update
- if (type.equals(TreeCacheEvent.Type.NODE_UPDATED)) {
- DiscoverySyncData discoverySyncData =
GsonUtils.getInstance().fromJson(new String(data.getData(),
StandardCharsets.UTF_8), DiscoverySyncData.class);
- // create or update
- Optional.ofNullable(data)
- .ifPresent(e ->
cacheDiscoveryUpstreamData(discoverySyncData));
- }
+ @Override
+ public void close() {
+ if (Objects.nonNull(zkClient)) {
+ zkClient.close();
}
}