dragon-zhang commented on a change in pull request #2939:
URL: https://github.com/apache/incubator-shenyu/pull/2939#discussion_r815868253
##########
File path:
shenyu-plugin/shenyu-plugin-httpclient/src/main/java/org/apache/shenyu/plugin/httpclient/WebClientPlugin.java
##########
@@ -100,12 +121,86 @@ public boolean skip(final ServerWebExchange exchange) {
return skipExceptHttpLike(exchange);
}
- private Mono<Void> handleRequestBody(final WebClient.RequestBodySpec
requestBodySpec,
- final ServerWebExchange exchange,
- final long timeout,
- final int retryTimes,
- final ShenyuPluginChain chain) {
- return requestBodySpec.headers(httpHeaders -> {
+ private Mono<ClientResponse> resend(final Mono<ClientResponse>
clientResponse,
+ final HttpMethod method,
+ final Set<URI> exclude,
+ final ServerWebExchange exchange,
+ final long timeout,
+ final ShenyuContext shenyuContext,
+ final int retryTimes) {
+ Mono<ClientResponse> result = clientResponse;
+ for (int i = 0; i < retryTimes; i++) {
+ result = resend(result, method, exclude, exchange, timeout,
shenyuContext);
+ }
+ return result;
+ }
+
+ private Mono<ClientResponse> resend(final Mono<ClientResponse>
clientResponse,
+ final HttpMethod method,
+ final Set<URI> exclude,
+ final ServerWebExchange exchange,
+ final long timeout,
+ final ShenyuContext shenyuContext) {
+ //todo How to add backoff ?
+ return clientResponse.onErrorResume(e -> {
+ final String selectorId =
exchange.getAttribute(Constants.DIVIDE_SELECTOR_ID);
+ final List<Upstream> upstreamList =
UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selectorId)
+ .stream().filter(data -> {
+ final String trimUri = data.getUrl().trim();
+ for (URI uri : exclude) {
+ // exclude already called
+ if ((uri.getHost() + ":" +
uri.getPort()).equals(trimUri)) {
+ return true;
+ }
+ }
+ return false;
+ }).collect(Collectors.toList());
+ if (CollectionUtils.isEmpty(upstreamList)) {
+ LOG.error("upstream configuration error: {}",
GsonUtils.getInstance().toJson(upstreamList));
+ return Mono.error(new
ShenyuException(ShenyuResultEnum.CANNOT_FIND_HEALTHY_UPSTREAM_URL.getMsg()));
+ }
+ final String loadBalance = (String)
Optional.ofNullable(exchange.getAttribute(Constants.LOAD_BALANCE)).orElseGet(LoadBalanceEnum.RANDOM::getName);
+ String ip =
Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress();
+ Upstream upstream = LoadBalancerFactory.selector(upstreamList,
loadBalance, ip);
+ if (Objects.isNull(upstream)) {
+ LOG.error("has no upstream");
+ return Mono.error(new
ShenyuException(ShenyuResultEnum.CANNOT_FIND_HEALTHY_UPSTREAM_URL.getMsg()));
+ }
+ final URI newUri = buildUri(upstream, exchange, shenyuContext);
+ // in order not to affect the next retry call, this uri needs to
be excluded
+ exclude.add(newUri);
+ return handleRequest(method, newUri, exchange, timeout);
+ });
+ }
+
+ private URI buildUri(final Upstream upstream,
Review comment:
> can refactor common method, because in URI plugin?
Yes, some of in `URIPlugin`, I'll try for it.
##########
File path:
shenyu-plugin/shenyu-plugin-httpclient/src/main/java/org/apache/shenyu/plugin/httpclient/WebClientPlugin.java
##########
@@ -100,12 +121,86 @@ public boolean skip(final ServerWebExchange exchange) {
return skipExceptHttpLike(exchange);
}
- private Mono<Void> handleRequestBody(final WebClient.RequestBodySpec
requestBodySpec,
- final ServerWebExchange exchange,
- final long timeout,
- final int retryTimes,
- final ShenyuPluginChain chain) {
- return requestBodySpec.headers(httpHeaders -> {
+ private Mono<ClientResponse> resend(final Mono<ClientResponse>
clientResponse,
+ final HttpMethod method,
+ final Set<URI> exclude,
+ final ServerWebExchange exchange,
+ final long timeout,
+ final ShenyuContext shenyuContext,
+ final int retryTimes) {
+ Mono<ClientResponse> result = clientResponse;
+ for (int i = 0; i < retryTimes; i++) {
+ result = resend(result, method, exclude, exchange, timeout,
shenyuContext);
+ }
+ return result;
+ }
+
+ private Mono<ClientResponse> resend(final Mono<ClientResponse>
clientResponse,
+ final HttpMethod method,
+ final Set<URI> exclude,
+ final ServerWebExchange exchange,
+ final long timeout,
+ final ShenyuContext shenyuContext) {
+ //todo How to add backoff ?
+ return clientResponse.onErrorResume(e -> {
+ final String selectorId =
exchange.getAttribute(Constants.DIVIDE_SELECTOR_ID);
+ final List<Upstream> upstreamList =
UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selectorId)
+ .stream().filter(data -> {
+ final String trimUri = data.getUrl().trim();
+ for (URI uri : exclude) {
+ // exclude already called
+ if ((uri.getHost() + ":" +
uri.getPort()).equals(trimUri)) {
+ return true;
+ }
+ }
+ return false;
+ }).collect(Collectors.toList());
+ if (CollectionUtils.isEmpty(upstreamList)) {
+ LOG.error("upstream configuration error: {}",
GsonUtils.getInstance().toJson(upstreamList));
+ return Mono.error(new
ShenyuException(ShenyuResultEnum.CANNOT_FIND_HEALTHY_UPSTREAM_URL.getMsg()));
+ }
+ final String loadBalance = (String)
Optional.ofNullable(exchange.getAttribute(Constants.LOAD_BALANCE)).orElseGet(LoadBalanceEnum.RANDOM::getName);
+ String ip =
Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress();
+ Upstream upstream = LoadBalancerFactory.selector(upstreamList,
loadBalance, ip);
+ if (Objects.isNull(upstream)) {
+ LOG.error("has no upstream");
+ return Mono.error(new
ShenyuException(ShenyuResultEnum.CANNOT_FIND_HEALTHY_UPSTREAM_URL.getMsg()));
+ }
+ final URI newUri = buildUri(upstream, exchange, shenyuContext);
+ // in order not to affect the next retry call, this uri needs to
be excluded
+ exclude.add(newUri);
+ return handleRequest(method, newUri, exchange, timeout);
+ });
+ }
+
+ private URI buildUri(final Upstream upstream,
Review comment:
> can refactor common method, because in URI plugin?
Yes, some in `URIPlugin`, I'll try for it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]