This is an automated email from the ASF dual-hosted git repository.
xiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-shenyu.git
The following commit(s) were added to refs/heads/master by this push:
new 11b2422 [ISSUE #2627] [type:new feature] DividePlugin support
failover retry (#3007)
11b2422 is described below
commit 11b24225d01ea00dd7c6b53ef77cbd5f847af0b0
Author: dragon-zhang <[email protected]>
AuthorDate: Wed Mar 9 11:00:05 2022 +0800
[ISSUE #2627] [type:new feature] DividePlugin support failover retry (#3007)
* [ISSUE #2627] [type:new feature] DividePlugin support failover retry
strategy
* remove todo and add doc
---
script/2.4.2-upgrade-2.4.3-mysql.sql | 9 ++-
script/2.4.2-upgrade-2.4.3-pg.sql | 9 ++-
.../src/main/resources/sql-script/h2/schema.sql | 6 +-
.../src/main/resources/sql-script/mysql/schema.sql | 5 ++
.../src/main/resources/sql-script/pg/schema.sql | 5 ++
.../apache/shenyu/common/constant/Constants.java | 15 ++++
.../dto/convert/rule/impl/DivideRuleHandle.java | 33 +++++++-
.../org/apache/shenyu/common/enums/RetryEnum.java | 80 +++++++++++++++++++
.../shenyu/plugin/api/result/ShenyuResultEnum.java | 5 ++
.../apache/shenyu/plugin/divide/DividePlugin.java | 4 +
shenyu-plugin/shenyu-plugin-httpclient/pom.xml | 6 ++
.../httpclient/AbstractHttpClientPlugin.java | 92 +++++++++++++++++++---
.../plugin/httpclient/NettyHttpClientPlugin.java | 5 +-
.../shenyu/plugin/httpclient/WebClientPlugin.java | 7 +-
14 files changed, 261 insertions(+), 20 deletions(-)
diff --git a/script/2.4.2-upgrade-2.4.3-mysql.sql
b/script/2.4.2-upgrade-2.4.3-mysql.sql
index e14d645..7fb5961 100644
--- a/script/2.4.2-upgrade-2.4.3-mysql.sql
+++ b/script/2.4.2-upgrade-2.4.3-mysql.sql
@@ -38,4 +38,11 @@ INSERT IGNORE INTO shenyu_dict (`type`, `dict_code`,
`dict_name`, `dict_value`,
-- remove monitor plugin
DELETE FROM plugin WHERE `id` = '7';
-DELETE FROM plugin_handle WHERE `plugin_id` = '7';
\ No newline at end of file
+DELETE FROM plugin_handle WHERE `plugin_id` = '7';
+
+-- insert plugin_handle data for divide
+INSERT IGNORE INTO plugin_handle (`plugin_id`, `field`, `label`, `data_type`,
`type`, `sort`, `ext_obj`) VALUES ('5', 'retryStrategy', 'retryStrategy', '3',
'2', '0',
'{"required":"0","defaultValue":"current","placeholder":"retryStrategy","rule":""}');
+
+-- insert dict for divide plugin
+INSERT IGNORE INTO shenyu_dict (`type`, `dict_code`, `dict_name`,
`dict_value`, `desc`, `sort`, `enabled`) VALUES ('retryStrategy',
'RETRY_STRATEGY', 'current', 'current', 'current', '0', '1');
+INSERT IGNORE INTO shenyu_dict (`type`, `dict_code`, `dict_name`,
`dict_value`, `desc`, `sort`, `enabled`) VALUES ('retryStrategy',
'RETRY_STRATEGY', 'failover', 'failover', 'failover', '1', '1');
\ No newline at end of file
diff --git a/script/2.4.2-upgrade-2.4.3-pg.sql
b/script/2.4.2-upgrade-2.4.3-pg.sql
index 4d94e6a..0846558 100644
--- a/script/2.4.2-upgrade-2.4.3-pg.sql
+++ b/script/2.4.2-upgrade-2.4.3-pg.sql
@@ -38,4 +38,11 @@ INSERT IGNORE INTO shenyu_dict (`type`, `dict_code`,
`dict_name`, `dict_value`,
-- remove monitor plugin
DELETE FROM plugin WHERE `id` = '7';
-DELETE FROM plugin_handle WHERE `plugin_id` = '7';
\ No newline at end of file
+DELETE FROM plugin_handle WHERE `plugin_id` = '7';
+
+-- insert plugin_handle data for divide
+INSERT IGNORE INTO plugin_handle (`plugin_id`, `field`, `label`, `data_type`,
`type`, `sort`, `ext_obj`) VALUES ('5', 'retryStrategy', 'retryStrategy', '3',
'2', '0',
'{"required":"0","defaultValue":"current","placeholder":"retryStrategy","rule":""}');
+
+-- insert dict for divide plugin
+INSERT IGNORE INTO shenyu_dict (`type`, `dict_code`, `dict_name`,
`dict_value`, `desc`, `sort`, `enabled`) VALUES ('retryStrategy',
'RETRY_STRATEGY', 'current', 'current', 'current', '0', '1');
+INSERT IGNORE INTO shenyu_dict (`type`, `dict_code`, `dict_name`,
`dict_value`, `desc`, `sort`, `enabled`) VALUES ('retryStrategy',
'RETRY_STRATEGY', 'failover', 'failover', 'failover', '1', '1');
\ No newline at end of file
diff --git a/shenyu-admin/src/main/resources/sql-script/h2/schema.sql
b/shenyu-admin/src/main/resources/sql-script/h2/schema.sql
index 629ea36..40fe16f 100644
--- a/shenyu-admin/src/main/resources/sql-script/h2/schema.sql
+++ b/shenyu-admin/src/main/resources/sql-script/h2/schema.sql
@@ -326,6 +326,10 @@ INSERT IGNORE INTO shenyu_dict (`type`, `dict_code`,
`dict_name`, `dict_value`,
INSERT IGNORE INTO shenyu_dict (`type`, `dict_code`, `dict_name`,
`dict_value`, `desc`, `sort`, `enabled`) VALUES ('threadpool', 'THREADPOOL',
'cached', 'cached', '', '0', '1');
INSERT IGNORE INTO shenyu_dict (`type`, `dict_code`, `dict_name`,
`dict_value`, `desc`, `sort`, `enabled`) VALUES ('threadpool', 'THREADPOOL',
'limited', 'limited', '', '1', '1');
+/* insert dict for divide plugin */
+INSERT IGNORE INTO shenyu_dict (`type`, `dict_code`, `dict_name`,
`dict_value`, `desc`, `sort`, `enabled`) VALUES ('retryStrategy',
'RETRY_STRATEGY', 'current', 'current', 'current', '0', '1');
+INSERT IGNORE INTO shenyu_dict (`type`, `dict_code`, `dict_name`,
`dict_value`, `desc`, `sort`, `enabled`) VALUES ('retryStrategy',
'RETRY_STRATEGY', 'failover', 'failover', 'failover', '1', '1');
+
/* insert dict for init resource,permission table */
INSERT IGNORE INTO shenyu_dict (`type`, `dict_code`, `dict_name`,
`dict_value`, `desc`, `sort`, `enabled`) VALUES ('table', 'INIT_FLAG',
'status', 'false', 'table(resource,permission) init status', '0', '1');;
@@ -437,7 +441,7 @@ INSERT IGNORE INTO plugin_handle
(`plugin_id`,`field`,`label`,`data_type`,`type`
INSERT IGNORE INTO plugin_handle
(`plugin_id`,`field`,`label`,`data_type`,`type`,`sort`,`ext_obj`) VALUES ('5',
'multiRuleHandle', 'multiRuleHandle', 3, 3, 1, null);;
INSERT IGNORE INTO plugin_handle
(`plugin_id`,`field`,`label`,`data_type`,`type`,`sort`,`ext_obj`) VALUES ('5',
'headerMaxSize', 'headerMaxSize', 1, 2, 3,
'{"defaultValue":"10240","rule":""}');;
INSERT IGNORE INTO plugin_handle
(`plugin_id`,`field`,`label`,`data_type`,`type`,`sort`,`ext_obj`) VALUES ('5',
'requestMaxSize', 'requestMaxSize', 1, 2, 4,
'{"defaultValue":"102400","rule":""}');;
-
+INSERT IGNORE INTO plugin_handle
(`plugin_id`,`field`,`label`,`data_type`,`type`,`sort`,`ext_obj`) VALUES ('5',
'retryStrategy', 'retryStrategy', 3, 2, 0,
'{"required":"0","defaultValue":"current","placeholder":"retryStrategy","rule":""}');
/*insert plugin_handle data for tars*/
INSERT IGNORE INTO plugin_handle
(`plugin_id`,`field`,`label`,`data_type`,`type`,`sort`,`ext_obj`) VALUES ('13',
'upstreamHost', 'host', 2, 1, 0, null);;
diff --git a/shenyu-admin/src/main/resources/sql-script/mysql/schema.sql
b/shenyu-admin/src/main/resources/sql-script/mysql/schema.sql
index 52d8d09..7459fab 100644
--- a/shenyu-admin/src/main/resources/sql-script/mysql/schema.sql
+++ b/shenyu-admin/src/main/resources/sql-script/mysql/schema.sql
@@ -339,6 +339,10 @@ INSERT IGNORE INTO shenyu_dict (`type`, `dict_code`,
`dict_name`, `dict_value`,
INSERT IGNORE INTO shenyu_dict (`type`, `dict_code`, `dict_name`,
`dict_value`, `desc`, `sort`, `enabled`) VALUES ('threadpool', 'THREADPOOL',
'cached', 'cached', '', '0', '1');
INSERT IGNORE INTO shenyu_dict (`type`, `dict_code`, `dict_name`,
`dict_value`, `desc`, `sort`, `enabled`) VALUES ('threadpool', 'THREADPOOL',
'limited', 'limited', '', '1', '1');
+/* insert dict for divide plugin */
+INSERT IGNORE INTO shenyu_dict (`type`, `dict_code`, `dict_name`,
`dict_value`, `desc`, `sort`, `enabled`) VALUES ('retryStrategy',
'RETRY_STRATEGY', 'current', 'current', 'current', '0', '1');
+INSERT IGNORE INTO shenyu_dict (`type`, `dict_code`, `dict_name`,
`dict_value`, `desc`, `sort`, `enabled`) VALUES ('retryStrategy',
'RETRY_STRATEGY', 'failover', 'failover', 'failover', '1', '1');
+
/* insert dict for init resource,permission table */
INSERT IGNORE INTO shenyu_dict (`type`, `dict_code`, `dict_name`,
`dict_value`, `desc`, `sort`, `enabled`) VALUES ('table', 'INIT_FLAG',
'status', 'false', 'table(resource,permission) init status', '0', '1');;
@@ -450,6 +454,7 @@ INSERT IGNORE INTO plugin_handle
(`plugin_id`,`field`,`label`,`data_type`,`type`
INSERT IGNORE INTO plugin_handle
(`plugin_id`,`field`,`label`,`data_type`,`type`,`sort`,`ext_obj`) VALUES ('5',
'multiRuleHandle', 'multiRuleHandle', 3, 3, 1, null);;
INSERT IGNORE INTO plugin_handle
(`plugin_id`,`field`,`label`,`data_type`,`type`,`sort`,`ext_obj`) VALUES ('5',
'headerMaxSize', 'headerMaxSize', 1, 2, 3,
'{"defaultValue":"10240","rule":""}');;
INSERT IGNORE INTO plugin_handle
(`plugin_id`,`field`,`label`,`data_type`,`type`,`sort`,`ext_obj`) VALUES ('5',
'requestMaxSize', 'requestMaxSize', 1, 2, 4,
'{"defaultValue":"102400","rule":""}');;
+INSERT IGNORE INTO plugin_handle
(`plugin_id`,`field`,`label`,`data_type`,`type`,`sort`,`ext_obj`) VALUES ('5',
'retryStrategy', 'retryStrategy', 3, 2, 0,
'{"required":"0","defaultValue":"current","placeholder":"retryStrategy","rule":""}');
/*insert plugin_handle data for tars*/
INSERT IGNORE INTO plugin_handle
(`plugin_id`,`field`,`label`,`data_type`,`type`,`sort`,`ext_obj`) VALUES ('13',
'upstreamHost', 'host', 2, 1, 0, null);;
diff --git a/shenyu-admin/src/main/resources/sql-script/pg/schema.sql
b/shenyu-admin/src/main/resources/sql-script/pg/schema.sql
index c347936..f2a6bfb 100644
--- a/shenyu-admin/src/main/resources/sql-script/pg/schema.sql
+++ b/shenyu-admin/src/main/resources/sql-script/pg/schema.sql
@@ -561,6 +561,7 @@ ELSE
PERFORM public.dblink_exec('init_conn', 'INSERT INTO "plugin_handle" (
plugin_id , field , label , data_type , type , sort , ext_obj ) VALUES (''' ||
'5' || ''', ''' || 'multiRuleHandle' || ''', ''' || 'multiRuleHandle' || ''',
3, 3, 1, null);');
PERFORM public.dblink_exec('init_conn', 'INSERT INTO "plugin_handle" (
plugin_id , field , label , data_type , type , sort , ext_obj ) VALUES (''' ||
'5' || ''', ''' || 'headerMaxSize' || ''', ''' || 'headerMaxSize' || ''', 1, 2,
3, ''' || '{"defaultValue":"10240","rule":""}' || ''');');
PERFORM public.dblink_exec('init_conn', 'INSERT INTO "plugin_handle" (
plugin_id , field , label , data_type , type , sort , ext_obj ) VALUES (''' ||
'5' || ''', ''' || 'requestMaxSize' || ''', ''' || 'requestMaxSize' || ''', 1,
2, 4, ''' || '{"defaultValue":"102400","rule":""}' || ''');');
+ PERFORM public.dblink_exec('init_conn', 'INSERT INTO "plugin_handle" (
plugin_id , field , label , data_type , type , sort , ext_obj ) VALUES (''' ||
'5' || ''', ''' || 'retryStrategy' || ''', ''' || 'retryStrategy' || ''', 3, 2,
0, ''' ||
'{"required":"0","defaultValue":"current","placeholder":"retryStrategy","rule":""}'
|| ''');');
/*insert "plugin_handle" data for tars*/
PERFORM public.dblink_exec('init_conn', 'INSERT INTO "plugin_handle" (
plugin_id , field , label , data_type , type , sort , ext_obj ) VALUES (''' ||
'13' || ''', ''' || 'upstreamHost' || ''', ''' || 'host' || ''', 2, 1, 0,
null);');
@@ -1068,6 +1069,10 @@ ELSE
PERFORM public.dblink_exec('init_conn', 'INSERT INTO shenyu_dict ( type
, dict_code , dict_name , dict_value , "desc" , sort , enabled ) VALUES
(''' || 'threadpool' || ''', ''' || 'THREADPOOL' || ''', ''' || 'cached' ||
''', ''' || 'cached' || ''', ''' || '' || ''', ''' || '0' || ''', ''' || '1' ||
''');');
PERFORM public.dblink_exec('init_conn', 'INSERT INTO shenyu_dict ( type
, dict_code , dict_name , dict_value , "desc" , sort , enabled ) VALUES
(''' || 'threadpool' || ''', ''' || 'THREADPOOL' || ''', ''' || 'limited' ||
''', ''' || 'limited' || ''', ''' || '' || ''', ''' || '1' || ''', ''' || '1'
|| ''');');
+ /*insert dict for divide plugin*/
+ PERFORM public.dblink_exec('init_conn', 'INSERT INTO shenyu_dict ( type
, dict_code , dict_name , dict_value , "desc" , sort , enabled ) VALUES
(''' || 'retryStrategy' || ''', ''' || 'RETRY_STRATEGY' || ''', ''' ||
'current' || ''', ''' || 'current' || ''', ''' || 'current' || ''', ''' || '0'
|| ''', ''' || '1' || ''');');
+ PERFORM public.dblink_exec('init_conn', 'INSERT INTO shenyu_dict ( type
, dict_code , dict_name , dict_value , "desc" , sort , enabled ) VALUES
(''' || 'retryStrategy' || ''', ''' || 'RETRY_STRATEGY' || ''', ''' ||
'failover' || ''', ''' || 'failover' || ''', ''' || 'failover' || ''', ''' ||
'1' || ''', ''' || '1' || ''');');
+
/* insert dict for init resource,permission table */
PERFORM public.dblink_exec('init_conn', 'INSERT INTO shenyu_dict ( type
, dict_code , dict_name , dict_value , "desc" , sort , enabled ) VALUES
(''' || 'table'|| ''', ''' || 'INIT_FLAG' || ''', ''' || 'status' || ''',''' ||
'false' ||''',''' || 'table(resource,permission) init status' ||''',''' || '0'
|| ''',''' || '1' || ''');');
diff --git
a/shenyu-common/src/main/java/org/apache/shenyu/common/constant/Constants.java
b/shenyu-common/src/main/java/org/apache/shenyu/common/constant/Constants.java
index a560d9e..4625aab 100644
---
a/shenyu-common/src/main/java/org/apache/shenyu/common/constant/Constants.java
+++
b/shenyu-common/src/main/java/org/apache/shenyu/common/constant/Constants.java
@@ -103,6 +103,21 @@ public interface Constants {
String HTTP_RETRY = "httpRetry";
/**
+ * The constant RETRY_STRATEGY.
+ */
+ String RETRY_STRATEGY = "retryStrategy";
+
+ /**
+ * The constant LOAD_BALANCE.
+ */
+ String LOAD_BALANCE = "loadBalance";
+
+ /**
+ * divide online selector id.
+ */
+ String DIVIDE_SELECTOR_ID = "divideSelectorId";
+
+ /**
* Original response Content-Type attribute name.
*/
String ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR =
"original_response_content_type";
diff --git
a/shenyu-common/src/main/java/org/apache/shenyu/common/dto/convert/rule/impl/DivideRuleHandle.java
b/shenyu-common/src/main/java/org/apache/shenyu/common/dto/convert/rule/impl/DivideRuleHandle.java
index 5884522..09b03f2 100644
---
a/shenyu-common/src/main/java/org/apache/shenyu/common/dto/convert/rule/impl/DivideRuleHandle.java
+++
b/shenyu-common/src/main/java/org/apache/shenyu/common/dto/convert/rule/impl/DivideRuleHandle.java
@@ -20,6 +20,7 @@ package org.apache.shenyu.common.dto.convert.rule.impl;
import org.apache.shenyu.common.constant.Constants;
import org.apache.shenyu.common.dto.convert.rule.RuleHandle;
import org.apache.shenyu.common.enums.LoadBalanceEnum;
+import org.apache.shenyu.common.enums.RetryEnum;
import java.util.Objects;
@@ -35,6 +36,12 @@ public class DivideRuleHandle implements RuleHandle {
private String loadBalance = LoadBalanceEnum.RANDOM.getName();
/**
+ * retryStrategy.
+ * {@linkplain RetryEnum}
+ */
+ private String retryStrategy = RetryEnum.CURRENT.getName();
+
+ /**
* http retry.
*/
private int retry = 3;
@@ -73,6 +80,24 @@ public class DivideRuleHandle implements RuleHandle {
}
/**
+ * get retryStrategy.
+ *
+ * @return retryStrategy
+ */
+ public String getRetryStrategy() {
+ return retryStrategy;
+ }
+
+ /**
+ * set retryStrategy.
+ *
+ * @param retryStrategy retryStrategy
+ */
+ public void setRetryStrategy(final String retryStrategy) {
+ this.retryStrategy = retryStrategy;
+ }
+
+ /**
* get retry.
*
* @return retry
@@ -154,12 +179,13 @@ public class DivideRuleHandle implements RuleHandle {
}
DivideRuleHandle that = (DivideRuleHandle) o;
return retry == that.retry && timeout == that.timeout && headerMaxSize
== that.headerMaxSize
- && requestMaxSize == that.requestMaxSize &&
Objects.equals(loadBalance, that.loadBalance);
+ && requestMaxSize == that.requestMaxSize &&
Objects.equals(loadBalance, that.loadBalance)
+ && Objects.equals(retryStrategy, that.retryStrategy);
}
@Override
public int hashCode() {
- return Objects.hash(loadBalance, retry, timeout, headerMaxSize,
requestMaxSize);
+ return Objects.hash(loadBalance, retryStrategy, retry, timeout,
headerMaxSize, requestMaxSize);
}
@Override
@@ -168,6 +194,9 @@ public class DivideRuleHandle implements RuleHandle {
+ "loadBalance='"
+ loadBalance
+ '\''
+ + "retryStrategy='"
+ + retryStrategy
+ + '\''
+ ", retry="
+ retry
+ ", timeout="
diff --git
a/shenyu-common/src/main/java/org/apache/shenyu/common/enums/RetryEnum.java
b/shenyu-common/src/main/java/org/apache/shenyu/common/enums/RetryEnum.java
new file mode 100644
index 0000000..3c1e41d
--- /dev/null
+++ b/shenyu-common/src/main/java/org/apache/shenyu/common/enums/RetryEnum.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.common.enums;
+
+/**
+ * retry enum.
+ */
+public enum RetryEnum {
+
+ /**
+ * Retry the previously failed call.
+ */
+ CURRENT(1, "current", true),
+
+ /**
+ * Retry other servers when failed.
+ */
+ FAILOVER(2, "failover", true);
+
+ private final int code;
+
+ private final String name;
+
+ private final boolean support;
+
+ /**
+ * all args constructor.
+ *
+ * @param code code
+ * @param name name
+ * @param support support
+ */
+ RetryEnum(final int code, final String name, final boolean support) {
+ this.code = code;
+ this.name = name;
+ this.support = support;
+ }
+
+ /**
+ * get code.
+ *
+ * @return code
+ */
+ public int getCode() {
+ return code;
+ }
+
+ /**
+ * get name.
+ *
+ * @return name
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * get support.
+ *
+ * @return support
+ */
+ public boolean isSupport() {
+ return support;
+ }
+}
diff --git
a/shenyu-plugin/shenyu-plugin-api/src/main/java/org/apache/shenyu/plugin/api/result/ShenyuResultEnum.java
b/shenyu-plugin/shenyu-plugin-api/src/main/java/org/apache/shenyu/plugin/api/result/ShenyuResultEnum.java
index fdf543e..7694c2e 100644
---
a/shenyu-plugin/shenyu-plugin-api/src/main/java/org/apache/shenyu/plugin/api/result/ShenyuResultEnum.java
+++
b/shenyu-plugin/shenyu-plugin-api/src/main/java/org/apache/shenyu/plugin/api/result/ShenyuResultEnum.java
@@ -209,6 +209,11 @@ public enum ShenyuResultEnum {
INVALID_XML_DATA(-120, "the xml data is invalid."),
/**
+ * cannot find healthy upstream url after failover.
+ */
+ CANNOT_FIND_HEALTHY_UPSTREAM_URL_AFTER_FAILOVER(-121, "Can not find
healthy upstream url after failover!"),
+
+ /**
* Request Header Fields Too Large.
*/
REQUEST_HEADER_TOO_LARGE(431, "Request Header Fields Too Large"),
diff --git
a/shenyu-plugin/shenyu-plugin-divide/src/main/java/org/apache/shenyu/plugin/divide/DividePlugin.java
b/shenyu-plugin/shenyu-plugin-divide/src/main/java/org/apache/shenyu/plugin/divide/DividePlugin.java
index 4e28af6..a4afd8b 100644
---
a/shenyu-plugin/shenyu-plugin-divide/src/main/java/org/apache/shenyu/plugin/divide/DividePlugin.java
+++
b/shenyu-plugin/shenyu-plugin-divide/src/main/java/org/apache/shenyu/plugin/divide/DividePlugin.java
@@ -91,6 +91,10 @@ public class DividePlugin extends AbstractShenyuPlugin {
// set the http timeout
exchange.getAttributes().put(Constants.HTTP_TIME_OUT,
ruleHandle.getTimeout());
exchange.getAttributes().put(Constants.HTTP_RETRY,
ruleHandle.getRetry());
+ // set retry strategy stuff
+ exchange.getAttributes().put(Constants.RETRY_STRATEGY,
ruleHandle.getRetryStrategy());
+ exchange.getAttributes().put(Constants.LOAD_BALANCE,
ruleHandle.getLoadBalance());
+ exchange.getAttributes().put(Constants.DIVIDE_SELECTOR_ID,
selector.getId());
return chain.execute(exchange);
}
diff --git a/shenyu-plugin/shenyu-plugin-httpclient/pom.xml
b/shenyu-plugin/shenyu-plugin-httpclient/pom.xml
index 6cb346e..4940225 100644
--- a/shenyu-plugin/shenyu-plugin-httpclient/pom.xml
+++ b/shenyu-plugin/shenyu-plugin-httpclient/pom.xml
@@ -33,6 +33,12 @@
</dependency>
<dependency>
+ <groupId>org.apache.shenyu</groupId>
+ <artifactId>shenyu-loadbalancer</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http</artifactId>
</dependency>
diff --git
a/shenyu-plugin/shenyu-plugin-httpclient/src/main/java/org/apache/shenyu/plugin/httpclient/AbstractHttpClientPlugin.java
b/shenyu-plugin/shenyu-plugin-httpclient/src/main/java/org/apache/shenyu/plugin/httpclient/AbstractHttpClientPlugin.java
index 1b6f5ce..c2e1b75 100644
---
a/shenyu-plugin/shenyu-plugin-httpclient/src/main/java/org/apache/shenyu/plugin/httpclient/AbstractHttpClientPlugin.java
+++
b/shenyu-plugin/shenyu-plugin-httpclient/src/main/java/org/apache/shenyu/plugin/httpclient/AbstractHttpClientPlugin.java
@@ -17,14 +17,22 @@
package org.apache.shenyu.plugin.httpclient;
+import com.google.common.collect.Sets;
import io.netty.channel.ConnectTimeoutException;
import io.netty.handler.timeout.ReadTimeoutException;
+import org.apache.commons.collections4.CollectionUtils;
import org.apache.shenyu.common.constant.Constants;
+import org.apache.shenyu.common.enums.RetryEnum;
+import org.apache.shenyu.common.exception.ShenyuException;
+import org.apache.shenyu.loadbalancer.cache.UpstreamCacheManager;
+import org.apache.shenyu.loadbalancer.entity.Upstream;
+import org.apache.shenyu.loadbalancer.factory.LoadBalancerFactory;
import org.apache.shenyu.plugin.api.ShenyuPlugin;
import org.apache.shenyu.plugin.api.ShenyuPluginChain;
import org.apache.shenyu.plugin.api.context.ShenyuContext;
import org.apache.shenyu.plugin.api.result.ShenyuResultEnum;
import org.apache.shenyu.plugin.api.result.ShenyuResultWrap;
+import org.apache.shenyu.plugin.api.utils.RequestUrlUtils;
import org.apache.shenyu.plugin.api.utils.WebFluxResultUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,15 +48,18 @@ import reactor.retry.Retry;
import java.net.URI;
import java.time.Duration;
+import java.util.List;
import java.util.Objects;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
+import java.util.stream.Collectors;
/**
* The type abstract http client plugin.
*/
-public abstract class AbstractHttpClientPlugin implements ShenyuPlugin {
+public abstract class AbstractHttpClientPlugin<R> implements ShenyuPlugin {
protected static final Logger LOG =
LoggerFactory.getLogger(AbstractHttpClientPlugin.class);
@@ -64,16 +75,77 @@ public abstract class AbstractHttpClientPlugin implements
ShenyuPlugin {
final long timeout = (long)
Optional.ofNullable(exchange.getAttribute(Constants.HTTP_TIME_OUT)).orElse(3000L);
final Duration duration = Duration.ofMillis(timeout);
final int retryTimes = (int)
Optional.ofNullable(exchange.getAttribute(Constants.HTTP_RETRY)).orElse(0);
- LOG.info("The request urlPath is {}, retryTimes is {}",
uri.toASCIIString(), retryTimes);
+ final String retryStrategy = (String)
Optional.ofNullable(exchange.getAttribute(Constants.RETRY_STRATEGY)).orElseGet(RetryEnum.CURRENT::getName);
+ LOG.info("The request urlPath is {}, retryTimes is {}, retryStrategy
is {}", uri.toASCIIString(), retryTimes, retryStrategy);
final HttpHeaders httpHeaders = buildHttpHeaders(exchange);
- final Mono<?> response = doRequest(exchange,
exchange.getRequest().getMethodValue(), uri, httpHeaders,
exchange.getRequest().getBody())
+ final Mono<R> response = doRequest(exchange,
exchange.getRequest().getMethodValue(), uri, httpHeaders,
exchange.getRequest().getBody())
.timeout(duration, Mono.error(new TimeoutException("Response
took longer than timeout: " + duration)))
- .retryWhen(Retry.anyOf(TimeoutException.class,
ConnectTimeoutException.class, ReadTimeoutException.class,
IllegalStateException.class)
- .retryMax(retryTimes)
- .backoff(Backoff.exponential(Duration.ofMillis(200),
Duration.ofSeconds(20), 2, true)))
- .doOnError(e -> LOG.error(e.getMessage(), e))
- .onErrorMap(TimeoutException.class, th -> new
ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th));
- return response.flatMap((Function<Object, Mono<? extends Void>>) o ->
chain.execute(exchange));
+ .doOnError(e -> LOG.error(e.getMessage(), e));
+ if (RetryEnum.CURRENT.getName().equals(retryStrategy)) {
+ //old version of DividePlugin and SpringCloudPlugin will run on
this
+ return response.retryWhen(Retry.anyOf(TimeoutException.class,
ConnectTimeoutException.class, ReadTimeoutException.class,
IllegalStateException.class)
+ .retryMax(retryTimes)
+ .backoff(Backoff.exponential(Duration.ofMillis(200),
Duration.ofSeconds(20), 2, true)))
+ .onErrorMap(TimeoutException.class, th -> new
ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th))
+ .flatMap((Function<Object, Mono<? extends Void>>) o ->
chain.execute(exchange));
+ }
+ final Set<URI> exclude = Sets.newHashSet(uri);
+ return resend(response, exchange, duration, httpHeaders, exclude,
retryTimes)
+ .onErrorMap(TimeoutException.class, th -> new
ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th))
+ .flatMap((Function<Object, Mono<? extends Void>>) o ->
chain.execute(exchange));
+ }
+
+ private Mono<R> resend(final Mono<R> clientResponse,
+ final ServerWebExchange exchange,
+ final Duration duration,
+ final HttpHeaders httpHeaders,
+ final Set<URI> exclude,
+ final int retryTimes) {
+ Mono<R> result = clientResponse;
+ for (int i = 0; i < retryTimes; i++) {
+ result = resend(result, exchange, duration, httpHeaders, exclude);
+ }
+ return result;
+ }
+
+ private Mono<R> resend(final Mono<R> response,
+ final ServerWebExchange exchange,
+ final Duration duration,
+ final HttpHeaders httpHeaders,
+ final Set<URI> exclude) {
+ // does it necessary to add backoff interval time ?
+ return response.onErrorResume(th -> {
+ final String selectorId =
exchange.getAttribute(Constants.DIVIDE_SELECTOR_ID);
+ final String loadBalance =
exchange.getAttribute(Constants.LOAD_BALANCE);
+ //always query the latest available list
+ final List<Upstream> upstreamList =
UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selectorId)
+ .stream().filter(data -> {
+ final String trimUri = data.getUrl().trim();
+ for (URI needToExclude : exclude) {
+ // exclude already called
+ if ((needToExclude.getHost() + ":" +
needToExclude.getPort()).equals(trimUri)) {
+ return false;
+ }
+ }
+ return true;
+ }).collect(Collectors.toList());
+ if (CollectionUtils.isEmpty(upstreamList)) {
+ // no need to retry anymore
+ return Mono.error(new
ShenyuException(ShenyuResultEnum.CANNOT_FIND_HEALTHY_UPSTREAM_URL_AFTER_FAILOVER.getMsg()));
+ }
+ final String ip =
Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress();
+ final Upstream upstream =
LoadBalancerFactory.selector(upstreamList, loadBalance, ip);
+ if (Objects.isNull(upstream)) {
+ // no need to retry anymore
+ return Mono.error(new
ShenyuException(ShenyuResultEnum.CANNOT_FIND_HEALTHY_UPSTREAM_URL_AFTER_FAILOVER.getMsg()));
+ }
+ final URI newUri = RequestUrlUtils.buildRequestUri(exchange,
upstream.buildDomain());
+ // in order not to affect the next retry call, newUri needs to be
excluded
+ exclude.add(newUri);
+ return doRequest(exchange, exchange.getRequest().getMethodValue(),
newUri, httpHeaders, exchange.getRequest().getBody())
+ .timeout(duration, Mono.error(new
TimeoutException("Response took longer than timeout: " + duration)))
+ .doOnError(e -> LOG.error(e.getMessage(), e));
+ });
}
/**
@@ -94,7 +166,7 @@ public abstract class AbstractHttpClientPlugin implements
ShenyuPlugin {
* @param body the request body
* @return {@code Mono<Void>} to indicate when request processing is
complete
*/
- protected abstract Mono<?> doRequest(ServerWebExchange exchange, String
httpMethod,
+ protected abstract Mono<R> doRequest(ServerWebExchange exchange, String
httpMethod,
URI uri, HttpHeaders httpHeaders,
Flux<DataBuffer> body);
}
diff --git
a/shenyu-plugin/shenyu-plugin-httpclient/src/main/java/org/apache/shenyu/plugin/httpclient/NettyHttpClientPlugin.java
b/shenyu-plugin/shenyu-plugin-httpclient/src/main/java/org/apache/shenyu/plugin/httpclient/NettyHttpClientPlugin.java
index 90932e1..69c5f31 100644
---
a/shenyu-plugin/shenyu-plugin-httpclient/src/main/java/org/apache/shenyu/plugin/httpclient/NettyHttpClientPlugin.java
+++
b/shenyu-plugin/shenyu-plugin-httpclient/src/main/java/org/apache/shenyu/plugin/httpclient/NettyHttpClientPlugin.java
@@ -31,6 +31,7 @@ import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
+import reactor.netty.http.client.HttpClientResponse;
import java.net.URI;
import java.util.List;
@@ -40,7 +41,7 @@ import java.util.stream.Stream;
/**
* The type Netty http client plugin.
*/
-public class NettyHttpClientPlugin extends AbstractHttpClientPlugin {
+public class NettyHttpClientPlugin extends
AbstractHttpClientPlugin<HttpClientResponse> {
private final HttpClient httpClient;
@@ -68,7 +69,7 @@ public class NettyHttpClientPlugin extends
AbstractHttpClientPlugin {
}
@Override
- protected Mono<?> doRequest(final ServerWebExchange exchange, final String
httpMethod, final URI uri,
+ protected Mono<HttpClientResponse> doRequest(final ServerWebExchange
exchange, final String httpMethod, final URI uri,
final HttpHeaders httpHeaders, final
Flux<DataBuffer> body) {
return Mono.from(httpClient.headers(headers ->
httpHeaders.forEach(headers::add))
.request(HttpMethod.valueOf(httpMethod)).uri(uri.toASCIIString())
diff --git
a/shenyu-plugin/shenyu-plugin-httpclient/src/main/java/org/apache/shenyu/plugin/httpclient/WebClientPlugin.java
b/shenyu-plugin/shenyu-plugin-httpclient/src/main/java/org/apache/shenyu/plugin/httpclient/WebClientPlugin.java
index 6c4f2d8..6b797c9 100644
---
a/shenyu-plugin/shenyu-plugin-httpclient/src/main/java/org/apache/shenyu/plugin/httpclient/WebClientPlugin.java
+++
b/shenyu-plugin/shenyu-plugin-httpclient/src/main/java/org/apache/shenyu/plugin/httpclient/WebClientPlugin.java
@@ -25,6 +25,7 @@ import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.web.reactive.function.BodyInserters;
+import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
@@ -38,7 +39,7 @@ import java.util.stream.Stream;
/**
* The type Web client plugin.
*/
-public class WebClientPlugin extends AbstractHttpClientPlugin {
+public class WebClientPlugin extends AbstractHttpClientPlugin<ClientResponse> {
private final WebClient webClient;
@@ -67,8 +68,8 @@ public class WebClientPlugin extends AbstractHttpClientPlugin
{
}
@Override
- protected Mono<?> doRequest(final ServerWebExchange exchange, final String
httpMethod, final URI uri,
- final HttpHeaders httpHeaders, final
Flux<DataBuffer> body) {
+ protected Mono<ClientResponse> doRequest(final ServerWebExchange exchange,
final String httpMethod, final URI uri,
+ final HttpHeaders httpHeaders,
final Flux<DataBuffer> body) {
return webClient.method(HttpMethod.valueOf(httpMethod)).uri(uri)
.headers(headers -> headers.addAll(httpHeaders))
.body(BodyInserters.fromDataBuffers(body))