This is an automated email from the ASF dual-hosted git repository.

zhangzicheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shenyu.git


The following commit(s) were added to refs/heads/master by this push:
     new 997e935fd [ISSUE #4356] optimize UpstreamCheckService (#4366)
997e935fd is described below

commit 997e935fd8283d4d4feb5de7712d616b1e50a37f
Author: iwangjie <[email protected]>
AuthorDate: Fri Feb 10 23:30:36 2023 +0800

    [ISSUE #4356] optimize UpstreamCheckService (#4366)
    
    * fix issue_4536 review commit
    
    * checkstyle pass
    
    * shenyu-4356 [Task] optimize UpstreamCheckService
    
    * Reserve the judgment and remove parallelStream.
    
    * implementation clear and simple
    
    * delete UpstreamWithSelectorId
    
    ---------
    
    Co-authored-by: dragon-zhang <[email protected]>
---
 .../admin/service/impl/UpstreamCheckService.java   | 92 ++++++++++++++++------
 shenyu-admin/src/main/resources/application.yml    |  1 +
 .../src/main/resources/application.yml             |  1 +
 .../apache/shenyu/common/config/ShenyuConfig.java  | 22 +++++-
 .../apache/shenyu/common/constant/Constants.java   | 10 +++
 .../dto/convert/selector/ZombieUpstream.java       |  2 -
 .../shenyu/common/config/ShenyuConfigTest.java     |  3 +-
 .../loadbalancer/cache/UpstreamCacheManager.java   |  4 +
 .../loadbalancer/cache/UpstreamCheckTask.java      | 24 +++++-
 9 files changed, 129 insertions(+), 30 deletions(-)

diff --git 
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/impl/UpstreamCheckService.java
 
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/impl/UpstreamCheckService.java
index c25f630d0..4cb1a81c2 100644
--- 
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/impl/UpstreamCheckService.java
+++ 
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/impl/UpstreamCheckService.java
@@ -65,9 +65,11 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
@@ -97,6 +99,8 @@ public class UpstreamCheckService {
 
     private final boolean checked;
 
+    private final Integer scheduledThreads;
+
     private final SelectorMapper selectorMapper;
 
     private final ApplicationEventPublisher eventPublisher;
@@ -111,6 +115,10 @@ public class UpstreamCheckService {
 
     private ScheduledFuture<?> scheduledFuture;
 
+    private ScheduledThreadPoolExecutor invokeExecutor;
+
+    private final List<CompletableFuture<Void>> futures = Lists.newArrayList();
+
     /**
      * Instantiates a new Upstream check service.
      *
@@ -134,6 +142,7 @@ public class UpstreamCheckService {
         this.converterFactor = converterFactor;
         Properties props = shenyuRegisterCenterConfig.getProps();
         this.checked = 
Boolean.parseBoolean(props.getProperty(Constants.IS_CHECKED, 
Constants.DEFAULT_CHECK_VALUE));
+        this.scheduledThreads = 
Integer.parseInt(props.getProperty(Constants.ZOMBIE_CHECK_THREADS, 
Constants.ZOMBIE_CHECK_THREADS_VALUE));
         this.zombieCheckTimes = 
Integer.parseInt(props.getProperty(Constants.ZOMBIE_CHECK_TIMES, 
Constants.ZOMBIE_CHECK_TIMES_VALUE));
         this.scheduledTime = 
Integer.parseInt(props.getProperty(Constants.SCHEDULED_TIME, 
Constants.SCHEDULED_TIME_VALUE));
         this.registerType = shenyuRegisterCenterConfig.getRegisterType();
@@ -151,6 +160,9 @@ public class UpstreamCheckService {
             this.fetchUpstreamData();
             executor = new ScheduledThreadPoolExecutor(1, 
ShenyuThreadFactory.create("scheduled-upstream-task", false));
             scheduledFuture = executor.scheduleWithFixedDelay(this::scheduled, 
10, scheduledTime, TimeUnit.SECONDS);
+
+            ThreadFactory requestFactory = 
ShenyuThreadFactory.create("upstream-health-check-request", true);
+            invokeExecutor = new 
ScheduledThreadPoolExecutor(this.scheduledThreads, requestFactory);
         }
     }
 
@@ -203,7 +215,7 @@ public class UpstreamCheckService {
         executor.execute(() -> updateHandler(selectorId, upstreams, 
upstreams));
         return true;
     }
-    
+
     /**
      * If the health check passes, the service will be added to
      * the normal service list; if the health check fails, the service
@@ -213,7 +225,7 @@ public class UpstreamCheckService {
      * that do not register with the gateway by listening to
      * {@link org.springframework.context.event.ContextRefreshedEvent},
      * which will cause some problems,
-     * check https://github.com/apache/shenyu/issues/3484 for more details.
+     * check <a href="https://github.com/apache/shenyu/issues/3484";>...</a> 
for more details.
      *
      * @param selectorId     the selector id
      * @param commonUpstream the common upstream
@@ -244,18 +256,37 @@ public class UpstreamCheckService {
 
     private void scheduled() {
         try {
-            if (!ZOMBIE_SET.isEmpty()) {
-                ZOMBIE_SET.parallelStream().forEach(this::checkZombie);
-            }
-            if (!UPSTREAM_MAP.isEmpty()) {
-                UPSTREAM_MAP.forEach(this::check);
-            }
+            doCheck();
+            waitFinish();
         } catch (Exception e) {
             LOG.error("upstream scheduled check error -------- ", e);
         }
     }
 
+    private void doCheck() {
+        // check zombie
+        if (!ZOMBIE_SET.isEmpty()) {
+            ZOMBIE_SET.forEach(this::checkZombie);
+        }
+        // check up
+        if (!UPSTREAM_MAP.isEmpty()) {
+            UPSTREAM_MAP.forEach(this::check);
+        }
+    }
+
+    private void waitFinish() {
+        // wait all check success
+        CompletableFuture.allOf(futures.toArray(new 
CompletableFuture[0])).join();
+        // clear, for next time
+        futures.clear();
+    }
+
     private void checkZombie(final ZombieUpstream zombieUpstream) {
+        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> 
checkZombie0(zombieUpstream), invokeExecutor);
+        futures.add(future);
+    }
+
+    private void checkZombie0(final ZombieUpstream zombieUpstream) {
         ZOMBIE_SET.remove(zombieUpstream);
         String selectorId = zombieUpstream.getSelectorId();
         CommonUpstream commonUpstream = zombieUpstream.getCommonUpstream();
@@ -277,23 +308,36 @@ public class UpstreamCheckService {
     }
 
     private void check(final String selectorId, final List<CommonUpstream> 
upstreamList) {
-        List<CommonUpstream> successList = 
Lists.newArrayListWithCapacity(upstreamList.size());
+        final List<CompletableFuture<CommonUpstream>> checkFutures = new 
ArrayList<>(upstreamList.size());
         for (CommonUpstream commonUpstream : upstreamList) {
-            final boolean pass = 
UpstreamCheckUtils.checkUrl(commonUpstream.getUpstreamUrl());
-            if (pass) {
-                if (!commonUpstream.isStatus()) {
-                    commonUpstream.setTimestamp(System.currentTimeMillis());
-                    commonUpstream.setStatus(true);
-                    LOG.info("UpstreamCacheManager check success the url: {}, 
host: {} ", commonUpstream.getUpstreamUrl(), commonUpstream.getUpstreamHost());
+            checkFutures.add(CompletableFuture.supplyAsync(() -> {
+                final boolean pass = 
UpstreamCheckUtils.checkUrl(commonUpstream.getUpstreamUrl());
+                if (pass) {
+                    if (!commonUpstream.isStatus()) {
+                        
commonUpstream.setTimestamp(System.currentTimeMillis());
+                        commonUpstream.setStatus(true);
+                        LOG.info("UpstreamCacheManager check success the url: 
{}, host: {} ", commonUpstream.getUpstreamUrl(), 
commonUpstream.getUpstreamHost());
+                    }
+                    return commonUpstream;
+                } else {
+                    commonUpstream.setStatus(false);
+                    ZOMBIE_SET.add(ZombieUpstream.transform(commonUpstream, 
zombieCheckTimes, selectorId));
+                    LOG.error("check the url={} is fail ", 
commonUpstream.getUpstreamUrl());
                 }
-                successList.add(commonUpstream);
-            } else {
-                commonUpstream.setStatus(false);
-                ZOMBIE_SET.add(ZombieUpstream.transform(commonUpstream, 
zombieCheckTimes, selectorId));
-                LOG.error("check the url={} is fail ", 
commonUpstream.getUpstreamUrl());
-            }
+                return null;
+            }).exceptionally(ex -> {
+                LOG.error("An exception occurred during the check of url {}: 
{}", commonUpstream.getUpstreamUrl(), ex);
+                return null;
+            }));
         }
-        updateHandler(selectorId, upstreamList, successList);
+
+        this.futures.add(CompletableFuture.runAsync(() -> {
+            List<CommonUpstream> successList = checkFutures.stream()
+                    .map(CompletableFuture::join)
+                    .filter(Objects::nonNull)
+                    .collect(Collectors.toList());
+            updateHandler(selectorId, upstreamList, successList);
+        }));
     }
 
     private void updateHandler(final String selectorId, final 
List<CommonUpstream> upstreamList, final List<CommonUpstream> successList) {
@@ -362,7 +406,7 @@ public class UpstreamCheckService {
                     }
                 });
     }
-    
+
     /**
      * listen {@link SelectorCreatedEvent} add data permission.
      *
@@ -376,7 +420,7 @@ public class UpstreamCheckService {
             replace(event.getSelector().getId(), 
CommonUpstreamUtils.convertCommonUpstreamList(existDivideUpstreams));
         }
     }
-    
+
     /**
      * listen {@link SelectorCreatedEvent} add data permission.
      *
diff --git a/shenyu-admin/src/main/resources/application.yml 
b/shenyu-admin/src/main/resources/application.yml
index b04145134..d2388e005 100755
--- a/shenyu-admin/src/main/resources/application.yml
+++ b/shenyu-admin/src/main/resources/application.yml
@@ -42,6 +42,7 @@ shenyu:
       sessionTimeout: 5000
       connectionTimeout: 2000
       checked: true
+      zombieCheckThreads: 10
       zombieCheckTimes: 5
       scheduledTime: 10
       nacosNameSpace: ShenyuRegisterCenter
diff --git a/shenyu-bootstrap/src/main/resources/application.yml 
b/shenyu-bootstrap/src/main/resources/application.yml
index 030c40151..f90973d6f 100644
--- a/shenyu-bootstrap/src/main/resources/application.yml
+++ b/shenyu-bootstrap/src/main/resources/application.yml
@@ -238,6 +238,7 @@ shenyu:
     threads: 16
   upstreamCheck:
     enabled: false
+    poolSize: 10
     timeout: 3000
     healthyThreshold: 1
     unhealthyThreshold: 1
diff --git 
a/shenyu-common/src/main/java/org/apache/shenyu/common/config/ShenyuConfig.java 
b/shenyu-common/src/main/java/org/apache/shenyu/common/config/ShenyuConfig.java
index f0d3631da..3f1bd8b68 100644
--- 
a/shenyu-common/src/main/java/org/apache/shenyu/common/config/ShenyuConfig.java
+++ 
b/shenyu-common/src/main/java/org/apache/shenyu/common/config/ShenyuConfig.java
@@ -794,6 +794,8 @@ public class ShenyuConfig {
     public static class UpstreamCheck {
     
         private boolean enabled;
+
+        private Integer poolSize = 10;
         
         private Integer timeout = 3000;
         
@@ -824,7 +826,25 @@ public class ShenyuConfig {
         public void setEnabled(final boolean enabled) {
             this.enabled = enabled;
         }
-    
+
+        /**
+         * get checkThreadPoolSize.
+         *
+         * @return checkThreadPoolSize
+         */
+        public Integer getPoolSize() {
+            return poolSize;
+        }
+
+        /**
+         * set checkThreadPoolSize.
+         *
+         * @param poolSize checkThreadPoolSize
+         */
+        public void setPoolSize(final Integer poolSize) {
+            this.poolSize = poolSize;
+        }
+
         /**
          * Gets timeout.
          *
diff --git 
a/shenyu-common/src/main/java/org/apache/shenyu/common/constant/Constants.java 
b/shenyu-common/src/main/java/org/apache/shenyu/common/constant/Constants.java
index 94c487ff9..68c933077 100644
--- 
a/shenyu-common/src/main/java/org/apache/shenyu/common/constant/Constants.java
+++ 
b/shenyu-common/src/main/java/org/apache/shenyu/common/constant/Constants.java
@@ -437,6 +437,16 @@ public interface Constants {
      */
     String DEFAULT_CHECK_VALUE = "false";
 
+    /**
+     * zombie check threads.
+     */
+    String ZOMBIE_CHECK_THREADS = "zombieCheckThreads";
+
+    /**
+     * default zombie check threads value.
+     */
+    String ZOMBIE_CHECK_THREADS_VALUE = "10";
+
     /**
      * zombie check times.
      */
diff --git 
a/shenyu-common/src/main/java/org/apache/shenyu/common/dto/convert/selector/ZombieUpstream.java
 
b/shenyu-common/src/main/java/org/apache/shenyu/common/dto/convert/selector/ZombieUpstream.java
index dfba6bdfd..4ef84c518 100644
--- 
a/shenyu-common/src/main/java/org/apache/shenyu/common/dto/convert/selector/ZombieUpstream.java
+++ 
b/shenyu-common/src/main/java/org/apache/shenyu/common/dto/convert/selector/ZombieUpstream.java
@@ -130,7 +130,6 @@ public class ZombieUpstream {
         }
         ZombieUpstream that = (ZombieUpstream) o;
         return new EqualsBuilder()
-                .append(zombieCheckTimes, that.zombieCheckTimes)
                 .append(commonUpstream, that.commonUpstream)
                 .append(selectorId, that.selectorId)
                 .isEquals();
@@ -140,7 +139,6 @@ public class ZombieUpstream {
     public int hashCode() {
         return new HashCodeBuilder(17, 37)
                 .append(commonUpstream)
-                .append(zombieCheckTimes)
                 .append(selectorId)
                 .toHashCode();
     }
diff --git 
a/shenyu-common/src/test/java/org/apache/shenyu/common/config/ShenyuConfigTest.java
 
b/shenyu-common/src/test/java/org/apache/shenyu/common/config/ShenyuConfigTest.java
index cce15adc4..9ff90c729 100644
--- 
a/shenyu-common/src/test/java/org/apache/shenyu/common/config/ShenyuConfigTest.java
+++ 
b/shenyu-common/src/test/java/org/apache/shenyu/common/config/ShenyuConfigTest.java
@@ -80,6 +80,7 @@ public class ShenyuConfigTest {
     public void testUpstreamCheck() {
         ShenyuConfig.UpstreamCheck upstreamCheck = config.getUpstreamCheck();
         upstreamCheck.setEnabled(false);
+        upstreamCheck.setPoolSize(10);
         upstreamCheck.setHealthyThreshold(4);
         upstreamCheck.setTimeout(10);
         upstreamCheck.setInterval(5);
@@ -87,7 +88,7 @@ public class ShenyuConfigTest {
         upstreamCheck.setPrintEnabled(false);
         upstreamCheck.setPrintInterval(5);
 
-        notEmptyElements(upstreamCheck.getEnabled(), 
upstreamCheck.getHealthyThreshold(), upstreamCheck.getTimeout(),
+        notEmptyElements(upstreamCheck.getEnabled(), 
upstreamCheck.getPoolSize(), upstreamCheck.getHealthyThreshold(), 
upstreamCheck.getTimeout(),
                 upstreamCheck.getInterval(), 
upstreamCheck.getUnhealthyThreshold(), upstreamCheck.getPrintInterval(), 
upstreamCheck.getPrintEnabled());
     }
 
diff --git 
a/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/cache/UpstreamCacheManager.java
 
b/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/cache/UpstreamCacheManager.java
index 899ae24db..630b9f155 100644
--- 
a/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/cache/UpstreamCacheManager.java
+++ 
b/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/cache/UpstreamCacheManager.java
@@ -51,6 +51,8 @@ public final class UpstreamCacheManager {
      */
     private Boolean checkEnable;
 
+    private int poolSize;
+
     private int checkTimeout;
 
     private int checkInterval;
@@ -74,6 +76,7 @@ public final class UpstreamCacheManager {
         ShenyuConfig shenyuConfig = 
Optional.ofNullable(Singleton.INST.get(ShenyuConfig.class)).orElse(new 
ShenyuConfig());
         UpstreamCheck upstreamCheck = shenyuConfig.getUpstreamCheck();
         checkEnable = upstreamCheck.getEnabled();
+        poolSize = upstreamCheck.getPoolSize();
         checkTimeout = upstreamCheck.getTimeout();
         healthyThreshold = upstreamCheck.getHealthyThreshold();
         unhealthyThreshold = upstreamCheck.getUnhealthyThreshold();
@@ -86,6 +89,7 @@ public final class UpstreamCacheManager {
 
     private void createTask() {
         task = new UpstreamCheckTask(checkInterval);
+        task.setPoolSize(poolSize);
         task.setCheckTimeout(checkTimeout);
         task.setHealthyThreshold(healthyThreshold);
         task.setUnhealthyThreshold(unhealthyThreshold);
diff --git 
a/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/cache/UpstreamCheckTask.java
 
b/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/cache/UpstreamCheckTask.java
index 46b9ddc76..e604a22ef 100644
--- 
a/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/cache/UpstreamCheckTask.java
+++ 
b/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/cache/UpstreamCheckTask.java
@@ -64,6 +64,8 @@ public final class UpstreamCheckTask implements Runnable {
 
     private ExecutorService executor;
 
+    private int poolSize;
+
     private int checkTimeout = 3000;
 
     private int healthyThreshold = 1;
@@ -99,7 +101,7 @@ public final class UpstreamCheckTask implements Runnable {
 
         // executor for async request, avoid request block health check thread
         ThreadFactory requestFactory = 
ShenyuThreadFactory.create("upstream-health-check-request", true);
-        executor = new ScheduledThreadPoolExecutor(10, requestFactory);
+        executor = new ScheduledThreadPoolExecutor(poolSize, requestFactory);
     }
     
     /**
@@ -110,7 +112,25 @@ public final class UpstreamCheckTask implements Runnable {
     public void setCheckTimeout(final int checkTimeout) {
         this.checkTimeout = checkTimeout;
     }
-    
+
+    /**
+     * get checkThreadPoolSize.
+     *
+     * @return checkThreadPoolSize
+     */
+    public int getPoolSize() {
+        return poolSize;
+    }
+
+    /**
+     * set checkThreadPoolSize.
+     *
+     * @param poolSize checkThreadPoolSize
+     */
+    public void setPoolSize(final int poolSize) {
+        this.poolSize = poolSize;
+    }
+
     /**
      * Set healthy threshold.
      *

Reply via email to