This is an automated email from the ASF dual-hosted git repository.
spacewander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git
The following commit(s) were added to refs/heads/master by this push:
new bbbdf58 refactor: the parent of upstream should point to its original
src (#3287)
bbbdf58 is described below
commit bbbdf58d555ee89144b6923e751deabf7e0bbf15
Author: 罗泽轩 <[email protected]>
AuthorDate: Fri Jan 15 00:43:39 2021 -0600
refactor: the parent of upstream should point to its original src (#3287)
Signed-off-by: spacewander <[email protected]>
---
apisix/core/table.lua | 45 ++++++---
apisix/http/service.lua | 1 +
apisix/plugin.lua | 7 +-
apisix/plugins/example-plugin.lua | 2 +-
apisix/plugins/traffic-split.lua | 4 +-
apisix/router.lua | 1 +
apisix/stream/plugins/mqtt-proxy.lua | 2 +-
apisix/upstream.lua | 39 +++-----
t/node/healthcheck.t | 120 +++++++++++++++++++++++-
t/node/healthcheck2.t | 175 ++++++++++++++++++++++++++++++++++-
10 files changed, 342 insertions(+), 54 deletions(-)
diff --git a/apisix/core/table.lua b/apisix/core/table.lua
index e314b39..d0bc1b2 100644
--- a/apisix/core/table.lua
+++ b/apisix/core/table.lua
@@ -96,25 +96,44 @@ function _M.setmt__gc(t, mt)
end
-local function deepcopy(orig)
- local orig_type = type(orig)
- if orig_type ~= 'table' then
- return orig
- end
+local deepcopy
+do
+ local function _deepcopy(orig, copied)
+ -- prevent infinite loop when a field refers its parent
+ copied[orig] = true
+ -- If the array-like table contains nil in the middle,
+ -- the len might be smaller than the expected.
+ -- But it doesn't affect the correctness.
+ local len = #orig
+ local copy = new_tab(len, nkeys(orig) - len)
+ for orig_key, orig_value in pairs(orig) do
+ if type(orig_value) == "table" and not copied[orig_value] then
+ copy[orig_key] = _deepcopy(orig_value, copied)
+ else
+ copy[orig_key] = orig_value
+ end
+ end
- -- If the array-like table contains nil in the middle,
- -- the len might be smaller than the expected.
- -- But it doesn't affect the correctness.
- local len = #orig
- local copy = new_tab(len, nkeys(orig) - len)
- for orig_key, orig_value in pairs(orig) do
- copy[orig_key] = deepcopy(orig_value)
+ return copy
end
- return copy
+
+ local copied_recorder = {}
+
+ function deepcopy(orig)
+ local orig_type = type(orig)
+ if orig_type ~= 'table' then
+ return orig
+ end
+
+ local res = _deepcopy(orig, copied_recorder)
+ _M.clear(copied_recorder)
+ return res
+ end
end
_M.deepcopy = deepcopy
+
local ngx_null = ngx.null
local function merge(origin, extend)
for k,v in pairs(extend) do
diff --git a/apisix/http/service.lua b/apisix/http/service.lua
index a23924d..9279dc3 100644
--- a/apisix/http/service.lua
+++ b/apisix/http/service.lua
@@ -79,6 +79,7 @@ local function filter(service)
service.value.upstream.nodes = new_nodes
end
+ service.value.upstream.parent = service
core.log.info("filter service: ", core.json.delay_encode(service))
end
diff --git a/apisix/plugin.lua b/apisix/plugin.lua
index 9928cb5..5445cfc 100644
--- a/apisix/plugin.lua
+++ b/apisix/plugin.lua
@@ -401,11 +401,8 @@ local function merge_service_route(service_conf,
route_conf)
local route_upstream = route_conf.value.upstream
if route_upstream then
new_conf.value.upstream = route_upstream
-
- if route_upstream.checks then
- route_upstream.parent = route_conf
- end
-
+ -- when route's upstream override service's upstream,
+ -- the upstream.parent still point to the route
new_conf.value.upstream_id = nil
new_conf.has_domain = route_conf.has_domain
end
diff --git a/apisix/plugins/example-plugin.lua
b/apisix/plugins/example-plugin.lua
index e270a5c..83722ff 100644
--- a/apisix/plugins/example-plugin.lua
+++ b/apisix/plugins/example-plugin.lua
@@ -99,7 +99,7 @@ function _M.access(conf, ctx)
local matched_route = ctx.matched_route
upstream.set(ctx, up_conf.type .. "#route_" .. matched_route.value.id,
- ctx.conf_version, up_conf, matched_route)
+ ctx.conf_version, up_conf)
return
end
diff --git a/apisix/plugins/traffic-split.lua b/apisix/plugins/traffic-split.lua
index d0bdec7..a94f6dd 100644
--- a/apisix/plugins/traffic-split.lua
+++ b/apisix/plugins/traffic-split.lua
@@ -240,14 +240,16 @@ local function set_upstream(upstream_info, ctx)
local ok, err = upstream.check_schema(up_conf)
if not ok then
+ core.log.error("failed to validate generated upstream: ", err)
return 500, err
end
local matched_route = ctx.matched_route
+ up_conf.parent = matched_route
local upstream_key = up_conf.type .. "#route_" ..
matched_route.value.id .. "_" ..upstream_info.vid
core.log.info("upstream_key: ", upstream_key)
- upstream.set(ctx, upstream_key, ctx.conf_version, up_conf, matched_route)
+ upstream.set(ctx, upstream_key, ctx.conf_version, up_conf)
return
end
diff --git a/apisix/router.lua b/apisix/router.lua
index 0ae2481..0f824e5 100644
--- a/apisix/router.lua
+++ b/apisix/router.lua
@@ -64,6 +64,7 @@ local function filter(route)
route.value.upstream.nodes = new_nodes
end
+ route.value.upstream.parent = route
core.log.info("filter route: ", core.json.delay_encode(route))
end
diff --git a/apisix/stream/plugins/mqtt-proxy.lua
b/apisix/stream/plugins/mqtt-proxy.lua
index b533430..f7cfd88 100644
--- a/apisix/stream/plugins/mqtt-proxy.lua
+++ b/apisix/stream/plugins/mqtt-proxy.lua
@@ -173,7 +173,7 @@ function _M.preread(conf, ctx)
local matched_route = ctx.matched_route
upstream.set(ctx, up_conf.type .. "#route_" .. matched_route.value.id,
- ctx.conf_version, up_conf, matched_route)
+ ctx.conf_version, up_conf)
return
end
diff --git a/apisix/upstream.lua b/apisix/upstream.lua
index 096f81e..856f44d 100644
--- a/apisix/upstream.lua
+++ b/apisix/upstream.lua
@@ -33,7 +33,7 @@ local lrucache_checker = core.lrucache.new({
local _M = {}
-local function set_directly(ctx, key, ver, conf, parent)
+local function set_directly(ctx, key, ver, conf)
if not ctx then
error("missing argument ctx", 2)
end
@@ -46,24 +46,22 @@ local function set_directly(ctx, key, ver, conf, parent)
if not conf then
error("missing argument conf", 2)
end
- if not parent then
- error("missing argument parent", 2)
- end
ctx.upstream_conf = conf
ctx.upstream_version = ver
ctx.upstream_key = key
- ctx.upstream_healthcheck_parent = parent
+ ctx.upstream_healthcheck_parent = conf.parent
return
end
_M.set = set_directly
-local function create_checker(upstream, healthcheck_parent)
+local function create_checker(upstream)
if healthcheck == nil then
healthcheck = require("resty.healthcheck")
end
+ local healthcheck_parent = upstream.parent
local checker, err = healthcheck.new({
name = "upstream#" .. healthcheck_parent.key,
shm_name = "upstream-healthcheck",
@@ -85,27 +83,18 @@ local function create_checker(upstream, healthcheck_parent)
end
end
- if upstream.parent then
- core.table.insert(upstream.parent.clean_handlers, function ()
- core.log.info("try to release checker: ", tostring(checker))
- checker:clear()
- checker:stop()
- end)
-
- else
- core.table.insert(healthcheck_parent.clean_handlers, function ()
- core.log.info("try to release checker: ", tostring(checker))
- checker:clear()
- checker:stop()
- end)
- end
+ core.table.insert(healthcheck_parent.clean_handlers, function ()
+ core.log.info("try to release checker: ", tostring(checker))
+ checker:clear()
+ checker:stop()
+ end)
core.log.info("create new checker: ", tostring(checker))
return checker
end
-local function fetch_healthchecker(upstream, healthcheck_parent, version)
+local function fetch_healthchecker(upstream, version)
if not upstream.checks then
return
end
@@ -115,8 +104,7 @@ local function fetch_healthchecker(upstream,
healthcheck_parent, version)
end
local checker = lrucache_checker(upstream, version,
- create_checker, upstream,
- healthcheck_parent)
+ create_checker, upstream)
return checker
end
@@ -150,7 +138,7 @@ function _M.set_by_route(route, api_ctx)
end
set_directly(api_ctx, up_conf.type .. "#upstream_" .. tostring(up_conf),
- api_ctx.conf_version, up_conf, route)
+ api_ctx.conf_version, up_conf)
local nodes_count = up_conf.nodes and #up_conf.nodes or 0
if nodes_count == 0 then
@@ -158,7 +146,7 @@ function _M.set_by_route(route, api_ctx)
end
if nodes_count > 1 then
- local checker = fetch_healthchecker(up_conf, route,
api_ctx.upstream_version)
+ local checker = fetch_healthchecker(up_conf, api_ctx.upstream_version)
api_ctx.up_checker = checker
end
@@ -219,6 +207,7 @@ function _M.init_worker()
upstream.value.nodes = new_nodes
end
+ upstream.value.parent = upstream
core.log.info("filter upstream: ",
core.json.delay_encode(upstream))
end,
})
diff --git a/t/node/healthcheck.t b/t/node/healthcheck.t
index 1056a53..d9bacb8 100644
--- a/t/node/healthcheck.t
+++ b/t/node/healthcheck.t
@@ -218,10 +218,10 @@ GET /t
--- response_body
[{"count":12,"port":"1980"}]
--- grep_error_log eval
-qr/unhealthy .* for '.*'/
+qr/\([^)]+\) unhealthy .* for '.*'/
--- grep_error_log_out
-unhealthy TCP increment (1/2) for 'foo.com(127.0.0.1:1970)'
-unhealthy TCP increment (2/2) for 'foo.com(127.0.0.1:1970)'
+(upstream#/apisix/routes/1) unhealthy TCP increment (1/2) for
'foo.com(127.0.0.1:1970)'
+(upstream#/apisix/routes/1) unhealthy TCP increment (2/2) for
'foo.com(127.0.0.1:1970)'
--- timeout: 10
@@ -795,3 +795,117 @@ GET /t
qr/expected 65536 to be smaller than 65535/
--- error_code chomp
400
+
+
+
+=== TEST 18: set route + upstream (two upstream node: one healthy + one
unhealthy)
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/upstreams/1',
+ ngx.HTTP_PUT,
+ [[{
+ "type": "roundrobin",
+ "nodes": {
+ "127.0.0.1:1980": 1,
+ "127.0.0.1:1970": 1
+ },
+ "checks": {
+ "active": {
+ "http_path": "/status",
+ "host": "foo.com",
+ "healthy": {
+ "interval": 1,
+ "successes": 1
+ },
+ "unhealthy": {
+ "interval": 1,
+ "http_failures": 2
+ }
+ }
+ }
+ }]]
+ )
+
+ if code >= 300 then
+ ngx.status = code
+ ngx.say(body)
+ return
+ end
+
+ local code, body = t('/apisix/admin/routes/1',
+ ngx.HTTP_PUT,
+ [[{
+ "uri": "/server_port",
+ "upstream_id": 1
+ }]]
+ )
+
+ if code >= 300 then
+ ngx.status = code
+ end
+ ngx.say(body)
+ }
+ }
+--- request
+GET /t
+--- response_body
+passed
+--- grep_error_log eval
+qr/^.*?\[error\](?!.*process exiting).*/
+--- grep_error_log_out
+
+
+
+=== TEST 19: hit routes, ensure the checker is bound to the upstream
+--- config
+ location /t {
+ content_by_lua_block {
+ local http = require "resty.http"
+ local uri = "http://127.0.0.1:" .. ngx.var.server_port
+ .. "/server_port"
+
+ do
+ local httpc = http.new()
+ local res, err = httpc:request_uri(uri, {method = "GET",
keepalive = false})
+ end
+
+ ngx.sleep(2.5)
+
+ local ports_count = {}
+ for i = 1, 12 do
+ local httpc = http.new()
+ local res, err = httpc:request_uri(uri, {method = "GET",
keepalive = false})
+ if not res then
+ ngx.say(err)
+ return
+ end
+
+ ports_count[res.body] = (ports_count[res.body] or 0) + 1
+ end
+
+ local ports_arr = {}
+ for port, count in pairs(ports_count) do
+ table.insert(ports_arr, {port = port, count = count})
+ end
+
+ local function cmd(a, b)
+ return a.port > b.port
+ end
+ table.sort(ports_arr, cmd)
+
+ ngx.say(require("toolkit.json").encode(ports_arr))
+ ngx.exit(200)
+ }
+ }
+--- request
+GET /t
+--- response_body
+[{"count":12,"port":"1980"}]
+--- grep_error_log eval
+qr/\([^)]+\) unhealthy .* for '.*'/
+--- grep_error_log_out
+(upstream#/apisix/upstreams/1) unhealthy TCP increment (1/2) for
'foo.com(127.0.0.1:1970)'
+(upstream#/apisix/upstreams/1) unhealthy TCP increment (2/2) for
'foo.com(127.0.0.1:1970)'
+--- timeout: 10
diff --git a/t/node/healthcheck2.t b/t/node/healthcheck2.t
index 1b25cc8..1c802e2 100644
--- a/t/node/healthcheck2.t
+++ b/t/node/healthcheck2.t
@@ -19,8 +19,10 @@ use t::APISIX 'no_plan';
master_on();
repeat_each(1);
+log_level('info');
no_root_location();
no_shuffle();
+worker_connections(256);
add_block_preprocessor(sub {
my ($block) = @_;
@@ -37,11 +39,7 @@ _EOC_
}
if (!$block->request) {
- $block->set_value("request", "GET /hello");
- }
-
- if ((!defined $block->error_log) && (!defined $block->no_error_log)) {
- $block->set_value("no_error_log", "[error]");
+ $block->set_value("request", "GET /t");
}
});
@@ -66,4 +64,171 @@ upstreams:
#END
--- error_log
value should match only one schema, but matches both schemas 1 and 2
+--- request
+GET /hello
--- error_code: 502
+
+
+
+=== TEST 2: route + service
+--- apisix_yaml
+services:
+ - id: 1
+ upstream:
+ type: roundrobin
+ nodes:
+ "127.0.0.1:1980": 1
+ "127.0.0.1:1970": 1
+ checks:
+ active:
+ http_path: /status
+ host: foo.com
+ healthy:
+ interval: 1
+ successes: 1
+ unhealthy:
+ interval: 1
+ http_failures: 2
+routes:
+ -
+ service_id: 1
+ uri: /server_port
+#END
+--- config
+ location /t {
+ content_by_lua_block {
+ local http = require "resty.http"
+ local uri = "http://127.0.0.1:" .. ngx.var.server_port
+ .. "/server_port"
+
+ do
+ local httpc = http.new()
+ local res, err = httpc:request_uri(uri, {method = "GET",
keepalive = false})
+ end
+
+ ngx.sleep(2.5)
+
+ local ports_count = {}
+ for i = 1, 12 do
+ local httpc = http.new()
+ local res, err = httpc:request_uri(uri, {method = "GET",
keepalive = false})
+ if not res then
+ ngx.say(err)
+ return
+ end
+
+ ports_count[res.body] = (ports_count[res.body] or 0) + 1
+ end
+
+ local ports_arr = {}
+ for port, count in pairs(ports_count) do
+ table.insert(ports_arr, {port = port, count = count})
+ end
+
+ local function cmd(a, b)
+ return a.port > b.port
+ end
+ table.sort(ports_arr, cmd)
+
+ ngx.say(require("toolkit.json").encode(ports_arr))
+ ngx.exit(200)
+ }
+ }
+--- response_body
+[{"count":12,"port":"1980"}]
+--- grep_error_log eval
+qr/\([^)]+\) unhealthy .* for '.*'/
+--- grep_error_log_out
+(upstream#/services/1) unhealthy TCP increment (1/2) for
'foo.com(127.0.0.1:1970)'
+(upstream#/services/1) unhealthy TCP increment (2/2) for
'foo.com(127.0.0.1:1970)'
+--- timeout: 10
+
+
+
+=== TEST 3: route override service
+--- apisix_yaml
+services:
+ - id: 1
+ upstream:
+ type: roundrobin
+ nodes:
+ "127.0.0.2:1980": 1
+ "127.0.0.2:1970": 1
+ checks:
+ active:
+ http_path: /status
+ host: foo.com
+ healthy:
+ interval: 1
+ successes: 1
+ unhealthy:
+ interval: 1
+ http_failures: 2
+routes:
+ -
+ service_id: 1
+ uri: /server_port
+ upstream:
+ type: roundrobin
+ nodes:
+ "127.0.0.1:1980": 1
+ "127.0.0.1:1970": 1
+ checks:
+ active:
+ http_path: /status
+ host: foo.com
+ healthy:
+ interval: 1
+ successes: 1
+ unhealthy:
+ interval: 1
+ http_failures: 2
+#END
+--- config
+ location /t {
+ content_by_lua_block {
+ local http = require "resty.http"
+ local uri = "http://127.0.0.1:" .. ngx.var.server_port
+ .. "/server_port"
+
+ do
+ local httpc = http.new()
+ local res, err = httpc:request_uri(uri, {method = "GET",
keepalive = false})
+ end
+
+ ngx.sleep(2.5)
+
+ local ports_count = {}
+ for i = 1, 12 do
+ local httpc = http.new()
+ local res, err = httpc:request_uri(uri, {method = "GET",
keepalive = false})
+ if not res then
+ ngx.say(err)
+ return
+ end
+
+ ports_count[res.body] = (ports_count[res.body] or 0) + 1
+ end
+
+ local ports_arr = {}
+ for port, count in pairs(ports_count) do
+ table.insert(ports_arr, {port = port, count = count})
+ end
+
+ local function cmd(a, b)
+ return a.port > b.port
+ end
+ table.sort(ports_arr, cmd)
+
+ ngx.say(require("toolkit.json").encode(ports_arr))
+ ngx.exit(200)
+ }
+ }
+--- response_body
+[{"count":12,"port":"1980"}]
+--- grep_error_log eval
+qr/\([^)]+\) unhealthy .* for '.*'/
+--- grep_error_log_out
+(upstream#/routes/arr_1) unhealthy TCP increment (1/2) for
'foo.com(127.0.0.1:1970)'
+(upstream#/routes/arr_1) unhealthy TCP increment (2/2) for
'foo.com(127.0.0.1:1970)'
+--- timeout: 10