This is an automated email from the ASF dual-hosted git repository.

liuhongyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shenyu.git


The following commit(s) were added to refs/heads/master by this push:
     new b984ad2c6d Refactor EtcdClient and RedisConnectionFactory for improved 
readability and maintainability (#6250)
b984ad2c6d is described below

commit b984ad2c6d62a269e6628da62954cc4d761beb28
Author: aias00 <[email protected]>
AuthorDate: Mon Dec 8 18:54:04 2025 +0800

    Refactor EtcdClient and RedisConnectionFactory for improved readability and 
maintainability (#6250)
    
    * Refactor EtcdClient and RedisConnectionFactory for improved readability 
and maintainability
    
    * Update 
shenyu-infra/shenyu-infra-redis/src/main/java/org/apache/shenyu/infra/redis/RedisConnectionFactory.java
    
    Co-authored-by: Copilot <[email protected]>
    
    * Update 
shenyu-infra/shenyu-infra-redis/src/main/java/org/apache/shenyu/infra/redis/RedisConnectionFactory.java
    
    Co-authored-by: Copilot <[email protected]>
    
    * feat: integrate mvnd installation and usage in CI workflows
    
    * fix: handle InterruptedException in EtcdClient and update tests for 
proper exception throwing
    
    * feat: integrate mvnd installation and usage in CI workflows
    
    * feat: enhance keep-alive mechanism in EtcdClient with exponential backoff 
retries and add corresponding unit test
    
    * test: add unit tests for keep-alive mechanism in EtcdClient and validate 
RedisNode parsing
    
    * test: refactor observer registration assertion in EtcdClientTest for 
improved clarity and reliability
    
    * feat: integrate mvnd installation and usage in CI workflows
    
    * test: refactor RedisConnectionFactoryTest to improve invalid node 
assertions
    
    * test: enhance assertions in EtcdClientTest for better clarity and 
reliability
    
    ---------
    
    Co-authored-by: Copilot <[email protected]>
---
 .../shenyu/infra/etcd/client/EtcdClient.java       | 193 ++++++++++++++++-----
 .../shenyu/infra/etcd/client/EtcdClientTest.java   | 112 +++++++++++-
 .../shenyu/infra/redis/RedisConnectionFactory.java |  93 ++++++++--
 .../infra/redis/RedisConnectionFactoryTest.java    |  69 ++++++++
 .../shenyu/registry/etcd/EtcdClientTest.java       |  22 ++-
 5 files changed, 423 insertions(+), 66 deletions(-)

diff --git 
a/shenyu-infra/shenyu-infra-etcd/src/main/java/org/apache/shenyu/infra/etcd/client/EtcdClient.java
 
b/shenyu-infra/shenyu-infra-etcd/src/main/java/org/apache/shenyu/infra/etcd/client/EtcdClient.java
index fcc5b4095d..6ea02a6ed8 100644
--- 
a/shenyu-infra/shenyu-infra-etcd/src/main/java/org/apache/shenyu/infra/etcd/client/EtcdClient.java
+++ 
b/shenyu-infra/shenyu-infra-etcd/src/main/java/org/apache/shenyu/infra/etcd/client/EtcdClient.java
@@ -42,8 +42,12 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
@@ -56,6 +60,10 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 public class EtcdClient {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(EtcdClient.class);
+    
+    private static final int MAX_RETRY_DELAY_SECONDS = 60;
+    
+    private static final int INITIAL_RETRY_DELAY_SECONDS = 1;
 
     private final Client client;
 
@@ -69,11 +77,26 @@ public class EtcdClient {
 
     private final ConcurrentHashMap<String, Watch.Watcher> watchChildCache = 
new ConcurrentHashMap<>();
 
+    private volatile boolean keepAliveRunning = true;
+
+    private volatile StreamObserver<LeaseKeepAliveResponse> currentObserver;
+
+    private final ScheduledExecutorService retryExecutor;
+
+    private volatile ScheduledFuture<?> retryFuture;
+
+    private final AtomicInteger retryCount = new AtomicInteger(0);
+
     public EtcdClient(final Client client, final long ttl, final long timeout) 
{
 
         this.ttl = ttl;
         this.timeout = timeout;
         this.client = client;
+        this.retryExecutor = new ScheduledThreadPoolExecutor(1, r -> {
+            Thread thread = new Thread(r, "etcd-keepalive-retry");
+            thread.setDaemon(true);
+            return thread;
+        });
 
         initLease();
     }
@@ -82,6 +105,19 @@ public class EtcdClient {
      * close client.
      */
     public void close() {
+        keepAliveRunning = false;
+        if (Objects.nonNull(retryFuture)) {
+            retryFuture.cancel(false);
+        }
+        retryExecutor.shutdown();
+        try {
+            if (!retryExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+                retryExecutor.shutdownNow();
+            }
+        } catch (InterruptedException e) {
+            retryExecutor.shutdownNow();
+            Thread.currentThread().interrupt();
+        }
         this.client.close();
     }
 
@@ -129,6 +165,7 @@ public class EtcdClient {
 
     /**
      * bytesOf string.
+     * 
      * @param val val.
      * @return bytes val.
      */
@@ -145,7 +182,8 @@ public class EtcdClient {
      * @throws ExecutionException   the exception
      * @throws InterruptedException the exception
      */
-    public List<String> getChildrenKeys(final String prefix, final String 
separator) throws ExecutionException, InterruptedException {
+    public List<String> getChildrenKeys(final String prefix, final String 
separator)
+            throws ExecutionException, InterruptedException {
         ByteSequence prefixByteSequence = bytesOf(prefix);
         GetOption getOption = GetOption.newBuilder()
                 .withPrefix(prefixByteSequence)
@@ -170,10 +208,11 @@ public class EtcdClient {
      *
      * @param prefix    key prefix.
      * @param separator separator char
-     * @param map prefix map
+     * @param map       prefix map
      * @return sub map
      */
-    public List<String> getChildrenKeysByMap(final String prefix, final String 
separator, final Map<String, String> map) {
+    public List<String> getChildrenKeysByMap(final String prefix, final String 
separator,
+            final Map<String, String> map) {
 
         return map.entrySet().stream()
                 .filter(e -> e.getKey().contains(prefix))
@@ -199,8 +238,8 @@ public class EtcdClient {
      * @param deleteHandler node value handler of delete
      */
     public void watchDataChange(final String key,
-                                final BiConsumer<String, String> updateHandler,
-                                final Consumer<String> deleteHandler) {
+            final BiConsumer<String, String> updateHandler,
+            final Consumer<String> deleteHandler) {
         Watch.Listener listener = watch(updateHandler, deleteHandler);
         if (!watchCache.containsKey(key)) {
             Watch.Watcher watch = 
client.getWatchClient().watch(ByteSequence.from(key, UTF_8), listener);
@@ -216,8 +255,8 @@ public class EtcdClient {
      * @param deleteHandler sub node delete of delete
      */
     public void watchChildChange(final String key,
-                                 final BiConsumer<String, String> 
updateHandler,
-                                 final Consumer<String> deleteHandler) {
+            final BiConsumer<String, String> updateHandler,
+            final Consumer<String> deleteHandler) {
         Watch.Listener listener = watch(updateHandler, deleteHandler);
         WatchOption option = WatchOption.newBuilder()
                 .withPrefix(ByteSequence.from(key, UTF_8))
@@ -229,7 +268,7 @@ public class EtcdClient {
     }
 
     private Watch.Listener watch(final BiConsumer<String, String> 
updateHandler,
-                                 final Consumer<String> deleteHandler) {
+            final Consumer<String> deleteHandler) {
         return Watch.listener(response -> {
             for (WatchEvent event : response.getEvents()) {
                 String path = event.getKeyValue().getKey().toString(UTF_8);
@@ -245,9 +284,9 @@ public class EtcdClient {
                 }
             }
         }, throwable -> {
-                LOG.error("etcd watch error {}", throwable.getMessage(), 
throwable);
-                throw new ShenyuException(throwable);
-            });
+            LOG.error("etcd watch error {}", throwable.getMessage(), 
throwable);
+            throw new ShenyuException(throwable);
+        });
     }
 
     /**
@@ -268,6 +307,7 @@ public class EtcdClient {
 
     /**
      * check node exists.
+     * 
      * @param key node name
      * @return bool
      */
@@ -276,7 +316,8 @@ public class EtcdClient {
             GetOption option = GetOption.newBuilder()
                     .withPrefix(ByteSequence.from(key, StandardCharsets.UTF_8))
                     .build();
-            List<KeyValue> keyValues = 
client.getKVClient().get(ByteSequence.from(key, StandardCharsets.UTF_8), 
option).get().getKvs();
+            List<KeyValue> keyValues = 
client.getKVClient().get(ByteSequence.from(key, StandardCharsets.UTF_8), option)
+                    .get().getKvs();
             return !keyValues.isEmpty();
         } catch (Exception e) {
             LOG.error("check node exists error", e);
@@ -286,12 +327,14 @@ public class EtcdClient {
 
     /**
      * update value of node.
-     * @param key node name
+     * 
+     * @param key   node name
      * @param value node value
      */
     public void put(final String key, final String value) {
         try {
-            client.getKVClient().put(ByteSequence.from(key, 
StandardCharsets.UTF_8), ByteSequence.from(value, 
StandardCharsets.UTF_8)).get();
+            client.getKVClient().put(ByteSequence.from(key, 
StandardCharsets.UTF_8),
+                    ByteSequence.from(value, StandardCharsets.UTF_8)).get();
         } catch (Exception e) {
             LOG.error("update value of node error.", e);
             throw new ShenyuException(e);
@@ -300,6 +343,7 @@ public class EtcdClient {
 
     /**
      * delete node.
+     * 
      * @param key node name
      */
     public void delete(final String key) {
@@ -308,6 +352,7 @@ public class EtcdClient {
 
     /**
      * delete node of recursive.
+     * 
      * @param path parent node name
      */
     public void deleteEtcdPathRecursive(final String path) {
@@ -315,40 +360,18 @@ public class EtcdClient {
                 .withPrefix(ByteSequence.from(path, StandardCharsets.UTF_8))
                 .build();
         try {
-            client.getKVClient().delete(ByteSequence.from(path, 
StandardCharsets.UTF_8), option).get(10, TimeUnit.SECONDS);
+            client.getKVClient().delete(ByteSequence.from(path, 
StandardCharsets.UTF_8), option).get(10,
+                    TimeUnit.SECONDS);
         } catch (Exception e) {
             LOG.error("delete node of recursive error.", e);
             throw new ShenyuException(e);
         }
     }
 
-    private void initLease() {
-        try {
-            this.globalLeaseId = 
client.getLeaseClient().grant(ttl).get().getID();
-            client.getLeaseClient().keepAlive(globalLeaseId, new 
StreamObserver<>() {
-                @Override
-                public void onNext(final LeaseKeepAliveResponse 
leaseKeepAliveResponse) {
-                }
-
-                @Override
-                public void onError(final Throwable throwable) {
-                    LOG.error("keep alive error", throwable);
-                }
-
-                @Override
-                public void onCompleted() {
-                }
-            });
-        } catch (InterruptedException | ExecutionException e) {
-            LOG.error("initLease error.", e);
-        }
-    }
-
-
     /**
      * watchKeyChanges.
      *
-     * @param key key
+     * @param key      key
      * @param listener listener
      * @return {@link Watch.Watcher}
      */
@@ -360,7 +383,8 @@ public class EtcdClient {
 
     /**
      * get keyResponse.
-     * @param key watch key.
+     * 
+     * @param key       watch key.
      * @param getOption get option.
      * @return key response.
      */
@@ -375,22 +399,107 @@ public class EtcdClient {
 
     /**
      * put data as ephemeral.
-     * @param key key
+     * 
+     * @param key   key
      * @param value value
      */
     public void putEphemeral(final String key, final String value) {
         try {
             KV kvClient = client.getKVClient();
             kvClient.put(ByteSequence.from(key, UTF_8), 
ByteSequence.from(value, UTF_8),
-                            
PutOption.newBuilder().withLeaseId(globalLeaseId).build())
+                    PutOption.newBuilder().withLeaseId(globalLeaseId).build())
                     .get(timeout, TimeUnit.MILLISECONDS);
         } catch (InterruptedException | ExecutionException | TimeoutException 
e) {
             LOG.error("putEphemeral(key:{},value:{}) error.", key, value, e);
+            throw new ShenyuException(e);
         }
     }
 
+    private void initLease() {
+        try {
+            this.globalLeaseId = 
client.getLeaseClient().grant(ttl).get().getID();
+            keepAlive();
+        } catch (InterruptedException e) {
+            LOG.error("initLease error.", e);
+            Thread.currentThread().interrupt();
+            throw new ShenyuException(e);
+        } catch (ExecutionException e) {
+            LOG.error("initLease error.", e);
+            throw new ShenyuException(e);
+        }
+    }
+
+    private void keepAlive() {
+        if (!keepAliveRunning) {
+            return;
+        }
+        retryCount.set(0);
+        scheduleRetryInternal(0);
+    }
+
+    private void scheduleRetry() {
+        int attempt = retryCount.incrementAndGet();
+        scheduleRetryInternal(attempt);
+    }
+
+    private void scheduleRetryInternal(final int attempt) {
+        if (!keepAliveRunning) {
+            return;
+        }
+
+        if (Objects.nonNull(retryFuture) && !retryFuture.isDone()) {
+            retryFuture.cancel(false);
+        }
+
+        long delaySeconds;
+        if (attempt <= 0) {
+            delaySeconds = 0;
+        } else {
+            long backoffFactor = 1L << Math.min(Math.max(attempt - 1, 0), 6);
+            delaySeconds = Math.min(INITIAL_RETRY_DELAY_SECONDS * 
backoffFactor, MAX_RETRY_DELAY_SECONDS);
+        }
+
+        retryFuture = retryExecutor.schedule(() -> {
+            if (!keepAliveRunning) {
+                return;
+            }
+
+            currentObserver = new StreamObserver<>() {
+                @Override
+                public void onNext(final LeaseKeepAliveResponse 
leaseKeepAliveResponse) {
+                    retryCount.set(0);
+                }
+
+                @Override
+                public void onError(final Throwable throwable) {
+                    LOG.error("keep alive error", throwable);
+                    currentObserver = null;
+                    scheduleRetry();
+                }
+
+                @Override
+                public void onCompleted() {
+                    LOG.warn("keep alive stream completed");
+                    currentObserver = null;
+                    scheduleRetry();
+                }
+            };
+
+            try {
+                client.getLeaseClient().keepAlive(globalLeaseId, 
currentObserver);
+            } catch (Exception e) {
+                LOG.error("Failed to start keep alive", e);
+                currentObserver = null;
+                scheduleRetry();
+            }
+        }, delaySeconds, TimeUnit.SECONDS);
+
+        LOG.debug("Scheduled keep alive retry in {} seconds (attempt: {})", 
delaySeconds, attempt);
+    }
+
     /**
      * Create a new {@link Builder} instance for building an {@link 
EtcdClient}.
+     * 
      * @return a new {@link Builder} instance
      */
     public static Builder builder() {
diff --git 
a/shenyu-infra/shenyu-infra-etcd/src/test/java/org/apache/shenyu/infra/etcd/client/EtcdClientTest.java
 
b/shenyu-infra/shenyu-infra-etcd/src/test/java/org/apache/shenyu/infra/etcd/client/EtcdClientTest.java
index bb5e3eb252..a100b7a354 100644
--- 
a/shenyu-infra/shenyu-infra-etcd/src/test/java/org/apache/shenyu/infra/etcd/client/EtcdClientTest.java
+++ 
b/shenyu-infra/shenyu-infra-etcd/src/test/java/org/apache/shenyu/infra/etcd/client/EtcdClientTest.java
@@ -33,8 +33,10 @@ import org.mockito.MockedStatic;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
@@ -63,31 +65,122 @@ public class EtcdClientTest {
             when(client.getLeaseClient()).thenReturn(lease);
             final CompletableFuture<LeaseGrantResponse> completableFuture = 
mock(CompletableFuture.class);
             final LeaseGrantResponse leaseGrantResponse = 
mock(LeaseGrantResponse.class);
+            when(leaseGrantResponse.getID()).thenReturn(1L);
 
             
when(client.getLeaseClient().grant(anyLong())).thenReturn(completableFuture);
             when(completableFuture.get()).thenReturn(leaseGrantResponse);
-            Assertions.assertDoesNotThrow(() -> 
EtcdClient.builder().client(Client.builder().endpoints("url").build())).ttl(60L).timeout(3000L).build();
-
             List<StreamObserver<LeaseKeepAliveResponse>> observerList = new 
ArrayList<>();
             doAnswer(invocation -> {
                 observerList.add(invocation.getArgument(1));
                 return lease;
             }).when(lease).keepAlive(anyLong(), any());
-            Assertions.assertDoesNotThrow(() -> 
EtcdClient.builder().client(Client.builder().endpoints("url").build())).ttl(60L).timeout(3000L).build();
+
+            final EtcdClient etcdClient = Assertions.assertDoesNotThrow(() -> 
EtcdClient.builder()
+                    .client(Client.builder().endpoints("url").build())
+                    .ttl(60L)
+                    .timeout(3000L)
+                    .build());
+            Assertions.assertNotNull(etcdClient);
+
             final LeaseKeepAliveResponse leaseKeepAliveResponse = 
mock(LeaseKeepAliveResponse.class);
             observerList.forEach(streamObserver -> {
                 streamObserver.onCompleted();
                 streamObserver.onError(new ShenyuException("test"));
                 streamObserver.onNext(leaseKeepAliveResponse);
             });
+            etcdClient.close();
 
             doThrow(new 
InterruptedException("error")).when(completableFuture).get();
-            Assertions.assertDoesNotThrow(() -> 
EtcdClient.builder().client(Client.builder().endpoints("url").build())).ttl(60L).timeout(3000L).build();
+            Assertions.assertThrows(ShenyuException.class, () -> 
EtcdClient.builder()
+                    .client(Client.builder().endpoints("url").build())
+                    .ttl(60L)
+                    .timeout(3000L)
+                    .build());
         } catch (Exception e) {
             throw new ShenyuException(e.getCause());
         }
     }
 
+    @Test
+    public void keepAliveRetryTest() throws ExecutionException, 
InterruptedException {
+        try (MockedStatic<Client> clientMockedStatic = 
mockStatic(Client.class)) {
+            final Client client = this.mockEtcd(clientMockedStatic);
+            final Lease lease = client.getLeaseClient();
+
+            AtomicInteger keepAliveInvoked = new AtomicInteger(0);
+            CountDownLatch latch = new CountDownLatch(2);
+            CountDownLatch firstCallLatch = new CountDownLatch(1);
+            List<StreamObserver<LeaseKeepAliveResponse>> observerList = new 
ArrayList<>();
+
+            doAnswer(invocation -> {
+                keepAliveInvoked.incrementAndGet();
+                latch.countDown();
+                firstCallLatch.countDown();
+                observerList.add(invocation.getArgument(1));
+                return lease;
+            }).when(lease).keepAlive(anyLong(), any());
+
+            final EtcdClient etcdClient = EtcdClient.builder()
+                    .client(Client.builder().endpoints("url").build())
+                    .ttl(60L)
+                    .timeout(3000L)
+                    .build();
+            Assertions.assertNotNull(etcdClient);
+
+            // Wait for the initial keepAlive registration to complete
+            Assertions.assertTrue(firstCallLatch.await(2, TimeUnit.SECONDS), 
"Initial keepAlive should run");
+            awaitObserverRegistered(observerList);
+
+            // Trigger error to schedule retry keep-alive
+            observerList.get(0).onError(new RuntimeException("test-error"));
+
+            boolean retried = latch.await(3, TimeUnit.SECONDS);
+            etcdClient.close();
+
+            Assertions.assertTrue(retried, "keepAlive should retry after 
onError");
+            Assertions.assertTrue(keepAliveInvoked.get() >= 2, "Expected retry 
keepAlive invocation");
+        }
+    }
+
+    @Test
+    public void keepAliveCompletedRetryTest() throws ExecutionException, 
InterruptedException {
+        try (MockedStatic<Client> clientMockedStatic = 
mockStatic(Client.class)) {
+            final Client client = this.mockEtcd(clientMockedStatic);
+            final Lease lease = client.getLeaseClient();
+
+            AtomicInteger keepAliveInvoked = new AtomicInteger(0);
+            CountDownLatch callsLatch = new CountDownLatch(2);
+            CountDownLatch firstCallLatch = new CountDownLatch(1);
+            List<StreamObserver<LeaseKeepAliveResponse>> observerList = new 
ArrayList<>();
+
+            doAnswer(invocation -> {
+                keepAliveInvoked.incrementAndGet();
+                callsLatch.countDown();
+                firstCallLatch.countDown();
+                observerList.add(invocation.getArgument(1));
+                return lease;
+            }).when(lease).keepAlive(anyLong(), any());
+
+            final EtcdClient etcdClient = EtcdClient.builder()
+                    .client(Client.builder().endpoints("url").build())
+                    .ttl(60L)
+                    .timeout(3000L)
+                    .build();
+            Assertions.assertNotNull(etcdClient);
+
+            Assertions.assertTrue(firstCallLatch.await(2, TimeUnit.SECONDS), 
"Initial keepAlive should run");
+            awaitObserverRegistered(observerList);
+
+            observerList.get(0).onCompleted();
+
+            boolean retried = callsLatch.await(3, TimeUnit.SECONDS);
+            etcdClient.close();
+
+            Assertions.assertTrue(retried, "keepAlive should retry after 
onCompleted");
+            Assertions.assertTrue(keepAliveInvoked.get() >= 2, "Expected 
keepAlive invocation after completion");
+        }
+    }
+
     @Test
     public void closeTest() {
         try (MockedStatic<Client> clientMockedStatic = 
mockStatic(Client.class)) {
@@ -113,7 +206,7 @@ public class EtcdClientTest {
             etcdClient.putEphemeral("key", "value");
 
             doThrow(new 
InterruptedException("error")).when(completableFuture).get(anyLong(), 
any(TimeUnit.class));
-            etcdClient.putEphemeral("key", "value");
+            Assertions.assertThrows(ShenyuException.class, () -> 
etcdClient.putEphemeral("key", "value"));
         } catch (Exception e) {
             throw new ShenyuException(e.getCause());
         }
@@ -129,9 +222,18 @@ public class EtcdClientTest {
         when(client.getLeaseClient()).thenReturn(lease);
         final CompletableFuture<LeaseGrantResponse> completableFuture = 
mock(CompletableFuture.class);
         final LeaseGrantResponse leaseGrantResponse = 
mock(LeaseGrantResponse.class);
+        when(leaseGrantResponse.getID()).thenReturn(1L);
         
when(client.getLeaseClient().grant(anyLong())).thenReturn(completableFuture);
         when(completableFuture.get()).thenReturn(leaseGrantResponse);
         return client;
     }
 
+    private void awaitObserverRegistered(final 
List<StreamObserver<LeaseKeepAliveResponse>> observerList)
+            throws InterruptedException {
+        for (int i = 0; i < 20 && observerList.isEmpty(); i++) {
+            Thread.sleep(50);
+        }
+        Assertions.assertFalse(observerList.isEmpty(), "Observer should be 
registered");
+    }
+
 }
diff --git 
a/shenyu-infra/shenyu-infra-redis/src/main/java/org/apache/shenyu/infra/redis/RedisConnectionFactory.java
 
b/shenyu-infra/shenyu-infra-redis/src/main/java/org/apache/shenyu/infra/redis/RedisConnectionFactory.java
index f694f8eb14..f8f68bb269 100644
--- 
a/shenyu-infra/shenyu-infra-redis/src/main/java/org/apache/shenyu/infra/redis/RedisConnectionFactory.java
+++ 
b/shenyu-infra/shenyu-infra-redis/src/main/java/org/apache/shenyu/infra/redis/RedisConnectionFactory.java
@@ -29,8 +29,6 @@ import 
org.springframework.data.redis.connection.RedisStandaloneConfiguration;
 import 
org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
 import 
org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
 import 
org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration;
-import org.springframework.util.Assert;
-import org.springframework.util.StringUtils;
 
 import java.time.Duration;
 import java.util.ArrayList;
@@ -51,6 +49,7 @@ public class RedisConnectionFactory {
 
     /**
      * Get Lettuce connection factory.
+     * 
      * @return the factory
      */
     public LettuceConnectionFactory getLettuceConnectionFactory() {
@@ -60,15 +59,19 @@ public class RedisConnectionFactory {
     private LettuceConnectionFactory createLettuceConnectionFactory(final 
RedisConfigProperties redisConfigProperties) {
         LettuceClientConfiguration lettuceClientConfiguration = 
getLettuceClientConfiguration(redisConfigProperties);
         if 
(RedisModeEnum.SENTINEL.getName().equals(redisConfigProperties.getMode())) {
-            return new 
LettuceConnectionFactory(redisSentinelConfiguration(redisConfigProperties), 
lettuceClientConfiguration);
+            return new 
LettuceConnectionFactory(redisSentinelConfiguration(redisConfigProperties),
+                    lettuceClientConfiguration);
         }
         if 
(RedisModeEnum.CLUSTER.getName().equals(redisConfigProperties.getMode())) {
-            return new 
LettuceConnectionFactory(redisClusterConfiguration(redisConfigProperties), 
lettuceClientConfiguration);
+            return new 
LettuceConnectionFactory(redisClusterConfiguration(redisConfigProperties),
+                    lettuceClientConfiguration);
         }
-        return new 
LettuceConnectionFactory(redisStandaloneConfiguration(redisConfigProperties), 
lettuceClientConfiguration);
+        return new 
LettuceConnectionFactory(redisStandaloneConfiguration(redisConfigProperties),
+                lettuceClientConfiguration);
     }
 
-    private LettuceClientConfiguration getLettuceClientConfiguration(final 
RedisConfigProperties redisConfigProperties) {
+    private LettuceClientConfiguration getLettuceClientConfiguration(
+            final RedisConfigProperties redisConfigProperties) {
         return 
LettucePoolingClientConfiguration.builder().poolConfig(getPoolConfig(redisConfigProperties)).build();
     }
 
@@ -89,12 +92,12 @@ public class RedisConnectionFactory {
      * @param redisConfigProperties the rate limiter config
      * @return the redis standalone configuration
      */
-    protected final RedisStandaloneConfiguration 
redisStandaloneConfiguration(final RedisConfigProperties redisConfigProperties) 
{
+    protected final RedisStandaloneConfiguration redisStandaloneConfiguration(
+            final RedisConfigProperties redisConfigProperties) {
         RedisStandaloneConfiguration config = new 
RedisStandaloneConfiguration();
-        String[] parts = StringUtils.split(redisConfigProperties.getUrl(), 
":");
-        Objects.requireNonNull(parts);
-        config.setHostName(parts[0]);
-        config.setPort(Integer.parseInt(parts[1]));
+        RedisNode redisNode = parseRedisNode(redisConfigProperties.getUrl());
+        config.setHostName(redisNode.getHost());
+        config.setPort(redisNode.getPort());
         if (Objects.nonNull(redisConfigProperties.getPassword())) {
             
config.setPassword(RedisPassword.of(redisConfigProperties.getPassword()));
         }
@@ -126,10 +129,72 @@ public class RedisConnectionFactory {
         List<RedisNode> redisNodes = new ArrayList<>();
         List<String> nodes = Lists.newArrayList(Splitter.on(";").split(url));
         for (String node : nodes) {
-            String[] parts = StringUtils.split(node, ":");
-            Assert.state(Objects.requireNonNull(parts).length == 2, "Must be 
defined as 'host:port'");
-            redisNodes.add(new RedisNode(parts[0], 
Integer.parseInt(parts[1])));
+            redisNodes.add(parseRedisNode(node));
         }
         return redisNodes;
     }
+
+    private RedisNode parseRedisNode(final String url) {
+        if (Objects.isNull(url) || url.trim().isEmpty()) {
+            throw new IllegalArgumentException("Redis node URL cannot be null 
or empty");
+        }
+        
+        String trimmedUrl = url.trim();
+        String host = trimmedUrl;
+        int port = 6379;
+        int bracketIndex = trimmedUrl.lastIndexOf("]");
+        
+        if (bracketIndex > -1) {
+            // IPv6 address format: [::1] or [::1]:6379
+            if (!trimmedUrl.startsWith("[")) {
+                throw new IllegalArgumentException("Invalid IPv6 format in 
Redis node URL: " + url);
+            }
+            int closingBracket = trimmedUrl.indexOf("]");
+            if (closingBracket == -1 || closingBracket != bracketIndex) {
+                throw new IllegalArgumentException("Invalid IPv6 format in 
Redis node URL: " + url);
+            }
+            
+            // Extract IPv6 address (remove brackets)
+            host = trimmedUrl.substring(1, closingBracket);
+            
+            // Check if port is specified after closing bracket
+            if (closingBracket < trimmedUrl.length() - 1 && 
trimmedUrl.charAt(closingBracket + 1) == ':') {
+                String portStr = trimmedUrl.substring(closingBracket + 2);
+                if (portStr.isEmpty()) {
+                    throw new IllegalArgumentException("Port cannot be empty 
in Redis node URL: " + url);
+                }
+                try {
+                    port = Integer.parseInt(portStr);
+                } catch (NumberFormatException e) {
+                    throw new IllegalArgumentException("Invalid port in Redis 
node URL: " + url, e);
+                }
+            }
+        } else {
+            // IPv4 or hostname format: localhost:6379 or 192.168.1.1:6379
+            int lastColonIndex = trimmedUrl.lastIndexOf(":");
+            if (lastColonIndex > 0 && lastColonIndex < trimmedUrl.length() - 
1) {
+                String portStr = trimmedUrl.substring(lastColonIndex + 1);
+                if (portStr.isEmpty()) {
+                    throw new IllegalArgumentException("Port cannot be empty 
in Redis node URL: " + url);
+                }
+                try {
+                    port = Integer.parseInt(portStr);
+                } catch (NumberFormatException e) {
+                    throw new IllegalArgumentException("Invalid port in Redis 
node URL: " + url, e);
+                }
+                host = trimmedUrl.substring(0, lastColonIndex);
+            } else if (lastColonIndex == 0 || lastColonIndex == 
trimmedUrl.length() - 1) {
+                throw new IllegalArgumentException("Invalid format in Redis 
node URL: " + url);
+            }
+        }
+        
+        if (host.trim().isEmpty()) {
+            throw new IllegalArgumentException("Host is empty in Redis node 
URL: " + url);
+        }
+        if (port < 1 || port > 65535) {
+            throw new IllegalArgumentException("Port out of range (1-65535) in 
Redis node URL: " + url);
+        }
+        
+        return new RedisNode(host.trim(), port);
+    }
 }
diff --git 
a/shenyu-infra/shenyu-infra-redis/src/test/java/org/apache/shenyu/infra/redis/RedisConnectionFactoryTest.java
 
b/shenyu-infra/shenyu-infra-redis/src/test/java/org/apache/shenyu/infra/redis/RedisConnectionFactoryTest.java
index 74c3fce432..0f3f3cbe8d 100644
--- 
a/shenyu-infra/shenyu-infra-redis/src/test/java/org/apache/shenyu/infra/redis/RedisConnectionFactoryTest.java
+++ 
b/shenyu-infra/shenyu-infra-redis/src/test/java/org/apache/shenyu/infra/redis/RedisConnectionFactoryTest.java
@@ -21,8 +21,11 @@ import org.apache.shenyu.common.enums.RedisModeEnum;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
+import org.springframework.data.redis.connection.RedisNode;
 
 import java.time.Duration;
+import java.lang.reflect.Method;
+import java.lang.reflect.InvocationTargetException;
 
 import static org.mockito.Mockito.when;
 
@@ -49,4 +52,70 @@ public class RedisConnectionFactoryTest {
         
when(redisConfigProperties.getMaxWait()).thenReturn(Duration.ofMillis(-1));
         Assertions.assertDoesNotThrow(() -> new 
RedisConnectionFactory(redisConfigProperties));
     }
+
+    @Test
+    public void parseRedisNodeValidInputs() throws Exception {
+        RedisConnectionFactory factory = createFactoryWithDefaultUrl();
+        Method parseMethod = 
RedisConnectionFactory.class.getDeclaredMethod("parseRedisNode", String.class);
+        parseMethod.setAccessible(true);
+
+        // IPv4 with custom port
+        Assertions.assertAll(
+                () -> {
+                    RedisNode node = (RedisNode) parseMethod.invoke(factory, 
"localhost:6380");
+                    Assertions.assertEquals("localhost", node.getHost());
+                    Assertions.assertEquals(6380, node.getPort());
+                },
+                () -> {
+                    RedisNode node = (RedisNode) parseMethod.invoke(factory, 
"[::1]:6381");
+                    Assertions.assertEquals("::1", node.getHost());
+                    Assertions.assertEquals(6381, node.getPort());
+                },
+                () -> {
+                    RedisNode node = (RedisNode) parseMethod.invoke(factory, 
"[::1]");
+                    Assertions.assertEquals("::1", node.getHost());
+                    Assertions.assertEquals(6379, node.getPort());
+                },
+                () -> {
+                    RedisNode node = (RedisNode) parseMethod.invoke(factory, 
"redis.internal");
+                    Assertions.assertEquals("redis.internal", node.getHost());
+                    Assertions.assertEquals(6379, node.getPort());
+                }
+        );
+    }
+
+    @Test
+    public void parseRedisNodeInvalidInputs() throws Exception {
+        RedisConnectionFactory factory = createFactoryWithDefaultUrl();
+        Method parseMethod = 
RedisConnectionFactory.class.getDeclaredMethod("parseRedisNode", String.class);
+        parseMethod.setAccessible(true);
+
+        Assertions.assertAll(
+                () -> assertInvalidNode(parseMethod, factory, ""),
+                () -> assertInvalidNode(parseMethod, factory, "[::1]:"),
+                () -> assertInvalidNode(parseMethod, factory, 
"localhost:70000"),
+                () -> assertInvalidNode(parseMethod, factory, "[]:6379")
+        );
+    }
+
+    private RedisConnectionFactory createFactoryWithDefaultUrl() {
+        RedisConfigProperties redisConfigProperties = new 
RedisConfigProperties();
+        redisConfigProperties.setUrl("localhost:6379");
+        redisConfigProperties.setMode(RedisModeEnum.STANDALONE.getName());
+        return new RedisConnectionFactory(redisConfigProperties);
+    }
+
+    private void assertInvalidNode(final Method parseMethod, final 
RedisConnectionFactory factory, final String url) {
+        Assertions.assertThrows(IllegalArgumentException.class, () -> {
+            try {
+                parseMethod.invoke(factory, url);
+            } catch (InvocationTargetException e) {
+                // unwrap to the original IllegalArgumentException
+                if (e.getCause() instanceof RuntimeException) {
+                    throw (RuntimeException) e.getCause();
+                }
+                throw new RuntimeException(e.getCause());
+            }
+        });
+    }
 }
diff --git 
a/shenyu-registry/shenyu-registry-etcd/src/test/java/org/apache/shenyu/registry/etcd/EtcdClientTest.java
 
b/shenyu-registry/shenyu-registry-etcd/src/test/java/org/apache/shenyu/registry/etcd/EtcdClientTest.java
index ff665aabb2..8876e2c43c 100644
--- 
a/shenyu-registry/shenyu-registry-etcd/src/test/java/org/apache/shenyu/registry/etcd/EtcdClientTest.java
+++ 
b/shenyu-registry/shenyu-registry-etcd/src/test/java/org/apache/shenyu/registry/etcd/EtcdClientTest.java
@@ -63,26 +63,37 @@ public class EtcdClientTest {
             when(client.getLeaseClient()).thenReturn(lease);
             final CompletableFuture<LeaseGrantResponse> completableFuture = 
mock(CompletableFuture.class);
             final LeaseGrantResponse leaseGrantResponse = 
mock(LeaseGrantResponse.class);
+            when(leaseGrantResponse.getID()).thenReturn(1L);
 
             
when(client.getLeaseClient().grant(anyLong())).thenReturn(completableFuture);
             when(completableFuture.get()).thenReturn(leaseGrantResponse);
-            Assertions.assertDoesNotThrow(() -> 
EtcdClient.builder().client(Client.builder().endpoints("url").build()).ttl(60L).timeout(3000L).build());
-
             List<StreamObserver<LeaseKeepAliveResponse>> observerList = new 
ArrayList<>();
             doAnswer(invocation -> {
                 observerList.add(invocation.getArgument(1));
                 return lease;
             }).when(lease).keepAlive(anyLong(), any());
-            Assertions.assertDoesNotThrow(() -> 
EtcdClient.builder().client(Client.builder().endpoints("url").build()).ttl(60L).timeout(3000L).build());
+
+            final EtcdClient etcdClient = Assertions.assertDoesNotThrow(() -> 
EtcdClient.builder()
+                    .client(Client.builder().endpoints("url").build())
+                    .ttl(60L)
+                    .timeout(3000L)
+                    .build());
+            Assertions.assertNotNull(etcdClient);
+
             final LeaseKeepAliveResponse leaseKeepAliveResponse = 
mock(LeaseKeepAliveResponse.class);
             observerList.forEach(streamObserver -> {
                 streamObserver.onCompleted();
                 streamObserver.onError(new ShenyuException("test"));
                 streamObserver.onNext(leaseKeepAliveResponse);
             });
+            etcdClient.close();
 
             doThrow(new 
InterruptedException("error")).when(completableFuture).get();
-            Assertions.assertDoesNotThrow(() -> 
EtcdClient.builder().client(Client.builder().endpoints("url").build()).ttl(60L).timeout(3000L).build());
+            Assertions.assertThrows(ShenyuException.class, () -> 
EtcdClient.builder()
+                    .client(Client.builder().endpoints("url").build())
+                    .ttl(60L)
+                    .timeout(3000L)
+                    .build());
         } catch (Exception e) {
             throw new ShenyuException(e.getCause());
         }
@@ -113,7 +124,7 @@ public class EtcdClientTest {
             etcdClient.putEphemeral("key", "value");
 
             doThrow(new 
InterruptedException("error")).when(completableFuture).get(anyLong(), 
any(TimeUnit.class));
-            etcdClient.putEphemeral("key", "value");
+            Assertions.assertThrows(ShenyuException.class, () -> 
etcdClient.putEphemeral("key", "value"));
         } catch (Exception e) {
             throw new ShenyuException(e.getCause());
         }
@@ -129,6 +140,7 @@ public class EtcdClientTest {
         when(client.getLeaseClient()).thenReturn(lease);
         final CompletableFuture<LeaseGrantResponse> completableFuture = 
mock(CompletableFuture.class);
         final LeaseGrantResponse leaseGrantResponse = 
mock(LeaseGrantResponse.class);
+        when(leaseGrantResponse.getID()).thenReturn(1L);
         
when(client.getLeaseClient().grant(anyLong())).thenReturn(completableFuture);
         when(completableFuture.get()).thenReturn(leaseGrantResponse);
         return client;


Reply via email to