This is an automated email from the ASF dual-hosted git repository.

gaohan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shenyu.git


The following commit(s) were added to refs/heads/master by this push:
     new 7909785cd [ISSUE  #3807] Consolidate etcd dependencies into one
7909785cd is described below

commit 7909785cd4bb3f27cb355aa3144ea14e337e6582
Author: huanccwang <[email protected]>
AuthorDate: Mon Aug 15 22:36:09 2022 +0800

    [ISSUE  #3807] Consolidate etcd dependencies into one
    
    * Consolidate etcd dependencies into one #3807
    
    * Consolidate etcd dependencies into one #3807
---
 pom.xml                                            |   7 +-
 shenyu-admin/pom.xml                               |   1 -
 .../shenyu-register-client-server-etcd/pom.xml     |  32 +-
 .../client/server/etcd/client/EtcdClient.java      | 459 +++++++++++----------
 .../shenyu-register-client-etcd/pom.xml            |   1 -
 .../shenyu-register-instance-etcd/pom.xml          |   1 -
 .../shenyu-sync-data-etcd/pom.xml                  |  15 +-
 7 files changed, 292 insertions(+), 224 deletions(-)

diff --git a/pom.xml b/pom.xml
index 6a73fe6f2..69b2b09b0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -117,7 +117,7 @@
         <spring-boot.version>2.6.8</spring-boot.version>
         <aspectjweaver.version>1.9.6</aspectjweaver.version>
         <h2.version>1.4.200</h2.version>
-        <jetcd-core.version>0.5.0</jetcd-core.version>
+        <jetcd-core.version>0.7.3</jetcd-core.version>
         <java-websocket.version>1.5.0</java-websocket.version>
         <mockito.version>3.5.15</mockito.version>
         <awaitility.version>4.0.3</awaitility.version>
@@ -474,6 +474,11 @@
                 <version>${jakarta.json-api.version}</version>
             </dependency>
 
+            <dependency>
+                <groupId>io.etcd</groupId>
+                <artifactId>jetcd-core</artifactId>
+                <version>${jetcd-core.version}</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 
diff --git a/shenyu-admin/pom.xml b/shenyu-admin/pom.xml
index 01b596b17..663b290da 100644
--- a/shenyu-admin/pom.xml
+++ b/shenyu-admin/pom.xml
@@ -163,7 +163,6 @@
         <dependency>
             <groupId>io.etcd</groupId>
             <artifactId>jetcd-core</artifactId>
-            <version>${jetcd-core.version}</version>
         </dependency>
 
         <dependency>
diff --git 
a/shenyu-register-center/shenyu-register-client-server/shenyu-register-client-server-etcd/pom.xml
 
b/shenyu-register-center/shenyu-register-client-server/shenyu-register-client-server-etcd/pom.xml
index 0b5154afa..bf1527ae9 100644
--- 
a/shenyu-register-center/shenyu-register-client-server/shenyu-register-client-server-etcd/pom.xml
+++ 
b/shenyu-register-center/shenyu-register-client-server/shenyu-register-client-server-etcd/pom.xml
@@ -27,9 +27,22 @@
 
     <dependencies>
         <dependency>
-            <groupId>com.coreos</groupId>
+            <groupId>io.etcd</groupId>
             <artifactId>jetcd-core</artifactId>
-            <version>${jetcd.version}</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>grpc-netty</artifactId>
+                    <groupId>io.grpc</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>grpc-protobuf</artifactId>
+                    <groupId>io.grpc</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>grpc-stub</artifactId>
+                    <groupId>io.grpc</groupId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.shenyu</groupId>
@@ -48,5 +61,20 @@
                 </exclusion>
             </exclusions>
         </dependency>
+        <dependency>
+            <groupId>io.grpc</groupId>
+            <artifactId>grpc-core</artifactId>
+            <version>${grpc.version}</version>
+        </dependency>
+        <dependency>
+            <artifactId>grpc-protobuf</artifactId>
+            <groupId>io.grpc</groupId>
+            <version>${grpc.version}</version>
+        </dependency>
+        <dependency>
+            <artifactId>grpc-stub</artifactId>
+            <groupId>io.grpc</groupId>
+            <version>${grpc.version}</version>
+        </dependency>
     </dependencies>
 </project>
\ No newline at end of file
diff --git 
a/shenyu-register-center/shenyu-register-client-server/shenyu-register-client-server-etcd/src/main/java/org/apache/shenyu/register/client/server/etcd/client/EtcdClient.java
 
b/shenyu-register-center/shenyu-register-client-server/shenyu-register-client-server-etcd/src/main/java/org/apache/shenyu/register/client/server/etcd/client/EtcdClient.java
index 7c450b167..55194d0a4 100644
--- 
a/shenyu-register-center/shenyu-register-client-server/shenyu-register-client-server-etcd/src/main/java/org/apache/shenyu/register/client/server/etcd/client/EtcdClient.java
+++ 
b/shenyu-register-center/shenyu-register-client-server/shenyu-register-client-server-etcd/src/main/java/org/apache/shenyu/register/client/server/etcd/client/EtcdClient.java
@@ -1,217 +1,242 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shenyu.register.client.server.etcd.client;
-
-import com.coreos.jetcd.Client;
-import com.coreos.jetcd.KV;
-import com.coreos.jetcd.Lease;
-import com.coreos.jetcd.Watch;
-import com.coreos.jetcd.data.ByteSequence;
-import com.coreos.jetcd.data.KeyValue;
-import com.coreos.jetcd.kv.GetResponse;
-import com.coreos.jetcd.lease.LeaseGrantResponse;
-import com.coreos.jetcd.options.GetOption;
-import com.coreos.jetcd.options.WatchOption;
-import com.coreos.jetcd.watch.WatchResponse;
-import org.apache.shenyu.common.concurrent.ShenyuThreadFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.function.BiConsumer;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-
-/**
- * etcd client.
- */
-public class EtcdClient {
-
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(EtcdClient.class);
-
-    private static final int EPHEMERAL_LEASE = 60;
-
-    private static final int DEFAULT_CORE_POOL_SIZE = 10;
-
-    private static final int DEFAULT_QUEUE_SIZE = 1000;
-
-    private final ThreadPoolExecutor defaultPoolExecutor;
-
-    private final Client client;
-
-    public EtcdClient(final String urls) {
-        defaultPoolExecutor = new ThreadPoolExecutor(
-                DEFAULT_CORE_POOL_SIZE, DEFAULT_CORE_POOL_SIZE * 2,
-                0L, TimeUnit.NANOSECONDS,
-                new ArrayBlockingQueue<>(DEFAULT_QUEUE_SIZE),
-                ShenyuThreadFactory.create("etcd register center watch-", 
true));
-
-        this.client = Client.builder().endpoints(urls.split(",")).build();
-
-        try {
-            initLease();
-        } catch (ExecutionException | InterruptedException e) {
-            LOGGER.error("initLease error.", e);
-        }
-    }
-
-    private void initLease() throws ExecutionException, InterruptedException {
-        Lease lease = client.getLeaseClient();
-        LeaseGrantResponse response = lease.grant(EPHEMERAL_LEASE).get();
-        long leaseId = response.getID();
-        lease.keepAlive(leaseId);
-    }
-
-    /**
-     * read data.
-     *
-     * @param key key
-     * @return string of data
-     */
-    public String read(final String key) {
-        KV kv = client.getKVClient();
-        ByteSequence storeKey = 
Optional.ofNullable(key).map(ByteSequence::fromString).orElse(null);
-        GetResponse response = null;
-
-        try {
-            response = kv.get(storeKey).get();
-        } catch (InterruptedException | ExecutionException e) {
-            LOGGER.error("read(key:{}) error.", key, e);
-        }
-
-        if (Objects.isNull(response)) {
-            return null;
-        }
-
-        LOGGER.debug(String.valueOf(response.getHeader()));
-        Node info = 
response.getKvs().stream().map(EtcdClient::kv2NodeInfo).findFirst().orElse(null);
-        assert info != null;
-        return info.getValue();
-    }
-
-    /**
-     * get children of path.
-     *
-     * @param path path
-     * @return list of children
-     */
-    public List<String> getChildren(final String path) {
-        try {
-            return listKeys(path);
-        } catch (ExecutionException | InterruptedException e) {
-            LOGGER.error("getChildren(path:{}) error.", path, e);
-        }
-        return null;
-    }
-
-    private List<String> listKeys(final String prefix) throws 
ExecutionException, InterruptedException {
-        KV kv = client.getKVClient();
-        ByteSequence storePrefix = 
Optional.ofNullable(prefix).map(ByteSequence::fromString).orElse(null);
-        GetOption option = 
GetOption.newBuilder().withKeysOnly(true).withPrefix(storePrefix).build();
-        GetResponse response = kv.get(storePrefix, option).get();
-        return response.getKvs().stream()
-                .map(o -> o.getKey().toStringUtf8())
-                .filter(k -> !k.equals(prefix))
-                .collect(Collectors.toList());
-    }
-
-    /**
-     * subscribe children change.
-     *
-     * @param key     key
-     * @param handler event handler
-     */
-    public void subscribeChildChanges(final String key, final 
EtcdListenHandler handler) {
-        defaultPoolExecutor.execute(() -> {
-            final Stoppable stoppable = new Stoppable();
-            try {
-                watchChildren(key, stoppable, handler);
-            } catch (Exception e) {
-                stoppable.stop();
-                LOGGER.warn(String.format("Watch exception of %s", "/s"), e);
-            }
-        });
-    }
-
-    private void watchChildren(final String key, final Supplier<Boolean> 
exitSignSupplier,
-                               final BiConsumer<Event, Node> consumer) throws 
InterruptedException {
-        ByteSequence storeKey = 
Optional.ofNullable(key).map(ByteSequence::fromString).orElse(null);
-
-        try (Watch watch = client.getWatchClient();
-             Watch.Watcher watcher = watch.watch(storeKey,
-                     WatchOption.newBuilder().withPrefix(storeKey).build())) {
-            while (!exitSignSupplier.get()) {
-                WatchResponse response = watcher.listen();
-                response.getEvents().forEach(watchEvent -> {
-                    KeyValue keyValue = watchEvent.getKeyValue();
-                    Node info = kv2NodeInfo(keyValue);
-                    // skip root node change
-                    if (watchEvent.getKeyValue().getKey().equals(storeKey)) {
-                        return;
-                    }
-                    Event event;
-                    switch (watchEvent.getEventType()) {
-                        case PUT:
-                            event = Event.UPDATE;
-                            break;
-                        case DELETE:
-                            event = Event.DELETE;
-                            break;
-                        default:
-                            event = Event.UNRECOGNIZED;
-                    }
-                    consumer.accept(event, info);
-                });
-            }
-        }
-    }
-
-    static Node kv2NodeInfo(final KeyValue kv) {
-        String key = kv.getKey().toStringUtf8();
-        String value = 
Optional.ofNullable(kv.getValue()).map(ByteSequence::toStringUtf8).orElse("");
-        return new Node(key, value, kv.getCreateRevision(), 
kv.getModRevision(), kv.getVersion());
-    }
-
-    /**
-     * close client.
-     */
-    public void close() {
-        Optional.ofNullable(client).ifPresent(Client::close);
-    }
-
-    static class Stoppable implements Supplier<Boolean> {
-
-        private boolean exit;
-
-        @Override
-        public Boolean get() {
-            return exit;
-        }
-
-        void stop() {
-            this.exit = true;
-        }
-
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.register.client.server.etcd.client;
+
+import io.etcd.jetcd.ByteSequence;
+import io.etcd.jetcd.Client;
+import io.etcd.jetcd.KV;
+import io.etcd.jetcd.KeyValue;
+import io.etcd.jetcd.Lease;
+import io.etcd.jetcd.Watch;
+import io.etcd.jetcd.kv.GetResponse;
+import io.etcd.jetcd.lease.LeaseGrantResponse;
+import io.etcd.jetcd.lease.LeaseKeepAliveResponse;
+import io.etcd.jetcd.options.GetOption;
+import io.etcd.jetcd.options.WatchOption;
+import io.etcd.jetcd.watch.WatchEvent;
+import io.grpc.stub.StreamObserver;
+import org.apache.shenyu.common.concurrent.ShenyuThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+/**
+ * etcd client.
+ */
+public class EtcdClient {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(EtcdClient.class);
+
+    private static final int EPHEMERAL_LEASE = 60;
+
+    private static final int DEFAULT_CORE_POOL_SIZE = 10;
+
+    private static final int DEFAULT_QUEUE_SIZE = 1000;
+
+    private final ThreadPoolExecutor defaultPoolExecutor;
+
+    private final Client client;
+
+    public EtcdClient(final String urls) {
+        defaultPoolExecutor = new ThreadPoolExecutor(
+                DEFAULT_CORE_POOL_SIZE, DEFAULT_CORE_POOL_SIZE * 2,
+                0L, TimeUnit.NANOSECONDS,
+                new ArrayBlockingQueue<>(DEFAULT_QUEUE_SIZE),
+                ShenyuThreadFactory.create("etcd register center watch-", 
true));
+
+        this.client = Client.builder().endpoints(urls.split(",")).build();
+
+        try {
+            initLease();
+        } catch (ExecutionException | InterruptedException e) {
+            LOGGER.error("initLease error.", e);
+        }
+    }
+
+    private void initLease() throws ExecutionException, InterruptedException {
+        Lease lease = client.getLeaseClient();
+        LeaseGrantResponse response = lease.grant(EPHEMERAL_LEASE).get();
+        long leaseId = response.getID();
+        lease.keepAlive(leaseId, new StreamObserver<LeaseKeepAliveResponse>() {
+            @Override
+            public void onNext(final LeaseKeepAliveResponse 
leaseKeepAliveResponse) {
+
+            }
+
+            @Override
+            public void onError(final Throwable throwable) {
+
+                LOGGER.error("keep alive error", throwable);
+            }
+
+            @Override
+            public void onCompleted() {
+
+            }
+        });
+    }
+
+    /**
+     * read data.
+     *
+     * @param key key
+     * @return string of data
+     */
+    public String read(final String key) {
+        KV kv = client.getKVClient();
+        ByteSequence storeKey = ByteSequence.from(key, StandardCharsets.UTF_8);
+        GetResponse response = null;
+
+        try {
+            response = kv.get(storeKey).get();
+        } catch (InterruptedException | ExecutionException e) {
+            LOGGER.error("read(key:{}) error.", key, e);
+        }
+
+        if (Objects.isNull(response)) {
+            return null;
+        }
+
+        LOGGER.debug(String.valueOf(response.getHeader()));
+        Node info = 
response.getKvs().stream().map(EtcdClient::kv2NodeInfo).findFirst().orElse(null);
+        assert info != null;
+        return info.getValue();
+    }
+
+    /**
+     * get children of path.
+     *
+     * @param path path
+     * @return list of children
+     */
+    public List<String> getChildren(final String path) {
+        try {
+            return listKeys(path);
+        } catch (ExecutionException | InterruptedException e) {
+            LOGGER.error("getChildren(path:{}) error.", path, e);
+        }
+        return null;
+    }
+
+    private List<String> listKeys(final String prefix) throws 
ExecutionException, InterruptedException {
+        KV kv = client.getKVClient();
+        ByteSequence storePrefix = ByteSequence.from(prefix, 
StandardCharsets.UTF_8);
+        GetOption option = 
GetOption.newBuilder().withKeysOnly(true).withPrefix(storePrefix).build();
+        GetResponse response = kv.get(storePrefix, option).get();
+        return response.getKvs().stream()
+                .map(o -> o.getKey().toString())
+                .filter(k -> !k.equals(prefix))
+                .collect(Collectors.toList());
+    }
+
+    /**
+     * subscribe children change.
+     *
+     * @param key     key
+     * @param handler event handler
+     */
+    public void subscribeChildChanges(final String key, final 
EtcdListenHandler handler) {
+        defaultPoolExecutor.execute(() -> {
+            final Stoppable stoppable = new Stoppable();
+            try {
+                watchChildren(key, stoppable, handler);
+            } catch (Exception e) {
+                stoppable.stop();
+                LOGGER.warn(String.format("Watch exception of %s", "/s"), e);
+            }
+        });
+    }
+
+    private void watchChildren(final String key, final Supplier<Boolean> 
exitSignSupplier,
+                               final BiConsumer<Event, Node> consumer) throws 
InterruptedException {
+        ByteSequence storeKey = ByteSequence.from(key, StandardCharsets.UTF_8);
+        Watch.Listener listener = watch(exitSignSupplier, storeKey, consumer);
+        WatchOption option = WatchOption.newBuilder()
+                .withPrefix(ByteSequence.from(key, StandardCharsets.UTF_8))
+                .build();
+        Watch.Watcher watch = 
client.getWatchClient().watch(ByteSequence.from(key, StandardCharsets.UTF_8), 
option, listener);
+        watch.close();
+    }
+
+    private Watch.Listener watch(final Supplier<Boolean> exitSignSupplier, 
final ByteSequence storeKey,
+                                 final BiConsumer<Event, Node> consumer) {
+        return Watch.listener(response -> {
+            while (!exitSignSupplier.get()) {
+                for (WatchEvent watchEvent : response.getEvents()) {
+                    KeyValue keyValue = watchEvent.getKeyValue();
+                    Node info = kv2NodeInfo(keyValue);
+                    // skip root node change
+                    if (watchEvent.getKeyValue().getKey().equals(storeKey)) {
+                        return;
+                    }
+                    Event event;
+                    switch (watchEvent.getEventType()) {
+                        case PUT:
+                            event = Event.UPDATE;
+                            break;
+                        case DELETE:
+                            event = Event.DELETE;
+                            break;
+                        default:
+                            event = Event.UNRECOGNIZED;
+                    }
+                    consumer.accept(event, info);
+                }
+            }
+        });
+    }
+
+    static Node kv2NodeInfo(final KeyValue kv) {
+        String key = kv.getKey().toString();
+        String value = 
Optional.ofNullable(kv.getValue()).map(ByteSequence::toString).orElse("");
+        return new Node(key, value, kv.getCreateRevision(), 
kv.getModRevision(), kv.getVersion());
+    }
+
+    /**
+     * close client.
+     */
+    public void close() {
+        Optional.ofNullable(client).ifPresent(Client::close);
+    }
+
+    static class Stoppable implements Supplier<Boolean> {
+
+        private boolean exit;
+
+        @Override
+        public Boolean get() {
+            return exit;
+        }
+
+        void stop() {
+            this.exit = true;
+        }
+
+    }
+}
diff --git 
a/shenyu-register-center/shenyu-register-client/shenyu-register-client-etcd/pom.xml
 
b/shenyu-register-center/shenyu-register-client/shenyu-register-client-etcd/pom.xml
index 579752a66..b71257d8a 100644
--- 
a/shenyu-register-center/shenyu-register-client/shenyu-register-client-etcd/pom.xml
+++ 
b/shenyu-register-center/shenyu-register-client/shenyu-register-client-etcd/pom.xml
@@ -30,7 +30,6 @@
         <dependency>
             <groupId>io.etcd</groupId>
             <artifactId>jetcd-core</artifactId>
-            <version>0.5.0</version>
             <exclusions>
                 <exclusion>
                     <artifactId>grpc-protobuf</artifactId>
diff --git 
a/shenyu-register-center/shenyu-register-instance/shenyu-register-instance-etcd/pom.xml
 
b/shenyu-register-center/shenyu-register-instance/shenyu-register-instance-etcd/pom.xml
index db57807d0..10ea212ae 100644
--- 
a/shenyu-register-center/shenyu-register-instance/shenyu-register-instance-etcd/pom.xml
+++ 
b/shenyu-register-center/shenyu-register-instance/shenyu-register-instance-etcd/pom.xml
@@ -35,7 +35,6 @@
         <dependency>
             <groupId>io.etcd</groupId>
             <artifactId>jetcd-core</artifactId>
-            <version>${jetcd-core.version}</version>
             <exclusions>
                 <exclusion>
                     <artifactId>grpc-protobuf</artifactId>
diff --git a/shenyu-sync-data-center/shenyu-sync-data-etcd/pom.xml 
b/shenyu-sync-data-center/shenyu-sync-data-etcd/pom.xml
index 505a6c620..0c4d3bc48 100644
--- a/shenyu-sync-data-center/shenyu-sync-data-etcd/pom.xml
+++ b/shenyu-sync-data-center/shenyu-sync-data-etcd/pom.xml
@@ -34,7 +34,20 @@
         <dependency>
             <groupId>io.etcd</groupId>
             <artifactId>jetcd-core</artifactId>
-            <version>0.5.0</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>grpc-netty</artifactId>
+                    <groupId>io.grpc</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>grpc-protobuf</artifactId>
+                    <groupId>io.grpc</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>grpc-stub</artifactId>
+                    <groupId>io.grpc</groupId>
+                </exclusion>
+            </exclusions>
         </dependency>
     </dependencies>
 

Reply via email to