This is an automated email from the ASF dual-hosted git repository. liuhongyu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/shenyu.git
The following commit(s) were added to refs/heads/master by this push: new 909704f3e4 [type:bugfix]Add more retry strategies (#5969) 909704f3e4 is described below commit 909704f3e4bafdd5b48f81540f27633b9233cb10 Author: JerryDtj <31332025+jerry...@users.noreply.github.com> AuthorDate: Wed Apr 2 10:24:24 2025 +0800 [type:bugfix]Add more retry strategies (#5969) * 添加重试策略 * http请求重试策略重构 * http请求重试策略重构,添加Customer,retryBackOff,fixed三种策略 * http请求重试策略重构,添加Customer,retryBackOff,fixed三种策略 * http请求重试策略重构,添加Customer,retryBackOff,fixed三种策略 * 修复格式 * 中文转英文 * add license header * add license header * add license header * add license header * 修复代码格式化问题 * fix testFixedRetryStrategyExecute bug --------- Co-authored-by: aias00 <liuhon...@apache.org> --- .../apache/shenyu/common/constant/Constants.java | 6 +- .../common/enums/HttpRetryBackoffSpecEnum.java | 68 ++++++++++ .../httpclient/AbstractHttpClientPlugin.java | 119 +++++------------- .../plugin/httpclient/CustomRetryStrategy.java | 52 ++++++++ .../plugin/httpclient/DefaultRetryStrategy.java | 137 +++++++++++++++++++++ .../ExponentialRetryBackoffStrategy.java | 72 +++++++++++ .../plugin/httpclient/FixedRetryStrategy.java | 61 +++++++++ .../shenyu/plugin/httpclient/RetryStrategy.java | 42 +++++++ .../plugin/httpclient/RetryStrategyTest.java | 105 ++++++++++++++++ 9 files changed, 571 insertions(+), 91 deletions(-) diff --git a/shenyu-common/src/main/java/org/apache/shenyu/common/constant/Constants.java b/shenyu-common/src/main/java/org/apache/shenyu/common/constant/Constants.java index 0e8392d8c6..62bfd1c399 100644 --- a/shenyu-common/src/main/java/org/apache/shenyu/common/constant/Constants.java +++ b/shenyu-common/src/main/java/org/apache/shenyu/common/constant/Constants.java @@ -965,7 +965,11 @@ public interface Constants { * The constant Content-Encoding. */ String CONTENT_ENCODING = "Content-Encoding"; - + /** + * Http request retry policy. + */ + String HTTP_RETRY_BACK_OFF_SPEC = "default"; + /** * The constant Content. */ diff --git a/shenyu-common/src/main/java/org/apache/shenyu/common/enums/HttpRetryBackoffSpecEnum.java b/shenyu-common/src/main/java/org/apache/shenyu/common/enums/HttpRetryBackoffSpecEnum.java new file mode 100644 index 0000000000..f3583503e9 --- /dev/null +++ b/shenyu-common/src/main/java/org/apache/shenyu/common/enums/HttpRetryBackoffSpecEnum.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shenyu.common.enums; + +import java.util.Arrays; + +/** + * The http scheme enum. + */ +public enum HttpRetryBackoffSpecEnum { + + /** + * Default retry. + */ + DEFAULT_BACKOFF("default"), + + /** + * Fixed retry. + */ + FIXED_BACKOFF("fixed"), + + /** + * Exponential retry. + */ + EXPONENTIAL_BACKOFF("exponential"), + + /** + * Custom retry. + */ + CUSTOM_BACKOFF("custom"); + + private final String name; + + + HttpRetryBackoffSpecEnum(final String name) { + this.name = name; + } + + public String getName() { + return name; + } + + public static HttpRetryBackoffSpecEnum acquireByName(final String name) { + return Arrays.stream(HttpRetryBackoffSpecEnum.values()) + .filter(e -> e.getName().equals(name)).findFirst() + .orElse(HttpRetryBackoffSpecEnum.DEFAULT_BACKOFF); + } + + public static String getDefault() { + return DEFAULT_BACKOFF.getName(); + } + +} diff --git a/shenyu-plugin/shenyu-plugin-httpclient/src/main/java/org/apache/shenyu/plugin/httpclient/AbstractHttpClientPlugin.java b/shenyu-plugin/shenyu-plugin-httpclient/src/main/java/org/apache/shenyu/plugin/httpclient/AbstractHttpClientPlugin.java index a81ba42d92..845585407d 100644 --- a/shenyu-plugin/shenyu-plugin-httpclient/src/main/java/org/apache/shenyu/plugin/httpclient/AbstractHttpClientPlugin.java +++ b/shenyu-plugin/shenyu-plugin-httpclient/src/main/java/org/apache/shenyu/plugin/httpclient/AbstractHttpClientPlugin.java @@ -17,28 +17,31 @@ package org.apache.shenyu.plugin.httpclient; -import com.google.common.collect.Sets; -import io.netty.channel.ConnectTimeoutException; -import io.netty.handler.timeout.ReadTimeoutException; +import java.net.URI; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.TimeoutException; +import java.util.function.Function; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.shenyu.common.constant.Constants; import org.apache.shenyu.common.enums.HeaderUniqueStrategyEnum; +import org.apache.shenyu.common.enums.HttpRetryBackoffSpecEnum; import org.apache.shenyu.common.enums.RetryEnum; import org.apache.shenyu.common.enums.UniqueHeaderEnum; import org.apache.shenyu.common.exception.ShenyuException; import org.apache.shenyu.common.utils.LogUtils; -import org.apache.shenyu.loadbalancer.cache.UpstreamCacheManager; -import org.apache.shenyu.loadbalancer.entity.Upstream; -import org.apache.shenyu.loadbalancer.factory.LoadBalancerFactory; import org.apache.shenyu.plugin.api.ShenyuPlugin; import org.apache.shenyu.plugin.api.ShenyuPluginChain; import org.apache.shenyu.plugin.api.context.ShenyuContext; import org.apache.shenyu.plugin.api.result.ShenyuResultEnum; import org.apache.shenyu.plugin.api.result.ShenyuResultWrap; -import org.apache.shenyu.plugin.api.utils.RequestUrlUtils; import org.apache.shenyu.plugin.api.utils.WebFluxResultUtils; -import org.apache.shenyu.plugin.httpclient.exception.ShenyuTimeoutException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.core.io.buffer.DataBuffer; @@ -48,21 +51,7 @@ import org.springframework.web.server.ResponseStatusException; import org.springframework.web.server.ServerWebExchange; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.util.retry.Retry; -import reactor.util.retry.RetryBackoffSpec; -import java.net.URI; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Objects; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.TimeoutException; -import java.util.function.Function; -import java.util.stream.Collectors; /** * The type abstract http client plugin. @@ -88,80 +77,30 @@ public abstract class AbstractHttpClientPlugin<R> implements ShenyuPlugin { final Mono<R> response = doRequest(exchange, exchange.getRequest().getMethod().name(), uri, exchange.getRequest().getBody()) .timeout(duration, Mono.error(() -> new TimeoutException("Response took longer than timeout: " + duration))) .doOnError(e -> LOG.error(e.getMessage(), e)); - if (RetryEnum.CURRENT.getName().equals(retryStrategy)) { - //old version of DividePlugin and SpringCloudPlugin will run on this - RetryBackoffSpec retryBackoffSpec = Retry.backoff(retryTimes, Duration.ofMillis(20L)) - .maxBackoff(Duration.ofSeconds(20L)) - .transientErrors(true) - .jitter(0.5d) - .filter(t -> t instanceof TimeoutException || t instanceof ConnectTimeoutException - || t instanceof ReadTimeoutException || t instanceof IllegalStateException) - .onRetryExhaustedThrow((retryBackoffSpecErr, retrySignal) -> { - throw new ShenyuTimeoutException("Request timeout, the maximum number of retry times has been exceeded"); - }); - return response.retryWhen(retryBackoffSpec) - .onErrorMap(ShenyuTimeoutException.class, th -> new ResponseStatusException(HttpStatus.REQUEST_TIMEOUT, th.getMessage(), th)) - .onErrorMap(TimeoutException.class, th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th)) - .flatMap((Function<Object, Mono<? extends Void>>) o -> chain.execute(exchange)); + RetryStrategy<R> strategy; + //Is it better to go with the configuration file here? + String retryStrategyType = (String) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_RETRY_BACK_OFF_SPEC)).orElse(HttpRetryBackoffSpecEnum.getDefault()); + switch (retryStrategyType) { + case "exponential": + strategy = new ExponentialRetryBackoffStrategy<>(this); + break; + case "fixed": + strategy = new FixedRetryStrategy<>(this); + break; + case "custom": + strategy = new CustomRetryStrategy<>(this); + break; + default: + strategy = new DefaultRetryStrategy<>(this); } - final Set<URI> exclude = Sets.newHashSet(uri); - return resend(response, exchange, duration, exclude, retryTimes) + Mono<R> retriedResponse = strategy.execute(response, exchange, duration, retryTimes); + return retriedResponse .onErrorMap(ShenyuException.class, th -> new ResponseStatusException(HttpStatus.SERVICE_UNAVAILABLE, ShenyuResultEnum.CANNOT_FIND_HEALTHY_UPSTREAM_URL_AFTER_FAILOVER.getMsg(), th)) - .onErrorMap(TimeoutException.class, th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th)) + .onErrorMap(java.util.concurrent.TimeoutException.class, th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th)) .flatMap((Function<Object, Mono<? extends Void>>) o -> chain.execute(exchange)); } - private Mono<R> resend(final Mono<R> clientResponse, - final ServerWebExchange exchange, - final Duration duration, - final Set<URI> exclude, - final int retryTimes) { - Mono<R> result = clientResponse; - for (int i = 0; i < retryTimes; i++) { - result = resend(result, exchange, duration, exclude); - } - return result; - } - - private Mono<R> resend(final Mono<R> response, - final ServerWebExchange exchange, - final Duration duration, - final Set<URI> exclude) { - // does it necessary to add backoff interval time ? - return response.onErrorResume(th -> { - final String selectorId = exchange.getAttribute(Constants.DIVIDE_SELECTOR_ID); - final String loadBalance = exchange.getAttribute(Constants.LOAD_BALANCE); - //always query the latest available list - final List<Upstream> upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selectorId) - .stream().filter(data -> { - final String trimUri = data.getUrl().trim(); - for (URI needToExclude : exclude) { - // exclude already called - if ((needToExclude.getHost() + ":" + needToExclude.getPort()).equals(trimUri)) { - return false; - } - } - return true; - }).collect(Collectors.toList()); - if (CollectionUtils.isEmpty(upstreamList)) { - // no need to retry anymore - return Mono.error(new ShenyuException(ShenyuResultEnum.CANNOT_FIND_HEALTHY_UPSTREAM_URL_AFTER_FAILOVER.getMsg())); - } - final String ip = Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress(); - final Upstream upstream = LoadBalancerFactory.selector(upstreamList, loadBalance, ip); - if (Objects.isNull(upstream)) { - // no need to retry anymore - return Mono.error(new ShenyuException(ShenyuResultEnum.CANNOT_FIND_HEALTHY_UPSTREAM_URL_AFTER_FAILOVER.getMsg())); - } - final URI newUri = RequestUrlUtils.buildRequestUri(exchange, upstream.buildDomain()); - // in order not to affect the next retry call, newUri needs to be excluded - exclude.add(newUri); - return doRequest(exchange, exchange.getRequest().getMethod().name(), newUri, exchange.getRequest().getBody()) - .timeout(duration, Mono.error(() -> new TimeoutException("Response took longer than timeout: " + duration))) - .doOnError(e -> LOG.error(e.getMessage(), e)); - }); - } /** * Process the Web request. diff --git a/shenyu-plugin/shenyu-plugin-httpclient/src/main/java/org/apache/shenyu/plugin/httpclient/CustomRetryStrategy.java b/shenyu-plugin/shenyu-plugin-httpclient/src/main/java/org/apache/shenyu/plugin/httpclient/CustomRetryStrategy.java new file mode 100644 index 0000000000..ffa782bc28 --- /dev/null +++ b/shenyu-plugin/shenyu-plugin-httpclient/src/main/java/org/apache/shenyu/plugin/httpclient/CustomRetryStrategy.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shenyu.plugin.httpclient; + +import java.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.web.server.ServerWebExchange; +import reactor.core.publisher.Mono; + +/** + * Custom retry policy. + * Also please achieve your own + *@Date 2025/3/23 14:27 + */ +public class CustomRetryStrategy<R> implements RetryStrategy<R> { + private static final Logger LOG = LoggerFactory.getLogger(CustomRetryStrategy.class); + + private final AbstractHttpClientPlugin<R> httpClientPlugin; + + public CustomRetryStrategy(final AbstractHttpClientPlugin<R> httpClientPlugin) { + this.httpClientPlugin = httpClientPlugin; + } + + /** + * Custom retry policy. + * + * @param response he Mono object of the response + * @param exchange Current Server Exchange Object + * @param duration Timeout + * @param retryTimes Number of retries + * @return Response Mono object after retry processing + */ + public Mono<R> execute(final Mono<R> response, final ServerWebExchange exchange, final Duration duration, final int retryTimes) { + return null; + } +} diff --git a/shenyu-plugin/shenyu-plugin-httpclient/src/main/java/org/apache/shenyu/plugin/httpclient/DefaultRetryStrategy.java b/shenyu-plugin/shenyu-plugin-httpclient/src/main/java/org/apache/shenyu/plugin/httpclient/DefaultRetryStrategy.java new file mode 100644 index 0000000000..0e1bab5692 --- /dev/null +++ b/shenyu-plugin/shenyu-plugin-httpclient/src/main/java/org/apache/shenyu/plugin/httpclient/DefaultRetryStrategy.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shenyu.plugin.httpclient; + +import java.net.URI; +import java.time.Duration; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; +import org.apache.shenyu.common.constant.Constants; +import org.apache.shenyu.common.enums.RetryEnum; +import org.apache.shenyu.common.exception.ShenyuException; +import org.apache.shenyu.loadbalancer.cache.UpstreamCacheManager; +import org.apache.shenyu.loadbalancer.entity.Upstream; +import org.apache.shenyu.loadbalancer.factory.LoadBalancerFactory; +import org.apache.shenyu.plugin.api.utils.RequestUrlUtils; +import org.apache.shenyu.plugin.httpclient.exception.ShenyuTimeoutException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.HttpStatus; +import org.springframework.web.server.ResponseStatusException; +import org.springframework.web.server.ServerWebExchange; +import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; +import reactor.util.retry.RetryBackoffSpec; + + +/** + * Default Retry Policy Class + * Keep the original default request retry test without any changes. + * @param <R> Request Response Type + * @Date 2025/3/23 08:36 + */ +public class DefaultRetryStrategy<R> implements RetryStrategy<R> { + private static final Logger LOG = LoggerFactory.getLogger(DefaultRetryStrategy.class); + + private final AbstractHttpClientPlugin<R> httpClientPlugin; + + public DefaultRetryStrategy(final AbstractHttpClientPlugin<R> httpClientPlugin) { + this.httpClientPlugin = httpClientPlugin; + } + + @Override + public Mono<R> execute(final Mono<R> clientResponse, final ServerWebExchange exchange, final Duration duration, final int retryTimes) { + final String retryStrategy = (String) Optional.ofNullable(exchange.getAttribute(Constants.RETRY_STRATEGY)).orElseGet(() -> "current"); + if (RetryEnum.CURRENT.getName().equals(retryStrategy)) { + //old version of DividePlugin and SpringCloudPlugin will run on this + RetryBackoffSpec retryBackoffSpec = Retry.backoff(retryTimes, Duration.ofMillis(20L)) + .maxBackoff(Duration.ofSeconds(20L)) + .transientErrors(true) + .jitter(0.5d) + .filter(t -> t instanceof java.util.concurrent.TimeoutException || t instanceof io.netty.channel.ConnectTimeoutException + || t instanceof io.netty.handler.timeout.ReadTimeoutException || t instanceof IllegalStateException) + .onRetryExhaustedThrow((retryBackoffSpecErr, retrySignal) -> { + throw new ShenyuTimeoutException("Request timeout, the maximum number of retry times has been exceeded"); + }); + return clientResponse.retryWhen(retryBackoffSpec) + .onErrorMap(ShenyuTimeoutException.class, th -> new ResponseStatusException(HttpStatus.REQUEST_TIMEOUT, th.getMessage(), th)) + .onErrorMap(java.util.concurrent.TimeoutException.class, th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th)); + } + final Set<URI> exclude = new HashSet<>(Collections.singletonList(Objects.requireNonNull(exchange.getAttribute(Constants.HTTP_URI)))); + return resend(clientResponse, exchange, duration, exclude, retryTimes) + .onErrorMap(ShenyuException.class, th -> new ResponseStatusException(HttpStatus.SERVICE_UNAVAILABLE, + "CANNOT_FIND_HEALTHY_UPSTREAM_URL_AFTER_FAILOVER", th)) + .onErrorMap(java.util.concurrent.TimeoutException.class, th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th)); + } + + private Mono<R> resend(final Mono<R> clientResponse, + final ServerWebExchange exchange, + final Duration duration, + final Set<URI> exclude, + final int retryTimes) { + Mono<R> result = clientResponse; + for (int i = 0; i < retryTimes; i++) { + result = resend(result, exchange, duration, exclude); + } + return result; + } + + private Mono<R> resend(final Mono<R> response, + final ServerWebExchange exchange, + final Duration duration, + final Set<URI> exclude) { + // does it necessary to add backoff interval time ? + return response.onErrorResume(th -> { + final String selectorId = exchange.getAttribute(Constants.DIVIDE_SELECTOR_ID); + final String loadBalance = exchange.getAttribute(Constants.LOAD_BALANCE); + //always query the latest available list + final List<Upstream> upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selectorId) + .stream().filter(data -> { + final String trimUri = data.getUrl().trim(); + for (URI needToExclude : exclude) { + if ((needToExclude.getHost() + ":" + needToExclude.getPort()).equals(trimUri)) { + return false; + } + } + return true; + }).collect(Collectors.toList()); + if (upstreamList.isEmpty()) { + // no need to retry anymore + return Mono.error(new ShenyuException("CANNOT_FIND_HEALTHY_UPSTREAM_URL_AFTER_FAILOVER")); + } + final String ip = Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress(); + final Upstream upstream = LoadBalancerFactory.selector(upstreamList, loadBalance, ip); + if (Objects.isNull(upstream)) { + // no need to retry anymore + return Mono.error(new ShenyuException("CANNOT_FIND_HEALTHY_UPSTREAM_URL_AFTER_FAILOVER")); + } + final URI newUri = RequestUrlUtils.buildRequestUri(exchange, upstream.buildDomain()); + // in order not to affect the next retry call, newUri needs to be excluded + exclude.add(newUri); + return httpClientPlugin.doRequest(exchange, exchange.getRequest().getMethod().name(), newUri, exchange.getRequest().getBody()) + .timeout(duration, Mono.error(() -> new TimeoutException("Response took longer than timeout: " + duration))) + .doOnError(e -> LOG.error(e.getMessage(), e)); + }); + } +} diff --git a/shenyu-plugin/shenyu-plugin-httpclient/src/main/java/org/apache/shenyu/plugin/httpclient/ExponentialRetryBackoffStrategy.java b/shenyu-plugin/shenyu-plugin-httpclient/src/main/java/org/apache/shenyu/plugin/httpclient/ExponentialRetryBackoffStrategy.java new file mode 100644 index 0000000000..3199ea30e9 --- /dev/null +++ b/shenyu-plugin/shenyu-plugin-httpclient/src/main/java/org/apache/shenyu/plugin/httpclient/ExponentialRetryBackoffStrategy.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shenyu.plugin.httpclient; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.web.server.ServerWebExchange; +import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; +import reactor.util.retry.RetryBackoffSpec; + +import java.time.Duration; + +/** + * Exponential Retry Backoff Strategy. + * + * @Date 2025/3/23 14:20 + */ +public class ExponentialRetryBackoffStrategy<R> implements RetryStrategy<R> { + private static final Logger LOG = LoggerFactory.getLogger(ExponentialRetryBackoffStrategy.class); + + private final AbstractHttpClientPlugin<R> httpClientPlugin; + + public ExponentialRetryBackoffStrategy(final AbstractHttpClientPlugin<R> httpClientPlugin) { + this.httpClientPlugin = httpClientPlugin; + } + + /** + * Execute retry policy. + * + * @param response The Mono object of the response + * @param exchange Current Server Exchange Object + * @param duration Timeout + * @param retryTimes Number of retries + * @return Response Mono object after retry processing + */ + public Mono<R> execute(final Mono<R> response, final ServerWebExchange exchange, final Duration duration, final int retryTimes) { + RetryBackoffSpec retrySpec = initDefaultBackoff(retryTimes); + return response.retryWhen(retrySpec) + .timeout(duration, Mono.error(() -> new java.util.concurrent.TimeoutException("Response took longer than timeout: " + duration))) + .doOnError(e -> LOG.error(e.getMessage(), e)); + } + + private RetryBackoffSpec initDefaultBackoff(final int retryTimes) { + return Retry.backoff(retryTimes, Duration.ofMillis(500)) + .maxBackoff(Duration.ofSeconds(5)) + // 只对瞬时错误进行重试 + .transientErrors(true) + // 添加 50% 的随机抖动到每次重试的延迟时间 + .jitter(0.5d) + .filter(t -> t instanceof IllegalStateException) + // 当达到最大重试次数后抛出一个指定的异常 + .onRetryExhaustedThrow((retryBackoffSpecErr, retrySignal) -> { + throw new IllegalStateException("重试超限"); + }); + } +} diff --git a/shenyu-plugin/shenyu-plugin-httpclient/src/main/java/org/apache/shenyu/plugin/httpclient/FixedRetryStrategy.java b/shenyu-plugin/shenyu-plugin-httpclient/src/main/java/org/apache/shenyu/plugin/httpclient/FixedRetryStrategy.java new file mode 100644 index 0000000000..8ddf6aeb4b --- /dev/null +++ b/shenyu-plugin/shenyu-plugin-httpclient/src/main/java/org/apache/shenyu/plugin/httpclient/FixedRetryStrategy.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shenyu.plugin.httpclient; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.web.server.ServerWebExchange; +import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; + +import java.time.Duration; + +/** + * Fixed Retry Policy Class. + * + * @Date 2025/3/23 10:04 + */ +public class FixedRetryStrategy<R> implements RetryStrategy<R> { + private static final Logger LOG = LoggerFactory.getLogger(FixedRetryStrategy.class); + + private final AbstractHttpClientPlugin<R> httpClientPlugin; + + public FixedRetryStrategy(final AbstractHttpClientPlugin<R> httpClientPlugin) { + this.httpClientPlugin = httpClientPlugin; + } + + /** + * Execute retry policy. + * + * @param response The Mono object of the response + * @param exchange Current Server Exchange Object + * @param duration TIMEOUT + * @param retryTimes Number of retries + * @return Response Mono object after retry processing + */ + public Mono<R> execute(final Mono<R> response, final ServerWebExchange exchange, final Duration duration, final int retryTimes) { + Retry retrySpec = initFixedBackoff(retryTimes); + return response.retryWhen(retrySpec) + .timeout(duration, Mono.error(() -> new java.util.concurrent.TimeoutException("Response took longer than timeout: " + duration))) + .doOnError(e -> LOG.error(e.getMessage(), e)); + } + + private Retry initFixedBackoff(final int retryTimes) { + return Retry.fixedDelay(retryTimes, Duration.ofSeconds(2)); + } +} diff --git a/shenyu-plugin/shenyu-plugin-httpclient/src/main/java/org/apache/shenyu/plugin/httpclient/RetryStrategy.java b/shenyu-plugin/shenyu-plugin-httpclient/src/main/java/org/apache/shenyu/plugin/httpclient/RetryStrategy.java new file mode 100644 index 0000000000..ad5747d5af --- /dev/null +++ b/shenyu-plugin/shenyu-plugin-httpclient/src/main/java/org/apache/shenyu/plugin/httpclient/RetryStrategy.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shenyu.plugin.httpclient; + + +import org.springframework.web.server.ServerWebExchange; +import reactor.core.publisher.Mono; + +import java.time.Duration; + +/** + * Retry Policy Interface. + * @param <R> Request Response Type + *@Date 2025/3/23 08:27 + */ +public interface RetryStrategy<R> { + /** + * Execute retry policy. + * + * @param clientResponse Original Request Response + * @param exchange Server Exchange Object + * @param duration Timeout + * @param retryTimes Number of retries + * @return Number of retries + */ + Mono<R> execute(Mono<R> clientResponse, ServerWebExchange exchange, Duration duration, int retryTimes); +} diff --git a/shenyu-plugin/shenyu-plugin-httpclient/src/test/java/org/apache/shenyu/plugin/httpclient/RetryStrategyTest.java b/shenyu-plugin/shenyu-plugin-httpclient/src/test/java/org/apache/shenyu/plugin/httpclient/RetryStrategyTest.java new file mode 100644 index 0000000000..9429d2a55b --- /dev/null +++ b/shenyu-plugin/shenyu-plugin-httpclient/src/test/java/org/apache/shenyu/plugin/httpclient/RetryStrategyTest.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shenyu.plugin.httpclient; + +import java.util.concurrent.TimeoutException; +import org.junit.jupiter.api.Test; +import org.springframework.web.server.ServerWebExchange; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import java.time.Duration; + +import static org.mockito.Mockito.mock; + +/** + * retry strategy test. + * + * @Date 2025/3/16 22:46 + */ +public class RetryStrategyTest { + + @Test + void testDefaultRetryBackoffExecute() { + // Create a simulated AbstractHttpClientPlugin + AbstractHttpClientPlugin<String> httpClientPlugin = mock(AbstractHttpClientPlugin.class); + ExponentialRetryBackoffStrategy<String> strategy = new ExponentialRetryBackoffStrategy<>(httpClientPlugin); + + // Create a simulated ServerWebExchange + ServerWebExchange exchange = mock(ServerWebExchange.class); + Duration duration = Duration.ofSeconds(5); + int retryTimes = 3; + + // Create a mock response Mono that throws an exception + Mono<String> response = Mono.error(new RuntimeException("Test error")); + + // Execute retry policy + Mono<String> result = strategy.execute(response, exchange, duration, retryTimes); + + // Use StepVerifier to verify results + StepVerifier.create(result) + .expectError(RuntimeException.class) + .verify(); + } + + @Test + void testDefaultRetryStrategyExecute() { + //Create a simulated AbstractHttpClientPlugin + AbstractHttpClientPlugin<String> httpClientPlugin = mock(AbstractHttpClientPlugin.class); + DefaultRetryStrategy<String> strategy = new DefaultRetryStrategy<>(httpClientPlugin); + + // Create a simulated ServerWebExchange + ServerWebExchange exchange = mock(ServerWebExchange.class); + Duration duration = Duration.ofSeconds(5); + int retryTimes = 3; + + // Create a mock response Mono that throws an exception + Mono<String> response = Mono.error(new RuntimeException("Test error")); + + // Execute retry policy + Mono<String> result = strategy.execute(response, exchange, duration, retryTimes); + + // Use StepVerifier to verify results + StepVerifier.create(result) + .expectError(RuntimeException.class) + .verify(); + } + + @Test + void testFixedRetryStrategyExecute() { + // Create a simulated AbstractHttpClientPlugin + AbstractHttpClientPlugin<String> httpClientPlugin = mock(AbstractHttpClientPlugin.class); + FixedRetryStrategy<String> strategy = new FixedRetryStrategy<>(httpClientPlugin); + + // Create a simulated ServerWebExchange + ServerWebExchange exchange = mock(ServerWebExchange.class); + Duration duration = Duration.ofSeconds(5); + int retryTimes = 3; + + // Create a mock response Mono that throws an exception + Mono<String> response = Mono.error(new RuntimeException("Test error")); + + // Execute retry policy + Mono<String> result = strategy.execute(response, exchange, duration, retryTimes); + + // Use StepVerifier to verify results + StepVerifier.create(result) + .expectError(TimeoutException.class) + .verify(); + } +}