This is an automated email from the ASF dual-hosted git repository.
liuhongyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shenyu.git
The following commit(s) were added to refs/heads/master by this push:
new b984ad2c6d Refactor EtcdClient and RedisConnectionFactory for improved
readability and maintainability (#6250)
b984ad2c6d is described below
commit b984ad2c6d62a269e6628da62954cc4d761beb28
Author: aias00 <[email protected]>
AuthorDate: Mon Dec 8 18:54:04 2025 +0800
Refactor EtcdClient and RedisConnectionFactory for improved readability and
maintainability (#6250)
* Refactor EtcdClient and RedisConnectionFactory for improved readability
and maintainability
* Update
shenyu-infra/shenyu-infra-redis/src/main/java/org/apache/shenyu/infra/redis/RedisConnectionFactory.java
Co-authored-by: Copilot <[email protected]>
* Update
shenyu-infra/shenyu-infra-redis/src/main/java/org/apache/shenyu/infra/redis/RedisConnectionFactory.java
Co-authored-by: Copilot <[email protected]>
* feat: integrate mvnd installation and usage in CI workflows
* fix: handle InterruptedException in EtcdClient and update tests for
proper exception throwing
* feat: integrate mvnd installation and usage in CI workflows
* feat: enhance keep-alive mechanism in EtcdClient with exponential backoff
retries and add corresponding unit test
* test: add unit tests for keep-alive mechanism in EtcdClient and validate
RedisNode parsing
* test: refactor observer registration assertion in EtcdClientTest for
improved clarity and reliability
* feat: integrate mvnd installation and usage in CI workflows
* test: refactor RedisConnectionFactoryTest to improve invalid node
assertions
* test: enhance assertions in EtcdClientTest for better clarity and
reliability
---------
Co-authored-by: Copilot <[email protected]>
---
.../shenyu/infra/etcd/client/EtcdClient.java | 193 ++++++++++++++++-----
.../shenyu/infra/etcd/client/EtcdClientTest.java | 112 +++++++++++-
.../shenyu/infra/redis/RedisConnectionFactory.java | 93 ++++++++--
.../infra/redis/RedisConnectionFactoryTest.java | 69 ++++++++
.../shenyu/registry/etcd/EtcdClientTest.java | 22 ++-
5 files changed, 423 insertions(+), 66 deletions(-)
diff --git
a/shenyu-infra/shenyu-infra-etcd/src/main/java/org/apache/shenyu/infra/etcd/client/EtcdClient.java
b/shenyu-infra/shenyu-infra-etcd/src/main/java/org/apache/shenyu/infra/etcd/client/EtcdClient.java
index fcc5b4095d..6ea02a6ed8 100644
---
a/shenyu-infra/shenyu-infra-etcd/src/main/java/org/apache/shenyu/infra/etcd/client/EtcdClient.java
+++
b/shenyu-infra/shenyu-infra-etcd/src/main/java/org/apache/shenyu/infra/etcd/client/EtcdClient.java
@@ -42,8 +42,12 @@ import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
@@ -56,6 +60,10 @@ import static java.nio.charset.StandardCharsets.UTF_8;
public class EtcdClient {
private static final Logger LOG =
LoggerFactory.getLogger(EtcdClient.class);
+
+ private static final int MAX_RETRY_DELAY_SECONDS = 60;
+
+ private static final int INITIAL_RETRY_DELAY_SECONDS = 1;
private final Client client;
@@ -69,11 +77,26 @@ public class EtcdClient {
private final ConcurrentHashMap<String, Watch.Watcher> watchChildCache =
new ConcurrentHashMap<>();
+ private volatile boolean keepAliveRunning = true;
+
+ private volatile StreamObserver<LeaseKeepAliveResponse> currentObserver;
+
+ private final ScheduledExecutorService retryExecutor;
+
+ private volatile ScheduledFuture<?> retryFuture;
+
+ private final AtomicInteger retryCount = new AtomicInteger(0);
+
public EtcdClient(final Client client, final long ttl, final long timeout)
{
this.ttl = ttl;
this.timeout = timeout;
this.client = client;
+ this.retryExecutor = new ScheduledThreadPoolExecutor(1, r -> {
+ Thread thread = new Thread(r, "etcd-keepalive-retry");
+ thread.setDaemon(true);
+ return thread;
+ });
initLease();
}
@@ -82,6 +105,19 @@ public class EtcdClient {
* close client.
*/
public void close() {
+ keepAliveRunning = false;
+ if (Objects.nonNull(retryFuture)) {
+ retryFuture.cancel(false);
+ }
+ retryExecutor.shutdown();
+ try {
+ if (!retryExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+ retryExecutor.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ retryExecutor.shutdownNow();
+ Thread.currentThread().interrupt();
+ }
this.client.close();
}
@@ -129,6 +165,7 @@ public class EtcdClient {
/**
* bytesOf string.
+ *
* @param val val.
* @return bytes val.
*/
@@ -145,7 +182,8 @@ public class EtcdClient {
* @throws ExecutionException the exception
* @throws InterruptedException the exception
*/
- public List<String> getChildrenKeys(final String prefix, final String
separator) throws ExecutionException, InterruptedException {
+ public List<String> getChildrenKeys(final String prefix, final String
separator)
+ throws ExecutionException, InterruptedException {
ByteSequence prefixByteSequence = bytesOf(prefix);
GetOption getOption = GetOption.newBuilder()
.withPrefix(prefixByteSequence)
@@ -170,10 +208,11 @@ public class EtcdClient {
*
* @param prefix key prefix.
* @param separator separator char
- * @param map prefix map
+ * @param map prefix map
* @return sub map
*/
- public List<String> getChildrenKeysByMap(final String prefix, final String
separator, final Map<String, String> map) {
+ public List<String> getChildrenKeysByMap(final String prefix, final String
separator,
+ final Map<String, String> map) {
return map.entrySet().stream()
.filter(e -> e.getKey().contains(prefix))
@@ -199,8 +238,8 @@ public class EtcdClient {
* @param deleteHandler node value handler of delete
*/
public void watchDataChange(final String key,
- final BiConsumer<String, String> updateHandler,
- final Consumer<String> deleteHandler) {
+ final BiConsumer<String, String> updateHandler,
+ final Consumer<String> deleteHandler) {
Watch.Listener listener = watch(updateHandler, deleteHandler);
if (!watchCache.containsKey(key)) {
Watch.Watcher watch =
client.getWatchClient().watch(ByteSequence.from(key, UTF_8), listener);
@@ -216,8 +255,8 @@ public class EtcdClient {
* @param deleteHandler sub node delete of delete
*/
public void watchChildChange(final String key,
- final BiConsumer<String, String>
updateHandler,
- final Consumer<String> deleteHandler) {
+ final BiConsumer<String, String> updateHandler,
+ final Consumer<String> deleteHandler) {
Watch.Listener listener = watch(updateHandler, deleteHandler);
WatchOption option = WatchOption.newBuilder()
.withPrefix(ByteSequence.from(key, UTF_8))
@@ -229,7 +268,7 @@ public class EtcdClient {
}
private Watch.Listener watch(final BiConsumer<String, String>
updateHandler,
- final Consumer<String> deleteHandler) {
+ final Consumer<String> deleteHandler) {
return Watch.listener(response -> {
for (WatchEvent event : response.getEvents()) {
String path = event.getKeyValue().getKey().toString(UTF_8);
@@ -245,9 +284,9 @@ public class EtcdClient {
}
}
}, throwable -> {
- LOG.error("etcd watch error {}", throwable.getMessage(),
throwable);
- throw new ShenyuException(throwable);
- });
+ LOG.error("etcd watch error {}", throwable.getMessage(),
throwable);
+ throw new ShenyuException(throwable);
+ });
}
/**
@@ -268,6 +307,7 @@ public class EtcdClient {
/**
* check node exists.
+ *
* @param key node name
* @return bool
*/
@@ -276,7 +316,8 @@ public class EtcdClient {
GetOption option = GetOption.newBuilder()
.withPrefix(ByteSequence.from(key, StandardCharsets.UTF_8))
.build();
- List<KeyValue> keyValues =
client.getKVClient().get(ByteSequence.from(key, StandardCharsets.UTF_8),
option).get().getKvs();
+ List<KeyValue> keyValues =
client.getKVClient().get(ByteSequence.from(key, StandardCharsets.UTF_8), option)
+ .get().getKvs();
return !keyValues.isEmpty();
} catch (Exception e) {
LOG.error("check node exists error", e);
@@ -286,12 +327,14 @@ public class EtcdClient {
/**
* update value of node.
- * @param key node name
+ *
+ * @param key node name
* @param value node value
*/
public void put(final String key, final String value) {
try {
- client.getKVClient().put(ByteSequence.from(key,
StandardCharsets.UTF_8), ByteSequence.from(value,
StandardCharsets.UTF_8)).get();
+ client.getKVClient().put(ByteSequence.from(key,
StandardCharsets.UTF_8),
+ ByteSequence.from(value, StandardCharsets.UTF_8)).get();
} catch (Exception e) {
LOG.error("update value of node error.", e);
throw new ShenyuException(e);
@@ -300,6 +343,7 @@ public class EtcdClient {
/**
* delete node.
+ *
* @param key node name
*/
public void delete(final String key) {
@@ -308,6 +352,7 @@ public class EtcdClient {
/**
* delete node of recursive.
+ *
* @param path parent node name
*/
public void deleteEtcdPathRecursive(final String path) {
@@ -315,40 +360,18 @@ public class EtcdClient {
.withPrefix(ByteSequence.from(path, StandardCharsets.UTF_8))
.build();
try {
- client.getKVClient().delete(ByteSequence.from(path,
StandardCharsets.UTF_8), option).get(10, TimeUnit.SECONDS);
+ client.getKVClient().delete(ByteSequence.from(path,
StandardCharsets.UTF_8), option).get(10,
+ TimeUnit.SECONDS);
} catch (Exception e) {
LOG.error("delete node of recursive error.", e);
throw new ShenyuException(e);
}
}
- private void initLease() {
- try {
- this.globalLeaseId =
client.getLeaseClient().grant(ttl).get().getID();
- client.getLeaseClient().keepAlive(globalLeaseId, new
StreamObserver<>() {
- @Override
- public void onNext(final LeaseKeepAliveResponse
leaseKeepAliveResponse) {
- }
-
- @Override
- public void onError(final Throwable throwable) {
- LOG.error("keep alive error", throwable);
- }
-
- @Override
- public void onCompleted() {
- }
- });
- } catch (InterruptedException | ExecutionException e) {
- LOG.error("initLease error.", e);
- }
- }
-
-
/**
* watchKeyChanges.
*
- * @param key key
+ * @param key key
* @param listener listener
* @return {@link Watch.Watcher}
*/
@@ -360,7 +383,8 @@ public class EtcdClient {
/**
* get keyResponse.
- * @param key watch key.
+ *
+ * @param key watch key.
* @param getOption get option.
* @return key response.
*/
@@ -375,22 +399,107 @@ public class EtcdClient {
/**
* put data as ephemeral.
- * @param key key
+ *
+ * @param key key
* @param value value
*/
public void putEphemeral(final String key, final String value) {
try {
KV kvClient = client.getKVClient();
kvClient.put(ByteSequence.from(key, UTF_8),
ByteSequence.from(value, UTF_8),
-
PutOption.newBuilder().withLeaseId(globalLeaseId).build())
+ PutOption.newBuilder().withLeaseId(globalLeaseId).build())
.get(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException
e) {
LOG.error("putEphemeral(key:{},value:{}) error.", key, value, e);
+ throw new ShenyuException(e);
}
}
+ private void initLease() {
+ try {
+ this.globalLeaseId =
client.getLeaseClient().grant(ttl).get().getID();
+ keepAlive();
+ } catch (InterruptedException e) {
+ LOG.error("initLease error.", e);
+ Thread.currentThread().interrupt();
+ throw new ShenyuException(e);
+ } catch (ExecutionException e) {
+ LOG.error("initLease error.", e);
+ throw new ShenyuException(e);
+ }
+ }
+
+ private void keepAlive() {
+ if (!keepAliveRunning) {
+ return;
+ }
+ retryCount.set(0);
+ scheduleRetryInternal(0);
+ }
+
+ private void scheduleRetry() {
+ int attempt = retryCount.incrementAndGet();
+ scheduleRetryInternal(attempt);
+ }
+
+ private void scheduleRetryInternal(final int attempt) {
+ if (!keepAliveRunning) {
+ return;
+ }
+
+ if (Objects.nonNull(retryFuture) && !retryFuture.isDone()) {
+ retryFuture.cancel(false);
+ }
+
+ long delaySeconds;
+ if (attempt <= 0) {
+ delaySeconds = 0;
+ } else {
+ long backoffFactor = 1L << Math.min(Math.max(attempt - 1, 0), 6);
+ delaySeconds = Math.min(INITIAL_RETRY_DELAY_SECONDS *
backoffFactor, MAX_RETRY_DELAY_SECONDS);
+ }
+
+ retryFuture = retryExecutor.schedule(() -> {
+ if (!keepAliveRunning) {
+ return;
+ }
+
+ currentObserver = new StreamObserver<>() {
+ @Override
+ public void onNext(final LeaseKeepAliveResponse
leaseKeepAliveResponse) {
+ retryCount.set(0);
+ }
+
+ @Override
+ public void onError(final Throwable throwable) {
+ LOG.error("keep alive error", throwable);
+ currentObserver = null;
+ scheduleRetry();
+ }
+
+ @Override
+ public void onCompleted() {
+ LOG.warn("keep alive stream completed");
+ currentObserver = null;
+ scheduleRetry();
+ }
+ };
+
+ try {
+ client.getLeaseClient().keepAlive(globalLeaseId,
currentObserver);
+ } catch (Exception e) {
+ LOG.error("Failed to start keep alive", e);
+ currentObserver = null;
+ scheduleRetry();
+ }
+ }, delaySeconds, TimeUnit.SECONDS);
+
+ LOG.debug("Scheduled keep alive retry in {} seconds (attempt: {})",
delaySeconds, attempt);
+ }
+
/**
* Create a new {@link Builder} instance for building an {@link
EtcdClient}.
+ *
* @return a new {@link Builder} instance
*/
public static Builder builder() {
diff --git
a/shenyu-infra/shenyu-infra-etcd/src/test/java/org/apache/shenyu/infra/etcd/client/EtcdClientTest.java
b/shenyu-infra/shenyu-infra-etcd/src/test/java/org/apache/shenyu/infra/etcd/client/EtcdClientTest.java
index bb5e3eb252..a100b7a354 100644
---
a/shenyu-infra/shenyu-infra-etcd/src/test/java/org/apache/shenyu/infra/etcd/client/EtcdClientTest.java
+++
b/shenyu-infra/shenyu-infra-etcd/src/test/java/org/apache/shenyu/infra/etcd/client/EtcdClientTest.java
@@ -33,8 +33,10 @@ import org.mockito.MockedStatic;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
@@ -63,31 +65,122 @@ public class EtcdClientTest {
when(client.getLeaseClient()).thenReturn(lease);
final CompletableFuture<LeaseGrantResponse> completableFuture =
mock(CompletableFuture.class);
final LeaseGrantResponse leaseGrantResponse =
mock(LeaseGrantResponse.class);
+ when(leaseGrantResponse.getID()).thenReturn(1L);
when(client.getLeaseClient().grant(anyLong())).thenReturn(completableFuture);
when(completableFuture.get()).thenReturn(leaseGrantResponse);
- Assertions.assertDoesNotThrow(() ->
EtcdClient.builder().client(Client.builder().endpoints("url").build())).ttl(60L).timeout(3000L).build();
-
List<StreamObserver<LeaseKeepAliveResponse>> observerList = new
ArrayList<>();
doAnswer(invocation -> {
observerList.add(invocation.getArgument(1));
return lease;
}).when(lease).keepAlive(anyLong(), any());
- Assertions.assertDoesNotThrow(() ->
EtcdClient.builder().client(Client.builder().endpoints("url").build())).ttl(60L).timeout(3000L).build();
+
+ final EtcdClient etcdClient = Assertions.assertDoesNotThrow(() ->
EtcdClient.builder()
+ .client(Client.builder().endpoints("url").build())
+ .ttl(60L)
+ .timeout(3000L)
+ .build());
+ Assertions.assertNotNull(etcdClient);
+
final LeaseKeepAliveResponse leaseKeepAliveResponse =
mock(LeaseKeepAliveResponse.class);
observerList.forEach(streamObserver -> {
streamObserver.onCompleted();
streamObserver.onError(new ShenyuException("test"));
streamObserver.onNext(leaseKeepAliveResponse);
});
+ etcdClient.close();
doThrow(new
InterruptedException("error")).when(completableFuture).get();
- Assertions.assertDoesNotThrow(() ->
EtcdClient.builder().client(Client.builder().endpoints("url").build())).ttl(60L).timeout(3000L).build();
+ Assertions.assertThrows(ShenyuException.class, () ->
EtcdClient.builder()
+ .client(Client.builder().endpoints("url").build())
+ .ttl(60L)
+ .timeout(3000L)
+ .build());
} catch (Exception e) {
throw new ShenyuException(e.getCause());
}
}
+ @Test
+ public void keepAliveRetryTest() throws ExecutionException,
InterruptedException {
+ try (MockedStatic<Client> clientMockedStatic =
mockStatic(Client.class)) {
+ final Client client = this.mockEtcd(clientMockedStatic);
+ final Lease lease = client.getLeaseClient();
+
+ AtomicInteger keepAliveInvoked = new AtomicInteger(0);
+ CountDownLatch latch = new CountDownLatch(2);
+ CountDownLatch firstCallLatch = new CountDownLatch(1);
+ List<StreamObserver<LeaseKeepAliveResponse>> observerList = new
ArrayList<>();
+
+ doAnswer(invocation -> {
+ keepAliveInvoked.incrementAndGet();
+ latch.countDown();
+ firstCallLatch.countDown();
+ observerList.add(invocation.getArgument(1));
+ return lease;
+ }).when(lease).keepAlive(anyLong(), any());
+
+ final EtcdClient etcdClient = EtcdClient.builder()
+ .client(Client.builder().endpoints("url").build())
+ .ttl(60L)
+ .timeout(3000L)
+ .build();
+ Assertions.assertNotNull(etcdClient);
+
+ // Wait for the initial keepAlive registration to complete
+ Assertions.assertTrue(firstCallLatch.await(2, TimeUnit.SECONDS),
"Initial keepAlive should run");
+ awaitObserverRegistered(observerList);
+
+ // Trigger error to schedule retry keep-alive
+ observerList.get(0).onError(new RuntimeException("test-error"));
+
+ boolean retried = latch.await(3, TimeUnit.SECONDS);
+ etcdClient.close();
+
+ Assertions.assertTrue(retried, "keepAlive should retry after
onError");
+ Assertions.assertTrue(keepAliveInvoked.get() >= 2, "Expected retry
keepAlive invocation");
+ }
+ }
+
+ @Test
+ public void keepAliveCompletedRetryTest() throws ExecutionException,
InterruptedException {
+ try (MockedStatic<Client> clientMockedStatic =
mockStatic(Client.class)) {
+ final Client client = this.mockEtcd(clientMockedStatic);
+ final Lease lease = client.getLeaseClient();
+
+ AtomicInteger keepAliveInvoked = new AtomicInteger(0);
+ CountDownLatch callsLatch = new CountDownLatch(2);
+ CountDownLatch firstCallLatch = new CountDownLatch(1);
+ List<StreamObserver<LeaseKeepAliveResponse>> observerList = new
ArrayList<>();
+
+ doAnswer(invocation -> {
+ keepAliveInvoked.incrementAndGet();
+ callsLatch.countDown();
+ firstCallLatch.countDown();
+ observerList.add(invocation.getArgument(1));
+ return lease;
+ }).when(lease).keepAlive(anyLong(), any());
+
+ final EtcdClient etcdClient = EtcdClient.builder()
+ .client(Client.builder().endpoints("url").build())
+ .ttl(60L)
+ .timeout(3000L)
+ .build();
+ Assertions.assertNotNull(etcdClient);
+
+ Assertions.assertTrue(firstCallLatch.await(2, TimeUnit.SECONDS),
"Initial keepAlive should run");
+ awaitObserverRegistered(observerList);
+
+ observerList.get(0).onCompleted();
+
+ boolean retried = callsLatch.await(3, TimeUnit.SECONDS);
+ etcdClient.close();
+
+ Assertions.assertTrue(retried, "keepAlive should retry after
onCompleted");
+ Assertions.assertTrue(keepAliveInvoked.get() >= 2, "Expected
keepAlive invocation after completion");
+ }
+ }
+
@Test
public void closeTest() {
try (MockedStatic<Client> clientMockedStatic =
mockStatic(Client.class)) {
@@ -113,7 +206,7 @@ public class EtcdClientTest {
etcdClient.putEphemeral("key", "value");
doThrow(new
InterruptedException("error")).when(completableFuture).get(anyLong(),
any(TimeUnit.class));
- etcdClient.putEphemeral("key", "value");
+ Assertions.assertThrows(ShenyuException.class, () ->
etcdClient.putEphemeral("key", "value"));
} catch (Exception e) {
throw new ShenyuException(e.getCause());
}
@@ -129,9 +222,18 @@ public class EtcdClientTest {
when(client.getLeaseClient()).thenReturn(lease);
final CompletableFuture<LeaseGrantResponse> completableFuture =
mock(CompletableFuture.class);
final LeaseGrantResponse leaseGrantResponse =
mock(LeaseGrantResponse.class);
+ when(leaseGrantResponse.getID()).thenReturn(1L);
when(client.getLeaseClient().grant(anyLong())).thenReturn(completableFuture);
when(completableFuture.get()).thenReturn(leaseGrantResponse);
return client;
}
+ private void awaitObserverRegistered(final
List<StreamObserver<LeaseKeepAliveResponse>> observerList)
+ throws InterruptedException {
+ for (int i = 0; i < 20 && observerList.isEmpty(); i++) {
+ Thread.sleep(50);
+ }
+ Assertions.assertFalse(observerList.isEmpty(), "Observer should be
registered");
+ }
+
}
diff --git
a/shenyu-infra/shenyu-infra-redis/src/main/java/org/apache/shenyu/infra/redis/RedisConnectionFactory.java
b/shenyu-infra/shenyu-infra-redis/src/main/java/org/apache/shenyu/infra/redis/RedisConnectionFactory.java
index f694f8eb14..f8f68bb269 100644
---
a/shenyu-infra/shenyu-infra-redis/src/main/java/org/apache/shenyu/infra/redis/RedisConnectionFactory.java
+++
b/shenyu-infra/shenyu-infra-redis/src/main/java/org/apache/shenyu/infra/redis/RedisConnectionFactory.java
@@ -29,8 +29,6 @@ import
org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import
org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
import
org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import
org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration;
-import org.springframework.util.Assert;
-import org.springframework.util.StringUtils;
import java.time.Duration;
import java.util.ArrayList;
@@ -51,6 +49,7 @@ public class RedisConnectionFactory {
/**
* Get Lettuce connection factory.
+ *
* @return the factory
*/
public LettuceConnectionFactory getLettuceConnectionFactory() {
@@ -60,15 +59,19 @@ public class RedisConnectionFactory {
private LettuceConnectionFactory createLettuceConnectionFactory(final
RedisConfigProperties redisConfigProperties) {
LettuceClientConfiguration lettuceClientConfiguration =
getLettuceClientConfiguration(redisConfigProperties);
if
(RedisModeEnum.SENTINEL.getName().equals(redisConfigProperties.getMode())) {
- return new
LettuceConnectionFactory(redisSentinelConfiguration(redisConfigProperties),
lettuceClientConfiguration);
+ return new
LettuceConnectionFactory(redisSentinelConfiguration(redisConfigProperties),
+ lettuceClientConfiguration);
}
if
(RedisModeEnum.CLUSTER.getName().equals(redisConfigProperties.getMode())) {
- return new
LettuceConnectionFactory(redisClusterConfiguration(redisConfigProperties),
lettuceClientConfiguration);
+ return new
LettuceConnectionFactory(redisClusterConfiguration(redisConfigProperties),
+ lettuceClientConfiguration);
}
- return new
LettuceConnectionFactory(redisStandaloneConfiguration(redisConfigProperties),
lettuceClientConfiguration);
+ return new
LettuceConnectionFactory(redisStandaloneConfiguration(redisConfigProperties),
+ lettuceClientConfiguration);
}
- private LettuceClientConfiguration getLettuceClientConfiguration(final
RedisConfigProperties redisConfigProperties) {
+ private LettuceClientConfiguration getLettuceClientConfiguration(
+ final RedisConfigProperties redisConfigProperties) {
return
LettucePoolingClientConfiguration.builder().poolConfig(getPoolConfig(redisConfigProperties)).build();
}
@@ -89,12 +92,12 @@ public class RedisConnectionFactory {
* @param redisConfigProperties the rate limiter config
* @return the redis standalone configuration
*/
- protected final RedisStandaloneConfiguration
redisStandaloneConfiguration(final RedisConfigProperties redisConfigProperties)
{
+ protected final RedisStandaloneConfiguration redisStandaloneConfiguration(
+ final RedisConfigProperties redisConfigProperties) {
RedisStandaloneConfiguration config = new
RedisStandaloneConfiguration();
- String[] parts = StringUtils.split(redisConfigProperties.getUrl(),
":");
- Objects.requireNonNull(parts);
- config.setHostName(parts[0]);
- config.setPort(Integer.parseInt(parts[1]));
+ RedisNode redisNode = parseRedisNode(redisConfigProperties.getUrl());
+ config.setHostName(redisNode.getHost());
+ config.setPort(redisNode.getPort());
if (Objects.nonNull(redisConfigProperties.getPassword())) {
config.setPassword(RedisPassword.of(redisConfigProperties.getPassword()));
}
@@ -126,10 +129,72 @@ public class RedisConnectionFactory {
List<RedisNode> redisNodes = new ArrayList<>();
List<String> nodes = Lists.newArrayList(Splitter.on(";").split(url));
for (String node : nodes) {
- String[] parts = StringUtils.split(node, ":");
- Assert.state(Objects.requireNonNull(parts).length == 2, "Must be
defined as 'host:port'");
- redisNodes.add(new RedisNode(parts[0],
Integer.parseInt(parts[1])));
+ redisNodes.add(parseRedisNode(node));
}
return redisNodes;
}
+
+ private RedisNode parseRedisNode(final String url) {
+ if (Objects.isNull(url) || url.trim().isEmpty()) {
+ throw new IllegalArgumentException("Redis node URL cannot be null
or empty");
+ }
+
+ String trimmedUrl = url.trim();
+ String host = trimmedUrl;
+ int port = 6379;
+ int bracketIndex = trimmedUrl.lastIndexOf("]");
+
+ if (bracketIndex > -1) {
+ // IPv6 address format: [::1] or [::1]:6379
+ if (!trimmedUrl.startsWith("[")) {
+ throw new IllegalArgumentException("Invalid IPv6 format in
Redis node URL: " + url);
+ }
+ int closingBracket = trimmedUrl.indexOf("]");
+ if (closingBracket == -1 || closingBracket != bracketIndex) {
+ throw new IllegalArgumentException("Invalid IPv6 format in
Redis node URL: " + url);
+ }
+
+ // Extract IPv6 address (remove brackets)
+ host = trimmedUrl.substring(1, closingBracket);
+
+ // Check if port is specified after closing bracket
+ if (closingBracket < trimmedUrl.length() - 1 &&
trimmedUrl.charAt(closingBracket + 1) == ':') {
+ String portStr = trimmedUrl.substring(closingBracket + 2);
+ if (portStr.isEmpty()) {
+ throw new IllegalArgumentException("Port cannot be empty
in Redis node URL: " + url);
+ }
+ try {
+ port = Integer.parseInt(portStr);
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Invalid port in Redis
node URL: " + url, e);
+ }
+ }
+ } else {
+ // IPv4 or hostname format: localhost:6379 or 192.168.1.1:6379
+ int lastColonIndex = trimmedUrl.lastIndexOf(":");
+ if (lastColonIndex > 0 && lastColonIndex < trimmedUrl.length() -
1) {
+ String portStr = trimmedUrl.substring(lastColonIndex + 1);
+ if (portStr.isEmpty()) {
+ throw new IllegalArgumentException("Port cannot be empty
in Redis node URL: " + url);
+ }
+ try {
+ port = Integer.parseInt(portStr);
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Invalid port in Redis
node URL: " + url, e);
+ }
+ host = trimmedUrl.substring(0, lastColonIndex);
+ } else if (lastColonIndex == 0 || lastColonIndex ==
trimmedUrl.length() - 1) {
+ throw new IllegalArgumentException("Invalid format in Redis
node URL: " + url);
+ }
+ }
+
+ if (host.trim().isEmpty()) {
+ throw new IllegalArgumentException("Host is empty in Redis node
URL: " + url);
+ }
+ if (port < 1 || port > 65535) {
+ throw new IllegalArgumentException("Port out of range (1-65535) in
Redis node URL: " + url);
+ }
+
+ return new RedisNode(host.trim(), port);
+ }
}
diff --git
a/shenyu-infra/shenyu-infra-redis/src/test/java/org/apache/shenyu/infra/redis/RedisConnectionFactoryTest.java
b/shenyu-infra/shenyu-infra-redis/src/test/java/org/apache/shenyu/infra/redis/RedisConnectionFactoryTest.java
index 74c3fce432..0f3f3cbe8d 100644
---
a/shenyu-infra/shenyu-infra-redis/src/test/java/org/apache/shenyu/infra/redis/RedisConnectionFactoryTest.java
+++
b/shenyu-infra/shenyu-infra-redis/src/test/java/org/apache/shenyu/infra/redis/RedisConnectionFactoryTest.java
@@ -21,8 +21,11 @@ import org.apache.shenyu.common.enums.RedisModeEnum;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
+import org.springframework.data.redis.connection.RedisNode;
import java.time.Duration;
+import java.lang.reflect.Method;
+import java.lang.reflect.InvocationTargetException;
import static org.mockito.Mockito.when;
@@ -49,4 +52,70 @@ public class RedisConnectionFactoryTest {
when(redisConfigProperties.getMaxWait()).thenReturn(Duration.ofMillis(-1));
Assertions.assertDoesNotThrow(() -> new
RedisConnectionFactory(redisConfigProperties));
}
+
+ @Test
+ public void parseRedisNodeValidInputs() throws Exception {
+ RedisConnectionFactory factory = createFactoryWithDefaultUrl();
+ Method parseMethod =
RedisConnectionFactory.class.getDeclaredMethod("parseRedisNode", String.class);
+ parseMethod.setAccessible(true);
+
+ // IPv4 with custom port
+ Assertions.assertAll(
+ () -> {
+ RedisNode node = (RedisNode) parseMethod.invoke(factory,
"localhost:6380");
+ Assertions.assertEquals("localhost", node.getHost());
+ Assertions.assertEquals(6380, node.getPort());
+ },
+ () -> {
+ RedisNode node = (RedisNode) parseMethod.invoke(factory,
"[::1]:6381");
+ Assertions.assertEquals("::1", node.getHost());
+ Assertions.assertEquals(6381, node.getPort());
+ },
+ () -> {
+ RedisNode node = (RedisNode) parseMethod.invoke(factory,
"[::1]");
+ Assertions.assertEquals("::1", node.getHost());
+ Assertions.assertEquals(6379, node.getPort());
+ },
+ () -> {
+ RedisNode node = (RedisNode) parseMethod.invoke(factory,
"redis.internal");
+ Assertions.assertEquals("redis.internal", node.getHost());
+ Assertions.assertEquals(6379, node.getPort());
+ }
+ );
+ }
+
+ @Test
+ public void parseRedisNodeInvalidInputs() throws Exception {
+ RedisConnectionFactory factory = createFactoryWithDefaultUrl();
+ Method parseMethod =
RedisConnectionFactory.class.getDeclaredMethod("parseRedisNode", String.class);
+ parseMethod.setAccessible(true);
+
+ Assertions.assertAll(
+ () -> assertInvalidNode(parseMethod, factory, ""),
+ () -> assertInvalidNode(parseMethod, factory, "[::1]:"),
+ () -> assertInvalidNode(parseMethod, factory,
"localhost:70000"),
+ () -> assertInvalidNode(parseMethod, factory, "[]:6379")
+ );
+ }
+
+ private RedisConnectionFactory createFactoryWithDefaultUrl() {
+ RedisConfigProperties redisConfigProperties = new
RedisConfigProperties();
+ redisConfigProperties.setUrl("localhost:6379");
+ redisConfigProperties.setMode(RedisModeEnum.STANDALONE.getName());
+ return new RedisConnectionFactory(redisConfigProperties);
+ }
+
+ private void assertInvalidNode(final Method parseMethod, final
RedisConnectionFactory factory, final String url) {
+ Assertions.assertThrows(IllegalArgumentException.class, () -> {
+ try {
+ parseMethod.invoke(factory, url);
+ } catch (InvocationTargetException e) {
+ // unwrap to the original IllegalArgumentException
+ if (e.getCause() instanceof RuntimeException) {
+ throw (RuntimeException) e.getCause();
+ }
+ throw new RuntimeException(e.getCause());
+ }
+ });
+ }
}
diff --git
a/shenyu-registry/shenyu-registry-etcd/src/test/java/org/apache/shenyu/registry/etcd/EtcdClientTest.java
b/shenyu-registry/shenyu-registry-etcd/src/test/java/org/apache/shenyu/registry/etcd/EtcdClientTest.java
index ff665aabb2..8876e2c43c 100644
---
a/shenyu-registry/shenyu-registry-etcd/src/test/java/org/apache/shenyu/registry/etcd/EtcdClientTest.java
+++
b/shenyu-registry/shenyu-registry-etcd/src/test/java/org/apache/shenyu/registry/etcd/EtcdClientTest.java
@@ -63,26 +63,37 @@ public class EtcdClientTest {
when(client.getLeaseClient()).thenReturn(lease);
final CompletableFuture<LeaseGrantResponse> completableFuture =
mock(CompletableFuture.class);
final LeaseGrantResponse leaseGrantResponse =
mock(LeaseGrantResponse.class);
+ when(leaseGrantResponse.getID()).thenReturn(1L);
when(client.getLeaseClient().grant(anyLong())).thenReturn(completableFuture);
when(completableFuture.get()).thenReturn(leaseGrantResponse);
- Assertions.assertDoesNotThrow(() ->
EtcdClient.builder().client(Client.builder().endpoints("url").build()).ttl(60L).timeout(3000L).build());
-
List<StreamObserver<LeaseKeepAliveResponse>> observerList = new
ArrayList<>();
doAnswer(invocation -> {
observerList.add(invocation.getArgument(1));
return lease;
}).when(lease).keepAlive(anyLong(), any());
- Assertions.assertDoesNotThrow(() ->
EtcdClient.builder().client(Client.builder().endpoints("url").build()).ttl(60L).timeout(3000L).build());
+
+ final EtcdClient etcdClient = Assertions.assertDoesNotThrow(() ->
EtcdClient.builder()
+ .client(Client.builder().endpoints("url").build())
+ .ttl(60L)
+ .timeout(3000L)
+ .build());
+ Assertions.assertNotNull(etcdClient);
+
final LeaseKeepAliveResponse leaseKeepAliveResponse =
mock(LeaseKeepAliveResponse.class);
observerList.forEach(streamObserver -> {
streamObserver.onCompleted();
streamObserver.onError(new ShenyuException("test"));
streamObserver.onNext(leaseKeepAliveResponse);
});
+ etcdClient.close();
doThrow(new
InterruptedException("error")).when(completableFuture).get();
- Assertions.assertDoesNotThrow(() ->
EtcdClient.builder().client(Client.builder().endpoints("url").build()).ttl(60L).timeout(3000L).build());
+ Assertions.assertThrows(ShenyuException.class, () ->
EtcdClient.builder()
+ .client(Client.builder().endpoints("url").build())
+ .ttl(60L)
+ .timeout(3000L)
+ .build());
} catch (Exception e) {
throw new ShenyuException(e.getCause());
}
@@ -113,7 +124,7 @@ public class EtcdClientTest {
etcdClient.putEphemeral("key", "value");
doThrow(new
InterruptedException("error")).when(completableFuture).get(anyLong(),
any(TimeUnit.class));
- etcdClient.putEphemeral("key", "value");
+ Assertions.assertThrows(ShenyuException.class, () ->
etcdClient.putEphemeral("key", "value"));
} catch (Exception e) {
throw new ShenyuException(e.getCause());
}
@@ -129,6 +140,7 @@ public class EtcdClientTest {
when(client.getLeaseClient()).thenReturn(lease);
final CompletableFuture<LeaseGrantResponse> completableFuture =
mock(CompletableFuture.class);
final LeaseGrantResponse leaseGrantResponse =
mock(LeaseGrantResponse.class);
+ when(leaseGrantResponse.getID()).thenReturn(1L);
when(client.getLeaseClient().grant(anyLong())).thenReturn(completableFuture);
when(completableFuture.get()).thenReturn(leaseGrantResponse);
return client;