This is an automated email from the ASF dual-hosted git repository.
gaohan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shenyu.git
The following commit(s) were added to refs/heads/master by this push:
new 7909785cd [ISSUE #3807] Consolidate etcd dependencies into one
7909785cd is described below
commit 7909785cd4bb3f27cb355aa3144ea14e337e6582
Author: huanccwang <[email protected]>
AuthorDate: Mon Aug 15 22:36:09 2022 +0800
[ISSUE #3807] Consolidate etcd dependencies into one
* Consolidate etcd dependencies into one #3807
* Consolidate etcd dependencies into one #3807
---
pom.xml | 7 +-
shenyu-admin/pom.xml | 1 -
.../shenyu-register-client-server-etcd/pom.xml | 32 +-
.../client/server/etcd/client/EtcdClient.java | 459 +++++++++++----------
.../shenyu-register-client-etcd/pom.xml | 1 -
.../shenyu-register-instance-etcd/pom.xml | 1 -
.../shenyu-sync-data-etcd/pom.xml | 15 +-
7 files changed, 292 insertions(+), 224 deletions(-)
diff --git a/pom.xml b/pom.xml
index 6a73fe6f2..69b2b09b0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -117,7 +117,7 @@
<spring-boot.version>2.6.8</spring-boot.version>
<aspectjweaver.version>1.9.6</aspectjweaver.version>
<h2.version>1.4.200</h2.version>
- <jetcd-core.version>0.5.0</jetcd-core.version>
+ <jetcd-core.version>0.7.3</jetcd-core.version>
<java-websocket.version>1.5.0</java-websocket.version>
<mockito.version>3.5.15</mockito.version>
<awaitility.version>4.0.3</awaitility.version>
@@ -474,6 +474,11 @@
<version>${jakarta.json-api.version}</version>
</dependency>
+ <dependency>
+ <groupId>io.etcd</groupId>
+ <artifactId>jetcd-core</artifactId>
+ <version>${jetcd-core.version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>
diff --git a/shenyu-admin/pom.xml b/shenyu-admin/pom.xml
index 01b596b17..663b290da 100644
--- a/shenyu-admin/pom.xml
+++ b/shenyu-admin/pom.xml
@@ -163,7 +163,6 @@
<dependency>
<groupId>io.etcd</groupId>
<artifactId>jetcd-core</artifactId>
- <version>${jetcd-core.version}</version>
</dependency>
<dependency>
diff --git
a/shenyu-register-center/shenyu-register-client-server/shenyu-register-client-server-etcd/pom.xml
b/shenyu-register-center/shenyu-register-client-server/shenyu-register-client-server-etcd/pom.xml
index 0b5154afa..bf1527ae9 100644
---
a/shenyu-register-center/shenyu-register-client-server/shenyu-register-client-server-etcd/pom.xml
+++
b/shenyu-register-center/shenyu-register-client-server/shenyu-register-client-server-etcd/pom.xml
@@ -27,9 +27,22 @@
<dependencies>
<dependency>
- <groupId>com.coreos</groupId>
+ <groupId>io.etcd</groupId>
<artifactId>jetcd-core</artifactId>
- <version>${jetcd.version}</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>grpc-netty</artifactId>
+ <groupId>io.grpc</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>grpc-protobuf</artifactId>
+ <groupId>io.grpc</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>grpc-stub</artifactId>
+ <groupId>io.grpc</groupId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.shenyu</groupId>
@@ -48,5 +61,20 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-core</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+ <dependency>
+ <artifactId>grpc-protobuf</artifactId>
+ <groupId>io.grpc</groupId>
+ <version>${grpc.version}</version>
+ </dependency>
+ <dependency>
+ <artifactId>grpc-stub</artifactId>
+ <groupId>io.grpc</groupId>
+ <version>${grpc.version}</version>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git
a/shenyu-register-center/shenyu-register-client-server/shenyu-register-client-server-etcd/src/main/java/org/apache/shenyu/register/client/server/etcd/client/EtcdClient.java
b/shenyu-register-center/shenyu-register-client-server/shenyu-register-client-server-etcd/src/main/java/org/apache/shenyu/register/client/server/etcd/client/EtcdClient.java
index 7c450b167..55194d0a4 100644
---
a/shenyu-register-center/shenyu-register-client-server/shenyu-register-client-server-etcd/src/main/java/org/apache/shenyu/register/client/server/etcd/client/EtcdClient.java
+++
b/shenyu-register-center/shenyu-register-client-server/shenyu-register-client-server-etcd/src/main/java/org/apache/shenyu/register/client/server/etcd/client/EtcdClient.java
@@ -1,217 +1,242 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shenyu.register.client.server.etcd.client;
-
-import com.coreos.jetcd.Client;
-import com.coreos.jetcd.KV;
-import com.coreos.jetcd.Lease;
-import com.coreos.jetcd.Watch;
-import com.coreos.jetcd.data.ByteSequence;
-import com.coreos.jetcd.data.KeyValue;
-import com.coreos.jetcd.kv.GetResponse;
-import com.coreos.jetcd.lease.LeaseGrantResponse;
-import com.coreos.jetcd.options.GetOption;
-import com.coreos.jetcd.options.WatchOption;
-import com.coreos.jetcd.watch.WatchResponse;
-import org.apache.shenyu.common.concurrent.ShenyuThreadFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.function.BiConsumer;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-
-/**
- * etcd client.
- */
-public class EtcdClient {
-
- private static final Logger LOGGER =
LoggerFactory.getLogger(EtcdClient.class);
-
- private static final int EPHEMERAL_LEASE = 60;
-
- private static final int DEFAULT_CORE_POOL_SIZE = 10;
-
- private static final int DEFAULT_QUEUE_SIZE = 1000;
-
- private final ThreadPoolExecutor defaultPoolExecutor;
-
- private final Client client;
-
- public EtcdClient(final String urls) {
- defaultPoolExecutor = new ThreadPoolExecutor(
- DEFAULT_CORE_POOL_SIZE, DEFAULT_CORE_POOL_SIZE * 2,
- 0L, TimeUnit.NANOSECONDS,
- new ArrayBlockingQueue<>(DEFAULT_QUEUE_SIZE),
- ShenyuThreadFactory.create("etcd register center watch-",
true));
-
- this.client = Client.builder().endpoints(urls.split(",")).build();
-
- try {
- initLease();
- } catch (ExecutionException | InterruptedException e) {
- LOGGER.error("initLease error.", e);
- }
- }
-
- private void initLease() throws ExecutionException, InterruptedException {
- Lease lease = client.getLeaseClient();
- LeaseGrantResponse response = lease.grant(EPHEMERAL_LEASE).get();
- long leaseId = response.getID();
- lease.keepAlive(leaseId);
- }
-
- /**
- * read data.
- *
- * @param key key
- * @return string of data
- */
- public String read(final String key) {
- KV kv = client.getKVClient();
- ByteSequence storeKey =
Optional.ofNullable(key).map(ByteSequence::fromString).orElse(null);
- GetResponse response = null;
-
- try {
- response = kv.get(storeKey).get();
- } catch (InterruptedException | ExecutionException e) {
- LOGGER.error("read(key:{}) error.", key, e);
- }
-
- if (Objects.isNull(response)) {
- return null;
- }
-
- LOGGER.debug(String.valueOf(response.getHeader()));
- Node info =
response.getKvs().stream().map(EtcdClient::kv2NodeInfo).findFirst().orElse(null);
- assert info != null;
- return info.getValue();
- }
-
- /**
- * get children of path.
- *
- * @param path path
- * @return list of children
- */
- public List<String> getChildren(final String path) {
- try {
- return listKeys(path);
- } catch (ExecutionException | InterruptedException e) {
- LOGGER.error("getChildren(path:{}) error.", path, e);
- }
- return null;
- }
-
- private List<String> listKeys(final String prefix) throws
ExecutionException, InterruptedException {
- KV kv = client.getKVClient();
- ByteSequence storePrefix =
Optional.ofNullable(prefix).map(ByteSequence::fromString).orElse(null);
- GetOption option =
GetOption.newBuilder().withKeysOnly(true).withPrefix(storePrefix).build();
- GetResponse response = kv.get(storePrefix, option).get();
- return response.getKvs().stream()
- .map(o -> o.getKey().toStringUtf8())
- .filter(k -> !k.equals(prefix))
- .collect(Collectors.toList());
- }
-
- /**
- * subscribe children change.
- *
- * @param key key
- * @param handler event handler
- */
- public void subscribeChildChanges(final String key, final
EtcdListenHandler handler) {
- defaultPoolExecutor.execute(() -> {
- final Stoppable stoppable = new Stoppable();
- try {
- watchChildren(key, stoppable, handler);
- } catch (Exception e) {
- stoppable.stop();
- LOGGER.warn(String.format("Watch exception of %s", "/s"), e);
- }
- });
- }
-
- private void watchChildren(final String key, final Supplier<Boolean>
exitSignSupplier,
- final BiConsumer<Event, Node> consumer) throws
InterruptedException {
- ByteSequence storeKey =
Optional.ofNullable(key).map(ByteSequence::fromString).orElse(null);
-
- try (Watch watch = client.getWatchClient();
- Watch.Watcher watcher = watch.watch(storeKey,
- WatchOption.newBuilder().withPrefix(storeKey).build())) {
- while (!exitSignSupplier.get()) {
- WatchResponse response = watcher.listen();
- response.getEvents().forEach(watchEvent -> {
- KeyValue keyValue = watchEvent.getKeyValue();
- Node info = kv2NodeInfo(keyValue);
- // skip root node change
- if (watchEvent.getKeyValue().getKey().equals(storeKey)) {
- return;
- }
- Event event;
- switch (watchEvent.getEventType()) {
- case PUT:
- event = Event.UPDATE;
- break;
- case DELETE:
- event = Event.DELETE;
- break;
- default:
- event = Event.UNRECOGNIZED;
- }
- consumer.accept(event, info);
- });
- }
- }
- }
-
- static Node kv2NodeInfo(final KeyValue kv) {
- String key = kv.getKey().toStringUtf8();
- String value =
Optional.ofNullable(kv.getValue()).map(ByteSequence::toStringUtf8).orElse("");
- return new Node(key, value, kv.getCreateRevision(),
kv.getModRevision(), kv.getVersion());
- }
-
- /**
- * close client.
- */
- public void close() {
- Optional.ofNullable(client).ifPresent(Client::close);
- }
-
- static class Stoppable implements Supplier<Boolean> {
-
- private boolean exit;
-
- @Override
- public Boolean get() {
- return exit;
- }
-
- void stop() {
- this.exit = true;
- }
-
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.register.client.server.etcd.client;
+
+import io.etcd.jetcd.ByteSequence;
+import io.etcd.jetcd.Client;
+import io.etcd.jetcd.KV;
+import io.etcd.jetcd.KeyValue;
+import io.etcd.jetcd.Lease;
+import io.etcd.jetcd.Watch;
+import io.etcd.jetcd.kv.GetResponse;
+import io.etcd.jetcd.lease.LeaseGrantResponse;
+import io.etcd.jetcd.lease.LeaseKeepAliveResponse;
+import io.etcd.jetcd.options.GetOption;
+import io.etcd.jetcd.options.WatchOption;
+import io.etcd.jetcd.watch.WatchEvent;
+import io.grpc.stub.StreamObserver;
+import org.apache.shenyu.common.concurrent.ShenyuThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+/**
+ * etcd client.
+ */
+public class EtcdClient {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(EtcdClient.class);
+
+ private static final int EPHEMERAL_LEASE = 60;
+
+ private static final int DEFAULT_CORE_POOL_SIZE = 10;
+
+ private static final int DEFAULT_QUEUE_SIZE = 1000;
+
+ private final ThreadPoolExecutor defaultPoolExecutor;
+
+ private final Client client;
+
+ public EtcdClient(final String urls) {
+ defaultPoolExecutor = new ThreadPoolExecutor(
+ DEFAULT_CORE_POOL_SIZE, DEFAULT_CORE_POOL_SIZE * 2,
+ 0L, TimeUnit.NANOSECONDS,
+ new ArrayBlockingQueue<>(DEFAULT_QUEUE_SIZE),
+ ShenyuThreadFactory.create("etcd register center watch-",
true));
+
+ this.client = Client.builder().endpoints(urls.split(",")).build();
+
+ try {
+ initLease();
+ } catch (ExecutionException | InterruptedException e) {
+ LOGGER.error("initLease error.", e);
+ }
+ }
+
+ private void initLease() throws ExecutionException, InterruptedException {
+ Lease lease = client.getLeaseClient();
+ LeaseGrantResponse response = lease.grant(EPHEMERAL_LEASE).get();
+ long leaseId = response.getID();
+ lease.keepAlive(leaseId, new StreamObserver<LeaseKeepAliveResponse>() {
+ @Override
+ public void onNext(final LeaseKeepAliveResponse
leaseKeepAliveResponse) {
+
+ }
+
+ @Override
+ public void onError(final Throwable throwable) {
+
+ LOGGER.error("keep alive error", throwable);
+ }
+
+ @Override
+ public void onCompleted() {
+
+ }
+ });
+ }
+
+ /**
+ * read data.
+ *
+ * @param key key
+ * @return string of data
+ */
+ public String read(final String key) {
+ KV kv = client.getKVClient();
+ ByteSequence storeKey = ByteSequence.from(key, StandardCharsets.UTF_8);
+ GetResponse response = null;
+
+ try {
+ response = kv.get(storeKey).get();
+ } catch (InterruptedException | ExecutionException e) {
+ LOGGER.error("read(key:{}) error.", key, e);
+ }
+
+ if (Objects.isNull(response)) {
+ return null;
+ }
+
+ LOGGER.debug(String.valueOf(response.getHeader()));
+ Node info =
response.getKvs().stream().map(EtcdClient::kv2NodeInfo).findFirst().orElse(null);
+ assert info != null;
+ return info.getValue();
+ }
+
+ /**
+ * get children of path.
+ *
+ * @param path path
+ * @return list of children
+ */
+ public List<String> getChildren(final String path) {
+ try {
+ return listKeys(path);
+ } catch (ExecutionException | InterruptedException e) {
+ LOGGER.error("getChildren(path:{}) error.", path, e);
+ }
+ return null;
+ }
+
+ private List<String> listKeys(final String prefix) throws
ExecutionException, InterruptedException {
+ KV kv = client.getKVClient();
+ ByteSequence storePrefix = ByteSequence.from(prefix,
StandardCharsets.UTF_8);
+ GetOption option =
GetOption.newBuilder().withKeysOnly(true).withPrefix(storePrefix).build();
+ GetResponse response = kv.get(storePrefix, option).get();
+ return response.getKvs().stream()
+ .map(o -> o.getKey().toString())
+ .filter(k -> !k.equals(prefix))
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * subscribe children change.
+ *
+ * @param key key
+ * @param handler event handler
+ */
+ public void subscribeChildChanges(final String key, final
EtcdListenHandler handler) {
+ defaultPoolExecutor.execute(() -> {
+ final Stoppable stoppable = new Stoppable();
+ try {
+ watchChildren(key, stoppable, handler);
+ } catch (Exception e) {
+ stoppable.stop();
+ LOGGER.warn(String.format("Watch exception of %s", "/s"), e);
+ }
+ });
+ }
+
+ private void watchChildren(final String key, final Supplier<Boolean>
exitSignSupplier,
+ final BiConsumer<Event, Node> consumer) throws
InterruptedException {
+ ByteSequence storeKey = ByteSequence.from(key, StandardCharsets.UTF_8);
+ Watch.Listener listener = watch(exitSignSupplier, storeKey, consumer);
+ WatchOption option = WatchOption.newBuilder()
+ .withPrefix(ByteSequence.from(key, StandardCharsets.UTF_8))
+ .build();
+ Watch.Watcher watch =
client.getWatchClient().watch(ByteSequence.from(key, StandardCharsets.UTF_8),
option, listener);
+ watch.close();
+ }
+
+ private Watch.Listener watch(final Supplier<Boolean> exitSignSupplier,
final ByteSequence storeKey,
+ final BiConsumer<Event, Node> consumer) {
+ return Watch.listener(response -> {
+ while (!exitSignSupplier.get()) {
+ for (WatchEvent watchEvent : response.getEvents()) {
+ KeyValue keyValue = watchEvent.getKeyValue();
+ Node info = kv2NodeInfo(keyValue);
+ // skip root node change
+ if (watchEvent.getKeyValue().getKey().equals(storeKey)) {
+ return;
+ }
+ Event event;
+ switch (watchEvent.getEventType()) {
+ case PUT:
+ event = Event.UPDATE;
+ break;
+ case DELETE:
+ event = Event.DELETE;
+ break;
+ default:
+ event = Event.UNRECOGNIZED;
+ }
+ consumer.accept(event, info);
+ }
+ }
+ });
+ }
+
+ static Node kv2NodeInfo(final KeyValue kv) {
+ String key = kv.getKey().toString();
+ String value =
Optional.ofNullable(kv.getValue()).map(ByteSequence::toString).orElse("");
+ return new Node(key, value, kv.getCreateRevision(),
kv.getModRevision(), kv.getVersion());
+ }
+
+ /**
+ * close client.
+ */
+ public void close() {
+ Optional.ofNullable(client).ifPresent(Client::close);
+ }
+
+ static class Stoppable implements Supplier<Boolean> {
+
+ private boolean exit;
+
+ @Override
+ public Boolean get() {
+ return exit;
+ }
+
+ void stop() {
+ this.exit = true;
+ }
+
+ }
+}
diff --git
a/shenyu-register-center/shenyu-register-client/shenyu-register-client-etcd/pom.xml
b/shenyu-register-center/shenyu-register-client/shenyu-register-client-etcd/pom.xml
index 579752a66..b71257d8a 100644
---
a/shenyu-register-center/shenyu-register-client/shenyu-register-client-etcd/pom.xml
+++
b/shenyu-register-center/shenyu-register-client/shenyu-register-client-etcd/pom.xml
@@ -30,7 +30,6 @@
<dependency>
<groupId>io.etcd</groupId>
<artifactId>jetcd-core</artifactId>
- <version>0.5.0</version>
<exclusions>
<exclusion>
<artifactId>grpc-protobuf</artifactId>
diff --git
a/shenyu-register-center/shenyu-register-instance/shenyu-register-instance-etcd/pom.xml
b/shenyu-register-center/shenyu-register-instance/shenyu-register-instance-etcd/pom.xml
index db57807d0..10ea212ae 100644
---
a/shenyu-register-center/shenyu-register-instance/shenyu-register-instance-etcd/pom.xml
+++
b/shenyu-register-center/shenyu-register-instance/shenyu-register-instance-etcd/pom.xml
@@ -35,7 +35,6 @@
<dependency>
<groupId>io.etcd</groupId>
<artifactId>jetcd-core</artifactId>
- <version>${jetcd-core.version}</version>
<exclusions>
<exclusion>
<artifactId>grpc-protobuf</artifactId>
diff --git a/shenyu-sync-data-center/shenyu-sync-data-etcd/pom.xml
b/shenyu-sync-data-center/shenyu-sync-data-etcd/pom.xml
index 505a6c620..0c4d3bc48 100644
--- a/shenyu-sync-data-center/shenyu-sync-data-etcd/pom.xml
+++ b/shenyu-sync-data-center/shenyu-sync-data-etcd/pom.xml
@@ -34,7 +34,20 @@
<dependency>
<groupId>io.etcd</groupId>
<artifactId>jetcd-core</artifactId>
- <version>0.5.0</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>grpc-netty</artifactId>
+ <groupId>io.grpc</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>grpc-protobuf</artifactId>
+ <groupId>io.grpc</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>grpc-stub</artifactId>
+ <groupId>io.grpc</groupId>
+ </exclusion>
+ </exclusions>
</dependency>
</dependencies>