This is an automated email from the ASF dual-hosted git repository.
liuhongyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shenyu.git
The following commit(s) were added to refs/heads/master by this push:
new cb3c582f4c Fix: Preserve Gateway's independent upstream health check
state when receiving config updates from Admin (#6274)
cb3c582f4c is described below
commit cb3c582f4cf0457ddb8cbbc1d465083c237ed787
Author: Jesen Kwan <[email protected]>
AuthorDate: Thu Jan 29 11:28:53 2026 +0800
Fix: Preserve Gateway's independent upstream health check state when
receiving config updates from Admin (#6274)
* Fix: Preserve unhealthy upstream state when receiving config updates from
admin
When admin publishes configuration updates with upstreams marked as
status=false,
the gateway should preserve their unhealthy state and continue health
checking
instead of completely removing them. This allows the gateway's independent
health
check to recover upstreams when they become healthy.
Changes:
- UpstreamCacheManager: Refactored submit() method to preserve unhealthy
state
for both status=true and status=false upstreams
- Added processOfflineUpstreams() to handle status=false upstreams with
health
check enabled, keeping them in unhealthy map for monitoring
- Added processValidUpstreams() to check if valid upstreams were previously
unhealthy and preserve that status
- UpstreamCheckTask: Made removeFromMap() public to support state
preservation
Co-Authored-By: Claude <[email protected]>
* Test: Add tests for upstream unhealthy state preservation
Add comprehensive tests to verify the fix for preserving unhealthy upstream
state when receiving config updates from admin.
UpstreamCacheManagerTest:
- testSubmitWithStatusFalsePreservesUnhealthyState: Verify that upstreams
with status=false that were previously unhealthy remain in unhealthy map
- testSubmitWithNewOfflineUpstreamAddedToUnhealthy: Verify new upstreams
with status=false are added to unhealthy map for monitoring
- testSubmitPreservesUnhealthyForValidUpstream: Verify valid upstreams
that were previously unhealthy remain in unhealthy map
- testSubmitWithHealthCheckDisabledAndStatusFalse: Verify upstreams with
healthCheckEnabled=false are removed, not added to unhealthy map
UpstreamCheckTaskTest:
- testPutToMap: Test adding upstreams to healthy map
- testPutToMapUnhealthy: Test adding upstreams to unhealthy map
- testRemoveFromMap: Test removing upstreams from healthy map
- testRemoveFromMapUnhealthy: Test removing upstreams from unhealthy map
- testMoveUpstreamBetweenMaps: Test moving upstreams between maps
Co-Authored-By: Claude <[email protected]>
* Update
shenyu-loadbalancer/src/test/java/org/apache/shenyu/loadbalancer/cache/UpstreamCheckTaskTest.java
Co-authored-by: Copilot <[email protected]>
* Opt: Move empty list check before initialization to avoid unnecessary
processing
Move the empty list check to the beginning of submit() method to avoid
calling initializeUpstreamHealthStatus() and stream partitioning when
the upstream list is empty.
This is a minor performance optimization that reduces unnecessary method
calls and stream operations when processing empty upstream lists.
Changes:
- Move isEmpty() check before initializeUpstreamHealthStatus()
- Add early return for empty lists
- Remove redundant isEmpty() check after partitioning
Co-Authored-By: Claude <[email protected]>
---------
Co-authored-by: Claude <[email protected]>
Co-authored-by: aias00 <[email protected]>
Co-authored-by: Copilot <[email protected]>
Co-authored-by: zhengpeng <[email protected]>
---
.../loadbalancer/cache/UpstreamCacheManager.java | 120 +++++++++----
.../loadbalancer/cache/UpstreamCheckTask.java | 20 ++-
.../cache/UpstreamCacheManagerTest.java | 185 +++++++++++++++++++++
.../loadbalancer/cache/UpstreamCheckTaskTest.java | 136 +++++++++++++++
4 files changed, 427 insertions(+), 34 deletions(-)
diff --git
a/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/cache/UpstreamCacheManager.java
b/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/cache/UpstreamCacheManager.java
index eeff3c8195..2c13303970 100644
---
a/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/cache/UpstreamCacheManager.java
+++
b/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/cache/UpstreamCacheManager.java
@@ -26,6 +26,7 @@ import org.apache.shenyu.common.utils.MapUtils;
import org.apache.shenyu.common.utils.Singleton;
import org.apache.shenyu.loadbalancer.entity.Upstream;
+import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -146,53 +147,110 @@ public final class UpstreamCacheManager {
*/
public void submit(final String selectorId, final List<Upstream>
upstreamList) {
List<Upstream> actualUpstreamList = Objects.isNull(upstreamList) ?
Lists.newArrayList() : upstreamList;
- actualUpstreamList.forEach(upstream -> {
- if (!upstream.isHealthCheckEnabled()) {
- upstream.setStatus(true);
- upstream.setHealthy(true);
- }
- });
+
+ // Check if the list is empty first to avoid unnecessary processing
+ if (actualUpstreamList.isEmpty()) {
+ List<Upstream> existUpstreamList =
MapUtils.computeIfAbsent(UPSTREAM_MAP, selectorId, k -> Lists.newArrayList());
+ removeAllUpstreams(selectorId, existUpstreamList);
+ return;
+ }
+
+ initializeUpstreamHealthStatus(actualUpstreamList);
+
Map<Boolean, List<Upstream>> partitionedUpstreams =
actualUpstreamList.stream()
.collect(Collectors.partitioningBy(Upstream::isStatus));
List<Upstream> validUpstreamList = partitionedUpstreams.get(true);
List<Upstream> offlineUpstreamList = partitionedUpstreams.get(false);
List<Upstream> existUpstreamList =
MapUtils.computeIfAbsent(UPSTREAM_MAP, selectorId, k -> Lists.newArrayList());
- if (actualUpstreamList.isEmpty()) {
- existUpstreamList.forEach(up -> task.triggerRemoveOne(selectorId,
up));
- }
+ processOfflineUpstreams(selectorId, offlineUpstreamList,
existUpstreamList);
+ processValidUpstreams(selectorId, validUpstreamList,
existUpstreamList);
+
+ List<Upstream> healthyUpstreamList =
task.getHealthyUpstreamListBySelectorId(selectorId);
+ UPSTREAM_MAP.put(selectorId, Objects.isNull(healthyUpstreamList) ?
Lists.newArrayList() : healthyUpstreamList);
+ }
+
+ private void initializeUpstreamHealthStatus(final List<Upstream>
upstreamList) {
+ upstreamList.forEach(upstream -> {
+ if (!upstream.isHealthCheckEnabled()) {
+ upstream.setStatus(true);
+ upstream.setHealthy(true);
+ }
+ });
+ }
+
+ private void removeAllUpstreams(final String selectorId, final
List<Upstream> existUpstreamList) {
+ List<Upstream> toRemove = new ArrayList<>(existUpstreamList);
+ toRemove.forEach(up -> task.triggerRemoveOne(selectorId, up));
+ }
- // Use a Set for O(1) lookups instead of nested loops
+ private void processOfflineUpstreams(final String selectorId, final
List<Upstream> offlineUpstreamList,
+ final List<Upstream>
existUpstreamList) {
+ Map<String, Upstream> currentUnhealthyMap =
getCurrentUnhealthyMap(selectorId);
Set<Upstream> existUpstreamSet = new HashSet<>(existUpstreamList);
+
offlineUpstreamList.forEach(offlineUp -> {
+ String key = upstreamMapKey(offlineUp);
if (existUpstreamSet.contains(offlineUp)) {
- task.triggerRemoveOne(selectorId, offlineUp);
+ if (currentUnhealthyMap.containsKey(key) &&
offlineUp.isHealthCheckEnabled()) {
+ task.putToMap(task.getUnhealthyUpstream(), selectorId,
offlineUp);
+ task.removeFromMap(task.getHealthyUpstream(), selectorId,
offlineUp);
+ } else {
+ task.triggerRemoveOne(selectorId, offlineUp);
+ }
+ } else if (offlineUp.isHealthCheckEnabled()) {
+ task.putToMap(task.getUnhealthyUpstream(), selectorId,
offlineUp);
+ }
+ });
+ }
+
+ private void processValidUpstreams(final String selectorId, final
List<Upstream> validUpstreamList,
+ final List<Upstream> existUpstreamList)
{
+ if (validUpstreamList.isEmpty()) {
+ return;
+ }
+
+ updateExistingUpstreams(validUpstreamList, existUpstreamList);
+ addNewUpstreams(selectorId, validUpstreamList, existUpstreamList);
+ }
+
+ private void updateExistingUpstreams(final List<Upstream>
validUpstreamList, final List<Upstream> existUpstreamList) {
+ Map<String, Upstream> existUpstreamMap = existUpstreamList.stream()
+ .collect(Collectors.toMap(this::upstreamMapKey, existUp ->
existUp, (existing, replacement) -> existing));
+
+ validUpstreamList.forEach(validUp -> {
+ Upstream matchedExistUp =
existUpstreamMap.get(upstreamMapKey(validUp));
+ if (Objects.nonNull(matchedExistUp)) {
+ matchedExistUp.setWeight(validUp.getWeight());
+
matchedExistUp.setHealthCheckEnabled(validUp.isHealthCheckEnabled());
+ if (!matchedExistUp.isHealthCheckEnabled()) {
+ matchedExistUp.setHealthy(true);
+ }
}
});
+ }
+
+ private void addNewUpstreams(final String selectorId, final List<Upstream>
validUpstreamList,
+ final List<Upstream> existUpstreamList) {
+ Map<String, Upstream> currentUnhealthyMap =
getCurrentUnhealthyMap(selectorId);
- if (!validUpstreamList.isEmpty()) {
- // update upstream weight
- Map<String, Upstream> existUpstreamMap = existUpstreamList.stream()
- .collect(Collectors.toMap(this::upstreamMapKey, existUp ->
existUp, (existing, replacement) -> existing));
- validUpstreamList.forEach(validUp -> {
- String key = upstreamMapKey(validUp);
- Upstream matchedExistUp = existUpstreamMap.get(key);
- if (Objects.nonNull(matchedExistUp)) {
- matchedExistUp.setWeight(validUp.getWeight());
-
matchedExistUp.setHealthCheckEnabled(validUp.isHealthCheckEnabled());
- if (!matchedExistUp.isHealthCheckEnabled()) {
- matchedExistUp.setHealthy(true);
- }
+ validUpstreamList.stream()
+ .filter(validUp -> !existUpstreamList.contains(validUp))
+ .forEach(up -> {
+ Upstream prevUnhealthy =
currentUnhealthyMap.get(upstreamMapKey(up));
+ if (Objects.nonNull(prevUnhealthy)) {
+ task.putToMap(task.getUnhealthyUpstream(), selectorId, up);
+ } else {
+ task.triggerAddOne(selectorId, up);
}
});
+ }
- validUpstreamList.stream()
- .filter(validUp -> !existUpstreamList.contains(validUp))
- .forEach(up -> task.triggerAddOne(selectorId, up));
- }
-
- List<Upstream> healthyUpstreamList =
task.getHealthyUpstreamListBySelectorId(selectorId);
- UPSTREAM_MAP.put(selectorId, Objects.isNull(healthyUpstreamList) ?
Lists.newArrayList() : healthyUpstreamList);
+ private Map<String, Upstream> getCurrentUnhealthyMap(final String
selectorId) {
+ List<Upstream> currentUnhealthy =
task.getUnhealthyUpstream().get(selectorId);
+ return Objects.isNull(currentUnhealthy)
+ ? Maps.newConcurrentMap()
+ :
currentUnhealthy.stream().collect(Collectors.toMap(this::upstreamMapKey, u ->
u, (a, b) -> a));
}
private String upstreamMapKey(final Upstream upstream) {
diff --git
a/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/cache/UpstreamCheckTask.java
b/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/cache/UpstreamCheckTask.java
index eef5cc47c5..4a5d0ea8fb 100644
---
a/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/cache/UpstreamCheckTask.java
+++
b/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/cache/UpstreamCheckTask.java
@@ -265,7 +265,7 @@ public final class UpstreamCheckTask implements Runnable {
public void triggerAddOne(final String selectorId, final Upstream
upstream) {
putToMap(healthyUpstream, selectorId, upstream);
}
-
+
/**
* Remove a specific upstream via selectorId.
*
@@ -277,7 +277,14 @@ public final class UpstreamCheckTask implements Runnable {
removeFromMap(unhealthyUpstream, selectorId, upstream);
}
- private void putToMap(final Map<String, List<Upstream>> map, final String
selectorId, final Upstream upstream) {
+ /**
+ * Put upstream to specified map (for preserving health status).
+ *
+ * @param map the map to put upstream
+ * @param selectorId the selector id
+ * @param upstream the upstream
+ */
+ public void putToMap(final Map<String, List<Upstream>> map, final String
selectorId, final Upstream upstream) {
synchronized (lock) {
List<Upstream> list = MapUtils.computeIfAbsent(map, selectorId, k
-> Lists.newArrayList());
if (!list.contains(upstream)) {
@@ -286,7 +293,14 @@ public final class UpstreamCheckTask implements Runnable {
}
}
- private void removeFromMap(final Map<String, List<Upstream>> map, final
String selectorId, final Upstream upstream) {
+ /**
+ * Remove upstream from specified map.
+ *
+ * @param map the map to remove upstream from
+ * @param selectorId the selector id
+ * @param upstream the upstream
+ */
+ public void removeFromMap(final Map<String, List<Upstream>> map, final
String selectorId, final Upstream upstream) {
synchronized (lock) {
List<Upstream> list = map.get(selectorId);
if (CollectionUtils.isNotEmpty(list)) {
diff --git
a/shenyu-loadbalancer/src/test/java/org/apache/shenyu/loadbalancer/cache/UpstreamCacheManagerTest.java
b/shenyu-loadbalancer/src/test/java/org/apache/shenyu/loadbalancer/cache/UpstreamCacheManagerTest.java
index 323535f230..17b449df3c 100644
---
a/shenyu-loadbalancer/src/test/java/org/apache/shenyu/loadbalancer/cache/UpstreamCacheManagerTest.java
+++
b/shenyu-loadbalancer/src/test/java/org/apache/shenyu/loadbalancer/cache/UpstreamCacheManagerTest.java
@@ -28,6 +28,7 @@ import org.junit.jupiter.api.Assertions;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
/**
* The type UpstreamCacheManager check task test.
@@ -106,4 +107,188 @@ public class UpstreamCacheManagerTest {
// Clean up
upstreamCacheManager.removeByKey(testSelectorId);
}
+
+ @Test
+ @Order(6)
+ public void testSubmitWithStatusFalsePreservesUnhealthyState() {
+ final UpstreamCacheManager upstreamCacheManager =
UpstreamCacheManager.getInstance();
+ final String testSelectorId = "PRESERVE_UNHEALTHY_TEST";
+
+ // First, submit healthy upstreams to establish baseline
+ List<Upstream> initialList = new ArrayList<>(2);
+ initialList.add(Upstream.builder()
+ .protocol("http://")
+ .url("upstream1:8080")
+ .status(true)
+ .healthCheckEnabled(true)
+ .build());
+ initialList.add(Upstream.builder()
+ .protocol("http://")
+ .url("upstream2:8080")
+ .status(true)
+ .healthCheckEnabled(true)
+ .build());
+ upstreamCacheManager.submit(testSelectorId, initialList);
+
+ // Simulate health check marking one as unhealthy
+ UpstreamCheckTask task = getUpstreamCheckTask(upstreamCacheManager);
+ if (Objects.nonNull(task)) {
+ Upstream unhealthyUpstream = initialList.get(0);
+ unhealthyUpstream.setHealthy(false);
+ task.putToMap(task.getUnhealthyUpstream(), testSelectorId,
unhealthyUpstream);
+ task.removeFromMap(task.getHealthyUpstream(), testSelectorId,
unhealthyUpstream);
+
+ // Verify it's in unhealthy map
+
Assertions.assertNotNull(task.getUnhealthyUpstream().get(testSelectorId));
+
Assertions.assertTrue(task.getUnhealthyUpstream().get(testSelectorId).stream()
+ .anyMatch(u -> u.getUrl().equals("upstream1:8080")));
+ }
+
+ // Now admin sends update with status=false for that upstream
+ List<Upstream> updateList = new ArrayList<>(2);
+ updateList.add(Upstream.builder()
+ .protocol("http://")
+ .url("upstream1:8080")
+ .status(false)
+ .healthCheckEnabled(true)
+ .build());
+ updateList.add(Upstream.builder()
+ .protocol("http://")
+ .url("upstream2:8080")
+ .status(true)
+ .healthCheckEnabled(true)
+ .build());
+ upstreamCacheManager.submit(testSelectorId, updateList);
+
+ // Verify: upstream1 should still be in unhealthy map (preserved state)
+ if (Objects.nonNull(task)) {
+ List<Upstream> unhealthyList =
task.getUnhealthyUpstream().get(testSelectorId);
+ Assertions.assertNotNull(unhealthyList);
+ Assertions.assertTrue(unhealthyList.stream()
+ .anyMatch(u -> u.getUrl().equals("upstream1:8080")),
+ "upstream1 should be preserved in unhealthy map");
+ }
+
+ // Clean up
+ upstreamCacheManager.removeByKey(testSelectorId);
+ }
+
+ @Test
+ @Order(7)
+ public void testSubmitWithNewOfflineUpstreamAddedToUnhealthy() {
+ final UpstreamCacheManager upstreamCacheManager =
UpstreamCacheManager.getInstance();
+ final String testSelectorId = "NEW_OFFLINE_UNHEALTHY_TEST";
+
+ // Submit a list with a new upstream having status=false
+ List<Upstream> upstreamList = new ArrayList<>(1);
+ upstreamList.add(Upstream.builder()
+ .protocol("http://")
+ .url("new-upstream:8080")
+ .status(false)
+ .healthCheckEnabled(true)
+ .build());
+ upstreamCacheManager.submit(testSelectorId, upstreamList);
+
+ // Verify: new upstream with status=false should be in unhealthy map
for monitoring
+ UpstreamCheckTask task = getUpstreamCheckTask(upstreamCacheManager);
+ if (Objects.nonNull(task)) {
+ List<Upstream> unhealthyList =
task.getUnhealthyUpstream().get(testSelectorId);
+ Assertions.assertNotNull(unhealthyList);
+ Assertions.assertTrue(unhealthyList.stream()
+ .anyMatch(u -> u.getUrl().equals("new-upstream:8080")),
+ "New upstream with status=false should be in unhealthy
map");
+ }
+
+ // Clean up
+ upstreamCacheManager.removeByKey(testSelectorId);
+ }
+
+ @Test
+ @Order(8)
+ public void testSubmitPreservesUnhealthyForValidUpstream() {
+ final UpstreamCacheManager upstreamCacheManager =
UpstreamCacheManager.getInstance();
+ final String testSelectorId = "PRESERVE_UNHEALTHY_VALID_TEST";
+
+ // First submit and mark an upstream as unhealthy
+ List<Upstream> initialList = new ArrayList<>(1);
+ initialList.add(Upstream.builder()
+ .protocol("http://")
+ .url("recovering-upstream:8080")
+ .status(true)
+ .healthCheckEnabled(true)
+ .build());
+ upstreamCacheManager.submit(testSelectorId, initialList);
+
+ UpstreamCheckTask task = getUpstreamCheckTask(upstreamCacheManager);
+ if (Objects.nonNull(task)) {
+ // Manually mark as unhealthy
+ Upstream unhealthyUpstream = initialList.get(0);
+ unhealthyUpstream.setHealthy(false);
+ task.putToMap(task.getUnhealthyUpstream(), testSelectorId,
unhealthyUpstream);
+ task.removeFromMap(task.getHealthyUpstream(), testSelectorId,
unhealthyUpstream);
+
+ // Now admin sends update with status=true (valid) for the same
upstream
+ List<Upstream> updateList = new ArrayList<>(1);
+ updateList.add(Upstream.builder()
+ .protocol("http://")
+ .url("recovering-upstream:8080")
+ .status(true)
+ .healthCheckEnabled(true)
+ .build());
+ upstreamCacheManager.submit(testSelectorId, updateList);
+
+ // Verify: should preserve unhealthy state since it was previously
unhealthy
+ List<Upstream> unhealthyList =
task.getUnhealthyUpstream().get(testSelectorId);
+ Assertions.assertNotNull(unhealthyList);
+ Assertions.assertTrue(unhealthyList.stream()
+ .anyMatch(u ->
u.getUrl().equals("recovering-upstream:8080")),
+ "Previously unhealthy upstream should remain in unhealthy
map");
+ }
+
+ // Clean up
+ upstreamCacheManager.removeByKey(testSelectorId);
+ }
+
+ @Test
+ @Order(9)
+ public void testSubmitWithHealthCheckDisabledAndStatusFalse() {
+ final UpstreamCacheManager upstreamCacheManager =
UpstreamCacheManager.getInstance();
+ final String testSelectorId =
"HEALTH_CHECK_DISABLED_STATUS_FALSE_TEST";
+
+ // Submit upstream with healthCheckEnabled=false and status=false
+ // This upstream should be removed, not added to unhealthy map
+ List<Upstream> upstreamList = new ArrayList<>(1);
+ upstreamList.add(Upstream.builder()
+ .protocol("http://")
+ .url("no-check-upstream:8080")
+ .status(false)
+ .healthCheckEnabled(false)
+ .build());
+ upstreamCacheManager.submit(testSelectorId, upstreamList);
+
+ UpstreamCheckTask task = getUpstreamCheckTask(upstreamCacheManager);
+ if (Objects.nonNull(task)) {
+ // Verify: should NOT be in unhealthy map since health check is
disabled
+ List<Upstream> unhealthyList =
task.getUnhealthyUpstream().get(testSelectorId);
+ Assertions.assertTrue(Objects.isNull(unhealthyList) ||
unhealthyList.isEmpty(),
+ "Upstream with healthCheckEnabled=false should not be in
unhealthy map");
+ }
+
+ // Clean up
+ upstreamCacheManager.removeByKey(testSelectorId);
+ }
+
+ /**
+ * Helper method to get the UpstreamCheckTask using reflection.
+ */
+ private UpstreamCheckTask getUpstreamCheckTask(final UpstreamCacheManager
manager) {
+ try {
+ java.lang.reflect.Field field =
UpstreamCacheManager.class.getDeclaredField("task");
+ field.setAccessible(true);
+ return (UpstreamCheckTask) field.get(manager);
+ } catch (Exception e) {
+ // If reflection fails, return null
+ return null;
+ }
+ }
}
diff --git
a/shenyu-loadbalancer/src/test/java/org/apache/shenyu/loadbalancer/cache/UpstreamCheckTaskTest.java
b/shenyu-loadbalancer/src/test/java/org/apache/shenyu/loadbalancer/cache/UpstreamCheckTaskTest.java
index 4d4017143d..d103e8844f 100644
---
a/shenyu-loadbalancer/src/test/java/org/apache/shenyu/loadbalancer/cache/UpstreamCheckTaskTest.java
+++
b/shenyu-loadbalancer/src/test/java/org/apache/shenyu/loadbalancer/cache/UpstreamCheckTaskTest.java
@@ -164,4 +164,140 @@ public class UpstreamCheckTaskTest {
assertTrue(upstream.isHealthCheckEnabled());
}
+
+ /**
+ * Test public putToMap method.
+ */
+ @Test
+ public void testPutToMap() {
+ final String selectorId = "putToMapTest";
+ Upstream upstream1 = Upstream.builder()
+ .url("upstream1:8080")
+ .build();
+ Upstream upstream2 = Upstream.builder()
+ .url("upstream2:8080")
+ .build();
+
+ // Test adding to healthy map
+ healthCheckTask.putToMap(healthCheckTask.getHealthyUpstream(),
selectorId, upstream1);
+
assertThat(healthCheckTask.getHealthyUpstream().get(selectorId).size(), is(1));
+
+ // Test adding another upstream
+ healthCheckTask.putToMap(healthCheckTask.getHealthyUpstream(),
selectorId, upstream2);
+
assertThat(healthCheckTask.getHealthyUpstream().get(selectorId).size(), is(2));
+
+ // Test adding duplicate (should not add again)
+ healthCheckTask.putToMap(healthCheckTask.getHealthyUpstream(),
selectorId, upstream1);
+
assertThat(healthCheckTask.getHealthyUpstream().get(selectorId).size(), is(2));
+
+ // Clean up
+ healthCheckTask.triggerRemoveAll(selectorId);
+ }
+
+ /**
+ * Test public putToMap method with unhealthy map.
+ */
+ @Test
+ public void testPutToMapUnhealthy() {
+ final String selectorId = "putToMapUnhealthyTest";
+ Upstream upstream = Upstream.builder()
+ .url("unhealthy-upstream:8080")
+ .build();
+
+ // Test adding to unhealthy map
+ healthCheckTask.putToMap(healthCheckTask.getUnhealthyUpstream(),
selectorId, upstream);
+
assertThat(healthCheckTask.getUnhealthyUpstream().get(selectorId).size(),
is(1));
+
+ // Verify it's not in healthy map
+
assertTrue(CollectionUtils.isEmpty(healthCheckTask.getHealthyUpstream().get(selectorId)));
+
+ // Clean up
+ healthCheckTask.triggerRemoveAll(selectorId);
+ }
+
+ /**
+ * Test public removeFromMap method.
+ */
+ @Test
+ public void testRemoveFromMap() {
+ final String selectorId = "removeFromMapTest";
+ Upstream upstream1 = Upstream.builder()
+ .url("remove1:8080")
+ .build();
+ Upstream upstream2 = Upstream.builder()
+ .url("remove2:8080")
+ .build();
+
+ // Add upstreams to healthy map
+ healthCheckTask.putToMap(healthCheckTask.getHealthyUpstream(),
selectorId, upstream1);
+ healthCheckTask.putToMap(healthCheckTask.getHealthyUpstream(),
selectorId, upstream2);
+
assertThat(healthCheckTask.getHealthyUpstream().get(selectorId).size(), is(2));
+
+ // Remove one upstream
+ healthCheckTask.removeFromMap(healthCheckTask.getHealthyUpstream(),
selectorId, upstream1);
+
assertThat(healthCheckTask.getHealthyUpstream().get(selectorId).size(), is(1));
+
+ // Verify correct upstream remains
+
assertThat(healthCheckTask.getHealthyUpstream().get(selectorId).get(0).getUrl(),
is("remove2:8080"));
+
+ // Clean up
+ healthCheckTask.triggerRemoveAll(selectorId);
+ }
+
+ /**
+ * Test public removeFromMap method with unhealthy map.
+ */
+ @Test
+ public void testRemoveFromMapUnhealthy() {
+ final String selectorId = "removeFromMapUnhealthyTest";
+ Upstream upstream = Upstream.builder()
+ .url("unhealthy-to-remove:8080")
+ .build();
+
+ // Add to unhealthy map
+ healthCheckTask.putToMap(healthCheckTask.getUnhealthyUpstream(),
selectorId, upstream);
+
assertThat(healthCheckTask.getUnhealthyUpstream().get(selectorId).size(),
is(1));
+
+ // Remove from unhealthy map
+ healthCheckTask.removeFromMap(healthCheckTask.getUnhealthyUpstream(),
selectorId, upstream);
+
assertTrue(!healthCheckTask.getUnhealthyUpstream().containsKey(selectorId)
+ ||
healthCheckTask.getUnhealthyUpstream().get(selectorId).isEmpty());
+ }
+
+ /**
+ * Test moving upstream between healthy and unhealthy maps using public
methods.
+ */
+ @Test
+ public void testMoveUpstreamBetweenMaps() {
+ final String selectorId = "moveBetweenMapsTest";
+ Upstream upstream = Upstream.builder()
+ .url("moving-upstream:8080")
+ .build();
+ upstream.setHealthy(true);
+
+ // Start in healthy map
+ healthCheckTask.putToMap(healthCheckTask.getHealthyUpstream(),
selectorId, upstream);
+
assertThat(healthCheckTask.getHealthyUpstream().get(selectorId).size(), is(1));
+
+ // Move to unhealthy map
+ healthCheckTask.removeFromMap(healthCheckTask.getHealthyUpstream(),
selectorId, upstream);
+ healthCheckTask.putToMap(healthCheckTask.getUnhealthyUpstream(),
selectorId, upstream);
+
+ // Verify moved
+
assertTrue(!healthCheckTask.getHealthyUpstream().containsKey(selectorId)
+ ||
healthCheckTask.getHealthyUpstream().get(selectorId).isEmpty());
+
assertThat(healthCheckTask.getUnhealthyUpstream().get(selectorId).size(),
is(1));
+
+ // Move back to healthy
+ healthCheckTask.removeFromMap(healthCheckTask.getUnhealthyUpstream(),
selectorId, upstream);
+ healthCheckTask.putToMap(healthCheckTask.getHealthyUpstream(),
selectorId, upstream);
+
+ // Verify moved back
+
assertThat(healthCheckTask.getHealthyUpstream().get(selectorId).size(), is(1));
+
assertTrue(!healthCheckTask.getUnhealthyUpstream().containsKey(selectorId)
+ ||
healthCheckTask.getUnhealthyUpstream().get(selectorId).isEmpty());
+
+ // Clean up
+ healthCheckTask.triggerRemoveAll(selectorId);
+ }
}