This is an automated email from the ASF dual-hosted git repository.
liuhongyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shenyu.git
The following commit(s) were added to refs/heads/master by this push:
new 082e7028be [ISSUE #6144] 修复通过 nacos
发现下游服务时,下游服务重启后,会将旧的IP覆盖新的IP。导致通过shenyu调用下游服务报错Can not find healthy upstream
url, please check your configuration! (#6201)
082e7028be is described below
commit 082e7028beb0e572d331121e09805a2bfb861ce4
Author: Evan <[email protected]>
AuthorDate: Tue Oct 21 16:55:34 2025 +0800
[ISSUE #6144] 修复通过 nacos 发现下游服务时,下游服务重启后,会将旧的IP覆盖新的IP。导致通过shenyu调用下游服务报错Can
not find healthy upstream url, please check your configuration! (#6201)
* fix : in the failure to properly handle valid and invalid nodes when
listening to the UpstreamList, which results in errors with valid instances.
1. 缓存实例逻辑修复
2. 新增通过 selectorId 获取健康实例方法
3. weight 支持 set 方法
4. 重写 Upstream equals 方法(去除 weight 参与相等校验)
issues: #6144 https://github.com/apache/shenyu/issues/6144
* Update
shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/cache/UpstreamCacheManager.java
Co-authored-by: Copilot <[email protected]>
* Update
shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/cache/UpstreamCacheManager.java
Co-authored-by: Copilot <[email protected]>
* Update
shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/cache/UpstreamCacheManager.java
Co-authored-by: Copilot <[email protected]>
---------
Co-authored-by: Evan <>
Co-authored-by: aias00 <[email protected]>
Co-authored-by: Copilot <[email protected]>
---
.../loadbalancer/cache/UpstreamCacheManager.java | 50 +++++++++++++++++++---
.../loadbalancer/cache/UpstreamCheckTask.java | 9 ++++
.../shenyu/loadbalancer/entity/Upstream.java | 13 +++++-
3 files changed, 63 insertions(+), 9 deletions(-)
diff --git
a/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/cache/UpstreamCacheManager.java
b/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/cache/UpstreamCacheManager.java
index f5a27b8d6b..ecb86c019c 100644
---
a/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/cache/UpstreamCacheManager.java
+++
b/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/cache/UpstreamCacheManager.java
@@ -28,6 +28,7 @@ import org.apache.shenyu.loadbalancer.entity.Upstream;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
@@ -142,12 +143,47 @@ public final class UpstreamCacheManager {
* @param upstreamList the upstream list
*/
public void submit(final String selectorId, final List<Upstream>
upstreamList) {
- List<Upstream> validUpstreamList =
upstreamList.stream().filter(Upstream::isStatus).collect(Collectors.toList());
- List<Upstream> existUpstream = MapUtils.computeIfAbsent(UPSTREAM_MAP,
selectorId, k -> Lists.newArrayList());
- existUpstream.stream().filter(upstream ->
!validUpstreamList.contains(upstream))
- .forEach(upstream -> task.triggerRemoveOne(selectorId,
upstream));
- validUpstreamList.stream().filter(upstream ->
!existUpstream.contains(upstream))
- .forEach(upstream -> task.triggerAddOne(selectorId, upstream));
- UPSTREAM_MAP.put(selectorId, validUpstreamList);
+ List<Upstream> actualUpstreamList = Objects.isNull(upstreamList) ?
Lists.newArrayList() : upstreamList;
+ Map<Boolean, List<Upstream>> partitionedUpstreams =
actualUpstreamList.stream()
+ .collect(Collectors.partitioningBy(Upstream::isStatus));
+ List<Upstream> validUpstreamList = partitionedUpstreams.get(true);
+ List<Upstream> offlineUpstreamList = partitionedUpstreams.get(false);
+ List<Upstream> existUpstreamList =
MapUtils.computeIfAbsent(UPSTREAM_MAP, selectorId, k -> Lists.newArrayList());
+
+ if (actualUpstreamList.isEmpty()) {
+ existUpstreamList.forEach(up -> task.triggerRemoveOne(selectorId,
up));
+ }
+
+ // Use a Set for O(1) lookups instead of nested loops
+ java.util.Set<Upstream> existUpstreamSet = new
java.util.HashSet<>(existUpstreamList);
+ offlineUpstreamList.forEach(offlineUp -> {
+ if (existUpstreamSet.contains(offlineUp)) {
+ task.triggerRemoveOne(selectorId, offlineUp);
+ }
+ });
+
+ if (!validUpstreamList.isEmpty()) {
+ // update upstream weight
+ Map<String, Upstream> existUpstreamMap = existUpstreamList.stream()
+ .collect(Collectors.toMap(this::upstreamMapKey, existUp ->
existUp, (existing, replacement) -> existing));
+ validUpstreamList.forEach(validUp -> {
+ String key = upstreamMapKey(validUp);
+ Upstream matchedExistUp = existUpstreamMap.get(key);
+ if (Objects.nonNull(matchedExistUp)) {
+ matchedExistUp.setWeight(validUp.getWeight());
+ }
+ });
+
+ validUpstreamList.stream()
+ .filter(validUp -> !existUpstreamList.contains(validUp))
+ .forEach(up -> task.triggerAddOne(selectorId, up));
+ }
+
+ List<Upstream> healthyUpstreamList =
task.getHealthyUpstreamListBySelectorId(selectorId);
+ UPSTREAM_MAP.put(selectorId, Objects.isNull(healthyUpstreamList) ?
Lists.newArrayList() : healthyUpstreamList);
+ }
+
+ private String upstreamMapKey(final Upstream upstream) {
+ return String.join("_", upstream.getProtocol(), upstream.getUrl());
}
}
diff --git
a/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/cache/UpstreamCheckTask.java
b/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/cache/UpstreamCheckTask.java
index e604a22ef3..359aa06253 100644
---
a/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/cache/UpstreamCheckTask.java
+++
b/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/cache/UpstreamCheckTask.java
@@ -334,6 +334,15 @@ public final class UpstreamCheckTask implements Runnable {
public Map<String, List<Upstream>> getHealthyUpstream() {
return healthyUpstream;
}
+
+ /**
+ * Get healthy upstream list.
+ * @param selectorId selectorId
+ * @return healthy upstream list.
+ */
+ public List<Upstream> getHealthyUpstreamListBySelectorId(final String
selectorId) {
+ return getHealthyUpstream().get(selectorId);
+ }
/**
* Get unhealthy upstream map.
diff --git
a/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/entity/Upstream.java
b/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/entity/Upstream.java
index e8c367d854..05c962bdc0 100644
---
a/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/entity/Upstream.java
+++
b/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/entity/Upstream.java
@@ -41,7 +41,7 @@ public final class Upstream {
/**
* weight.
*/
- private final int weight;
+ private int weight;
/**
* false close, true open.
@@ -196,6 +196,15 @@ public final class Upstream {
return weight;
}
+ /**
+ * Sets weight.
+ *
+ * @param weight the weight
+ */
+ public void setWeight(final int weight) {
+ this.weight = weight;
+ }
+
/**
* Is healthy boolean.
*
@@ -441,7 +450,7 @@ public final class Upstream {
return false;
}
Upstream that = (Upstream) o;
- return Objects.equals(url, that.url) && Objects.equals(protocol,
that.protocol) && Objects.equals(weight, that.weight);
+ return Objects.equals(url, that.url) && Objects.equals(protocol,
that.protocol);
}
@Override