This is an automated email from the ASF dual-hosted git repository. zhangzicheng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/shenyu.git
The following commit(s) were added to refs/heads/master by this push: new 3a69b0b204 [ISSUE #5311] less concurrency (#5587) 3a69b0b204 is described below commit 3a69b0b204cef59da93571cee64068a9db6cf1ba Author: loongs-zhang <zhangzich...@apache.org> AuthorDate: Sat Jul 6 11:18:08 2024 +0800 [ISSUE #5311] less concurrency (#5587) * [ISSUE #5311] less concurrency * fix grpc CI * try fix grpc e2e CI * rollback --------- Co-authored-by: moremind <hefen...@apache.org> --- .../admin/service/impl/UpstreamCheckService.java | 65 +++++++++++++++++++--- .../shenyu/admin/transfer/DiscoveryTransfer.java | 14 +++++ .../admin/service/UpstreamCheckServiceTest.java | 22 +++++++- .../handler/GrpcDiscoveryUpstreamDataHandler.java | 5 +- 4 files changed, 94 insertions(+), 12 deletions(-) diff --git a/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/impl/UpstreamCheckService.java b/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/impl/UpstreamCheckService.java index fdef9ef6e6..305f18fa02 100644 --- a/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/impl/UpstreamCheckService.java +++ b/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/impl/UpstreamCheckService.java @@ -33,13 +33,17 @@ import org.apache.shenyu.admin.model.entity.SelectorDO; import org.apache.shenyu.admin.model.event.selector.SelectorCreatedEvent; import org.apache.shenyu.admin.model.event.selector.SelectorUpdatedEvent; import org.apache.shenyu.admin.model.query.SelectorConditionQuery; +import org.apache.shenyu.admin.service.DiscoveryUpstreamService; import org.apache.shenyu.admin.service.converter.SelectorHandleConverterFactor; import org.apache.shenyu.admin.transfer.ConditionTransfer; +import org.apache.shenyu.admin.transfer.DiscoveryTransfer; import org.apache.shenyu.admin.utils.CommonUpstreamUtils; import org.apache.shenyu.admin.utils.SelectorUtil; import org.apache.shenyu.common.concurrent.ShenyuThreadFactory; import org.apache.shenyu.common.constant.Constants; import org.apache.shenyu.common.dto.ConditionData; +import org.apache.shenyu.common.dto.DiscoverySyncData; +import org.apache.shenyu.common.dto.DiscoveryUpstreamData; import org.apache.shenyu.common.dto.SelectorData; import org.apache.shenyu.common.dto.convert.selector.CommonUpstream; import org.apache.shenyu.common.dto.convert.selector.DivideUpstream; @@ -59,6 +63,7 @@ import org.springframework.stereotype.Component; import javax.annotation.PreDestroy; import java.util.ArrayList; import java.util.Collections; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Objects; @@ -111,6 +116,8 @@ public class UpstreamCheckService { private final SelectorHandleConverterFactor converterFactor; + private final DiscoveryUpstreamService discoveryUpstreamService; + private ScheduledThreadPoolExecutor executor; private ScheduledFuture<?> scheduledFuture; @@ -134,12 +141,14 @@ public class UpstreamCheckService { final PluginMapper pluginMapper, final SelectorConditionMapper selectorConditionMapper, final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig, - final SelectorHandleConverterFactor converterFactor) { + final SelectorHandleConverterFactor converterFactor, + final DiscoveryUpstreamService discoveryUpstreamService) { this.selectorMapper = selectorMapper; this.eventPublisher = eventPublisher; this.pluginMapper = pluginMapper; this.selectorConditionMapper = selectorConditionMapper; this.converterFactor = converterFactor; + this.discoveryUpstreamService = discoveryUpstreamService; Properties props = shenyuRegisterCenterConfig.getProps(); this.checked = Boolean.parseBoolean(props.getProperty(Constants.IS_CHECKED, Constants.DEFAULT_CHECK_VALUE)); this.scheduledThreads = Integer.parseInt(props.getProperty(Constants.ZOMBIE_CHECK_THREADS, Constants.ZOMBIE_CHECK_THREADS_VALUE)); @@ -199,6 +208,15 @@ public class UpstreamCheckService { return; } + Optional.ofNullable(submitJust(selectorId, commonUpstream)) + .ifPresent(upstreams -> executor.execute(() -> updateHandler(selectorId, upstreams, upstreams))); + } + + private List<CommonUpstream> submitJust(final String selectorId, final CommonUpstream commonUpstream) { + if (!REGISTER_TYPE_HTTP.equalsIgnoreCase(registerType) || !checked) { + return null; + } + List<CommonUpstream> upstreams = MapUtils.computeIfAbsent(UPSTREAM_MAP, selectorId, k -> new CopyOnWriteArrayList<>()); if (commonUpstream.isStatus()) { Optional<CommonUpstream> exists = upstreams.stream().filter(item -> StringUtils.isNotBlank(item.getUpstreamUrl()) @@ -213,7 +231,7 @@ public class UpstreamCheckService { upstreams.removeIf(item -> item.equals(commonUpstream)); PENDING_SYNC.add(NumberUtils.INTEGER_ZERO); } - executor.execute(() -> updateHandler(selectorId, upstreams, upstreams)); + return upstreams; } /** @@ -297,7 +315,8 @@ public class UpstreamCheckService { commonUpstream.setStatus(true); LOG.info("UpstreamCacheManager check zombie upstream success the url: {}, host: {} ", commonUpstream.getUpstreamUrl(), commonUpstream.getUpstreamHost()); List<CommonUpstream> old = ListUtils.unmodifiableList(UPSTREAM_MAP.getOrDefault(selectorId, Collections.emptyList())); - this.submit(selectorId, commonUpstream); + // fix https://github.com/apache/shenyu/issues/5311 + this.submitJust(selectorId, commonUpstream); updateHandler(selectorId, old, UPSTREAM_MAP.get(selectorId)); } else { LOG.error("check zombie upstream the url={} is fail", commonUpstream.getUpstreamUrl()); @@ -369,18 +388,38 @@ public class UpstreamCheckService { } PluginDO pluginDO = pluginMapper.selectById(selectorDO.getPluginId()); - String handler = converterFactor.newInstance(pluginDO.getName()).handler(selectorDO.getHandle(), aliveList); + if (Objects.isNull(pluginDO)) { + return; + } + String pluginName = pluginDO.getName(); + String handler = converterFactor.newInstance(pluginName).handler(selectorDO.getHandle(), aliveList); selectorDO.setHandle(handler); selectorMapper.updateSelective(selectorDO); List<ConditionData> conditionDataList = ConditionTransfer.INSTANCE.mapToSelectorDOS( selectorConditionMapper.selectByQuery(new SelectorConditionQuery(selectorDO.getId()))); - SelectorData selectorData = SelectorDO.transFrom(selectorDO, pluginDO.getName(), conditionDataList); + SelectorData selectorData = SelectorDO.transFrom(selectorDO, pluginName, conditionDataList); selectorData.setHandle(handler); // publish change event. eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE, Collections.singletonList(selectorData))); + // publish discovery change event. + List<DiscoveryUpstreamData> discoveryUpstreamDataList = discoveryUpstreamService.findBySelectorId(selectorId); + discoveryUpstreamDataList.removeIf(u -> { + for (CommonUpstream alive : aliveList) { + if (alive.getUpstreamUrl().equals(u.getUrl())) { + return false; + } + } + return true; + }); + DiscoverySyncData discoverySyncData = new DiscoverySyncData(); + discoverySyncData.setUpstreamDataList(discoveryUpstreamDataList); + discoverySyncData.setPluginName(pluginName); + discoverySyncData.setSelectorId(selectorId); + discoverySyncData.setSelectorName(selectorDO.getName()); + eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.DISCOVER_UPSTREAM, DataEventTypeEnum.UPDATE, Collections.singletonList(discoverySyncData))); } /** @@ -396,12 +435,19 @@ public class UpstreamCheckService { final List<SelectorDO> selectorDOList = selectorMapper.findByPluginIds(new ArrayList<>(pluginMap.keySet())); long currentTimeMillis = System.currentTimeMillis(); Optional.ofNullable(selectorDOList).orElseGet(ArrayList::new).stream() - .filter(selectorDO -> Objects.nonNull(selectorDO) && StringUtils.isNotEmpty(selectorDO.getHandle())) + .filter(Objects::nonNull) .forEach(selectorDO -> { String name = pluginMap.get(selectorDO.getPluginId()); - List<CommonUpstream> commonUpstreams = converterFactor.newInstance(name).convertUpstream(selectorDO.getHandle()) - .stream().filter(upstream -> upstream.isStatus() || upstream.getTimestamp() > currentTimeMillis - TimeUnit.SECONDS.toMillis(zombieRemovalTimes)) - .collect(Collectors.toList()); + List<CommonUpstream> commonUpstreams = new LinkedList<>(); + discoveryUpstreamService.findBySelectorId(selectorDO.getId()).stream() + .map(DiscoveryTransfer.INSTANCE::mapToCommonUpstream) + .forEach(commonUpstreams::add); + String handle = selectorDO.getHandle(); + if (StringUtils.isNotEmpty(handle)) { + commonUpstreams.addAll(converterFactor.newInstance(name).convertUpstream(handle) + .stream().filter(upstream -> upstream.isStatus() || upstream.getTimestamp() > currentTimeMillis - TimeUnit.SECONDS.toMillis(zombieRemovalTimes)) + .collect(Collectors.toList())); + } if (CollectionUtils.isNotEmpty(commonUpstreams)) { UPSTREAM_MAP.put(selectorDO.getId(), commonUpstreams); PENDING_SYNC.add(NumberUtils.INTEGER_ZERO); @@ -439,6 +485,7 @@ public class UpstreamCheckService { /** * get the zombie removal time value. + * * @return zombie removal time value */ public static int getZombieRemovalTimes() { diff --git a/shenyu-admin/src/main/java/org/apache/shenyu/admin/transfer/DiscoveryTransfer.java b/shenyu-admin/src/main/java/org/apache/shenyu/admin/transfer/DiscoveryTransfer.java index 4a40cb0df3..bd60b69518 100644 --- a/shenyu-admin/src/main/java/org/apache/shenyu/admin/transfer/DiscoveryTransfer.java +++ b/shenyu-admin/src/main/java/org/apache/shenyu/admin/transfer/DiscoveryTransfer.java @@ -33,6 +33,7 @@ import org.apache.shenyu.admin.model.vo.DiscoveryUpstreamVO; import org.apache.shenyu.admin.model.vo.DiscoveryVO; import org.apache.shenyu.common.dto.DiscoveryUpstreamData; import org.apache.shenyu.common.dto.ProxySelectorData; +import org.apache.shenyu.common.dto.convert.selector.CommonUpstream; import org.apache.shenyu.common.utils.GsonUtils; import java.util.Optional; @@ -65,6 +66,19 @@ public enum DiscoveryTransfer { .dateUpdated(data.getDateUpdated()) .dateCreated(data.getDateCreated()).build()).orElse(null); } + + /** + * mapToCommonUpstream. + * + * @param discoveryUpstreamData discoveryUpstreamData + * @return CommonUpstream + */ + public CommonUpstream mapToCommonUpstream(DiscoveryUpstreamData discoveryUpstreamData) { + return Optional.ofNullable(discoveryUpstreamData).map(data -> { + String url = data.getUrl(); + return new CommonUpstream(data.getProtocol(), url.split(":")[0], url, false, data.getDateCreated().getTime()); + }).orElse(null); + } /** * mapToVo. diff --git a/shenyu-admin/src/test/java/org/apache/shenyu/admin/service/UpstreamCheckServiceTest.java b/shenyu-admin/src/test/java/org/apache/shenyu/admin/service/UpstreamCheckServiceTest.java index fcd8a64df1..01bb6f04af 100644 --- a/shenyu-admin/src/test/java/org/apache/shenyu/admin/service/UpstreamCheckServiceTest.java +++ b/shenyu-admin/src/test/java/org/apache/shenyu/admin/service/UpstreamCheckServiceTest.java @@ -29,6 +29,7 @@ import org.apache.shenyu.admin.service.converter.SelectorHandleConverterFactor; import org.apache.shenyu.admin.service.impl.UpstreamCheckService; import org.apache.shenyu.common.concurrent.ShenyuThreadFactory; import org.apache.shenyu.common.constant.Constants; +import org.apache.shenyu.common.dto.DiscoveryUpstreamData; import org.apache.shenyu.common.dto.convert.selector.DivideUpstream; import org.apache.shenyu.common.dto.convert.selector.ZombieUpstream; import org.apache.shenyu.common.enums.PluginEnum; @@ -50,6 +51,7 @@ import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationEventPublisher; import org.springframework.test.util.ReflectionTestUtils; +import java.sql.Timestamp; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -103,6 +105,9 @@ public final class UpstreamCheckServiceTest { private SelectorConditionMapper selectorConditionMapper; private SelectorHandleConverterFactor converterFactor; + + @Mock + private DiscoveryUpstreamService discoveryUpstreamService; private final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig = new ShenyuRegisterCenterConfig(); @@ -133,7 +138,8 @@ public final class UpstreamCheckServiceTest { Map<String, SelectorHandleConverter> maps = new HashMap<>(); maps.put(PluginEnum.DIVIDE.getName(), new DivideSelectorHandleConverter()); converterFactor = new SelectorHandleConverterFactor(maps); - upstreamCheckService = new UpstreamCheckService(selectorMapper, eventPublisher, pluginMapper, selectorConditionMapper, shenyuRegisterCenterConfig, converterFactor); + upstreamCheckService = new UpstreamCheckService(selectorMapper, eventPublisher, pluginMapper, selectorConditionMapper, + shenyuRegisterCenterConfig, converterFactor, discoveryUpstreamService); } @Test @@ -242,11 +248,22 @@ public final class UpstreamCheckServiceTest { .name(MOCK_SELECTOR_NAME_OTHER) .handle("[{\"upstreamHost\":\"localhost\",\"protocol\":\"http://\",\"localhost\":\"divide-upstream-60\",\"weight\":60}]") .build(); + DiscoveryUpstreamData discoveryUpstreamData = DiscoveryUpstreamData.builder() + .dateCreated(new Timestamp(System.currentTimeMillis())) + .protocol("http") + .url("127.0.0.1:8080") + .props("{}") + .discoveryHandlerId("1") + .status(0) + .build(); when(pluginMapper.selectByNames(anyList())).thenReturn(Lists.newArrayList(pluginDO)); when(selectorMapper.findByPluginIds(anyList())).thenReturn(Lists.newArrayList(selectorDOWithUrlError, selectorDOWithUrlReachable)); + when(discoveryUpstreamService.findBySelectorId(anyString())).thenReturn(Lists.newArrayList(discoveryUpstreamData)); upstreamCheckService.fetchUpstreamData(); assertTrue(upstreamMap.containsKey(MOCK_SELECTOR_NAME)); + assertEquals(2, upstreamMap.get(MOCK_SELECTOR_NAME).size()); assertTrue(upstreamMap.containsKey(MOCK_SELECTOR_NAME_OTHER)); + assertEquals(2, upstreamMap.get(MOCK_SELECTOR_NAME_OTHER).size()); } @Test @@ -254,7 +271,8 @@ public final class UpstreamCheckServiceTest { Properties properties = new Properties(); properties.setProperty(Constants.IS_CHECKED, "true"); shenyuRegisterCenterConfig.setProps(properties); - upstreamCheckService = new UpstreamCheckService(selectorMapper, eventPublisher, pluginMapper, selectorConditionMapper, shenyuRegisterCenterConfig, converterFactor); + upstreamCheckService = new UpstreamCheckService(selectorMapper, eventPublisher, pluginMapper, selectorConditionMapper, + shenyuRegisterCenterConfig, converterFactor, discoveryUpstreamService); upstreamCheckService.close(); } diff --git a/shenyu-plugin/shenyu-plugin-proxy/shenyu-plugin-rpc/shenyu-plugin-grpc/src/main/java/org/apache/shenyu/plugin/grpc/handler/GrpcDiscoveryUpstreamDataHandler.java b/shenyu-plugin/shenyu-plugin-proxy/shenyu-plugin-rpc/shenyu-plugin-grpc/src/main/java/org/apache/shenyu/plugin/grpc/handler/GrpcDiscoveryUpstreamDataHandler.java index 109fa86239..f9e7645d1a 100644 --- a/shenyu-plugin/shenyu-plugin-proxy/shenyu-plugin-rpc/shenyu-plugin-grpc/src/main/java/org/apache/shenyu/plugin/grpc/handler/GrpcDiscoveryUpstreamDataHandler.java +++ b/shenyu-plugin/shenyu-plugin-proxy/shenyu-plugin-rpc/shenyu-plugin-grpc/src/main/java/org/apache/shenyu/plugin/grpc/handler/GrpcDiscoveryUpstreamDataHandler.java @@ -23,6 +23,7 @@ import org.apache.shenyu.common.dto.convert.selector.GrpcUpstream; import org.apache.shenyu.common.enums.PluginEnum; import org.apache.shenyu.plugin.base.handler.DiscoveryUpstreamDataHandler; import org.apache.shenyu.plugin.grpc.cache.ApplicationConfigCache; +import org.apache.shenyu.plugin.grpc.cache.GrpcClientCache; import org.springframework.util.ObjectUtils; import java.sql.Timestamp; @@ -42,7 +43,9 @@ public class GrpcDiscoveryUpstreamDataHandler implements DiscoveryUpstreamDataHa if (Objects.isNull(discoverySyncData) || Objects.isNull(discoverySyncData.getSelectorId())) { return; } - ApplicationConfigCache.getInstance().handlerUpstream(discoverySyncData.getSelectorId(), convertUpstreamList(discoverySyncData.getUpstreamDataList())); + final String selectorId = discoverySyncData.getSelectorId(); + ApplicationConfigCache.getInstance().handlerUpstream(selectorId, convertUpstreamList(discoverySyncData.getUpstreamDataList())); + GrpcClientCache.initGrpcClient(selectorId); } private List<GrpcUpstream> convertUpstreamList(final List<DiscoveryUpstreamData> upstreamList) {