This is an automated email from the ASF dual-hosted git repository.
hefengen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shenyu.git
The following commit(s) were added to refs/heads/master by this push:
new a152a98975 [type: feature] add nacos, etcd, eureka discovery service
(#5193)
a152a98975 is described below
commit a152a98975549f28662a1cd5242c6ebc7ff575cb
Author: lulu <[email protected]>
AuthorDate: Sun Oct 29 21:37:05 2023 +0800
[type: feature] add nacos, etcd, eureka discovery service (#5193)
* save current change
* save current change
* save current change
* [type:feat] add nacos, eureka, etcd discovery service
* [type:fix] fix checkstyle bug
* [type:fix] fix set calculation bug
* [type:fix] fix pom.xml dependency bug
* [type:fix] fix pom.xml dependency bug
* [type:fix] fix pom.xml dependency bug
* [type:fix] test pom.xml dependency bug
* [type:fix] test pom.xml dependency bug
* [type:fix] test eureka pom.xml dependency bug
* [type:fix] fix some bug
* [type:fix] fix eurekaCLient register bug
* [type:fix] change log content
---------
Co-authored-by: xiaoyu <[email protected]>
Co-authored-by: moremind <[email protected]>
Co-authored-by: yunlongn <[email protected]>
---
pom.xml | 6 -
shenyu-admin/pom.xml | 23 +-
.../admin/discovery/DefaultDiscoveryProcessor.java | 6 +-
.../shenyu/admin/discovery/DiscoveryMode.java | 6 +-
.../admin/discovery/DiscoveryProcessorHolder.java | 6 +
shenyu-discovery/pom.xml | 3 +
.../discovery/api/ShenyuDiscoveryService.java | 12 +-
shenyu-discovery/shenyu-discovery-etcd/pom.xml | 87 +++++++
.../discovery/etcd/EtcdDiscoveryService.java | 220 ++++++++++++++++++
...che.shenyu.discovery.api.ShenyuDiscoveryService | 18 ++
.../pom.xml | 23 +-
.../discovery/eureka/CustomedEurekaConfig.java | 106 +++++++++
.../discovery/eureka/EurekaDiscoveryService.java | 258 +++++++++++++++++++++
...che.shenyu.discovery.api.ShenyuDiscoveryService | 17 ++
.../pom.xml | 20 +-
.../discovery/nacos/NacosDiscoveryService.java | 239 +++++++++++++++++++
...che.shenyu.discovery.api.ShenyuDiscoveryService | 17 ++
.../shenyu-discovery-zookeeper/pom.xml | 4 +-
.../zookeeper/ZookeeperDiscoveryService.java | 8 +-
19 files changed, 1030 insertions(+), 49 deletions(-)
diff --git a/pom.xml b/pom.xml
index 315d017c93..1ab1375aa4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -197,12 +197,6 @@
<scope>import</scope>
</dependency>
- <dependency>
- <groupId>com.alibaba.nacos</groupId>
- <artifactId>nacos-client</artifactId>
- <version>${nacos-client.version}</version>
- </dependency>
-
<dependency>
<groupId>org.ow2.asm</groupId>
<artifactId>asm-tree</artifactId>
diff --git a/shenyu-admin/pom.xml b/shenyu-admin/pom.xml
index a5465ab0a4..0a08b975cc 100644
--- a/shenyu-admin/pom.xml
+++ b/shenyu-admin/pom.xml
@@ -174,11 +174,6 @@
<artifactId>pagehelper</artifactId>
</dependency>
- <dependency>
- <groupId>com.alibaba.nacos</groupId>
- <artifactId>nacos-client</artifactId>
- </dependency>
-
<dependency>
<groupId>com.tencent.polaris</groupId>
<artifactId>polaris-all</artifactId>
@@ -313,6 +308,24 @@
<artifactId>shenyu-discovery-zookeeper</artifactId>
<version>${project.version}</version>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.shenyu</groupId>
+ <artifactId>shenyu-discovery-etcd</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.shenyu</groupId>
+ <artifactId>shenyu-discovery-eureka</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.shenyu</groupId>
+ <artifactId>shenyu-discovery-nacos</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
<profiles>
diff --git
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/DefaultDiscoveryProcessor.java
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/DefaultDiscoveryProcessor.java
index 99a4513d9f..6d5670293c 100644
---
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/DefaultDiscoveryProcessor.java
+++
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/DefaultDiscoveryProcessor.java
@@ -122,14 +122,14 @@ public class DefaultDiscoveryProcessor implements
DiscoveryProcessor, Applicatio
if (Objects.isNull(shenyuDiscoveryService)) {
throw new ShenyuAdminException(String.format("before start
ProxySelector you need init DiscoveryId=%s",
discoveryHandlerDTO.getDiscoveryId()));
}
- if (!shenyuDiscoveryService.exits(key)) {
+ if (!shenyuDiscoveryService.exists(key)) {
throw new ShenyuAdminException(String.format("shenyu discovery
start watcher need you has this key %s in Discovery", key));
}
Set<String> cacheKey =
dataChangedEventListenerCache.get(discoveryHandlerDTO.getDiscoveryId());
if (Objects.nonNull(cacheKey) && cacheKey.contains(key)) {
throw new ShenyuAdminException(String.format("shenyu discovery has
watcher key = %s", key));
}
- shenyuDiscoveryService.watcher(key,
getDiscoveryDataChangedEventListener(discoveryHandlerDTO, proxySelectorDTO));
+ shenyuDiscoveryService.watch(key,
getDiscoveryDataChangedEventListener(discoveryHandlerDTO, proxySelectorDTO));
cacheKey.add(key);
DataChangedEvent dataChangedEvent = new
DataChangedEvent(ConfigGroupEnum.PROXY_SELECTOR, DataEventTypeEnum.CREATE,
Collections.singletonList(DiscoveryTransfer.INSTANCE.mapToData(proxySelectorDTO)));
@@ -159,7 +159,7 @@ public class DefaultDiscoveryProcessor implements
DiscoveryProcessor, Applicatio
String key =
buildProxySelectorKey(discoveryHandlerDTO.getListenerNode());
Set<String> cacheKey =
dataChangedEventListenerCache.get(discoveryHandlerDTO.getDiscoveryId());
cacheKey.remove(key);
- shenyuDiscoveryService.unWatcher(key);
+ shenyuDiscoveryService.unwatch(key);
DataChangedEvent dataChangedEvent = new
DataChangedEvent(ConfigGroupEnum.PROXY_SELECTOR, DataEventTypeEnum.DELETE,
Collections.singletonList(DiscoveryTransfer.INSTANCE.mapToData(proxySelectorDTO)));
eventPublisher.publishEvent(dataChangedEvent);
diff --git
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/DiscoveryMode.java
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/DiscoveryMode.java
index c610d9ac7e..f3c913bcd1 100644
---
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/DiscoveryMode.java
+++
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/DiscoveryMode.java
@@ -36,5 +36,9 @@ public enum DiscoveryMode {
/**
* Eureka discovery mode.
*/
- EUREKA
+ EUREKA,
+ /**
+ * ETCD discovery mode.
+ */
+ ETCD
}
diff --git
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/DiscoveryProcessorHolder.java
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/DiscoveryProcessorHolder.java
index ea272c1f86..51e378fcd8 100644
---
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/DiscoveryProcessorHolder.java
+++
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/DiscoveryProcessorHolder.java
@@ -41,6 +41,12 @@ public class DiscoveryProcessorHolder {
return localDiscoveryProcessor;
} else if (DiscoveryMode.ZOOKEEPER.name().equalsIgnoreCase(mode)) {
return defaultDiscoveryProcessor;
+ } else if (DiscoveryMode.ETCD.name().equalsIgnoreCase(mode)) {
+ return defaultDiscoveryProcessor;
+ } else if (DiscoveryMode.NACOS.name().equalsIgnoreCase(mode)) {
+ return defaultDiscoveryProcessor;
+ } else if (DiscoveryMode.EUREKA.name().equalsIgnoreCase(mode)) {
+ return defaultDiscoveryProcessor;
} else {
throw new NotImplementedException("shenyu discovery mode current
didn't support " + mode);
}
diff --git a/shenyu-discovery/pom.xml b/shenyu-discovery/pom.xml
index 30c9230f0d..2fd21c078c 100644
--- a/shenyu-discovery/pom.xml
+++ b/shenyu-discovery/pom.xml
@@ -29,6 +29,9 @@
<modules>
<module>shenyu-discovery-api</module>
<module>shenyu-discovery-zookeeper</module>
+ <module>shenyu-discovery-etcd</module>
+ <module>shenyu-discovery-nacos</module>
+ <module>shenyu-discovery-eureka</module>
</modules>
</project>
\ No newline at end of file
diff --git
a/shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/ShenyuDiscoveryService.java
b/shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/ShenyuDiscoveryService.java
index f60f054ab7..9ff40c1774 100644
---
a/shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/ShenyuDiscoveryService.java
+++
b/shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/ShenyuDiscoveryService.java
@@ -37,19 +37,19 @@ public interface ShenyuDiscoveryService {
void init(DiscoveryConfig config);
/**
- * Watcher path , fire data changed event.
+ * Watch path , fire data changed event.
*
* @param key the key
* @param listener the listener
*/
- void watcher(String key, DataChangedEventListener listener);
+ void watch(String key, DataChangedEventListener listener);
/**
- * unWatcher path.
+ * unwatch path.
*
* @param key key
*/
- void unWatcher(String key);
+ void unwatch(String key);
/**
* Register data.
@@ -68,12 +68,12 @@ public interface ShenyuDiscoveryService {
List<String> getRegisterData(String key);
/**
- * exits.
+ * exists.
*
* @param key key
* @return Boolean
*/
- Boolean exits(String key);
+ Boolean exists(String key);
/**
* shutdown.
diff --git a/shenyu-discovery/shenyu-discovery-etcd/pom.xml
b/shenyu-discovery/shenyu-discovery-etcd/pom.xml
new file mode 100644
index 0000000000..188f35607e
--- /dev/null
+++ b/shenyu-discovery/shenyu-discovery-etcd/pom.xml
@@ -0,0 +1,87 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.shenyu</groupId>
+ <artifactId>shenyu-discovery</artifactId>
+ <version>2.6.1-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>shenyu-discovery-etcd</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.shenyu</groupId>
+ <artifactId>shenyu-discovery-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.shenyu</groupId>
+ <artifactId>shenyu-common</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.etcd</groupId>
+ <artifactId>jetcd-core</artifactId>
+ <exclusions>
+ <exclusion>
+ <artifactId>grpc-protobuf</artifactId>
+ <groupId>io.grpc</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>grpc-stub</artifactId>
+ <groupId>io.grpc</groupId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <artifactId>grpc-netty</artifactId>
+ <groupId>io.grpc</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-core</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+ <dependency>
+ <artifactId>grpc-protobuf</artifactId>
+ <groupId>io.grpc</groupId>
+ <version>${grpc.version}</version>
+ </dependency>
+
+ <dependency>
+ <artifactId>grpc-stub</artifactId>
+ <groupId>io.grpc</groupId>
+ <version>${grpc.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-netty</artifactId>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git
a/shenyu-discovery/shenyu-discovery-etcd/src/main/java/org/apache/shenyu/discovery/etcd/EtcdDiscoveryService.java
b/shenyu-discovery/shenyu-discovery-etcd/src/main/java/org/apache/shenyu/discovery/etcd/EtcdDiscoveryService.java
new file mode 100644
index 0000000000..2099ee5b1b
--- /dev/null
+++
b/shenyu-discovery/shenyu-discovery-etcd/src/main/java/org/apache/shenyu/discovery/etcd/EtcdDiscoveryService.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.discovery.etcd;
+
+import io.etcd.jetcd.ByteSequence;
+import io.etcd.jetcd.Client;
+import io.etcd.jetcd.Lease;
+import io.etcd.jetcd.KV;
+import io.etcd.jetcd.Watch;
+import io.etcd.jetcd.kv.GetResponse;
+import io.etcd.jetcd.lease.LeaseKeepAliveResponse;
+import io.etcd.jetcd.options.GetOption;
+import io.etcd.jetcd.options.PutOption;
+import io.etcd.jetcd.options.WatchOption;
+import io.etcd.jetcd.watch.WatchEvent;
+import io.grpc.stub.StreamObserver;
+import org.apache.shenyu.common.exception.ShenyuException;
+import org.apache.shenyu.discovery.api.ShenyuDiscoveryService;
+import org.apache.shenyu.discovery.api.config.DiscoveryConfig;
+import org.apache.shenyu.discovery.api.listener.DataChangedEventListener;
+import org.apache.shenyu.discovery.api.listener.DiscoveryDataChangedEvent;
+import org.apache.shenyu.spi.Join;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+@Join
+public class EtcdDiscoveryService implements ShenyuDiscoveryService {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(EtcdDiscoveryService.class);
+
+ private Client etcdClient;
+
+ private final ConcurrentMap<String, Watch.Watcher> watchCache = new
ConcurrentHashMap<>();
+
+ private long leaseId;
+
+ private long ttl;
+
+ private long timeout;
+
+ @Override
+ public void init(final DiscoveryConfig config) {
+ Properties props = config.getProps();
+ this.timeout = Long.parseLong(props.getProperty("etcdTimeout",
"3000"));
+ this.ttl = Long.parseLong(props.getProperty("etcdTTL", "5"));
+ if (Objects.isNull(etcdClient)) {
+ this.etcdClient =
Client.builder().endpoints(config.getServerList().split(",")).build();
+ LOGGER.info("Etcd Discovery Service initialize successfully");
+ }
+ if (leaseId == 0) {
+ initLease();
+ }
+ }
+
+ private void initLease() {
+ try {
+ Lease lease = etcdClient.getLeaseClient();
+ this.leaseId = lease.grant(ttl).get().getID();
+ lease.keepAlive(leaseId, new
StreamObserver<LeaseKeepAliveResponse>() {
+ @Override
+ public void onNext(final LeaseKeepAliveResponse
leaseKeepAliveResponse) {
+ }
+
+ @Override
+ public void onError(final Throwable throwable) {
+ LOGGER.error("etcd lease keep alive error", throwable);
+ }
+
+ @Override
+ public void onCompleted() {
+ }
+ });
+ } catch (InterruptedException | ExecutionException e) {
+ LOGGER.error("initLease error.", e);
+ throw new ShenyuException(e);
+ }
+ }
+
+ @Override
+ public void watch(final String key, final DataChangedEventListener
listener) {
+ if (!this.watchCache.containsKey(key)) {
+ try {
+ Watch watch = etcdClient.getWatchClient();
+ WatchOption option =
WatchOption.newBuilder().isPrefix(true).build();
+
+ Watch.Watcher watcher = watch.watch(bytesOf(key), option,
Watch.listener(response -> {
+ for (WatchEvent event : response.getEvents()) {
+ DiscoveryDataChangedEvent dataChangedEvent;
+ // ignore parent node
+ if (event.getKeyValue().getKey().equals(bytesOf(key)))
{
+ return;
+ }
+ String value =
event.getKeyValue().getValue().toString(StandardCharsets.UTF_8);
+ String path =
event.getKeyValue().getKey().toString(StandardCharsets.UTF_8);
+
+ if (Objects.nonNull(event.getKeyValue()) &&
Objects.nonNull(value)) {
+ switch (event.getEventType()) {
+ case PUT:
+ dataChangedEvent =
event.getKeyValue().getCreateRevision() == event.getKeyValue().getModRevision()
+ ? new
DiscoveryDataChangedEvent(path, value, DiscoveryDataChangedEvent.Event.ADDED)
+ : new
DiscoveryDataChangedEvent(path, value, DiscoveryDataChangedEvent.Event.UPDATED);
+ break;
+ case DELETE:
+ dataChangedEvent = new
DiscoveryDataChangedEvent(path, value, DiscoveryDataChangedEvent.Event.DELETED);
+ break;
+ default:
+ dataChangedEvent = new
DiscoveryDataChangedEvent(path, value, DiscoveryDataChangedEvent.Event.IGNORED);
+ }
+ listener.onChange(dataChangedEvent);
+ }
+ }
+ }));
+ watchCache.put(key, watcher);
+ LOGGER.info("Added etcd watcher for key: {}", key);
+ } catch (Exception e) {
+ LOGGER.error("etcd client watch key: {} error", key, e);
+ throw new ShenyuException(e);
+ }
+ }
+ }
+
+ @Override
+ public void unwatch(final String key) {
+ if (watchCache.containsKey(key)) {
+ watchCache.remove(key).close();
+ LOGGER.info("Unwatched etcd key: {}", key);
+ }
+ }
+
+ @Override
+ public void register(final String key, final String value) {
+ try {
+ KV kvClient = etcdClient.getKVClient();
+ PutOption putOption =
PutOption.newBuilder().withLeaseId(leaseId).build();
+ kvClient.put(bytesOf(key), bytesOf(value), putOption).get(timeout,
TimeUnit.MILLISECONDS);
+ LOGGER.info("etcd client key: {} with value: {}", key, value);
+ } catch (InterruptedException | ExecutionException | TimeoutException
e) {
+ LOGGER.error("etcd client register (key:{},value:{}) error.", key,
value, e);
+ throw new ShenyuException(e);
+ }
+ }
+
+ @Override
+ public List<String> getRegisterData(final String key) {
+ try {
+ KV kvClient = etcdClient.getKVClient();
+ GetOption option = GetOption.newBuilder().isPrefix(true).build();
+ GetResponse response = kvClient.get(bytesOf(key), option).get();
+ return response.getKvs().stream()
+ .filter(o -> !o.getKey().equals(ByteSequence.from(key,
StandardCharsets.UTF_8)))
+ .map(kv -> kv.getValue().toString(StandardCharsets.UTF_8))
+ .collect(Collectors.toList());
+ } catch (Exception e) {
+ LOGGER.error("etcd client get registered data with key: {} error",
key, e);
+ throw new ShenyuException(e);
+ }
+ }
+
+ @Override
+ public Boolean exists(final String key) {
+ try {
+ KV kvClient = etcdClient.getKVClient();
+ GetOption option =
GetOption.newBuilder().isPrefix(true).withCountOnly(true).build();
+ GetResponse response = kvClient.get(bytesOf(key), option).get();
+ return response.getCount() > 0;
+ } catch (InterruptedException | ExecutionException e) {
+ throw new ShenyuException(e);
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ try {
+ for (Map.Entry<String, Watch.Watcher> entry :
watchCache.entrySet()) {
+ Watch.Watcher watcher = entry.getValue();
+ watcher.close();
+ }
+ if (Objects.nonNull(etcdClient)) {
+ etcdClient.close();
+ }
+ } catch (Exception e) {
+ LOGGER.error("etcd client shutdown error", e);
+ throw new ShenyuException(e);
+ }
+ }
+
+ private ByteSequence bytesOf(final String val) {
+ return ByteSequence.from(val, UTF_8);
+ }
+
+}
diff --git
a/shenyu-discovery/shenyu-discovery-etcd/src/main/resources/META-INF/shenyu/org.apache.shenyu.discovery.api.ShenyuDiscoveryService
b/shenyu-discovery/shenyu-discovery-etcd/src/main/resources/META-INF/shenyu/org.apache.shenyu.discovery.api.ShenyuDiscoveryService
new file mode 100644
index 0000000000..90aad4e7e8
--- /dev/null
+++
b/shenyu-discovery/shenyu-discovery-etcd/src/main/resources/META-INF/shenyu/org.apache.shenyu.discovery.api.ShenyuDiscoveryService
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+etcd=org.apache.shenyu.discovery.etcd.EtcdDiscoveryService
+
diff --git a/shenyu-discovery/shenyu-discovery-zookeeper/pom.xml
b/shenyu-discovery/shenyu-discovery-eureka/pom.xml
similarity index 74%
copy from shenyu-discovery/shenyu-discovery-zookeeper/pom.xml
copy to shenyu-discovery/shenyu-discovery-eureka/pom.xml
index 80207d09ca..08ae89b9da 100644
--- a/shenyu-discovery/shenyu-discovery-zookeeper/pom.xml
+++ b/shenyu-discovery/shenyu-discovery-eureka/pom.xml
@@ -16,35 +16,34 @@
~ limitations under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.shenyu</groupId>
<artifactId>shenyu-discovery</artifactId>
<version>2.6.1-SNAPSHOT</version>
</parent>
- <modelVersion>4.0.0</modelVersion>
- <artifactId>shenyu-discovery-zookeeper</artifactId>
-
+
+ <artifactId>shenyu-discovery-eureka</artifactId>
+
<dependencies>
<dependency>
<groupId>org.apache.shenyu</groupId>
<artifactId>shenyu-discovery-api</artifactId>
<version>${project.version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-framework</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-recipes</artifactId>
- </dependency>
<dependency>
<groupId>org.apache.shenyu</groupId>
<artifactId>shenyu-common</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>com.netflix.eureka</groupId>
+ <artifactId>eureka-client</artifactId>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git
a/shenyu-discovery/shenyu-discovery-eureka/src/main/java/org/apache/shenyu/discovery/eureka/CustomedEurekaConfig.java
b/shenyu-discovery/shenyu-discovery-eureka/src/main/java/org/apache/shenyu/discovery/eureka/CustomedEurekaConfig.java
new file mode 100644
index 0000000000..e4e731e45a
--- /dev/null
+++
b/shenyu-discovery/shenyu-discovery-eureka/src/main/java/org/apache/shenyu/discovery/eureka/CustomedEurekaConfig.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.discovery.eureka;
+
+import com.netflix.appinfo.EurekaInstanceConfig;
+import com.netflix.appinfo.MyDataCenterInstanceConfig;
+import org.apache.commons.lang.StringUtils;
+
+public class CustomedEurekaConfig extends MyDataCenterInstanceConfig
implements EurekaInstanceConfig {
+
+ private String applicationName;
+
+ private String instanceId;
+
+ private String ipAddress;
+
+ private int port = -1;
+
+ @Override
+ public String getInstanceId() {
+ if (StringUtils.isBlank(instanceId)) {
+ return super.getInstanceId();
+ }
+ return instanceId;
+ }
+
+ @Override
+ public String getIpAddress() {
+ if (StringUtils.isBlank(ipAddress)) {
+ return super.getIpAddress();
+ }
+ return ipAddress;
+ }
+
+ @Override
+ public int getNonSecurePort() {
+ if (port == -1) {
+ return super.getNonSecurePort();
+ }
+ return port;
+ }
+
+ @Override
+ public String getAppname() {
+ if (StringUtils.isBlank(applicationName)) {
+ return super.getAppname();
+ }
+ return applicationName;
+ }
+
+ @Override
+ public String getHostName(final boolean refresh) {
+ return this.getIpAddress();
+ }
+
+ /**
+ * Sets the instance ID.
+ *
+ * @param instanceId The unique identifier for the instance.
+ */
+ public void setInstanceId(final String instanceId) {
+ this.instanceId = instanceId;
+ }
+
+ /**
+ * Sets the IP address.
+ *
+ * @param ipAddress The IP address of the instance.
+ */
+ public void setIpAddress(final String ipAddress) {
+ this.ipAddress = ipAddress;
+ }
+
+ /**
+ * Sets the port number.
+ *
+ * @param port The port number where the service is running.
+ */
+ public void setPort(final int port) {
+ this.port = port;
+ }
+
+ /**
+ * Sets the application name.
+ *
+ * @param applicationName The name of the application.
+ */
+ public void setApplicationName(final String applicationName) {
+ this.applicationName = applicationName;
+ }
+}
diff --git
a/shenyu-discovery/shenyu-discovery-eureka/src/main/java/org/apache/shenyu/discovery/eureka/EurekaDiscoveryService.java
b/shenyu-discovery/shenyu-discovery-eureka/src/main/java/org/apache/shenyu/discovery/eureka/EurekaDiscoveryService.java
new file mode 100644
index 0000000000..8168aba548
--- /dev/null
+++
b/shenyu-discovery/shenyu-discovery-eureka/src/main/java/org/apache/shenyu/discovery/eureka/EurekaDiscoveryService.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.discovery.eureka;
+
+import com.google.gson.JsonObject;
+import org.apache.shenyu.common.utils.GsonUtils;
+import com.netflix.appinfo.ApplicationInfoManager;
+import com.netflix.appinfo.MyDataCenterInstanceConfig;
+import com.netflix.appinfo.InstanceInfo;
+import com.netflix.appinfo.EurekaInstanceConfig;
+import com.netflix.appinfo.providers.EurekaConfigBasedInstanceInfoProvider;
+import com.netflix.config.ConfigurationManager;
+import com.netflix.discovery.EurekaClient;
+import com.netflix.discovery.EurekaClientConfig;
+import com.netflix.discovery.DiscoveryClient;
+import com.netflix.discovery.DefaultEurekaClientConfig;
+import org.apache.shenyu.common.concurrent.ShenyuThreadFactory;
+import org.apache.shenyu.common.exception.ShenyuException;
+import org.apache.shenyu.discovery.api.listener.DiscoveryDataChangedEvent;
+import org.apache.shenyu.spi.Join;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.shenyu.discovery.api.ShenyuDiscoveryService;
+import org.apache.shenyu.discovery.api.config.DiscoveryConfig;
+import org.apache.shenyu.discovery.api.listener.DataChangedEventListener;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+@Join
+public class EurekaDiscoveryService implements ShenyuDiscoveryService {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(EurekaDiscoveryService.class);
+
+ private ApplicationInfoManager applicationInfoManager;
+
+ private EurekaClient eurekaClient;
+
+ private final ScheduledExecutorService executorService = new
ScheduledThreadPoolExecutor(10,
ShenyuThreadFactory.create("scheduled-eureka-watcher", true));
+
+ private final ConcurrentMap<String, ScheduledFuture<?>> listenerThreadsMap
= new ConcurrentHashMap<>();
+
+ private final ConcurrentMap<String, List<InstanceInfo>> instanceListMap =
new ConcurrentHashMap<>();
+
+ @Override
+ public void init(final DiscoveryConfig config) {
+ try {
+ if (Objects.isNull(eurekaClient)) {
+
ConfigurationManager.loadProperties(getEurekaProperties(config));
+ applicationInfoManager = initializeApplicationInfoManager(new
MyDataCenterInstanceConfig());
+ eurekaClient = initializeEurekaClient(applicationInfoManager,
new DefaultEurekaClientConfig());
+ LOGGER.info("Initializing EurekaDiscoveryService success");
+ }
+ } catch (Exception e) {
+ LOGGER.error("Error initializing EurekaDiscoveryService", e);
+ clean();
+ throw new ShenyuException(e);
+ }
+ }
+
+ @Override
+ public void watch(final String key, final DataChangedEventListener
listener) {
+ if (!listenerThreadsMap.containsKey(key)) {
+ List<InstanceInfo> initialInstances =
eurekaClient.getInstancesByVipAddressAndAppName(null, key, true);
+ instanceListMap.put(key, initialInstances);
+ ScheduledFuture<?> scheduledFuture =
executorService.scheduleAtFixedRate(() -> {
+ try {
+ List<InstanceInfo> previousInstances =
instanceListMap.get(key);
+ List<InstanceInfo> currentInstances =
eurekaClient.getInstancesByVipAddressAndAppName(null, key, true);
+ compareInstances(previousInstances, currentInstances,
listener);
+ instanceListMap.put(key, currentInstances);
+ LOGGER.info("EurekaDiscoveryService watch key: {}
success", key);
+ } catch (Exception e) {
+ LOGGER.error("EurekaDiscoveryService watch key: {} error",
key, e);
+ throw new ShenyuException(e);
+ }
+ }, 0, 1, TimeUnit.SECONDS);
+ listenerThreadsMap.put(key, scheduledFuture);
+ }
+ }
+
+ @Override
+ public void unwatch(final String key) {
+ try {
+ ScheduledFuture<?> scheduledFuture = listenerThreadsMap.get(key);
+ if (Objects.nonNull(scheduledFuture)) {
+ scheduledFuture.cancel(true);
+ listenerThreadsMap.remove(key);
+ LOGGER.info("EurekaDiscoveryService unwatch key {}
successfully", key);
+ }
+ } catch (Exception e) {
+ LOGGER.error("Error removing eureka watch task for key '{}': {}",
key, e.getMessage(), e);
+ throw new ShenyuException(e);
+ }
+ }
+
+ @Override
+ public void register(final String key, final String value) {
+ InstanceInfo instanceInfoFromJson =
GsonUtils.getInstance().fromJson(value, InstanceInfo.class);
+ CustomedEurekaConfig customedEurekaConfig = new CustomedEurekaConfig();
+
customedEurekaConfig.setIpAddress(instanceInfoFromJson.getVIPAddress());
+ customedEurekaConfig.setPort(instanceInfoFromJson.getPort());
+ customedEurekaConfig.setApplicationName(key);
+
customedEurekaConfig.setInstanceId(instanceInfoFromJson.getInstanceId());
+ InstanceInfo instanceInfo = new
EurekaConfigBasedInstanceInfoProvider(customedEurekaConfig).get();
+ applicationInfoManager = new
ApplicationInfoManager(customedEurekaConfig, instanceInfo);
+ eurekaClient = new DiscoveryClient(applicationInfoManager, new
DefaultEurekaClientConfig());
+
applicationInfoManager.setInstanceStatus(InstanceInfo.InstanceStatus.UP);
+ }
+
+ @Override
+ public List<String> getRegisterData(final String key) {
+ try {
+ List<InstanceInfo> instances =
eurekaClient.getInstancesByVipAddressAndAppName(null, key, true);
+ List<String> registerDataList = new ArrayList<>();
+ for (InstanceInfo instanceInfo : instances) {
+ String instanceInfoJson = buildInstanceInfoJson(instanceInfo);
+ registerDataList.add(instanceInfoJson);
+ }
+ return registerDataList;
+ } catch (Exception e) {
+ throw new ShenyuException(e);
+ }
+ }
+
+ @Override
+ public Boolean exists(final String key) {
+ try {
+ InstanceInfo instanceInfo =
eurekaClient.getNextServerFromEureka(key, false);
+ return Objects.nonNull(instanceInfo);
+ } catch (Exception e) {
+ throw new ShenyuException(e);
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ try {
+ for (ScheduledFuture<?> scheduledFuture :
listenerThreadsMap.values()) {
+ scheduledFuture.cancel(true);
+ }
+ executorService.shutdown();
+ listenerThreadsMap.clear();
+ if (Objects.nonNull(eurekaClient)) {
+ eurekaClient.shutdown();
+ }
+ LOGGER.info("Shutting down EurekaDiscoveryService");
+ clean();
+ } catch (Exception e) {
+ LOGGER.error("Shutting down EurekaDiscoveryService", e);
+ throw new ShenyuException(e);
+ }
+ }
+
+ private Properties getEurekaProperties(final DiscoveryConfig config) {
+ Properties eurekaProperties = new Properties();
+ eurekaProperties.setProperty("eureka.client.service-url.defaultZone",
config.getServerList());
+ eurekaProperties.setProperty("eureka.serviceUrl.default",
config.getServerList());
+ eurekaProperties.setProperty("eureka.client.refresh.interval",
config.getProps().getProperty("eureka.client.refresh.interval", "15"));
+
+ return eurekaProperties;
+ }
+
+ private ApplicationInfoManager initializeApplicationInfoManager(final
EurekaInstanceConfig instanceConfig) {
+ if (Objects.isNull(applicationInfoManager)) {
+ InstanceInfo instanceInfo = new
EurekaConfigBasedInstanceInfoProvider(instanceConfig).get();
+ applicationInfoManager = new
ApplicationInfoManager(instanceConfig, instanceInfo);
+ }
+
+ return applicationInfoManager;
+ }
+
+ private EurekaClient initializeEurekaClient(final ApplicationInfoManager
applicationInfoManager, final EurekaClientConfig clientConfig) {
+ if (Objects.isNull(eurekaClient)) {
+ eurekaClient = new DiscoveryClient(applicationInfoManager,
clientConfig);
+ }
+
+ return eurekaClient;
+ }
+
+ private void clean() {
+ eurekaClient = null;
+ applicationInfoManager = null;
+ }
+
+ private String buildInstanceInfoJson(final InstanceInfo instanceInfo) {
+ JsonObject instanceJson = new JsonObject();
+ instanceJson.addProperty("url", instanceInfo.getIPAddr() + ":" +
instanceInfo.getPort());
+ instanceJson.addProperty("weight",
instanceInfo.getMetadata().get("weight"));
+ if (instanceInfo.getStatus() == InstanceInfo.InstanceStatus.UP) {
+ instanceJson.addProperty("status", 0);
+ } else if (instanceInfo.getStatus() ==
InstanceInfo.InstanceStatus.DOWN) {
+ instanceJson.addProperty("status", 1);
+ }
+ return GsonUtils.getInstance().toJson(instanceJson);
+ }
+
+ private void compareInstances(final List<InstanceInfo> previousInstances,
final List<InstanceInfo> currentInstances, final DataChangedEventListener
listener) {
+ Set<InstanceInfo> addedInstances = currentInstances.stream()
+ .filter(item -> !previousInstances.contains(item))
+ .collect(Collectors.toSet());
+ if (!addedInstances.isEmpty()) {
+ for (InstanceInfo instance: addedInstances) {
+ DiscoveryDataChangedEvent dataChangedEvent = new
DiscoveryDataChangedEvent(instance.getAppName(),
+ buildInstanceInfoJson(instance),
DiscoveryDataChangedEvent.Event.ADDED);
+ listener.onChange(dataChangedEvent);
+ }
+ }
+
+ Set<InstanceInfo> deletedInstances = previousInstances.stream()
+ .filter(item -> !currentInstances.contains(item))
+ .collect(Collectors.toSet());
+ if (!deletedInstances.isEmpty()) {
+ for (InstanceInfo instance: deletedInstances) {
+ instance.setStatus(InstanceInfo.InstanceStatus.DOWN);
+ DiscoveryDataChangedEvent dataChangedEvent = new
DiscoveryDataChangedEvent(instance.getAppName(),
+ buildInstanceInfoJson(instance),
DiscoveryDataChangedEvent.Event.DELETED);
+ listener.onChange(dataChangedEvent);
+ }
+ }
+
+ Set<InstanceInfo> updatedInstances = currentInstances.stream()
+ .filter(currentInstance -> previousInstances.stream()
+ .anyMatch(previousInstance ->
currentInstance.getInstanceId().equals(previousInstance.getInstanceId()) &&
!currentInstance.equals(previousInstance)))
+ .collect(Collectors.toSet());
+ if (!updatedInstances.isEmpty()) {
+ for (InstanceInfo instance: updatedInstances) {
+ DiscoveryDataChangedEvent dataChangedEvent = new
DiscoveryDataChangedEvent(instance.getAppName(),
+ buildInstanceInfoJson(instance),
DiscoveryDataChangedEvent.Event.UPDATED);
+ listener.onChange(dataChangedEvent);
+ }
+ }
+ }
+}
diff --git
a/shenyu-discovery/shenyu-discovery-eureka/src/main/resources/META-INF.shenyu/org.apache.shenyu.discovery.api.ShenyuDiscoveryService
b/shenyu-discovery/shenyu-discovery-eureka/src/main/resources/META-INF.shenyu/org.apache.shenyu.discovery.api.ShenyuDiscoveryService
new file mode 100644
index 0000000000..3e16a6ef05
--- /dev/null
+++
b/shenyu-discovery/shenyu-discovery-eureka/src/main/resources/META-INF.shenyu/org.apache.shenyu.discovery.api.ShenyuDiscoveryService
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+eureka=org.apache.shenyu.discovery.eureka.EurekaDiscoveryService
diff --git a/shenyu-discovery/shenyu-discovery-zookeeper/pom.xml
b/shenyu-discovery/shenyu-discovery-nacos/pom.xml
similarity index 74%
copy from shenyu-discovery/shenyu-discovery-zookeeper/pom.xml
copy to shenyu-discovery/shenyu-discovery-nacos/pom.xml
index 80207d09ca..f0e667286a 100644
--- a/shenyu-discovery/shenyu-discovery-zookeeper/pom.xml
+++ b/shenyu-discovery/shenyu-discovery-nacos/pom.xml
@@ -16,35 +16,33 @@
~ limitations under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.shenyu</groupId>
<artifactId>shenyu-discovery</artifactId>
<version>2.6.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>shenyu-discovery-zookeeper</artifactId>
-
+ <artifactId>shenyu-discovery-nacos</artifactId>
+
<dependencies>
<dependency>
<groupId>org.apache.shenyu</groupId>
<artifactId>shenyu-discovery-api</artifactId>
<version>${project.version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-framework</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-recipes</artifactId>
- </dependency>
<dependency>
<groupId>org.apache.shenyu</groupId>
<artifactId>shenyu-common</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>com.alibaba.nacos</groupId>
+ <artifactId>nacos-client</artifactId>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git
a/shenyu-discovery/shenyu-discovery-nacos/src/main/java/org/apache/shenyu/discovery/nacos/NacosDiscoveryService.java
b/shenyu-discovery/shenyu-discovery-nacos/src/main/java/org/apache/shenyu/discovery/nacos/NacosDiscoveryService.java
new file mode 100644
index 0000000000..3708b1809c
--- /dev/null
+++
b/shenyu-discovery/shenyu-discovery-nacos/src/main/java/org/apache/shenyu/discovery/nacos/NacosDiscoveryService.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.discovery.nacos;
+
+import com.google.gson.JsonObject;
+import org.apache.shenyu.common.utils.GsonUtils;
+import com.alibaba.nacos.api.PropertyKeyConst;
+import com.alibaba.nacos.api.exception.NacosException;
+import com.alibaba.nacos.api.naming.NamingFactory;
+import com.alibaba.nacos.api.naming.listener.EventListener;
+import com.alibaba.nacos.api.naming.NamingService;
+import com.alibaba.nacos.api.naming.listener.NamingEvent;
+import com.alibaba.nacos.api.naming.pojo.Instance;
+import org.apache.shenyu.common.exception.ShenyuException;
+import org.apache.shenyu.discovery.api.ShenyuDiscoveryService;
+import org.apache.shenyu.discovery.api.config.DiscoveryConfig;
+import org.apache.shenyu.discovery.api.listener.DataChangedEventListener;
+import org.apache.shenyu.discovery.api.listener.DiscoveryDataChangedEvent;
+import org.apache.shenyu.spi.Join;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Collectors;
+
+/**
+ * The type Nacos for shenyu discovery service.
+ */
+@Join
+public class NacosDiscoveryService implements ShenyuDiscoveryService {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(NacosDiscoveryService.class);
+
+ private static final String NAMESPACE = "nacosNameSpace";
+
+ private final ConcurrentMap<String, EventListener> listenerMap = new
ConcurrentHashMap<>();
+
+ private NamingService namingService;
+
+ private String groupName;
+
+ private final ConcurrentMap<String, List<Instance>> instanceListMap = new
ConcurrentHashMap<>();
+
+ @Override
+ public void init(final DiscoveryConfig config) {
+ Properties properties = config.getProps();
+ Properties nacosProperties = new Properties();
+ this.groupName = properties.getProperty("groupName", "SHENYU_GROUP");
+ String serverAddr = config.getServerList();
+ nacosProperties.put(PropertyKeyConst.SERVER_ADDR, serverAddr);
+ nacosProperties.put(PropertyKeyConst.NAMESPACE,
properties.getProperty(NAMESPACE, ""));
+ nacosProperties.put(PropertyKeyConst.USERNAME,
properties.getProperty(PropertyKeyConst.USERNAME, ""));
+ nacosProperties.put(PropertyKeyConst.PASSWORD,
properties.getProperty(PropertyKeyConst.PASSWORD, ""));
+ nacosProperties.put(PropertyKeyConst.ACCESS_KEY,
properties.getProperty(PropertyKeyConst.ACCESS_KEY, ""));
+ nacosProperties.put(PropertyKeyConst.SECRET_KEY,
properties.getProperty(PropertyKeyConst.SECRET_KEY, ""));
+ try {
+ this.namingService =
NamingFactory.createNamingService(nacosProperties);
+ LOGGER.info("Nacos naming service initialized success");
+ } catch (NacosException e) {
+ LOGGER.error("Error initializing Nacos naming service", e);
+ throw new ShenyuException(e);
+ }
+ }
+
+ @Override
+ public void watch(final String key, final DataChangedEventListener
listener) {
+ try {
+ if (!listenerMap.containsKey(key)) {
+ List<Instance> initialInstances =
namingService.selectInstances(key, groupName, true);
+ instanceListMap.put(key, initialInstances);
+ EventListener nacosListener = event -> {
+ if (event instanceof NamingEvent) {
+ try {
+ List<Instance> previousInstances =
instanceListMap.get(key);
+ List<Instance> currentInstances =
namingService.selectInstances(key, groupName, true);
+ compareInstances(previousInstances,
currentInstances, listener);
+ instanceListMap.put(key, currentInstances);
+ } catch (NacosException e) {
+ throw new ShenyuException(e);
+ }
+ }
+ };
+ namingService.subscribe(key, groupName, nacosListener);
+ listenerMap.put(key, nacosListener);
+ LOGGER.info("Subscribed to Nacos updates for key: {}", key);
+ }
+ } catch (NacosException e) {
+ LOGGER.error("nacosDiscoveryService error watching key: {}", key,
e);
+ throw new ShenyuException(e);
+ }
+ }
+
+ @Override
+ public void unwatch(final String key) {
+ try {
+ EventListener nacosListener = listenerMap.get(key);
+ if (Objects.nonNull(nacosListener)) {
+ namingService.unsubscribe(key, groupName, nacosListener);
+ listenerMap.remove(key);
+ LOGGER.info("Nacos Unwatch key: {}", key);
+ }
+ } catch (NacosException e) {
+ LOGGER.error("Error removing Nacos service listener: {}",
e.getMessage(), e);
+ throw new ShenyuException(e);
+ }
+ }
+
+ @Override
+ public void register(final String key, final String value) {
+ try {
+ Instance instance = GsonUtils.getInstance().fromJson(value,
Instance.class);
+ namingService.registerInstance(key, groupName, instance);
+ LOGGER.info("Registering service with key: {} and value: {}", key,
value);
+ } catch (NacosException e) {
+ LOGGER.error("Error registering Nacos service instance: {}",
e.getMessage(), e);
+ throw new ShenyuException(e);
+ }
+ }
+
+ @Override
+ public List<String> getRegisterData(final String key) {
+ try {
+ List<Instance> instances = namingService.selectInstances(key,
groupName, true);
+ List<String> registerData = new ArrayList<>();
+ for (Instance instance : instances) {
+ String data = buildInstanceInfoJson(instance);
+ registerData.add(data);
+ }
+ return registerData;
+ } catch (NacosException e) {
+ LOGGER.error("Error getting Nacos service instances: {}",
e.getMessage(), e);
+ throw new ShenyuException(e);
+ }
+ }
+
+ @Override
+ public Boolean exists(final String key) {
+ try {
+ List<Instance> instances = namingService.selectInstances(key,
groupName, true);
+ return !instances.isEmpty();
+ } catch (NacosException e) {
+ LOGGER.error("Error checking Nacos service existence: {}",
e.getMessage(), e);
+ throw new ShenyuException(e);
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ try {
+ if (Objects.nonNull(namingService)) {
+ for (Map.Entry<String, EventListener> entry :
listenerMap.entrySet()) {
+ String key = entry.getKey();
+ EventListener listener = entry.getValue();
+ namingService.unsubscribe(key, groupName, listener);
+ }
+ listenerMap.clear();
+ namingService.shutDown();
+ LOGGER.info("Shutting down NacosDiscoveryService");
+ }
+ } catch (NacosException e) {
+ LOGGER.error("Error shutting down NacosDiscoveryService", e);
+ throw new ShenyuException(e);
+ }
+ }
+
+ private void compareInstances(final List<Instance> previousInstances,
final List<Instance> currentInstances, final DataChangedEventListener listener)
{
+ Set<Instance> addedInstances = currentInstances.stream()
+ .filter(item -> !previousInstances.contains(item))
+ .collect(Collectors.toSet());
+ if (!addedInstances.isEmpty()) {
+ for (Instance instance: addedInstances) {
+ DiscoveryDataChangedEvent dataChangedEvent = new
DiscoveryDataChangedEvent(instance.getServiceName(),
+ buildInstanceInfoJson(instance),
DiscoveryDataChangedEvent.Event.ADDED);
+ listener.onChange(dataChangedEvent);
+ }
+ }
+
+ Set<Instance> deletedInstances = previousInstances.stream()
+ .filter(item -> !currentInstances.contains(item))
+ .collect(Collectors.toSet());
+ if (!deletedInstances.isEmpty()) {
+ for (Instance instance: deletedInstances) {
+ instance.setHealthy(false);
+ DiscoveryDataChangedEvent dataChangedEvent = new
DiscoveryDataChangedEvent(instance.getServiceName(),
+ buildInstanceInfoJson(instance),
DiscoveryDataChangedEvent.Event.DELETED);
+ listener.onChange(dataChangedEvent);
+ }
+ }
+
+ Set<Instance> updatedInstances = currentInstances.stream()
+ .filter(currentInstance -> previousInstances.stream()
+ .anyMatch(previousInstance ->
currentInstance.getInstanceId().equals(previousInstance.getInstanceId()) &&
!currentInstance.equals(previousInstance)))
+ .collect(Collectors.toSet());
+ if (!updatedInstances.isEmpty()) {
+ for (Instance instance: updatedInstances) {
+ DiscoveryDataChangedEvent dataChangedEvent = new
DiscoveryDataChangedEvent(instance.getServiceName(),
+ buildInstanceInfoJson(instance),
DiscoveryDataChangedEvent.Event.UPDATED);
+ listener.onChange(dataChangedEvent);
+ }
+ }
+ }
+
+ private String buildInstanceInfoJson(final Instance instance) {
+ JsonObject instanceJson = new JsonObject();
+ instanceJson.addProperty("url", instance.getIp() + ":" +
instance.getPort());
+ // status 0:true, 1:false
+ instanceJson.addProperty("status", instance.isHealthy() ? 0 : 1);
+ instanceJson.addProperty("weight", instance.getWeight());
+
+ return GsonUtils.getInstance().toJson(instanceJson);
+ }
+}
+
+
+
+
+
diff --git
a/shenyu-discovery/shenyu-discovery-nacos/src/main/resources/META-INF/shenyu/org.apache.shenyu.discovery.api.ShenyuDiscoveryService
b/shenyu-discovery/shenyu-discovery-nacos/src/main/resources/META-INF/shenyu/org.apache.shenyu.discovery.api.ShenyuDiscoveryService
new file mode 100644
index 0000000000..2e05f227df
--- /dev/null
+++
b/shenyu-discovery/shenyu-discovery-nacos/src/main/resources/META-INF/shenyu/org.apache.shenyu.discovery.api.ShenyuDiscoveryService
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+nacos=org.apache.shenyu.discovery.nacos.NacosDiscoveryService
diff --git a/shenyu-discovery/shenyu-discovery-zookeeper/pom.xml
b/shenyu-discovery/shenyu-discovery-zookeeper/pom.xml
index 80207d09ca..0674ada09c 100644
--- a/shenyu-discovery/shenyu-discovery-zookeeper/pom.xml
+++ b/shenyu-discovery/shenyu-discovery-zookeeper/pom.xml
@@ -16,7 +16,9 @@
~ limitations under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.shenyu</groupId>
<artifactId>shenyu-discovery</artifactId>
diff --git
a/shenyu-discovery/shenyu-discovery-zookeeper/src/main/java/org/apache/shenyu/discovery/zookeeper/ZookeeperDiscoveryService.java
b/shenyu-discovery/shenyu-discovery-zookeeper/src/main/java/org/apache/shenyu/discovery/zookeeper/ZookeeperDiscoveryService.java
index 076ffa2cc3..b271bac69c 100644
---
a/shenyu-discovery/shenyu-discovery-zookeeper/src/main/java/org/apache/shenyu/discovery/zookeeper/ZookeeperDiscoveryService.java
+++
b/shenyu-discovery/shenyu-discovery-zookeeper/src/main/java/org/apache/shenyu/discovery/zookeeper/ZookeeperDiscoveryService.java
@@ -85,7 +85,7 @@ public class ZookeeperDiscoveryService implements
ShenyuDiscoveryService {
this.client.getConnectionStateListenable().addListener((c, newState)
-> {
if (newState == ConnectionState.RECONNECTED) {
nodeDataMap.forEach((k, v) -> {
- if (!this.exits(k)) {
+ if (!this.exists(k)) {
this.createOrUpdate(k, v, CreateMode.EPHEMERAL);
LOGGER.info("zookeeper client register instance
success: key={}|value={}", k, v);
}
@@ -103,7 +103,7 @@ public class ZookeeperDiscoveryService implements
ShenyuDiscoveryService {
}
@Override
- public Boolean exits(final String key) {
+ public Boolean exists(final String key) {
try {
return null != client.checkExists().forPath(key);
} catch (Exception e) {
@@ -121,7 +121,7 @@ public class ZookeeperDiscoveryService implements
ShenyuDiscoveryService {
}
@Override
- public void watcher(final String key, final DataChangedEventListener
listener) {
+ public void watch(final String key, final DataChangedEventListener
listener) {
try {
TreeCache treeCache = new TreeCache(client, key);
TreeCacheListener treeCacheListener = (curatorFramework, event) ->
{
@@ -163,7 +163,7 @@ public class ZookeeperDiscoveryService implements
ShenyuDiscoveryService {
}
@Override
- public void unWatcher(final String key) {
+ public void unwatch(final String key) {
if (cacheMap.containsKey(key)) {
cacheMap.remove(key).close();
}