This is an automated email from the ASF dual-hosted git repository.

xiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-shenyu.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ccf154  [ISSUE #1665] Improve health check for divide plugin (#1666)
8ccf154 is described below

commit 8ccf154cfdb3e03b18085b5537142a8ecb603852
Author: Wu Zhiguo <[email protected]>
AuthorDate: Tue Jul 27 14:50:20 2021 +0800

    [ISSUE #1665] Improve health check for divide plugin (#1666)
    
    * add health check parameters
    
    * simple health check logic
    
    * improve health check for divide plugin
    
    * improve health check for divide plugin
    
    * improve health check for divide plugin
    
    * remove WAITING_FOR_CHECK
    
    * synchronized
    
    * add log
    
    * log
    
    * disable zombie check in default
    
    * disable zombie check in default
    
    * ut
    
    * ut
    
    * ut
    
    * checkstyle
    
    * ut
    
    * checkstyle
    
    * move health check to common
    
    * check immediately only when check is enable
    
    * extract specific logic to UpstreamCacheManager
    
    * ut
    
    * style
---
 .../admin/service/UpstreamCheckServiceTest.java    |   6 +-
 .../apache/shenyu/common/constant/Constants.java   |   2 +-
 .../shenyu/common/dto/convert/DivideUpstream.java  |  14 +-
 .../shenyu/common/healthcheck/HealthCheckTask.java | 339 +++++++++++++++++++++
 .../UpstreamWithSelectorId.java}                   |  55 +---
 .../plugin/divide/cache/UpstreamCacheManager.java  | 118 ++++---
 .../shenyu/plugin/divide/DividePluginTest.java     |  18 +-
 .../divide/cache/UpstreamCacheManagerTest.java     |  31 +-
 .../handler/DividePluginDataHandlerTest.java       |  19 +-
 .../divide/websocket/WebSocketPluginTest.java      |  17 ++
 10 files changed, 513 insertions(+), 106 deletions(-)

diff --git 
a/shenyu-admin/src/test/java/org/apache/shenyu/admin/service/UpstreamCheckServiceTest.java
 
b/shenyu-admin/src/test/java/org/apache/shenyu/admin/service/UpstreamCheckServiceTest.java
index 6318732..f401c72 100644
--- 
a/shenyu-admin/src/test/java/org/apache/shenyu/admin/service/UpstreamCheckServiceTest.java
+++ 
b/shenyu-admin/src/test/java/org/apache/shenyu/admin/service/UpstreamCheckServiceTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.shenyu.admin.service;
 
-import com.google.common.collect.Lists;
 import org.apache.shenyu.admin.model.entity.PluginDO;
 import org.apache.shenyu.admin.model.entity.SelectorDO;
 import org.apache.shenyu.admin.listener.DataChangedEvent;
@@ -44,7 +43,6 @@ import java.util.Map;
 
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.when;
 
@@ -102,9 +100,9 @@ public final class UpstreamCheckServiceTest {
                 
.handle("[{\"upstreamHost\":\"localhost\",\"protocol\":\"http://\",\"localhost\":\"divide-upstream-60\",\"weight\":60}]";)
                 .build();
         //stubbing
-        
when(pluginMapper.selectByNames(anyList())).thenReturn(Lists.newArrayList(pluginDO));
+//        
when(pluginMapper.selectByNames(anyList())).thenReturn(Lists.newArrayList(pluginDO));
         when(pluginMapper.selectById(anyString())).thenReturn(pluginDO);
-        
when(selectorMapper.findByPluginId(anyString())).thenReturn(Lists.newArrayList(selectorDOWithUrlError,
 selectorDOWithUrlReachable));
+//        
when(selectorMapper.findByPluginId(anyString())).thenReturn(Lists.newArrayList(selectorDOWithUrlError,
 selectorDOWithUrlReachable));
         
when(selectorMapper.updateSelective(any(SelectorDO.class))).thenReturn(1);
         when(selectorMapper.selectByName(anyString())).then(invocationOnMock 
-> {
             Object[] args = invocationOnMock.getArguments();
diff --git 
a/shenyu-common/src/main/java/org/apache/shenyu/common/constant/Constants.java 
b/shenyu-common/src/main/java/org/apache/shenyu/common/constant/Constants.java
index 7779883..af064e8 100644
--- 
a/shenyu-common/src/main/java/org/apache/shenyu/common/constant/Constants.java
+++ 
b/shenyu-common/src/main/java/org/apache/shenyu/common/constant/Constants.java
@@ -376,7 +376,7 @@ public interface Constants {
     /**
      * default checked value.
      */
-    String DEFAULT_CHECK_VALUE = "true";
+    String DEFAULT_CHECK_VALUE = "false";
 
     /**
      * zombie check times.
diff --git 
a/shenyu-common/src/main/java/org/apache/shenyu/common/dto/convert/DivideUpstream.java
 
b/shenyu-common/src/main/java/org/apache/shenyu/common/dto/convert/DivideUpstream.java
index 203b297..84b21f7 100644
--- 
a/shenyu-common/src/main/java/org/apache/shenyu/common/dto/convert/DivideUpstream.java
+++ 
b/shenyu-common/src/main/java/org/apache/shenyu/common/dto/convert/DivideUpstream.java
@@ -19,6 +19,7 @@ package org.apache.shenyu.common.dto.convert;
 
 import lombok.Builder;
 import lombok.Data;
+import lombok.EqualsAndHashCode;
 import lombok.ToString;
 
 import java.io.Serializable;
@@ -29,6 +30,7 @@ import java.io.Serializable;
 @Data
 @ToString
 @Builder
+@EqualsAndHashCode(onlyExplicitlyIncluded = true)
 public class DivideUpstream implements Serializable {
 
     private static final long serialVersionUID = 6252280511262542360L;
@@ -36,16 +38,19 @@ public class DivideUpstream implements Serializable {
     /**
      * host.
      */
+    @EqualsAndHashCode.Include
     private String upstreamHost;
 
     /**
      * this is http protocol.
      */
+    @EqualsAndHashCode.Include
     private String protocol;
 
     /**
      * url.
      */
+    @EqualsAndHashCode.Include
     private String upstreamUrl;
 
     /**
@@ -60,7 +65,7 @@ public class DivideUpstream implements Serializable {
     private boolean status = true;
 
     /**
-     * starup time.
+     * startup time.
      */
     private long timestamp;
 
@@ -69,4 +74,11 @@ public class DivideUpstream implements Serializable {
      */
     private int warmup;
 
+    // health parameters
+
+    private boolean healthy;
+
+    private long lastHealthTimestamp;
+
+    private long lastUnhealthyTimestamp;
 }
diff --git 
a/shenyu-common/src/main/java/org/apache/shenyu/common/healthcheck/HealthCheckTask.java
 
b/shenyu-common/src/main/java/org/apache/shenyu/common/healthcheck/HealthCheckTask.java
new file mode 100644
index 0000000..2a6881f
--- /dev/null
+++ 
b/shenyu-common/src/main/java/org/apache/shenyu/common/healthcheck/HealthCheckTask.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.common.healthcheck;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shenyu.common.concurrent.ShenyuThreadFactory;
+import org.apache.shenyu.common.dto.SelectorData;
+import org.apache.shenyu.common.dto.convert.DivideUpstream;
+import org.apache.shenyu.common.utils.CollectionUtils;
+import org.apache.shenyu.common.utils.GsonUtils;
+import org.apache.shenyu.common.utils.UpstreamCheckUtils;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+/**
+ * Health check manager for upstream servers.
+ */
+@Slf4j
+public final class HealthCheckTask implements Runnable {
+
+    private final Map<String, List<DivideUpstream>> healthyUpstream = 
Maps.newConcurrentMap();
+
+    private final Map<String, List<DivideUpstream>> unhealthyUpstream = 
Maps.newConcurrentMap();
+
+    private final Map<String, SelectorData> selectorCache = 
Maps.newConcurrentMap();
+
+    private final Object lock = new Object();
+
+    private final AtomicBoolean checkStarted = new AtomicBoolean(false);
+
+    private final List<CompletableFuture<UpstreamWithSelectorId>> futures = 
Lists.newArrayList();
+
+    private final int checkInterval;
+
+    private ExecutorService executor;
+
+    private int checkTimeout = 3000;
+
+    private int healthyThreshold = 1;
+
+    private int unhealthyThreshold = 1;
+
+    public HealthCheckTask(final int checkInterval) {
+        this.checkInterval = checkInterval;
+    }
+
+    /**
+     * Schedule health check task.
+     */
+    public void schedule() {
+        // executor for health check
+        ThreadFactory healthCheckFactory = 
ShenyuThreadFactory.create("upstream-health-check", true);
+        new ScheduledThreadPoolExecutor(1, healthCheckFactory)
+                .scheduleWithFixedDelay(this, 3000, checkInterval, 
TimeUnit.MILLISECONDS);
+
+        // executor for async request, avoid request block health check thread
+        ThreadFactory requestFactory = 
ShenyuThreadFactory.create("upstream-health-check-request", true);
+        executor = new ScheduledThreadPoolExecutor(10, requestFactory);
+    }
+
+    /**
+     * Set check timeout.
+     *
+     * @param checkTimeout milliseconds
+     */
+    public void setCheckTimeout(final int checkTimeout) {
+        this.checkTimeout = checkTimeout;
+    }
+
+    /**
+     * Set healthy threshold.
+     *
+     * @param healthyThreshold healthy threshold
+     */
+    public void setHealthyThreshold(final int healthyThreshold) {
+        this.healthyThreshold = healthyThreshold;
+    }
+
+    /**
+     * Set unhealthy threshold.
+     *
+     * @param unhealthyThreshold unhealthy threshold
+     */
+    public void setUnhealthyThreshold(final int unhealthyThreshold) {
+        this.unhealthyThreshold = unhealthyThreshold;
+    }
+
+    @Override
+    public void run() {
+        healthCheck();
+    }
+
+    private void healthCheck() {
+        try {
+            // If there is no synchronized. when check is done and all 
upstream check result is in the futures list.
+            // In the same time, triggerRemoveAll() called before 
waitFinish(), there will be dirty data stay in map.
+            synchronized (lock) {
+                if (tryStartHealthCheck()) {
+                    doHealthCheck();
+                    waitFinish();
+                }
+            }
+        } catch (Exception e) {
+            log.error("[Health Check] Meet problem: ", e);
+        } finally {
+            finishHealthCheck();
+        }
+    }
+
+    private void doHealthCheck() {
+        check(healthyUpstream);
+        check(unhealthyUpstream);
+    }
+
+    private void check(final Map<String, List<DivideUpstream>> map) {
+        for (Map.Entry<String, List<DivideUpstream>> entry : map.entrySet()) {
+            String key = entry.getKey();
+            List<DivideUpstream> value = entry.getValue();
+            for (DivideUpstream upstream : value) {
+                CompletableFuture<UpstreamWithSelectorId> future = 
CompletableFuture.supplyAsync(() -> check(key, upstream), executor);
+
+                futures.add(future);
+            }
+        }
+    }
+
+    private UpstreamWithSelectorId check(final String selectorId, final 
DivideUpstream upstream) {
+        SelectorData selectorData = selectorCache.get(selectorId);
+        boolean pass = UpstreamCheckUtils.checkUrl(upstream.getUpstreamUrl(), 
checkTimeout);
+        if (pass) {
+            if (upstream.isHealthy()) {
+                upstream.setLastHealthTimestamp(System.currentTimeMillis());
+            } else {
+                long now = System.currentTimeMillis();
+                long interval = now - upstream.getLastUnhealthyTimestamp();
+                if (interval >= (long) checkInterval * healthyThreshold) {
+                    upstream.setHealthy(true);
+                    upstream.setLastHealthTimestamp(now);
+                    log.info("[Health Check] Selector [{}] upstream {} health 
check passed, server is back online.",
+                            selectorData.getName(), upstream.getUpstreamUrl());
+                }
+            }
+        } else {
+            if (!upstream.isHealthy()) {
+                upstream.setLastUnhealthyTimestamp(System.currentTimeMillis());
+            } else {
+                long now = System.currentTimeMillis();
+                long interval = now - upstream.getLastHealthTimestamp();
+                if (interval >= (long) checkInterval * unhealthyThreshold) {
+                    upstream.setHealthy(false);
+                    upstream.setLastUnhealthyTimestamp(now);
+                    log.info("[Health Check] Selector [{}] upstream {} health 
check failed, server is offline.",
+                            selectorData.getName(), upstream.getUpstreamUrl());
+                }
+            }
+        }
+
+        return new UpstreamWithSelectorId(selectorId, upstream);
+    }
+
+    private boolean tryStartHealthCheck() {
+        return checkStarted.compareAndSet(false, true);
+    }
+
+    private void waitFinish() throws ExecutionException, InterruptedException {
+        for (CompletableFuture<UpstreamWithSelectorId> future : futures) {
+            UpstreamWithSelectorId entity = future.get();
+            putEntityToMap(entity);
+        }
+
+        futures.clear();
+    }
+
+    private void putEntityToMap(final UpstreamWithSelectorId entity) {
+        DivideUpstream upstream = entity.getDivideUpstream();
+        if (upstream.isHealthy()) {
+            putToMap(healthyUpstream, entity.getSelectorId(), upstream);
+            removeFromMap(unhealthyUpstream, entity.getSelectorId(), upstream);
+        } else {
+            putToMap(unhealthyUpstream, entity.getSelectorId(), upstream);
+            removeFromMap(healthyUpstream, entity.getSelectorId(), upstream);
+        }
+    }
+
+    private void finishHealthCheck() {
+        checkStarted.set(false);
+    }
+
+    /**
+     * Add one upstream via selectorData.
+     *
+     * @param selectorData selectorData
+     * @param upstream     upstream
+     */
+    public void triggerAddOne(final SelectorData selectorData, final 
DivideUpstream upstream) {
+        selectorCache.putIfAbsent(selectorData.getId(), selectorData);
+        putToMap(healthyUpstream, selectorData.getId(), upstream);
+    }
+
+    /**
+     * Remove a specific upstream via selectorData.
+     *
+     * @param selectorData selectorData
+     * @param upstream     upstream
+     */
+    public void triggerRemoveOne(final SelectorData selectorData, final 
DivideUpstream upstream) {
+        triggerRemoveOne(selectorData.getId(), upstream);
+    }
+
+    /**
+     * Remove a specific upstream via selectorId.
+     *
+     * @param selectorId selectorId
+     * @param upstream   upstream
+     */
+    public void triggerRemoveOne(final String selectorId, final DivideUpstream 
upstream) {
+        removeFromMap(healthyUpstream, selectorId, upstream);
+        removeFromMap(unhealthyUpstream, selectorId, upstream);
+
+        SelectorData selectorData = selectorCache.get(selectorId);
+        log.info("[Health Check] Selector [{}] upstream {} was removed.", 
selectorData.getName(), upstream.getUpstreamUrl());
+    }
+
+    private void putToMap(final Map<String, List<DivideUpstream>> map, final 
String selectorId, final DivideUpstream upstream) {
+        synchronized (lock) {
+            List<DivideUpstream> list = map.computeIfAbsent(selectorId, k -> 
Lists.newArrayList());
+            if (!list.contains(upstream)) {
+                list.add(upstream);
+            }
+        }
+    }
+
+    private void removeFromMap(final Map<String, List<DivideUpstream>> map, 
final String selectorId, final DivideUpstream upstream) {
+        synchronized (lock) {
+            List<DivideUpstream> list = map.get(selectorId);
+            if (CollectionUtils.isNotEmpty(list)) {
+                list.remove(upstream);
+            }
+        }
+    }
+
+    /**
+     * Remove all upstream via selectorData.
+     *
+     * @param selectorData selectorData
+     */
+    public void triggerRemoveAll(final SelectorData selectorData) {
+        triggerRemoveAll(selectorData.getId());
+    }
+
+    /**
+     * Remove all upstream via selectorId.
+     *
+     * @param selectorId selectorId
+     */
+    public void triggerRemoveAll(final String selectorId) {
+        synchronized (lock) {
+            healthyUpstream.remove(selectorId);
+            unhealthyUpstream.remove(selectorId);
+        }
+
+        SelectorData selectorData = selectorCache.get(selectorId);
+        log.info("[Health Check] Selector [{}] all upstream as removed.", 
selectorData.getName());
+
+        selectorCache.remove(selectorId);
+    }
+
+    /**
+     * Print healthy upstream.
+     */
+    public void printHealthyUpstream() {
+        healthyUpstream.forEach((k, v) -> {
+            if (v != null) {
+                SelectorData selectorData = selectorCache.get(k);
+                List<String> list = 
v.stream().map(DivideUpstream::getUpstreamUrl).collect(Collectors.toList());
+                log.info("[Health Check] Selector [{}] currently healthy 
upstream: {}",
+                        selectorData.getName(), 
GsonUtils.getInstance().toJson(list));
+            }
+        });
+    }
+
+    /**
+     * Print unhealthy upstream.
+     */
+    public void printUnhealthyUpstream() {
+        unhealthyUpstream.forEach((k, v) -> {
+            if (v != null) {
+                SelectorData selectorData = selectorCache.get(k);
+                List<String> list = 
v.stream().map(DivideUpstream::getUpstreamUrl).collect(Collectors.toList());
+                log.info("[Health Check] Selector [{}] currently unhealthy 
upstream: {}",
+                        selectorData.getName(), 
GsonUtils.getInstance().toJson(list));
+            }
+        });
+    }
+
+    /**
+     * Get healthy upstream map.
+     *
+     * @return healthy map.
+     */
+    public Map<String, List<DivideUpstream>> getHealthyUpstream() {
+        return healthyUpstream;
+    }
+
+    /**
+     * Get unhealthy upstream map.
+     *
+     * @return unhealthy map.
+     */
+    public Map<String, List<DivideUpstream>> getUnhealthyUpstream() {
+        return unhealthyUpstream;
+    }
+}
diff --git 
a/shenyu-common/src/main/java/org/apache/shenyu/common/dto/convert/DivideUpstream.java
 
b/shenyu-common/src/main/java/org/apache/shenyu/common/healthcheck/UpstreamWithSelectorId.java
similarity index 51%
copy from 
shenyu-common/src/main/java/org/apache/shenyu/common/dto/convert/DivideUpstream.java
copy to 
shenyu-common/src/main/java/org/apache/shenyu/common/healthcheck/UpstreamWithSelectorId.java
index 203b297..7bfeaa4 100644
--- 
a/shenyu-common/src/main/java/org/apache/shenyu/common/dto/convert/DivideUpstream.java
+++ 
b/shenyu-common/src/main/java/org/apache/shenyu/common/healthcheck/UpstreamWithSelectorId.java
@@ -15,58 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.shenyu.common.dto.convert;
+package org.apache.shenyu.common.healthcheck;
 
-import lombok.Builder;
+import lombok.AllArgsConstructor;
 import lombok.Data;
-import lombok.ToString;
+import org.apache.shenyu.common.dto.convert.DivideUpstream;
 
-import java.io.Serializable;
-
-/**
- * this is divide upstream.
- */
 @Data
-@ToString
-@Builder
-public class DivideUpstream implements Serializable {
-
-    private static final long serialVersionUID = 6252280511262542360L;
-
-    /**
-     * host.
-     */
-    private String upstreamHost;
-
-    /**
-     * this is http protocol.
-     */
-    private String protocol;
-
-    /**
-     * url.
-     */
-    private String upstreamUrl;
-
-    /**
-     * weight.
-     */
-    private int weight;
-
-    /**
-     * false close/ true open.
-     */
-    @Builder.Default
-    private boolean status = true;
-
-    /**
-     * starup time.
-     */
-    private long timestamp;
+@AllArgsConstructor
+public class UpstreamWithSelectorId {
 
-    /**
-     * warmup.
-     */
-    private int warmup;
+    private String selectorId;
 
+    private DivideUpstream divideUpstream;
 }
diff --git 
a/shenyu-plugin/shenyu-plugin-divide/src/main/java/org/apache/shenyu/plugin/divide/cache/UpstreamCacheManager.java
 
b/shenyu-plugin/shenyu-plugin-divide/src/main/java/org/apache/shenyu/plugin/divide/cache/UpstreamCacheManager.java
index 8c38c3d..cdb2dbc 100644
--- 
a/shenyu-plugin/shenyu-plugin-divide/src/main/java/org/apache/shenyu/plugin/divide/cache/UpstreamCacheManager.java
+++ 
b/shenyu-plugin/shenyu-plugin-divide/src/main/java/org/apache/shenyu/plugin/divide/cache/UpstreamCacheManager.java
@@ -24,13 +24,15 @@ import 
org.apache.shenyu.common.concurrent.ShenyuThreadFactory;
 import org.apache.shenyu.common.dto.SelectorData;
 import org.apache.shenyu.common.dto.convert.DivideUpstream;
 import org.apache.shenyu.common.dto.convert.rule.impl.DivideRuleHandle;
+import org.apache.shenyu.common.healthcheck.HealthCheckTask;
+import org.apache.shenyu.common.utils.CollectionUtils;
 import org.apache.shenyu.common.utils.GsonUtils;
-import org.apache.shenyu.common.utils.UpstreamCheckUtils;
 import org.apache.shenyu.plugin.base.cache.RuleHandleCache;
 
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -43,17 +45,58 @@ public final class UpstreamCacheManager extends 
RuleHandleCache<String, DivideRu
 
     private static final Map<String, List<DivideUpstream>> UPSTREAM_MAP = 
Maps.newConcurrentMap();
 
-    private static final Map<String, List<DivideUpstream>> UPSTREAM_MAP_TEMP = 
Maps.newConcurrentMap();
+    private HealthCheckTask task;
+
+    // health check parameters
+    private Boolean checkEnable;
+
+    private int checkTimeout;
+
+    private int checkInterval;
+
+    private int healthyThreshold;
+
+    private int unhealthyThreshold;
+
+    // healthy upstream print parameters
+    private Boolean printEnable;
+
+    private Integer printInterval;
 
-    /**
-     * suggest shenyu.upstream.scheduledTime set 1 SECONDS.
-     */
     private UpstreamCacheManager() {
-        boolean check = 
Boolean.parseBoolean(System.getProperty("shenyu.upstream.check", "false"));
-        if (check) {
-            new ScheduledThreadPoolExecutor(1, 
ShenyuThreadFactory.create("scheduled-upstream-task", false))
-                    .scheduleWithFixedDelay(this::scheduled,
-                            30, 
Integer.parseInt(System.getProperty("shenyu.upstream.scheduledTime", "30")), 
TimeUnit.SECONDS);
+        initHealthCheck();
+    }
+
+    private void initHealthCheck() {
+        checkEnable = 
Boolean.parseBoolean(System.getProperty("shenyu.upstream.check.enable", 
"false"));
+        checkTimeout = 
Integer.parseInt(System.getProperty("shenyu.upstream.check.timeout", "3000"));
+        healthyThreshold = 
Integer.parseInt(System.getProperty("shenyu.upstream.check.healthy-threshold", 
"1"));
+        unhealthyThreshold = 
Integer.parseInt(System.getProperty("shenyu.upstream.check.unhealthy-threshold",
 "1"));
+        checkInterval = 
Integer.parseInt(System.getProperty("shenyu.upstream.check.interval", "5000"));
+
+        createTask();
+        printEnable = 
Boolean.parseBoolean(System.getProperty("shenyu.upstream.check.print.enable", 
"true"));
+        printInterval = 
Integer.parseInt(System.getProperty("shenyu.upstream.check.print.interval", 
"60000"));
+        scheduleHealthCheck();
+    }
+
+    private void createTask() {
+        task = new HealthCheckTask(checkInterval);
+        task.setCheckTimeout(checkTimeout);
+        task.setHealthyThreshold(healthyThreshold);
+        task.setUnhealthyThreshold(unhealthyThreshold);
+    }
+
+    private void scheduleHealthCheck() {
+        if (checkEnable) {
+            task.schedule();
+
+            // executor for log print
+            if (printEnable) {
+                ThreadFactory printFactory = 
ShenyuThreadFactory.create("upstream-health-print", true);
+                new ScheduledThreadPoolExecutor(1, printFactory)
+                        .scheduleWithFixedDelay(task::printHealthyUpstream, 
printInterval, printInterval, TimeUnit.MILLISECONDS);
+            }
         }
     }
 
@@ -73,7 +116,7 @@ public final class UpstreamCacheManager extends 
RuleHandleCache<String, DivideRu
      * @return the list
      */
     public List<DivideUpstream> findUpstreamListBySelectorId(final String 
selectorId) {
-        return UPSTREAM_MAP_TEMP.get(selectorId);
+        return task.getHealthyUpstream().get(selectorId);
     }
 
     /**
@@ -82,7 +125,8 @@ public final class UpstreamCacheManager extends 
RuleHandleCache<String, DivideRu
      * @param key the key
      */
     public void removeByKey(final String key) {
-        UPSTREAM_MAP_TEMP.remove(key);
+        UPSTREAM_MAP.remove(key);
+        task.triggerRemoveAll(key);
     }
 
     /**
@@ -92,44 +136,28 @@ public final class UpstreamCacheManager extends 
RuleHandleCache<String, DivideRu
      */
     public void submit(final SelectorData selectorData) {
         final List<DivideUpstream> upstreamList = 
GsonUtils.getInstance().fromList(selectorData.getHandle(), 
DivideUpstream.class);
-        if (null != upstreamList && upstreamList.size() > 0) {
-            UPSTREAM_MAP.put(selectorData.getId(), upstreamList);
-            UPSTREAM_MAP_TEMP.put(selectorData.getId(), upstreamList);
-        } else {
-            UPSTREAM_MAP.remove(selectorData.getId());
-            UPSTREAM_MAP_TEMP.remove(selectorData.getId());
-        }
-    }
+        if (CollectionUtils.isNotEmpty(upstreamList)) {
+            List<DivideUpstream> existUpstream = 
UPSTREAM_MAP.computeIfAbsent(selectorData.getId(), k -> Lists.newArrayList());
 
-    private void scheduled() {
-        if (UPSTREAM_MAP.size() > 0) {
-            UPSTREAM_MAP.forEach((k, v) -> {
-                List<DivideUpstream> result = check(v);
-                if (result.size() > 0) {
-                    UPSTREAM_MAP_TEMP.put(k, result);
-                } else {
-                    UPSTREAM_MAP_TEMP.remove(k);
+            // check upstream delete
+            for (DivideUpstream upstream : existUpstream) {
+                if (!upstreamList.contains(upstream)) {
+                    task.triggerRemoveOne(selectorData, upstream);
                 }
-            });
-        }
-    }
+            }
 
-    private List<DivideUpstream> check(final List<DivideUpstream> 
upstreamList) {
-        List<DivideUpstream> resultList = 
Lists.newArrayListWithCapacity(upstreamList.size());
-        for (DivideUpstream divideUpstream : upstreamList) {
-            final boolean pass = 
UpstreamCheckUtils.checkUrl(divideUpstream.getUpstreamUrl());
-            if (pass) {
-                if (!divideUpstream.isStatus()) {
-                    divideUpstream.setTimestamp(System.currentTimeMillis());
-                    divideUpstream.setStatus(true);
-                    log.info("UpstreamCacheManager detect success the url: {}, 
host: {} ", divideUpstream.getUpstreamUrl(), divideUpstream.getUpstreamHost());
+            // check upstream add
+            for (DivideUpstream upstream : upstreamList) {
+                if (!existUpstream.contains(upstream)) {
+                    task.triggerAddOne(selectorData, upstream);
                 }
-                resultList.add(divideUpstream);
-            } else {
-                divideUpstream.setStatus(false);
-                log.error("check the url={} is fail ", 
divideUpstream.getUpstreamUrl());
             }
+
+            // replace upstream
+            UPSTREAM_MAP.put(selectorData.getId(), upstreamList);
+        } else {
+            UPSTREAM_MAP.remove(selectorData.getId());
+            task.triggerRemoveAll(selectorData);
         }
-        return resultList;
     }
 }
diff --git 
a/shenyu-plugin/shenyu-plugin-divide/src/test/java/org/apache/shenyu/plugin/divide/DividePluginTest.java
 
b/shenyu-plugin/shenyu-plugin-divide/src/test/java/org/apache/shenyu/plugin/divide/DividePluginTest.java
index 6c7a262..ed2c20c 100644
--- 
a/shenyu-plugin/shenyu-plugin-divide/src/test/java/org/apache/shenyu/plugin/divide/DividePluginTest.java
+++ 
b/shenyu-plugin/shenyu-plugin-divide/src/test/java/org/apache/shenyu/plugin/divide/DividePluginTest.java
@@ -26,14 +26,17 @@ import 
org.apache.shenyu.common.dto.convert.rule.impl.DivideRuleHandle;
 import org.apache.shenyu.common.enums.PluginEnum;
 import org.apache.shenyu.common.enums.RpcTypeEnum;
 import org.apache.shenyu.common.utils.GsonUtils;
+import org.apache.shenyu.common.utils.UpstreamCheckUtils;
 import org.apache.shenyu.plugin.api.ShenyuPluginChain;
 import org.apache.shenyu.plugin.api.context.ShenyuContext;
 import org.apache.shenyu.plugin.base.utils.CacheKeyUtils;
 import org.apache.shenyu.plugin.divide.cache.UpstreamCacheManager;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.MockedStatic;
 import org.mockito.junit.MockitoJUnitRunner;
 import org.springframework.mock.http.server.reactive.MockServerHttpRequest;
 import org.springframework.mock.web.server.MockServerWebExchange;
@@ -46,7 +49,10 @@ import java.util.List;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.when;
 
 /**
@@ -69,6 +75,8 @@ public final class DividePluginTest {
 
     private List<DivideUpstream> divideUpstreamList;
 
+    private MockedStatic<UpstreamCheckUtils> mockCheckUtils;
+
     @Before
     public void setup() {
         this.ruleData = mock(RuleData.class);
@@ -86,6 +94,15 @@ public final class DividePluginTest {
                 .remoteAddress(new InetSocketAddress(8090))
                 .build());
         this.dividePlugin = new DividePlugin();
+
+        // mock static
+        mockCheckUtils = mockStatic(UpstreamCheckUtils.class);
+        mockCheckUtils.when(() -> UpstreamCheckUtils.checkUrl(anyString(), 
anyInt())).thenReturn(true);
+    }
+
+    @After
+    public void tearDown() {
+        mockCheckUtils.close();
     }
 
     /**
@@ -139,7 +156,6 @@ public final class DividePluginTest {
      * Init mock info.
      */
     private void initMockInfo() {
-
         ShenyuContext context = mock(ShenyuContext.class);
         context.setRpcType(RpcTypeEnum.HTTP.getName());
         DivideRuleHandle handle = (DivideRuleHandle) 
RuleHandleFactory.ruleHandle(PluginEnum.DIVIDE.getName(), "");
diff --git 
a/shenyu-plugin/shenyu-plugin-divide/src/test/java/org/apache/shenyu/plugin/divide/cache/UpstreamCacheManagerTest.java
 
b/shenyu-plugin/shenyu-plugin-divide/src/test/java/org/apache/shenyu/plugin/divide/cache/UpstreamCacheManagerTest.java
index 5c5dcbd..494409b 100644
--- 
a/shenyu-plugin/shenyu-plugin-divide/src/test/java/org/apache/shenyu/plugin/divide/cache/UpstreamCacheManagerTest.java
+++ 
b/shenyu-plugin/shenyu-plugin-divide/src/test/java/org/apache/shenyu/plugin/divide/cache/UpstreamCacheManagerTest.java
@@ -19,18 +19,25 @@ package org.apache.shenyu.plugin.divide.cache;
 
 import org.apache.shenyu.common.dto.SelectorData;
 import org.apache.shenyu.common.dto.convert.DivideUpstream;
+import org.apache.shenyu.common.utils.CollectionUtils;
 import org.apache.shenyu.common.utils.GsonUtils;
+import org.apache.shenyu.common.utils.UpstreamCheckUtils;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.MockedStatic;
 import org.mockito.junit.MockitoJUnitRunner;
 
 import java.util.List;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.when;
 
 /**
@@ -41,6 +48,10 @@ public final class UpstreamCacheManagerTest {
 
     private List<DivideUpstream> loadBalances;
 
+    private SelectorData selectorData;
+
+    private MockedStatic<UpstreamCheckUtils> mockCheckUtils;
+
     @Before
     public void setUp() {
         this.loadBalances = Stream.of(3, 4, 5)
@@ -48,10 +59,18 @@ public final class UpstreamCacheManagerTest {
                         .upstreamUrl("divide-upstream-" + weight)
                         .build())
                 .collect(Collectors.toList());
-        SelectorData selectorData = mock(SelectorData.class);
+        selectorData = mock(SelectorData.class);
         when(selectorData.getId()).thenReturn("mock");
         
when(selectorData.getHandle()).thenReturn(GsonUtils.getGson().toJson(loadBalances));
-        UpstreamCacheManager.getInstance().submit(selectorData);
+
+        // mock static
+        mockCheckUtils = mockStatic(UpstreamCheckUtils.class);
+        mockCheckUtils.when(() -> UpstreamCheckUtils.checkUrl(anyString(), 
anyInt())).thenReturn(true);
+    }
+
+    @After
+    public void tearDown() {
+        mockCheckUtils.close();
     }
 
     /**
@@ -59,8 +78,9 @@ public final class UpstreamCacheManagerTest {
      */
     @Test
     public void findUpstreamListTest() {
+        UpstreamCacheManager.getInstance().submit(selectorData);
         List<DivideUpstream> result = 
UpstreamCacheManager.getInstance().findUpstreamListBySelectorId("mock");
-        Assert.assertEquals(GsonUtils.getGson().toJson(loadBalances), 
GsonUtils.getGson().toJson(result));
+        Assert.assertEquals(loadBalances, result);
     }
 
     /**
@@ -68,9 +88,10 @@ public final class UpstreamCacheManagerTest {
      */
     @Test
     public void removeByKeyTest() {
+        UpstreamCacheManager.getInstance().submit(selectorData);
         UpstreamCacheManager.getInstance().removeByKey("mock");
         List<DivideUpstream> result = 
UpstreamCacheManager.getInstance().findUpstreamListBySelectorId("mock");
-        Assert.assertNull(result);
+        Assert.assertTrue(CollectionUtils.isEmpty(result));
     }
 
     /**
@@ -88,6 +109,6 @@ public final class UpstreamCacheManagerTest {
         
when(selectorData.getHandle()).thenReturn(GsonUtils.getGson().toJson(upstreams));
         UpstreamCacheManager.getInstance().submit(selectorData);
         List<DivideUpstream> result = 
UpstreamCacheManager.getInstance().findUpstreamListBySelectorId("submit");
-        Assert.assertEquals(GsonUtils.getGson().toJson(upstreams), 
GsonUtils.getGson().toJson(result));
+        Assert.assertEquals(upstreams.get(0), result.get(0));
     }
 }
diff --git 
a/shenyu-plugin/shenyu-plugin-divide/src/test/java/org/apache/shenyu/plugin/divide/handler/DividePluginDataHandlerTest.java
 
b/shenyu-plugin/shenyu-plugin-divide/src/test/java/org/apache/shenyu/plugin/divide/handler/DividePluginDataHandlerTest.java
index 7cf78a1..4be58b6 100644
--- 
a/shenyu-plugin/shenyu-plugin-divide/src/test/java/org/apache/shenyu/plugin/divide/handler/DividePluginDataHandlerTest.java
+++ 
b/shenyu-plugin/shenyu-plugin-divide/src/test/java/org/apache/shenyu/plugin/divide/handler/DividePluginDataHandlerTest.java
@@ -21,16 +21,22 @@ import org.apache.shenyu.common.dto.SelectorData;
 import org.apache.shenyu.common.dto.convert.DivideUpstream;
 import org.apache.shenyu.common.enums.PluginEnum;
 import org.apache.shenyu.common.utils.GsonUtils;
+import org.apache.shenyu.common.utils.UpstreamCheckUtils;
 import org.apache.shenyu.plugin.divide.cache.UpstreamCacheManager;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.MockedStatic;
 
 import java.util.List;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.when;
 
 /**
@@ -44,6 +50,8 @@ public final class DividePluginDataHandlerTest {
 
     private DividePluginDataHandler dividePluginDataHandler;
 
+    private MockedStatic<UpstreamCheckUtils> mockCheckUtils;
+
     @Before
     public void setUp() {
         this.dividePluginDataHandler = new DividePluginDataHandler();
@@ -55,6 +63,15 @@ public final class DividePluginDataHandlerTest {
         this.selectorData = mock(SelectorData.class);
         when(selectorData.getId()).thenReturn("handler");
         
when(selectorData.getHandle()).thenReturn(GsonUtils.getGson().toJson(divideUpstreamList));
+
+        // mock static
+        mockCheckUtils = mockStatic(UpstreamCheckUtils.class);
+        mockCheckUtils.when(() -> UpstreamCheckUtils.checkUrl(anyString(), 
anyInt())).thenReturn(true);
+    }
+
+    @After
+    public void tearDown() {
+        mockCheckUtils.close();
     }
 
     /**
@@ -64,7 +81,7 @@ public final class DividePluginDataHandlerTest {
     public void handlerSelectorTest() {
         dividePluginDataHandler.handlerSelector(selectorData);
         List<DivideUpstream> result = 
UpstreamCacheManager.getInstance().findUpstreamListBySelectorId("handler");
-        Assert.assertEquals(GsonUtils.getGson().toJson(divideUpstreamList), 
GsonUtils.getGson().toJson(result));
+        
Assert.assertEquals(GsonUtils.getInstance().fromList(selectorData.getHandle(), 
DivideUpstream.class).get(0), result.get(0));
     }
 
     /**
diff --git 
a/shenyu-plugin/shenyu-plugin-divide/src/test/java/org/apache/shenyu/plugin/divide/websocket/WebSocketPluginTest.java
 
b/shenyu-plugin/shenyu-plugin-divide/src/test/java/org/apache/shenyu/plugin/divide/websocket/WebSocketPluginTest.java
index 8d4b344..9c9abb4 100644
--- 
a/shenyu-plugin/shenyu-plugin-divide/src/test/java/org/apache/shenyu/plugin/divide/websocket/WebSocketPluginTest.java
+++ 
b/shenyu-plugin/shenyu-plugin-divide/src/test/java/org/apache/shenyu/plugin/divide/websocket/WebSocketPluginTest.java
@@ -26,12 +26,15 @@ import 
org.apache.shenyu.common.dto.convert.rule.impl.DivideRuleHandle;
 import org.apache.shenyu.common.enums.PluginEnum;
 import org.apache.shenyu.common.enums.RpcTypeEnum;
 import org.apache.shenyu.common.utils.GsonUtils;
+import org.apache.shenyu.common.utils.UpstreamCheckUtils;
 import org.apache.shenyu.plugin.api.ShenyuPluginChain;
 import org.apache.shenyu.plugin.api.context.ShenyuContext;
 import org.apache.shenyu.plugin.divide.cache.UpstreamCacheManager;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.MockedStatic;
 import org.springframework.mock.http.server.reactive.MockServerHttpRequest;
 import org.springframework.mock.web.server.MockServerWebExchange;
 import org.springframework.web.reactive.socket.client.WebSocketClient;
@@ -46,7 +49,10 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.when;
 import static org.springframework.http.HttpHeaders.CONNECTION;
 import static org.springframework.http.HttpHeaders.UPGRADE;
@@ -70,6 +76,8 @@ public class WebSocketPluginTest {
 
     private WebSocketService webSocketService;
 
+    private MockedStatic<UpstreamCheckUtils> mockCheckUtils;
+
     @Before
     public void setup() {
         this.ruleData = mock(RuleData.class);
@@ -89,6 +97,15 @@ public class WebSocketPluginTest {
         WebSocketClient webSocketClient = mock(WebSocketClient.class);
         this.webSocketService = mock(WebSocketService.class);
         this.webSocketPlugin = new WebSocketPlugin(webSocketClient, 
webSocketService);
+
+        // mock static
+        mockCheckUtils = mockStatic(UpstreamCheckUtils.class);
+        mockCheckUtils.when(() -> UpstreamCheckUtils.checkUrl(anyString(), 
anyInt())).thenReturn(true);
+    }
+
+    @After
+    public void tearDown() {
+        mockCheckUtils.close();
     }
 
     /**

Reply via email to