This is an automated email from the ASF dual-hosted git repository.

zhangzicheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shenyu.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a69b0b204 [ISSUE #5311] less concurrency (#5587)
3a69b0b204 is described below

commit 3a69b0b204cef59da93571cee64068a9db6cf1ba
Author: loongs-zhang <zhangzich...@apache.org>
AuthorDate: Sat Jul 6 11:18:08 2024 +0800

    [ISSUE #5311] less concurrency (#5587)
    
    * [ISSUE #5311] less concurrency
    
    * fix grpc CI
    
    * try fix grpc e2e CI
    
    * rollback
    
    ---------
    
    Co-authored-by: moremind <hefen...@apache.org>
---
 .../admin/service/impl/UpstreamCheckService.java   | 65 +++++++++++++++++++---
 .../shenyu/admin/transfer/DiscoveryTransfer.java   | 14 +++++
 .../admin/service/UpstreamCheckServiceTest.java    | 22 +++++++-
 .../handler/GrpcDiscoveryUpstreamDataHandler.java  |  5 +-
 4 files changed, 94 insertions(+), 12 deletions(-)

diff --git 
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/impl/UpstreamCheckService.java
 
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/impl/UpstreamCheckService.java
index fdef9ef6e6..305f18fa02 100644
--- 
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/impl/UpstreamCheckService.java
+++ 
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/impl/UpstreamCheckService.java
@@ -33,13 +33,17 @@ import org.apache.shenyu.admin.model.entity.SelectorDO;
 import org.apache.shenyu.admin.model.event.selector.SelectorCreatedEvent;
 import org.apache.shenyu.admin.model.event.selector.SelectorUpdatedEvent;
 import org.apache.shenyu.admin.model.query.SelectorConditionQuery;
+import org.apache.shenyu.admin.service.DiscoveryUpstreamService;
 import org.apache.shenyu.admin.service.converter.SelectorHandleConverterFactor;
 import org.apache.shenyu.admin.transfer.ConditionTransfer;
+import org.apache.shenyu.admin.transfer.DiscoveryTransfer;
 import org.apache.shenyu.admin.utils.CommonUpstreamUtils;
 import org.apache.shenyu.admin.utils.SelectorUtil;
 import org.apache.shenyu.common.concurrent.ShenyuThreadFactory;
 import org.apache.shenyu.common.constant.Constants;
 import org.apache.shenyu.common.dto.ConditionData;
+import org.apache.shenyu.common.dto.DiscoverySyncData;
+import org.apache.shenyu.common.dto.DiscoveryUpstreamData;
 import org.apache.shenyu.common.dto.SelectorData;
 import org.apache.shenyu.common.dto.convert.selector.CommonUpstream;
 import org.apache.shenyu.common.dto.convert.selector.DivideUpstream;
@@ -59,6 +63,7 @@ import org.springframework.stereotype.Component;
 import javax.annotation.PreDestroy;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -111,6 +116,8 @@ public class UpstreamCheckService {
 
     private final SelectorHandleConverterFactor converterFactor;
 
+    private final DiscoveryUpstreamService discoveryUpstreamService;
+
     private ScheduledThreadPoolExecutor executor;
 
     private ScheduledFuture<?> scheduledFuture;
@@ -134,12 +141,14 @@ public class UpstreamCheckService {
                                 final PluginMapper pluginMapper,
                                 final SelectorConditionMapper 
selectorConditionMapper,
                                 final ShenyuRegisterCenterConfig 
shenyuRegisterCenterConfig,
-                                final SelectorHandleConverterFactor 
converterFactor) {
+                                final SelectorHandleConverterFactor 
converterFactor,
+                                final DiscoveryUpstreamService 
discoveryUpstreamService) {
         this.selectorMapper = selectorMapper;
         this.eventPublisher = eventPublisher;
         this.pluginMapper = pluginMapper;
         this.selectorConditionMapper = selectorConditionMapper;
         this.converterFactor = converterFactor;
+        this.discoveryUpstreamService = discoveryUpstreamService;
         Properties props = shenyuRegisterCenterConfig.getProps();
         this.checked = 
Boolean.parseBoolean(props.getProperty(Constants.IS_CHECKED, 
Constants.DEFAULT_CHECK_VALUE));
         this.scheduledThreads = 
Integer.parseInt(props.getProperty(Constants.ZOMBIE_CHECK_THREADS, 
Constants.ZOMBIE_CHECK_THREADS_VALUE));
@@ -199,6 +208,15 @@ public class UpstreamCheckService {
             return;
         }
 
+        Optional.ofNullable(submitJust(selectorId, commonUpstream))
+                .ifPresent(upstreams -> executor.execute(() -> 
updateHandler(selectorId, upstreams, upstreams)));
+    }
+
+    private List<CommonUpstream> submitJust(final String selectorId, final 
CommonUpstream commonUpstream) {
+        if (!REGISTER_TYPE_HTTP.equalsIgnoreCase(registerType) || !checked) {
+            return null;
+        }
+
         List<CommonUpstream> upstreams = 
MapUtils.computeIfAbsent(UPSTREAM_MAP, selectorId, k -> new 
CopyOnWriteArrayList<>());
         if (commonUpstream.isStatus()) {
             Optional<CommonUpstream> exists = upstreams.stream().filter(item 
-> StringUtils.isNotBlank(item.getUpstreamUrl())
@@ -213,7 +231,7 @@ public class UpstreamCheckService {
             upstreams.removeIf(item -> item.equals(commonUpstream));
             PENDING_SYNC.add(NumberUtils.INTEGER_ZERO);
         }
-        executor.execute(() -> updateHandler(selectorId, upstreams, 
upstreams));
+        return upstreams;
     }
 
     /**
@@ -297,7 +315,8 @@ public class UpstreamCheckService {
             commonUpstream.setStatus(true);
             LOG.info("UpstreamCacheManager check zombie upstream success the 
url: {}, host: {} ", commonUpstream.getUpstreamUrl(), 
commonUpstream.getUpstreamHost());
             List<CommonUpstream> old = 
ListUtils.unmodifiableList(UPSTREAM_MAP.getOrDefault(selectorId, 
Collections.emptyList()));
-            this.submit(selectorId, commonUpstream);
+            // fix https://github.com/apache/shenyu/issues/5311
+            this.submitJust(selectorId, commonUpstream);
             updateHandler(selectorId, old, UPSTREAM_MAP.get(selectorId));
         } else {
             LOG.error("check zombie upstream the url={} is fail", 
commonUpstream.getUpstreamUrl());
@@ -369,18 +388,38 @@ public class UpstreamCheckService {
         }
 
         PluginDO pluginDO = pluginMapper.selectById(selectorDO.getPluginId());
-        String handler = 
converterFactor.newInstance(pluginDO.getName()).handler(selectorDO.getHandle(), 
aliveList);
+        if (Objects.isNull(pluginDO)) {
+            return;
+        }
+        String pluginName = pluginDO.getName();
+        String handler = 
converterFactor.newInstance(pluginName).handler(selectorDO.getHandle(), 
aliveList);
         selectorDO.setHandle(handler);
         selectorMapper.updateSelective(selectorDO);
 
         List<ConditionData> conditionDataList = 
ConditionTransfer.INSTANCE.mapToSelectorDOS(
                 selectorConditionMapper.selectByQuery(new 
SelectorConditionQuery(selectorDO.getId())));
-        SelectorData selectorData = SelectorDO.transFrom(selectorDO, 
pluginDO.getName(), conditionDataList);
+        SelectorData selectorData = SelectorDO.transFrom(selectorDO, 
pluginName, conditionDataList);
         selectorData.setHandle(handler);
 
         // publish change event.
         eventPublisher.publishEvent(new 
DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE, 
Collections.singletonList(selectorData)));
 
+        // publish discovery change event.
+        List<DiscoveryUpstreamData> discoveryUpstreamDataList = 
discoveryUpstreamService.findBySelectorId(selectorId);
+        discoveryUpstreamDataList.removeIf(u -> {
+            for (CommonUpstream alive : aliveList) {
+                if (alive.getUpstreamUrl().equals(u.getUrl())) {
+                    return false;
+                }
+            }
+            return true;
+        });
+        DiscoverySyncData discoverySyncData = new DiscoverySyncData();
+        discoverySyncData.setUpstreamDataList(discoveryUpstreamDataList);
+        discoverySyncData.setPluginName(pluginName);
+        discoverySyncData.setSelectorId(selectorId);
+        discoverySyncData.setSelectorName(selectorDO.getName());
+        eventPublisher.publishEvent(new 
DataChangedEvent(ConfigGroupEnum.DISCOVER_UPSTREAM, DataEventTypeEnum.UPDATE, 
Collections.singletonList(discoverySyncData)));
     }
 
     /**
@@ -396,12 +435,19 @@ public class UpstreamCheckService {
         final List<SelectorDO> selectorDOList = 
selectorMapper.findByPluginIds(new ArrayList<>(pluginMap.keySet()));
         long currentTimeMillis = System.currentTimeMillis();
         Optional.ofNullable(selectorDOList).orElseGet(ArrayList::new).stream()
-                .filter(selectorDO -> Objects.nonNull(selectorDO) && 
StringUtils.isNotEmpty(selectorDO.getHandle()))
+                .filter(Objects::nonNull)
                 .forEach(selectorDO -> {
                     String name = pluginMap.get(selectorDO.getPluginId());
-                    List<CommonUpstream> commonUpstreams = 
converterFactor.newInstance(name).convertUpstream(selectorDO.getHandle())
-                            .stream().filter(upstream -> upstream.isStatus() 
|| upstream.getTimestamp() > currentTimeMillis - 
TimeUnit.SECONDS.toMillis(zombieRemovalTimes))
-                            .collect(Collectors.toList());
+                    List<CommonUpstream> commonUpstreams = new LinkedList<>();
+                    
discoveryUpstreamService.findBySelectorId(selectorDO.getId()).stream()
+                            
.map(DiscoveryTransfer.INSTANCE::mapToCommonUpstream)
+                            .forEach(commonUpstreams::add);
+                    String handle = selectorDO.getHandle();
+                    if (StringUtils.isNotEmpty(handle)) {
+                        
commonUpstreams.addAll(converterFactor.newInstance(name).convertUpstream(handle)
+                                .stream().filter(upstream -> 
upstream.isStatus() || upstream.getTimestamp() > currentTimeMillis - 
TimeUnit.SECONDS.toMillis(zombieRemovalTimes))
+                                .collect(Collectors.toList()));
+                    }
                     if (CollectionUtils.isNotEmpty(commonUpstreams)) {
                         UPSTREAM_MAP.put(selectorDO.getId(), commonUpstreams);
                         PENDING_SYNC.add(NumberUtils.INTEGER_ZERO);
@@ -439,6 +485,7 @@ public class UpstreamCheckService {
 
     /**
      * get the zombie removal time value.
+     *
      * @return zombie removal time value
      */
     public static int getZombieRemovalTimes() {
diff --git 
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/transfer/DiscoveryTransfer.java
 
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/transfer/DiscoveryTransfer.java
index 4a40cb0df3..bd60b69518 100644
--- 
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/transfer/DiscoveryTransfer.java
+++ 
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/transfer/DiscoveryTransfer.java
@@ -33,6 +33,7 @@ import org.apache.shenyu.admin.model.vo.DiscoveryUpstreamVO;
 import org.apache.shenyu.admin.model.vo.DiscoveryVO;
 import org.apache.shenyu.common.dto.DiscoveryUpstreamData;
 import org.apache.shenyu.common.dto.ProxySelectorData;
+import org.apache.shenyu.common.dto.convert.selector.CommonUpstream;
 import org.apache.shenyu.common.utils.GsonUtils;
 
 import java.util.Optional;
@@ -65,6 +66,19 @@ public enum DiscoveryTransfer {
             .dateUpdated(data.getDateUpdated())
             .dateCreated(data.getDateCreated()).build()).orElse(null);
     }
+    
+    /**
+     * mapToCommonUpstream.
+     *
+     * @param discoveryUpstreamData discoveryUpstreamData
+     * @return CommonUpstream
+     */
+    public CommonUpstream mapToCommonUpstream(DiscoveryUpstreamData 
discoveryUpstreamData) {
+        return Optional.ofNullable(discoveryUpstreamData).map(data -> {
+            String url = data.getUrl();
+            return new CommonUpstream(data.getProtocol(), url.split(":")[0], 
url, false, data.getDateCreated().getTime());
+        }).orElse(null);
+    }
 
     /**
      * mapToVo.
diff --git 
a/shenyu-admin/src/test/java/org/apache/shenyu/admin/service/UpstreamCheckServiceTest.java
 
b/shenyu-admin/src/test/java/org/apache/shenyu/admin/service/UpstreamCheckServiceTest.java
index fcd8a64df1..01bb6f04af 100644
--- 
a/shenyu-admin/src/test/java/org/apache/shenyu/admin/service/UpstreamCheckServiceTest.java
+++ 
b/shenyu-admin/src/test/java/org/apache/shenyu/admin/service/UpstreamCheckServiceTest.java
@@ -29,6 +29,7 @@ import 
org.apache.shenyu.admin.service.converter.SelectorHandleConverterFactor;
 import org.apache.shenyu.admin.service.impl.UpstreamCheckService;
 import org.apache.shenyu.common.concurrent.ShenyuThreadFactory;
 import org.apache.shenyu.common.constant.Constants;
+import org.apache.shenyu.common.dto.DiscoveryUpstreamData;
 import org.apache.shenyu.common.dto.convert.selector.DivideUpstream;
 import org.apache.shenyu.common.dto.convert.selector.ZombieUpstream;
 import org.apache.shenyu.common.enums.PluginEnum;
@@ -50,6 +51,7 @@ import org.slf4j.LoggerFactory;
 import org.springframework.context.ApplicationEventPublisher;
 import org.springframework.test.util.ReflectionTestUtils;
 
+import java.sql.Timestamp;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -103,6 +105,9 @@ public final class UpstreamCheckServiceTest {
     private SelectorConditionMapper selectorConditionMapper;
 
     private SelectorHandleConverterFactor converterFactor;
+    
+    @Mock
+    private DiscoveryUpstreamService discoveryUpstreamService;
 
     private final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig = new 
ShenyuRegisterCenterConfig();
 
@@ -133,7 +138,8 @@ public final class UpstreamCheckServiceTest {
         Map<String, SelectorHandleConverter> maps = new HashMap<>();
         maps.put(PluginEnum.DIVIDE.getName(), new 
DivideSelectorHandleConverter());
         converterFactor = new SelectorHandleConverterFactor(maps);
-        upstreamCheckService = new UpstreamCheckService(selectorMapper, 
eventPublisher, pluginMapper, selectorConditionMapper, 
shenyuRegisterCenterConfig, converterFactor);
+        upstreamCheckService = new UpstreamCheckService(selectorMapper, 
eventPublisher, pluginMapper, selectorConditionMapper,
+                shenyuRegisterCenterConfig, converterFactor, 
discoveryUpstreamService);
     }
 
     @Test
@@ -242,11 +248,22 @@ public final class UpstreamCheckServiceTest {
                 .name(MOCK_SELECTOR_NAME_OTHER)
                 
.handle("[{\"upstreamHost\":\"localhost\",\"protocol\":\"http://\",\"localhost\":\"divide-upstream-60\",\"weight\":60}]";)
                 .build();
+        DiscoveryUpstreamData discoveryUpstreamData = 
DiscoveryUpstreamData.builder()
+                .dateCreated(new Timestamp(System.currentTimeMillis()))
+                .protocol("http")
+                .url("127.0.0.1:8080")
+                .props("{}")
+                .discoveryHandlerId("1")
+                .status(0)
+                .build();
         
when(pluginMapper.selectByNames(anyList())).thenReturn(Lists.newArrayList(pluginDO));
         
when(selectorMapper.findByPluginIds(anyList())).thenReturn(Lists.newArrayList(selectorDOWithUrlError,
 selectorDOWithUrlReachable));
+        
when(discoveryUpstreamService.findBySelectorId(anyString())).thenReturn(Lists.newArrayList(discoveryUpstreamData));
         upstreamCheckService.fetchUpstreamData();
         assertTrue(upstreamMap.containsKey(MOCK_SELECTOR_NAME));
+        assertEquals(2, upstreamMap.get(MOCK_SELECTOR_NAME).size());
         assertTrue(upstreamMap.containsKey(MOCK_SELECTOR_NAME_OTHER));
+        assertEquals(2, upstreamMap.get(MOCK_SELECTOR_NAME_OTHER).size());
     }
 
     @Test
@@ -254,7 +271,8 @@ public final class UpstreamCheckServiceTest {
         Properties properties = new Properties();
         properties.setProperty(Constants.IS_CHECKED, "true");
         shenyuRegisterCenterConfig.setProps(properties);
-        upstreamCheckService = new UpstreamCheckService(selectorMapper, 
eventPublisher, pluginMapper, selectorConditionMapper, 
shenyuRegisterCenterConfig, converterFactor);
+        upstreamCheckService = new UpstreamCheckService(selectorMapper, 
eventPublisher, pluginMapper, selectorConditionMapper,
+                shenyuRegisterCenterConfig, converterFactor, 
discoveryUpstreamService);
         upstreamCheckService.close();
     }
 
diff --git 
a/shenyu-plugin/shenyu-plugin-proxy/shenyu-plugin-rpc/shenyu-plugin-grpc/src/main/java/org/apache/shenyu/plugin/grpc/handler/GrpcDiscoveryUpstreamDataHandler.java
 
b/shenyu-plugin/shenyu-plugin-proxy/shenyu-plugin-rpc/shenyu-plugin-grpc/src/main/java/org/apache/shenyu/plugin/grpc/handler/GrpcDiscoveryUpstreamDataHandler.java
index 109fa86239..f9e7645d1a 100644
--- 
a/shenyu-plugin/shenyu-plugin-proxy/shenyu-plugin-rpc/shenyu-plugin-grpc/src/main/java/org/apache/shenyu/plugin/grpc/handler/GrpcDiscoveryUpstreamDataHandler.java
+++ 
b/shenyu-plugin/shenyu-plugin-proxy/shenyu-plugin-rpc/shenyu-plugin-grpc/src/main/java/org/apache/shenyu/plugin/grpc/handler/GrpcDiscoveryUpstreamDataHandler.java
@@ -23,6 +23,7 @@ import 
org.apache.shenyu.common.dto.convert.selector.GrpcUpstream;
 import org.apache.shenyu.common.enums.PluginEnum;
 import org.apache.shenyu.plugin.base.handler.DiscoveryUpstreamDataHandler;
 import org.apache.shenyu.plugin.grpc.cache.ApplicationConfigCache;
+import org.apache.shenyu.plugin.grpc.cache.GrpcClientCache;
 import org.springframework.util.ObjectUtils;
 
 import java.sql.Timestamp;
@@ -42,7 +43,9 @@ public class GrpcDiscoveryUpstreamDataHandler implements 
DiscoveryUpstreamDataHa
         if (Objects.isNull(discoverySyncData) || 
Objects.isNull(discoverySyncData.getSelectorId())) {
             return;
         }
-        
ApplicationConfigCache.getInstance().handlerUpstream(discoverySyncData.getSelectorId(),
 convertUpstreamList(discoverySyncData.getUpstreamDataList()));
+        final String selectorId = discoverySyncData.getSelectorId();
+        ApplicationConfigCache.getInstance().handlerUpstream(selectorId, 
convertUpstreamList(discoverySyncData.getUpstreamDataList()));
+        GrpcClientCache.initGrpcClient(selectorId);
     }
 
     private List<GrpcUpstream> convertUpstreamList(final 
List<DiscoveryUpstreamData> upstreamList) {

Reply via email to