This is an automated email from the ASF dual-hosted git repository.
xiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-shenyu.git
The following commit(s) were added to refs/heads/master by this push:
new 8ccf154 [ISSUE #1665] Improve health check for divide plugin (#1666)
8ccf154 is described below
commit 8ccf154cfdb3e03b18085b5537142a8ecb603852
Author: Wu Zhiguo <[email protected]>
AuthorDate: Tue Jul 27 14:50:20 2021 +0800
[ISSUE #1665] Improve health check for divide plugin (#1666)
* add health check parameters
* simple health check logic
* improve health check for divide plugin
* improve health check for divide plugin
* improve health check for divide plugin
* remove WAITING_FOR_CHECK
* synchronized
* add log
* log
* disable zombie check in default
* disable zombie check in default
* ut
* ut
* ut
* checkstyle
* ut
* checkstyle
* move health check to common
* check immediately only when check is enable
* extract specific logic to UpstreamCacheManager
* ut
* style
---
.../admin/service/UpstreamCheckServiceTest.java | 6 +-
.../apache/shenyu/common/constant/Constants.java | 2 +-
.../shenyu/common/dto/convert/DivideUpstream.java | 14 +-
.../shenyu/common/healthcheck/HealthCheckTask.java | 339 +++++++++++++++++++++
.../UpstreamWithSelectorId.java} | 55 +---
.../plugin/divide/cache/UpstreamCacheManager.java | 118 ++++---
.../shenyu/plugin/divide/DividePluginTest.java | 18 +-
.../divide/cache/UpstreamCacheManagerTest.java | 31 +-
.../handler/DividePluginDataHandlerTest.java | 19 +-
.../divide/websocket/WebSocketPluginTest.java | 17 ++
10 files changed, 513 insertions(+), 106 deletions(-)
diff --git
a/shenyu-admin/src/test/java/org/apache/shenyu/admin/service/UpstreamCheckServiceTest.java
b/shenyu-admin/src/test/java/org/apache/shenyu/admin/service/UpstreamCheckServiceTest.java
index 6318732..f401c72 100644
---
a/shenyu-admin/src/test/java/org/apache/shenyu/admin/service/UpstreamCheckServiceTest.java
+++
b/shenyu-admin/src/test/java/org/apache/shenyu/admin/service/UpstreamCheckServiceTest.java
@@ -17,7 +17,6 @@
package org.apache.shenyu.admin.service;
-import com.google.common.collect.Lists;
import org.apache.shenyu.admin.model.entity.PluginDO;
import org.apache.shenyu.admin.model.entity.SelectorDO;
import org.apache.shenyu.admin.listener.DataChangedEvent;
@@ -44,7 +43,6 @@ import java.util.Map;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.when;
@@ -102,9 +100,9 @@ public final class UpstreamCheckServiceTest {
.handle("[{\"upstreamHost\":\"localhost\",\"protocol\":\"http://\",\"localhost\":\"divide-upstream-60\",\"weight\":60}]")
.build();
//stubbing
-
when(pluginMapper.selectByNames(anyList())).thenReturn(Lists.newArrayList(pluginDO));
+//
when(pluginMapper.selectByNames(anyList())).thenReturn(Lists.newArrayList(pluginDO));
when(pluginMapper.selectById(anyString())).thenReturn(pluginDO);
-
when(selectorMapper.findByPluginId(anyString())).thenReturn(Lists.newArrayList(selectorDOWithUrlError,
selectorDOWithUrlReachable));
+//
when(selectorMapper.findByPluginId(anyString())).thenReturn(Lists.newArrayList(selectorDOWithUrlError,
selectorDOWithUrlReachable));
when(selectorMapper.updateSelective(any(SelectorDO.class))).thenReturn(1);
when(selectorMapper.selectByName(anyString())).then(invocationOnMock
-> {
Object[] args = invocationOnMock.getArguments();
diff --git
a/shenyu-common/src/main/java/org/apache/shenyu/common/constant/Constants.java
b/shenyu-common/src/main/java/org/apache/shenyu/common/constant/Constants.java
index 7779883..af064e8 100644
---
a/shenyu-common/src/main/java/org/apache/shenyu/common/constant/Constants.java
+++
b/shenyu-common/src/main/java/org/apache/shenyu/common/constant/Constants.java
@@ -376,7 +376,7 @@ public interface Constants {
/**
* default checked value.
*/
- String DEFAULT_CHECK_VALUE = "true";
+ String DEFAULT_CHECK_VALUE = "false";
/**
* zombie check times.
diff --git
a/shenyu-common/src/main/java/org/apache/shenyu/common/dto/convert/DivideUpstream.java
b/shenyu-common/src/main/java/org/apache/shenyu/common/dto/convert/DivideUpstream.java
index 203b297..84b21f7 100644
---
a/shenyu-common/src/main/java/org/apache/shenyu/common/dto/convert/DivideUpstream.java
+++
b/shenyu-common/src/main/java/org/apache/shenyu/common/dto/convert/DivideUpstream.java
@@ -19,6 +19,7 @@ package org.apache.shenyu.common.dto.convert;
import lombok.Builder;
import lombok.Data;
+import lombok.EqualsAndHashCode;
import lombok.ToString;
import java.io.Serializable;
@@ -29,6 +30,7 @@ import java.io.Serializable;
@Data
@ToString
@Builder
+@EqualsAndHashCode(onlyExplicitlyIncluded = true)
public class DivideUpstream implements Serializable {
private static final long serialVersionUID = 6252280511262542360L;
@@ -36,16 +38,19 @@ public class DivideUpstream implements Serializable {
/**
* host.
*/
+ @EqualsAndHashCode.Include
private String upstreamHost;
/**
* this is http protocol.
*/
+ @EqualsAndHashCode.Include
private String protocol;
/**
* url.
*/
+ @EqualsAndHashCode.Include
private String upstreamUrl;
/**
@@ -60,7 +65,7 @@ public class DivideUpstream implements Serializable {
private boolean status = true;
/**
- * starup time.
+ * startup time.
*/
private long timestamp;
@@ -69,4 +74,11 @@ public class DivideUpstream implements Serializable {
*/
private int warmup;
+ // health parameters
+
+ private boolean healthy;
+
+ private long lastHealthTimestamp;
+
+ private long lastUnhealthyTimestamp;
}
diff --git
a/shenyu-common/src/main/java/org/apache/shenyu/common/healthcheck/HealthCheckTask.java
b/shenyu-common/src/main/java/org/apache/shenyu/common/healthcheck/HealthCheckTask.java
new file mode 100644
index 0000000..2a6881f
--- /dev/null
+++
b/shenyu-common/src/main/java/org/apache/shenyu/common/healthcheck/HealthCheckTask.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.common.healthcheck;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shenyu.common.concurrent.ShenyuThreadFactory;
+import org.apache.shenyu.common.dto.SelectorData;
+import org.apache.shenyu.common.dto.convert.DivideUpstream;
+import org.apache.shenyu.common.utils.CollectionUtils;
+import org.apache.shenyu.common.utils.GsonUtils;
+import org.apache.shenyu.common.utils.UpstreamCheckUtils;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+/**
+ * Health check manager for upstream servers.
+ */
+@Slf4j
+public final class HealthCheckTask implements Runnable {
+
+ private final Map<String, List<DivideUpstream>> healthyUpstream =
Maps.newConcurrentMap();
+
+ private final Map<String, List<DivideUpstream>> unhealthyUpstream =
Maps.newConcurrentMap();
+
+ private final Map<String, SelectorData> selectorCache =
Maps.newConcurrentMap();
+
+ private final Object lock = new Object();
+
+ private final AtomicBoolean checkStarted = new AtomicBoolean(false);
+
+ private final List<CompletableFuture<UpstreamWithSelectorId>> futures =
Lists.newArrayList();
+
+ private final int checkInterval;
+
+ private ExecutorService executor;
+
+ private int checkTimeout = 3000;
+
+ private int healthyThreshold = 1;
+
+ private int unhealthyThreshold = 1;
+
+ public HealthCheckTask(final int checkInterval) {
+ this.checkInterval = checkInterval;
+ }
+
+ /**
+ * Schedule health check task.
+ */
+ public void schedule() {
+ // executor for health check
+ ThreadFactory healthCheckFactory =
ShenyuThreadFactory.create("upstream-health-check", true);
+ new ScheduledThreadPoolExecutor(1, healthCheckFactory)
+ .scheduleWithFixedDelay(this, 3000, checkInterval,
TimeUnit.MILLISECONDS);
+
+ // executor for async request, avoid request block health check thread
+ ThreadFactory requestFactory =
ShenyuThreadFactory.create("upstream-health-check-request", true);
+ executor = new ScheduledThreadPoolExecutor(10, requestFactory);
+ }
+
+ /**
+ * Set check timeout.
+ *
+ * @param checkTimeout milliseconds
+ */
+ public void setCheckTimeout(final int checkTimeout) {
+ this.checkTimeout = checkTimeout;
+ }
+
+ /**
+ * Set healthy threshold.
+ *
+ * @param healthyThreshold healthy threshold
+ */
+ public void setHealthyThreshold(final int healthyThreshold) {
+ this.healthyThreshold = healthyThreshold;
+ }
+
+ /**
+ * Set unhealthy threshold.
+ *
+ * @param unhealthyThreshold unhealthy threshold
+ */
+ public void setUnhealthyThreshold(final int unhealthyThreshold) {
+ this.unhealthyThreshold = unhealthyThreshold;
+ }
+
+ @Override
+ public void run() {
+ healthCheck();
+ }
+
+ private void healthCheck() {
+ try {
+ // If there is no synchronized. when check is done and all
upstream check result is in the futures list.
+ // In the same time, triggerRemoveAll() called before
waitFinish(), there will be dirty data stay in map.
+ synchronized (lock) {
+ if (tryStartHealthCheck()) {
+ doHealthCheck();
+ waitFinish();
+ }
+ }
+ } catch (Exception e) {
+ log.error("[Health Check] Meet problem: ", e);
+ } finally {
+ finishHealthCheck();
+ }
+ }
+
+ private void doHealthCheck() {
+ check(healthyUpstream);
+ check(unhealthyUpstream);
+ }
+
+ private void check(final Map<String, List<DivideUpstream>> map) {
+ for (Map.Entry<String, List<DivideUpstream>> entry : map.entrySet()) {
+ String key = entry.getKey();
+ List<DivideUpstream> value = entry.getValue();
+ for (DivideUpstream upstream : value) {
+ CompletableFuture<UpstreamWithSelectorId> future =
CompletableFuture.supplyAsync(() -> check(key, upstream), executor);
+
+ futures.add(future);
+ }
+ }
+ }
+
+ private UpstreamWithSelectorId check(final String selectorId, final
DivideUpstream upstream) {
+ SelectorData selectorData = selectorCache.get(selectorId);
+ boolean pass = UpstreamCheckUtils.checkUrl(upstream.getUpstreamUrl(),
checkTimeout);
+ if (pass) {
+ if (upstream.isHealthy()) {
+ upstream.setLastHealthTimestamp(System.currentTimeMillis());
+ } else {
+ long now = System.currentTimeMillis();
+ long interval = now - upstream.getLastUnhealthyTimestamp();
+ if (interval >= (long) checkInterval * healthyThreshold) {
+ upstream.setHealthy(true);
+ upstream.setLastHealthTimestamp(now);
+ log.info("[Health Check] Selector [{}] upstream {} health
check passed, server is back online.",
+ selectorData.getName(), upstream.getUpstreamUrl());
+ }
+ }
+ } else {
+ if (!upstream.isHealthy()) {
+ upstream.setLastUnhealthyTimestamp(System.currentTimeMillis());
+ } else {
+ long now = System.currentTimeMillis();
+ long interval = now - upstream.getLastHealthTimestamp();
+ if (interval >= (long) checkInterval * unhealthyThreshold) {
+ upstream.setHealthy(false);
+ upstream.setLastUnhealthyTimestamp(now);
+ log.info("[Health Check] Selector [{}] upstream {} health
check failed, server is offline.",
+ selectorData.getName(), upstream.getUpstreamUrl());
+ }
+ }
+ }
+
+ return new UpstreamWithSelectorId(selectorId, upstream);
+ }
+
+ private boolean tryStartHealthCheck() {
+ return checkStarted.compareAndSet(false, true);
+ }
+
+ private void waitFinish() throws ExecutionException, InterruptedException {
+ for (CompletableFuture<UpstreamWithSelectorId> future : futures) {
+ UpstreamWithSelectorId entity = future.get();
+ putEntityToMap(entity);
+ }
+
+ futures.clear();
+ }
+
+ private void putEntityToMap(final UpstreamWithSelectorId entity) {
+ DivideUpstream upstream = entity.getDivideUpstream();
+ if (upstream.isHealthy()) {
+ putToMap(healthyUpstream, entity.getSelectorId(), upstream);
+ removeFromMap(unhealthyUpstream, entity.getSelectorId(), upstream);
+ } else {
+ putToMap(unhealthyUpstream, entity.getSelectorId(), upstream);
+ removeFromMap(healthyUpstream, entity.getSelectorId(), upstream);
+ }
+ }
+
+ private void finishHealthCheck() {
+ checkStarted.set(false);
+ }
+
+ /**
+ * Add one upstream via selectorData.
+ *
+ * @param selectorData selectorData
+ * @param upstream upstream
+ */
+ public void triggerAddOne(final SelectorData selectorData, final
DivideUpstream upstream) {
+ selectorCache.putIfAbsent(selectorData.getId(), selectorData);
+ putToMap(healthyUpstream, selectorData.getId(), upstream);
+ }
+
+ /**
+ * Remove a specific upstream via selectorData.
+ *
+ * @param selectorData selectorData
+ * @param upstream upstream
+ */
+ public void triggerRemoveOne(final SelectorData selectorData, final
DivideUpstream upstream) {
+ triggerRemoveOne(selectorData.getId(), upstream);
+ }
+
+ /**
+ * Remove a specific upstream via selectorId.
+ *
+ * @param selectorId selectorId
+ * @param upstream upstream
+ */
+ public void triggerRemoveOne(final String selectorId, final DivideUpstream
upstream) {
+ removeFromMap(healthyUpstream, selectorId, upstream);
+ removeFromMap(unhealthyUpstream, selectorId, upstream);
+
+ SelectorData selectorData = selectorCache.get(selectorId);
+ log.info("[Health Check] Selector [{}] upstream {} was removed.",
selectorData.getName(), upstream.getUpstreamUrl());
+ }
+
+ private void putToMap(final Map<String, List<DivideUpstream>> map, final
String selectorId, final DivideUpstream upstream) {
+ synchronized (lock) {
+ List<DivideUpstream> list = map.computeIfAbsent(selectorId, k ->
Lists.newArrayList());
+ if (!list.contains(upstream)) {
+ list.add(upstream);
+ }
+ }
+ }
+
+ private void removeFromMap(final Map<String, List<DivideUpstream>> map,
final String selectorId, final DivideUpstream upstream) {
+ synchronized (lock) {
+ List<DivideUpstream> list = map.get(selectorId);
+ if (CollectionUtils.isNotEmpty(list)) {
+ list.remove(upstream);
+ }
+ }
+ }
+
+ /**
+ * Remove all upstream via selectorData.
+ *
+ * @param selectorData selectorData
+ */
+ public void triggerRemoveAll(final SelectorData selectorData) {
+ triggerRemoveAll(selectorData.getId());
+ }
+
+ /**
+ * Remove all upstream via selectorId.
+ *
+ * @param selectorId selectorId
+ */
+ public void triggerRemoveAll(final String selectorId) {
+ synchronized (lock) {
+ healthyUpstream.remove(selectorId);
+ unhealthyUpstream.remove(selectorId);
+ }
+
+ SelectorData selectorData = selectorCache.get(selectorId);
+ log.info("[Health Check] Selector [{}] all upstream as removed.",
selectorData.getName());
+
+ selectorCache.remove(selectorId);
+ }
+
+ /**
+ * Print healthy upstream.
+ */
+ public void printHealthyUpstream() {
+ healthyUpstream.forEach((k, v) -> {
+ if (v != null) {
+ SelectorData selectorData = selectorCache.get(k);
+ List<String> list =
v.stream().map(DivideUpstream::getUpstreamUrl).collect(Collectors.toList());
+ log.info("[Health Check] Selector [{}] currently healthy
upstream: {}",
+ selectorData.getName(),
GsonUtils.getInstance().toJson(list));
+ }
+ });
+ }
+
+ /**
+ * Print unhealthy upstream.
+ */
+ public void printUnhealthyUpstream() {
+ unhealthyUpstream.forEach((k, v) -> {
+ if (v != null) {
+ SelectorData selectorData = selectorCache.get(k);
+ List<String> list =
v.stream().map(DivideUpstream::getUpstreamUrl).collect(Collectors.toList());
+ log.info("[Health Check] Selector [{}] currently unhealthy
upstream: {}",
+ selectorData.getName(),
GsonUtils.getInstance().toJson(list));
+ }
+ });
+ }
+
+ /**
+ * Get healthy upstream map.
+ *
+ * @return healthy map.
+ */
+ public Map<String, List<DivideUpstream>> getHealthyUpstream() {
+ return healthyUpstream;
+ }
+
+ /**
+ * Get unhealthy upstream map.
+ *
+ * @return unhealthy map.
+ */
+ public Map<String, List<DivideUpstream>> getUnhealthyUpstream() {
+ return unhealthyUpstream;
+ }
+}
diff --git
a/shenyu-common/src/main/java/org/apache/shenyu/common/dto/convert/DivideUpstream.java
b/shenyu-common/src/main/java/org/apache/shenyu/common/healthcheck/UpstreamWithSelectorId.java
similarity index 51%
copy from
shenyu-common/src/main/java/org/apache/shenyu/common/dto/convert/DivideUpstream.java
copy to
shenyu-common/src/main/java/org/apache/shenyu/common/healthcheck/UpstreamWithSelectorId.java
index 203b297..7bfeaa4 100644
---
a/shenyu-common/src/main/java/org/apache/shenyu/common/dto/convert/DivideUpstream.java
+++
b/shenyu-common/src/main/java/org/apache/shenyu/common/healthcheck/UpstreamWithSelectorId.java
@@ -15,58 +15,17 @@
* limitations under the License.
*/
-package org.apache.shenyu.common.dto.convert;
+package org.apache.shenyu.common.healthcheck;
-import lombok.Builder;
+import lombok.AllArgsConstructor;
import lombok.Data;
-import lombok.ToString;
+import org.apache.shenyu.common.dto.convert.DivideUpstream;
-import java.io.Serializable;
-
-/**
- * this is divide upstream.
- */
@Data
-@ToString
-@Builder
-public class DivideUpstream implements Serializable {
-
- private static final long serialVersionUID = 6252280511262542360L;
-
- /**
- * host.
- */
- private String upstreamHost;
-
- /**
- * this is http protocol.
- */
- private String protocol;
-
- /**
- * url.
- */
- private String upstreamUrl;
-
- /**
- * weight.
- */
- private int weight;
-
- /**
- * false close/ true open.
- */
- @Builder.Default
- private boolean status = true;
-
- /**
- * starup time.
- */
- private long timestamp;
+@AllArgsConstructor
+public class UpstreamWithSelectorId {
- /**
- * warmup.
- */
- private int warmup;
+ private String selectorId;
+ private DivideUpstream divideUpstream;
}
diff --git
a/shenyu-plugin/shenyu-plugin-divide/src/main/java/org/apache/shenyu/plugin/divide/cache/UpstreamCacheManager.java
b/shenyu-plugin/shenyu-plugin-divide/src/main/java/org/apache/shenyu/plugin/divide/cache/UpstreamCacheManager.java
index 8c38c3d..cdb2dbc 100644
---
a/shenyu-plugin/shenyu-plugin-divide/src/main/java/org/apache/shenyu/plugin/divide/cache/UpstreamCacheManager.java
+++
b/shenyu-plugin/shenyu-plugin-divide/src/main/java/org/apache/shenyu/plugin/divide/cache/UpstreamCacheManager.java
@@ -24,13 +24,15 @@ import
org.apache.shenyu.common.concurrent.ShenyuThreadFactory;
import org.apache.shenyu.common.dto.SelectorData;
import org.apache.shenyu.common.dto.convert.DivideUpstream;
import org.apache.shenyu.common.dto.convert.rule.impl.DivideRuleHandle;
+import org.apache.shenyu.common.healthcheck.HealthCheckTask;
+import org.apache.shenyu.common.utils.CollectionUtils;
import org.apache.shenyu.common.utils.GsonUtils;
-import org.apache.shenyu.common.utils.UpstreamCheckUtils;
import org.apache.shenyu.plugin.base.cache.RuleHandleCache;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
/**
@@ -43,17 +45,58 @@ public final class UpstreamCacheManager extends
RuleHandleCache<String, DivideRu
private static final Map<String, List<DivideUpstream>> UPSTREAM_MAP =
Maps.newConcurrentMap();
- private static final Map<String, List<DivideUpstream>> UPSTREAM_MAP_TEMP =
Maps.newConcurrentMap();
+ private HealthCheckTask task;
+
+ // health check parameters
+ private Boolean checkEnable;
+
+ private int checkTimeout;
+
+ private int checkInterval;
+
+ private int healthyThreshold;
+
+ private int unhealthyThreshold;
+
+ // healthy upstream print parameters
+ private Boolean printEnable;
+
+ private Integer printInterval;
- /**
- * suggest shenyu.upstream.scheduledTime set 1 SECONDS.
- */
private UpstreamCacheManager() {
- boolean check =
Boolean.parseBoolean(System.getProperty("shenyu.upstream.check", "false"));
- if (check) {
- new ScheduledThreadPoolExecutor(1,
ShenyuThreadFactory.create("scheduled-upstream-task", false))
- .scheduleWithFixedDelay(this::scheduled,
- 30,
Integer.parseInt(System.getProperty("shenyu.upstream.scheduledTime", "30")),
TimeUnit.SECONDS);
+ initHealthCheck();
+ }
+
+ private void initHealthCheck() {
+ checkEnable =
Boolean.parseBoolean(System.getProperty("shenyu.upstream.check.enable",
"false"));
+ checkTimeout =
Integer.parseInt(System.getProperty("shenyu.upstream.check.timeout", "3000"));
+ healthyThreshold =
Integer.parseInt(System.getProperty("shenyu.upstream.check.healthy-threshold",
"1"));
+ unhealthyThreshold =
Integer.parseInt(System.getProperty("shenyu.upstream.check.unhealthy-threshold",
"1"));
+ checkInterval =
Integer.parseInt(System.getProperty("shenyu.upstream.check.interval", "5000"));
+
+ createTask();
+ printEnable =
Boolean.parseBoolean(System.getProperty("shenyu.upstream.check.print.enable",
"true"));
+ printInterval =
Integer.parseInt(System.getProperty("shenyu.upstream.check.print.interval",
"60000"));
+ scheduleHealthCheck();
+ }
+
+ private void createTask() {
+ task = new HealthCheckTask(checkInterval);
+ task.setCheckTimeout(checkTimeout);
+ task.setHealthyThreshold(healthyThreshold);
+ task.setUnhealthyThreshold(unhealthyThreshold);
+ }
+
+ private void scheduleHealthCheck() {
+ if (checkEnable) {
+ task.schedule();
+
+ // executor for log print
+ if (printEnable) {
+ ThreadFactory printFactory =
ShenyuThreadFactory.create("upstream-health-print", true);
+ new ScheduledThreadPoolExecutor(1, printFactory)
+ .scheduleWithFixedDelay(task::printHealthyUpstream,
printInterval, printInterval, TimeUnit.MILLISECONDS);
+ }
}
}
@@ -73,7 +116,7 @@ public final class UpstreamCacheManager extends
RuleHandleCache<String, DivideRu
* @return the list
*/
public List<DivideUpstream> findUpstreamListBySelectorId(final String
selectorId) {
- return UPSTREAM_MAP_TEMP.get(selectorId);
+ return task.getHealthyUpstream().get(selectorId);
}
/**
@@ -82,7 +125,8 @@ public final class UpstreamCacheManager extends
RuleHandleCache<String, DivideRu
* @param key the key
*/
public void removeByKey(final String key) {
- UPSTREAM_MAP_TEMP.remove(key);
+ UPSTREAM_MAP.remove(key);
+ task.triggerRemoveAll(key);
}
/**
@@ -92,44 +136,28 @@ public final class UpstreamCacheManager extends
RuleHandleCache<String, DivideRu
*/
public void submit(final SelectorData selectorData) {
final List<DivideUpstream> upstreamList =
GsonUtils.getInstance().fromList(selectorData.getHandle(),
DivideUpstream.class);
- if (null != upstreamList && upstreamList.size() > 0) {
- UPSTREAM_MAP.put(selectorData.getId(), upstreamList);
- UPSTREAM_MAP_TEMP.put(selectorData.getId(), upstreamList);
- } else {
- UPSTREAM_MAP.remove(selectorData.getId());
- UPSTREAM_MAP_TEMP.remove(selectorData.getId());
- }
- }
+ if (CollectionUtils.isNotEmpty(upstreamList)) {
+ List<DivideUpstream> existUpstream =
UPSTREAM_MAP.computeIfAbsent(selectorData.getId(), k -> Lists.newArrayList());
- private void scheduled() {
- if (UPSTREAM_MAP.size() > 0) {
- UPSTREAM_MAP.forEach((k, v) -> {
- List<DivideUpstream> result = check(v);
- if (result.size() > 0) {
- UPSTREAM_MAP_TEMP.put(k, result);
- } else {
- UPSTREAM_MAP_TEMP.remove(k);
+ // check upstream delete
+ for (DivideUpstream upstream : existUpstream) {
+ if (!upstreamList.contains(upstream)) {
+ task.triggerRemoveOne(selectorData, upstream);
}
- });
- }
- }
+ }
- private List<DivideUpstream> check(final List<DivideUpstream>
upstreamList) {
- List<DivideUpstream> resultList =
Lists.newArrayListWithCapacity(upstreamList.size());
- for (DivideUpstream divideUpstream : upstreamList) {
- final boolean pass =
UpstreamCheckUtils.checkUrl(divideUpstream.getUpstreamUrl());
- if (pass) {
- if (!divideUpstream.isStatus()) {
- divideUpstream.setTimestamp(System.currentTimeMillis());
- divideUpstream.setStatus(true);
- log.info("UpstreamCacheManager detect success the url: {},
host: {} ", divideUpstream.getUpstreamUrl(), divideUpstream.getUpstreamHost());
+ // check upstream add
+ for (DivideUpstream upstream : upstreamList) {
+ if (!existUpstream.contains(upstream)) {
+ task.triggerAddOne(selectorData, upstream);
}
- resultList.add(divideUpstream);
- } else {
- divideUpstream.setStatus(false);
- log.error("check the url={} is fail ",
divideUpstream.getUpstreamUrl());
}
+
+ // replace upstream
+ UPSTREAM_MAP.put(selectorData.getId(), upstreamList);
+ } else {
+ UPSTREAM_MAP.remove(selectorData.getId());
+ task.triggerRemoveAll(selectorData);
}
- return resultList;
}
}
diff --git
a/shenyu-plugin/shenyu-plugin-divide/src/test/java/org/apache/shenyu/plugin/divide/DividePluginTest.java
b/shenyu-plugin/shenyu-plugin-divide/src/test/java/org/apache/shenyu/plugin/divide/DividePluginTest.java
index 6c7a262..ed2c20c 100644
---
a/shenyu-plugin/shenyu-plugin-divide/src/test/java/org/apache/shenyu/plugin/divide/DividePluginTest.java
+++
b/shenyu-plugin/shenyu-plugin-divide/src/test/java/org/apache/shenyu/plugin/divide/DividePluginTest.java
@@ -26,14 +26,17 @@ import
org.apache.shenyu.common.dto.convert.rule.impl.DivideRuleHandle;
import org.apache.shenyu.common.enums.PluginEnum;
import org.apache.shenyu.common.enums.RpcTypeEnum;
import org.apache.shenyu.common.utils.GsonUtils;
+import org.apache.shenyu.common.utils.UpstreamCheckUtils;
import org.apache.shenyu.plugin.api.ShenyuPluginChain;
import org.apache.shenyu.plugin.api.context.ShenyuContext;
import org.apache.shenyu.plugin.base.utils.CacheKeyUtils;
import org.apache.shenyu.plugin.divide.cache.UpstreamCacheManager;
+import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.mockito.MockedStatic;
import org.mockito.junit.MockitoJUnitRunner;
import org.springframework.mock.http.server.reactive.MockServerHttpRequest;
import org.springframework.mock.web.server.MockServerWebExchange;
@@ -46,7 +49,10 @@ import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.when;
/**
@@ -69,6 +75,8 @@ public final class DividePluginTest {
private List<DivideUpstream> divideUpstreamList;
+ private MockedStatic<UpstreamCheckUtils> mockCheckUtils;
+
@Before
public void setup() {
this.ruleData = mock(RuleData.class);
@@ -86,6 +94,15 @@ public final class DividePluginTest {
.remoteAddress(new InetSocketAddress(8090))
.build());
this.dividePlugin = new DividePlugin();
+
+ // mock static
+ mockCheckUtils = mockStatic(UpstreamCheckUtils.class);
+ mockCheckUtils.when(() -> UpstreamCheckUtils.checkUrl(anyString(),
anyInt())).thenReturn(true);
+ }
+
+ @After
+ public void tearDown() {
+ mockCheckUtils.close();
}
/**
@@ -139,7 +156,6 @@ public final class DividePluginTest {
* Init mock info.
*/
private void initMockInfo() {
-
ShenyuContext context = mock(ShenyuContext.class);
context.setRpcType(RpcTypeEnum.HTTP.getName());
DivideRuleHandle handle = (DivideRuleHandle)
RuleHandleFactory.ruleHandle(PluginEnum.DIVIDE.getName(), "");
diff --git
a/shenyu-plugin/shenyu-plugin-divide/src/test/java/org/apache/shenyu/plugin/divide/cache/UpstreamCacheManagerTest.java
b/shenyu-plugin/shenyu-plugin-divide/src/test/java/org/apache/shenyu/plugin/divide/cache/UpstreamCacheManagerTest.java
index 5c5dcbd..494409b 100644
---
a/shenyu-plugin/shenyu-plugin-divide/src/test/java/org/apache/shenyu/plugin/divide/cache/UpstreamCacheManagerTest.java
+++
b/shenyu-plugin/shenyu-plugin-divide/src/test/java/org/apache/shenyu/plugin/divide/cache/UpstreamCacheManagerTest.java
@@ -19,18 +19,25 @@ package org.apache.shenyu.plugin.divide.cache;
import org.apache.shenyu.common.dto.SelectorData;
import org.apache.shenyu.common.dto.convert.DivideUpstream;
+import org.apache.shenyu.common.utils.CollectionUtils;
import org.apache.shenyu.common.utils.GsonUtils;
+import org.apache.shenyu.common.utils.UpstreamCheckUtils;
+import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.mockito.MockedStatic;
import org.mockito.junit.MockitoJUnitRunner;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.when;
/**
@@ -41,6 +48,10 @@ public final class UpstreamCacheManagerTest {
private List<DivideUpstream> loadBalances;
+ private SelectorData selectorData;
+
+ private MockedStatic<UpstreamCheckUtils> mockCheckUtils;
+
@Before
public void setUp() {
this.loadBalances = Stream.of(3, 4, 5)
@@ -48,10 +59,18 @@ public final class UpstreamCacheManagerTest {
.upstreamUrl("divide-upstream-" + weight)
.build())
.collect(Collectors.toList());
- SelectorData selectorData = mock(SelectorData.class);
+ selectorData = mock(SelectorData.class);
when(selectorData.getId()).thenReturn("mock");
when(selectorData.getHandle()).thenReturn(GsonUtils.getGson().toJson(loadBalances));
- UpstreamCacheManager.getInstance().submit(selectorData);
+
+ // mock static
+ mockCheckUtils = mockStatic(UpstreamCheckUtils.class);
+ mockCheckUtils.when(() -> UpstreamCheckUtils.checkUrl(anyString(),
anyInt())).thenReturn(true);
+ }
+
+ @After
+ public void tearDown() {
+ mockCheckUtils.close();
}
/**
@@ -59,8 +78,9 @@ public final class UpstreamCacheManagerTest {
*/
@Test
public void findUpstreamListTest() {
+ UpstreamCacheManager.getInstance().submit(selectorData);
List<DivideUpstream> result =
UpstreamCacheManager.getInstance().findUpstreamListBySelectorId("mock");
- Assert.assertEquals(GsonUtils.getGson().toJson(loadBalances),
GsonUtils.getGson().toJson(result));
+ Assert.assertEquals(loadBalances, result);
}
/**
@@ -68,9 +88,10 @@ public final class UpstreamCacheManagerTest {
*/
@Test
public void removeByKeyTest() {
+ UpstreamCacheManager.getInstance().submit(selectorData);
UpstreamCacheManager.getInstance().removeByKey("mock");
List<DivideUpstream> result =
UpstreamCacheManager.getInstance().findUpstreamListBySelectorId("mock");
- Assert.assertNull(result);
+ Assert.assertTrue(CollectionUtils.isEmpty(result));
}
/**
@@ -88,6 +109,6 @@ public final class UpstreamCacheManagerTest {
when(selectorData.getHandle()).thenReturn(GsonUtils.getGson().toJson(upstreams));
UpstreamCacheManager.getInstance().submit(selectorData);
List<DivideUpstream> result =
UpstreamCacheManager.getInstance().findUpstreamListBySelectorId("submit");
- Assert.assertEquals(GsonUtils.getGson().toJson(upstreams),
GsonUtils.getGson().toJson(result));
+ Assert.assertEquals(upstreams.get(0), result.get(0));
}
}
diff --git
a/shenyu-plugin/shenyu-plugin-divide/src/test/java/org/apache/shenyu/plugin/divide/handler/DividePluginDataHandlerTest.java
b/shenyu-plugin/shenyu-plugin-divide/src/test/java/org/apache/shenyu/plugin/divide/handler/DividePluginDataHandlerTest.java
index 7cf78a1..4be58b6 100644
---
a/shenyu-plugin/shenyu-plugin-divide/src/test/java/org/apache/shenyu/plugin/divide/handler/DividePluginDataHandlerTest.java
+++
b/shenyu-plugin/shenyu-plugin-divide/src/test/java/org/apache/shenyu/plugin/divide/handler/DividePluginDataHandlerTest.java
@@ -21,16 +21,22 @@ import org.apache.shenyu.common.dto.SelectorData;
import org.apache.shenyu.common.dto.convert.DivideUpstream;
import org.apache.shenyu.common.enums.PluginEnum;
import org.apache.shenyu.common.utils.GsonUtils;
+import org.apache.shenyu.common.utils.UpstreamCheckUtils;
import org.apache.shenyu.plugin.divide.cache.UpstreamCacheManager;
+import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.MockedStatic;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.when;
/**
@@ -44,6 +50,8 @@ public final class DividePluginDataHandlerTest {
private DividePluginDataHandler dividePluginDataHandler;
+ private MockedStatic<UpstreamCheckUtils> mockCheckUtils;
+
@Before
public void setUp() {
this.dividePluginDataHandler = new DividePluginDataHandler();
@@ -55,6 +63,15 @@ public final class DividePluginDataHandlerTest {
this.selectorData = mock(SelectorData.class);
when(selectorData.getId()).thenReturn("handler");
when(selectorData.getHandle()).thenReturn(GsonUtils.getGson().toJson(divideUpstreamList));
+
+ // mock static
+ mockCheckUtils = mockStatic(UpstreamCheckUtils.class);
+ mockCheckUtils.when(() -> UpstreamCheckUtils.checkUrl(anyString(),
anyInt())).thenReturn(true);
+ }
+
+ @After
+ public void tearDown() {
+ mockCheckUtils.close();
}
/**
@@ -64,7 +81,7 @@ public final class DividePluginDataHandlerTest {
public void handlerSelectorTest() {
dividePluginDataHandler.handlerSelector(selectorData);
List<DivideUpstream> result =
UpstreamCacheManager.getInstance().findUpstreamListBySelectorId("handler");
- Assert.assertEquals(GsonUtils.getGson().toJson(divideUpstreamList),
GsonUtils.getGson().toJson(result));
+
Assert.assertEquals(GsonUtils.getInstance().fromList(selectorData.getHandle(),
DivideUpstream.class).get(0), result.get(0));
}
/**
diff --git
a/shenyu-plugin/shenyu-plugin-divide/src/test/java/org/apache/shenyu/plugin/divide/websocket/WebSocketPluginTest.java
b/shenyu-plugin/shenyu-plugin-divide/src/test/java/org/apache/shenyu/plugin/divide/websocket/WebSocketPluginTest.java
index 8d4b344..9c9abb4 100644
---
a/shenyu-plugin/shenyu-plugin-divide/src/test/java/org/apache/shenyu/plugin/divide/websocket/WebSocketPluginTest.java
+++
b/shenyu-plugin/shenyu-plugin-divide/src/test/java/org/apache/shenyu/plugin/divide/websocket/WebSocketPluginTest.java
@@ -26,12 +26,15 @@ import
org.apache.shenyu.common.dto.convert.rule.impl.DivideRuleHandle;
import org.apache.shenyu.common.enums.PluginEnum;
import org.apache.shenyu.common.enums.RpcTypeEnum;
import org.apache.shenyu.common.utils.GsonUtils;
+import org.apache.shenyu.common.utils.UpstreamCheckUtils;
import org.apache.shenyu.plugin.api.ShenyuPluginChain;
import org.apache.shenyu.plugin.api.context.ShenyuContext;
import org.apache.shenyu.plugin.divide.cache.UpstreamCacheManager;
+import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.MockedStatic;
import org.springframework.mock.http.server.reactive.MockServerHttpRequest;
import org.springframework.mock.web.server.MockServerWebExchange;
import org.springframework.web.reactive.socket.client.WebSocketClient;
@@ -46,7 +49,10 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.when;
import static org.springframework.http.HttpHeaders.CONNECTION;
import static org.springframework.http.HttpHeaders.UPGRADE;
@@ -70,6 +76,8 @@ public class WebSocketPluginTest {
private WebSocketService webSocketService;
+ private MockedStatic<UpstreamCheckUtils> mockCheckUtils;
+
@Before
public void setup() {
this.ruleData = mock(RuleData.class);
@@ -89,6 +97,15 @@ public class WebSocketPluginTest {
WebSocketClient webSocketClient = mock(WebSocketClient.class);
this.webSocketService = mock(WebSocketService.class);
this.webSocketPlugin = new WebSocketPlugin(webSocketClient,
webSocketService);
+
+ // mock static
+ mockCheckUtils = mockStatic(UpstreamCheckUtils.class);
+ mockCheckUtils.when(() -> UpstreamCheckUtils.checkUrl(anyString(),
anyInt())).thenReturn(true);
+ }
+
+ @After
+ public void tearDown() {
+ mockCheckUtils.close();
}
/**