This is an automated email from the ASF dual-hosted git repository.

xiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-shenyu.git


The following commit(s) were added to refs/heads/master by this push:
     new 886fe08  [ISSUE #2963] NettyHttpClientPlugin retry when fail (#2964)
886fe08 is described below

commit 886fe08f302d31c0959b14eb4f0019f6df883d34
Author: dragon-zhang <[email protected]>
AuthorDate: Wed Mar 2 17:41:13 2022 +0800

    [ISSUE #2963] NettyHttpClientPlugin retry when fail (#2964)
---
 .../shenyu/plugin/httpclient/NettyHttpClientPlugin.java   | 15 +++++++++++----
 1 file changed, 11 insertions(+), 4 deletions(-)

diff --git 
a/shenyu-plugin/shenyu-plugin-httpclient/src/main/java/org/apache/shenyu/plugin/httpclient/NettyHttpClientPlugin.java
 
b/shenyu-plugin/shenyu-plugin-httpclient/src/main/java/org/apache/shenyu/plugin/httpclient/NettyHttpClientPlugin.java
index 81c223a..475fba9 100644
--- 
a/shenyu-plugin/shenyu-plugin-httpclient/src/main/java/org/apache/shenyu/plugin/httpclient/NettyHttpClientPlugin.java
+++ 
b/shenyu-plugin/shenyu-plugin-httpclient/src/main/java/org/apache/shenyu/plugin/httpclient/NettyHttpClientPlugin.java
@@ -17,8 +17,10 @@
 
 package org.apache.shenyu.plugin.httpclient;
 
+import io.netty.channel.ConnectTimeoutException;
 import io.netty.handler.codec.http.DefaultHttpHeaders;
 import io.netty.handler.codec.http.HttpMethod;
+import io.netty.handler.timeout.ReadTimeoutException;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.shenyu.common.constant.Constants;
 import org.apache.shenyu.common.enums.PluginEnum;
@@ -42,6 +44,8 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.netty.http.client.HttpClient;
 import reactor.netty.http.client.HttpClientResponse;
+import reactor.retry.Backoff;
+import reactor.retry.Retry;
 
 import java.net.URI;
 import java.time.Duration;
@@ -93,8 +97,8 @@ public class NettyHttpClientPlugin implements ShenyuPlugin {
         }
         LOG.info("you request, The resulting urlPath is: {}", 
uri.toASCIIString());
         Flux<HttpClientResponse> responseFlux = 
this.httpClient.headers(headers -> headers.add(httpHeaders))
-                .request(method).uri(uri.toASCIIString()).send((req, 
nettyOutbound) ->
-                        nettyOutbound.send(request.getBody().map(dataBuffer -> 
((NettyDataBuffer) dataBuffer) .getNativeBuffer())))
+                .request(method).uri(uri.toASCIIString())
+                .send((req, nettyOutbound) -> 
nettyOutbound.send(request.getBody().map(dataBuffer -> ((NettyDataBuffer) 
dataBuffer).getNativeBuffer())))
                 .responseConnection((res, connection) -> {
                     
exchange.getAttributes().put(Constants.CLIENT_RESPONSE_ATTR, res);
                     
exchange.getAttributes().put(Constants.CLIENT_RESPONSE_CONN_ATTR, connection);
@@ -118,10 +122,13 @@ public class NettyHttpClientPlugin implements 
ShenyuPlugin {
 
                     return Mono.just(res);
                 });
+        final int retryTimes = (int) 
Optional.ofNullable(exchange.getAttribute(Constants.HTTP_RETRY)).orElse(0);
         long timeout = (long) 
Optional.ofNullable(exchange.getAttribute(Constants.HTTP_TIME_OUT)).orElse(3000L);
         Duration duration = Duration.ofMillis(timeout);
-        responseFlux = responseFlux.timeout(duration,
-                Mono.error(new TimeoutException("Response took longer than 
timeout: " + duration)))
+        responseFlux = responseFlux.timeout(duration, Mono.error(new 
TimeoutException("Response took longer than timeout: " + duration)))
+                .retryWhen(Retry.anyOf(TimeoutException.class, 
ConnectTimeoutException.class, ReadTimeoutException.class, 
IllegalStateException.class)
+                        .retryMax(retryTimes)
+                        .backoff(Backoff.exponential(Duration.ofMillis(200), 
Duration.ofSeconds(20), 2, true)))
                 .onErrorMap(TimeoutException.class, th -> new 
ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th));
         return responseFlux.then(chain.execute(exchange));
     }

Reply via email to