This is an automated email from the ASF dual-hosted git repository.
monkeydluffy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git
The following commit(s) were added to refs/heads/master by this push:
new 2cd71cea0 fix(consul): support to fetch only health endpoint (#9204)
2cd71cea0 is described below
commit 2cd71cea020b64864f295d22bf841ee127840f3b
Author: fabriceli <[email protected]>
AuthorDate: Thu May 4 22:55:46 2023 +0800
fix(consul): support to fetch only health endpoint (#9204)
---
apisix/discovery/consul/init.lua | 369 +++++++++++++++++++++++++++++++--------
t/discovery/consul.t | 75 ++++++++
t/discovery/consul_dump.t | 52 ++++++
3 files changed, 423 insertions(+), 73 deletions(-)
diff --git a/apisix/discovery/consul/init.lua b/apisix/discovery/consul/init.lua
index 686ab4120..ae1e4c64c 100644
--- a/apisix/discovery/consul/init.lua
+++ b/apisix/discovery/consul/init.lua
@@ -32,6 +32,14 @@ local ngx_timer_every = ngx.timer.every
local log = core.log
local json_delay_encode = core.json.delay_encode
local ngx_worker_id = ngx.worker.id
+local thread_spawn = ngx.thread.spawn
+local thread_wait = ngx.thread.wait
+local thread_kill = ngx.thread.kill
+local math_random = math.random
+local pcall = pcall
+local null = ngx.null
+local type = type
+local next = next
local all_services = core.table.new(0, 5)
local default_service
@@ -44,9 +52,15 @@ local events_list
local consul_services
local default_skip_services = {"consul"}
+local default_random_range = 5
+local default_catalog_error_index = -1
+local default_health_error_index = -2
+local watch_type_catalog = 1
+local watch_type_health = 2
+local max_retry_time = 256
local _M = {
- version = 0.2,
+ version = 0.3,
}
@@ -128,7 +142,7 @@ local function read_dump_services()
local now_time = ngx.time()
log.info("dump file last_update: ", entity.last_update, ",
dump_params.expire: ",
dump_params.expire, ", now_time: ", now_time)
- if dump_params.expire ~= 0 and (entity.last_update + dump_params.expire)
< now_time then
+ if dump_params.expire ~= 0 and (entity.last_update + dump_params.expire) <
now_time then
log.warn("dump file: ", dump_params.path, " had expired, ignored it")
return
end
@@ -145,7 +159,7 @@ local function write_dump_services()
expire = dump_params.expire, -- later need handle it
}
local data = core.json.encode(entity)
- local succ, err = util.write_file(dump_params.path, data)
+ local succ, err = util.write_file(dump_params.path, data)
if not succ then
log.error("write dump into file got error: ", err)
end
@@ -165,8 +179,9 @@ local function show_dump_file()
return 200, data
end
+
local function get_retry_delay(retry_delay)
- if not retry_delay then
+ if not retry_delay or retry_delay >= max_retry_time then
retry_delay = 1
else
retry_delay = retry_delay * 4
@@ -176,80 +191,307 @@ local function get_retry_delay(retry_delay)
end
+local function get_opts(consul_server, is_catalog)
+ local opts = {
+ host = consul_server.host,
+ port = consul_server.port,
+ connect_timeout = consul_server.connect_timeout,
+ read_timeout = consul_server.read_timeout,
+ }
+ if not consul_server.keepalive then
+ return opts
+ end
+
+ if is_catalog then
+ opts.default_args = {
+ wait = consul_server.wait_timeout, --blocked wait!=0; unblocked by
wait=0
+ index = consul_server.catalog_index,
+ }
+ else
+ opts.default_args = {
+ wait = consul_server.wait_timeout, --blocked wait!=0; unblocked by
wait=0
+ index = consul_server.health_index,
+ }
+ end
+
+ return opts
+end
+
+
+local function watch_catalog(consul_server)
+ local client = resty_consul:new(get_opts(consul_server, true))
+
+ ::RETRY::
+ local watch_result, watch_err =
client:get(consul_server.consul_watch_catalog_url)
+ local watch_error_info = (watch_err ~= nil and watch_err)
+ or ((watch_result ~= nil and watch_result.status
~= 200)
+ and watch_result.status)
+ if watch_error_info then
+ log.error("connect consul: ", consul_server.consul_server_url,
+ " by sub url: ", consul_server.consul_watch_catalog_url,
+ ", got watch result: ", json_delay_encode(watch_result),
+ ", with error: ", watch_error_info)
+
+ return watch_type_catalog, default_catalog_error_index
+ end
+
+ if consul_server.catalog_index > 0
+ and consul_server.catalog_index ==
tonumber(watch_result.headers['X-Consul-Index']) then
+ local random_delay = math_random(default_random_range)
+ log.info("watch catalog has no change, re-watch consul after ",
random_delay, " seconds")
+ core_sleep(random_delay)
+ goto RETRY
+ end
+
+ return watch_type_catalog, watch_result.headers['X-Consul-Index']
+end
+
+
+local function watch_health(consul_server)
+ local client = resty_consul:new(get_opts(consul_server, false))
+
+ ::RETRY::
+ local watch_result, watch_err =
client:get(consul_server.consul_watch_health_url)
+ local watch_error_info = (watch_err ~= nil and watch_err)
+ or ((watch_result ~= nil and watch_result.status ~= 200)
+ and watch_result.status)
+ if watch_error_info then
+ log.error("connect consul: ", consul_server.consul_server_url,
+ " by sub url: ", consul_server.consul_watch_health_url,
+ ", got watch result: ", json_delay_encode(watch_result),
+ ", with error: ", watch_error_info)
+
+ return watch_type_health, default_health_error_index
+ end
+
+ if consul_server.health_index > 0
+ and consul_server.health_index ==
tonumber(watch_result.headers['X-Consul-Index']) then
+ local random_delay = math_random(default_random_range)
+ log.info("watch health has no change, re-watch consul after ",
random_delay, " seconds")
+ core_sleep(random_delay)
+ goto RETRY
+ end
+
+ return watch_type_health, watch_result.headers['X-Consul-Index']
+end
+
+
+local function check_keepalive(consul_server, retry_delay)
+ if consul_server.keepalive then
+ local ok, err = ngx_timer_at(0, _M.connect, consul_server, retry_delay)
+ if not ok then
+ log.error("create ngx_timer_at got error: ", err)
+ return
+ end
+ end
+end
+
+
+local function update_index(consul_server, catalog_index, health_index)
+ local c_index = 0
+ local h_index = 0
+ if catalog_index ~= nil then
+ c_index = tonumber(catalog_index)
+ end
+
+ if health_index ~= nil then
+ h_index = tonumber(health_index)
+ end
+
+ if c_index > 0 then
+ consul_server.catalog_index = c_index
+ end
+
+ if h_index > 0 then
+ consul_server.health_index = h_index
+ end
+end
+
+
+local function is_not_empty(value)
+ if value == nil or value == null
+ or (type(value) == "table" and not next(value))
+ or (type(value) == "string" and value == "")
+ then
+ return false
+ end
+
+ return true
+end
+
+
+local function watch_result_is_valid(watch_type, index, catalog_index,
health_index)
+ if index <= 0 then
+ return false
+ end
+
+ if watch_type == watch_type_catalog then
+ if index == catalog_index then
+ return false
+ end
+ else
+ if index == health_index then
+ return false
+ end
+ end
+
+ return true
+end
+
+
function _M.connect(premature, consul_server, retry_delay)
if premature then
return
end
+ local catalog_thread, spawn_catalog_err = thread_spawn(watch_catalog,
consul_server)
+ if not catalog_thread then
+ local random_delay = math_random(default_random_range)
+ log.error("failed to spawn thread watch catalog: ", spawn_catalog_err,
+ ", retry connecting consul after ", random_delay, " seconds")
+ core_sleep(random_delay)
+
+ check_keepalive(consul_server, retry_delay)
+ return
+ end
+
+ local health_thread, err = thread_spawn(watch_health, consul_server)
+ if not health_thread then
+ thread_kill(catalog_thread)
+ local random_delay = math_random(default_random_range)
+ log.error("failed to spawn thread watch health: ", err, ", retry
connecting consul after ",
+ random_delay, " seconds")
+ core_sleep(random_delay)
+
+ check_keepalive(consul_server, retry_delay)
+ return
+ end
+
+ local thread_wait_ok, watch_type, index = thread_wait(catalog_thread,
health_thread)
+ thread_kill(catalog_thread)
+ thread_kill(health_thread)
+ if not thread_wait_ok then
+ local random_delay = math_random(default_random_range)
+ log.error("failed to wait thread: ", watch_type, ", retry connecting
consul after ",
+ random_delay, " seconds")
+ core_sleep(random_delay)
+
+ check_keepalive(consul_server, retry_delay)
+ return
+ end
+
+ -- double check index has changed
+ if not watch_result_is_valid(tonumber(watch_type),
+ tonumber(index), consul_server.catalog_index,
consul_server.health_index) then
+ retry_delay = get_retry_delay(retry_delay)
+ log.warn("get all svcs got err, retry connecting consul after ",
retry_delay, " seconds")
+ core_sleep(retry_delay)
+
+ check_keepalive(consul_server, retry_delay)
+ return
+ end
+
local consul_client = resty_consul:new({
host = consul_server.host,
port = consul_server.port,
connect_timeout = consul_server.connect_timeout,
read_timeout = consul_server.read_timeout,
- default_args = consul_server.default_args,
})
+ local catalog_success, catalog_res, catalog_err = pcall(function()
+ return consul_client:get(consul_server.consul_watch_catalog_url)
+ end)
+ if not catalog_success then
+ log.error("connect consul: ", consul_server.consul_server_url,
+ " by sub url: ", consul_server.consul_watch_catalog_url,
+ ", got catalog result: ", json_delay_encode(catalog_res))
+ check_keepalive(consul_server, retry_delay)
+ return
+ end
+ local catalog_error_info = (catalog_err ~= nil and catalog_err)
+ or ((catalog_res ~= nil and catalog_res.status ~= 200)
+ and catalog_res.status)
+ if catalog_error_info then
+ log.error("connect consul: ", consul_server.consul_server_url,
+ " by sub url: ", consul_server.consul_watch_catalog_url,
+ ", got catalog result: ", json_delay_encode(catalog_res),
+ ", with error: ", catalog_error_info)
- log.info("consul_server: ", json_delay_encode(consul_server, true))
- local watch_result, watch_err =
consul_client:get(consul_server.consul_watch_sub_url)
- local watch_error_info = (watch_err ~= nil and watch_err)
- or ((watch_result ~= nil and watch_result.status ~= 200)
- and watch_result.status)
- if watch_error_info then
+ retry_delay = get_retry_delay(retry_delay)
+ log.warn("get all svcs got err, retry connecting consul after ",
retry_delay, " seconds")
+ core_sleep(retry_delay)
+
+ check_keepalive(consul_server, retry_delay)
+ return
+ end
+
+ -- get health index
+ local success, health_res, health_err = pcall(function()
+ return consul_client:get(consul_server.consul_watch_health_url)
+ end)
+ if not success then
log.error("connect consul: ", consul_server.consul_server_url,
- " by sub url: ", consul_server.consul_watch_sub_url,
- ", got watch result: ", json_delay_encode(watch_result, true),
- ", with error: ", watch_error_info)
+ " by sub url: ", consul_server.consul_watch_health_url,
+ ", got health result: ", json_delay_encode(health_res))
+ check_keepalive(consul_server, retry_delay)
+ return
+ end
+ local health_error_info = (health_err ~= nil and health_err)
+ or ((health_res ~= nil and health_res.status ~= 200)
+ and health_res.status)
+ if health_error_info then
+ log.error("connect consul: ", consul_server.consul_server_url,
+ " by sub url: ", consul_server.consul_watch_health_url,
+ ", got health result: ", json_delay_encode(health_res),
+ ", with error: ", health_error_info)
retry_delay = get_retry_delay(retry_delay)
- log.warn("retry connecting consul after ", retry_delay, " seconds")
+ log.warn("get all svcs got err, retry connecting consul after ",
retry_delay, " seconds")
core_sleep(retry_delay)
- goto ERR
+ check_keepalive(consul_server, retry_delay)
+ return
end
log.info("connect consul: ", consul_server.consul_server_url,
- ", watch_result status: ", watch_result.status,
- ", watch_result.headers.index: ",
watch_result.headers['X-Consul-Index'],
+ ", catalog_result status: ", catalog_res.status,
+ ", catalog_result.headers.index: ",
catalog_res.headers['X-Consul-Index'],
", consul_server.index: ", consul_server.index,
- ", consul_server: ", json_delay_encode(consul_server, true))
-
- -- if current index different last index then update service
- if consul_server.index ~= watch_result.headers['X-Consul-Index'] then
- local up_services = core.table.new(0, #watch_result.body)
- local consul_client_svc = resty_consul:new({
- host = consul_server.host,
- port = consul_server.port,
- connect_timeout = consul_server.connect_timeout,
- read_timeout = consul_server.read_timeout,
- })
- for service_name, _ in pairs(watch_result.body) do
+ ", consul_server: ", json_delay_encode(consul_server))
+
+ -- if the current index is different from the last index, then update the
service
+ if (consul_server.catalog_index ~=
tonumber(catalog_res.headers['X-Consul-Index']))
+ or (consul_server.health_index ~=
tonumber(health_res.headers['X-Consul-Index'])) then
+ local up_services = core.table.new(0, #catalog_res.body)
+ for service_name, _ in pairs(catalog_res.body) do
-- check if the service_name is 'skip service'
if skip_service_map[service_name] then
goto CONTINUE
end
+
-- get node from service
local svc_url = consul_server.consul_sub_url .. "/" .. service_name
- local result, err = consul_client_svc:get(svc_url)
- local error_info = (err ~= nil and err) or
+ local svc_success, result, get_err = pcall(function()
+ return consul_client:get(svc_url, {passing = true})
+ end)
+ local error_info = (get_err ~= nil and get_err) or
((result ~= nil and result.status ~= 200) and
result.status)
- if error_info then
+ if not svc_success or error_info then
log.error("connect consul: ", consul_server.consul_server_url,
", by service url: ", svc_url, ", with error: ",
error_info)
goto CONTINUE
end
-- decode body, decode json, update service, error handling
- if result.body then
- log.notice("service url: ", svc_url,
- ", header: ", json_delay_encode(result.headers, true),
- ", body: ", json_delay_encode(result.body, true))
+ -- check result body is not nil and not empty
+ if is_not_empty(result.body) then
-- add services to table
local nodes = up_services[service_name]
- for _, node in ipairs(result.body) do
- local svc_address, svc_port = node.ServiceAddress,
node.ServicePort
- if not svc_address then
- svc_address = node.Address
+ for _, node in ipairs(result.body) do
+ if not node.Service then
+ goto CONTINUE
end
+
+ local svc_address, svc_port = node.Service.Address,
node.Service.Port
-- if nodes is nil, new nodes table and set to up_services
if not nodes then
nodes = core.table.new(1, 0)
@@ -270,8 +512,9 @@ function _M.connect(premature, consul_server, retry_delay)
update_all_services(consul_server.consul_server_url, up_services)
--update events
- local ok, post_err = events.post(events_list._source,
events_list.updating, all_services)
- if not ok then
+ local post_ok, post_err = events.post(events_list._source,
+ events_list.updating, all_services)
+ if not post_ok then
log.error("post_event failure with ", events_list._source,
", update all services error: ", post_err)
end
@@ -280,37 +523,17 @@ function _M.connect(premature, consul_server, retry_delay)
ngx_timer_at(0, write_dump_services)
end
- consul_server.index = watch_result.headers['X-Consul-Index']
- -- only long connect type use index
- if consul_server.keepalive then
- consul_server.default_args.index =
watch_result.headers['X-Consul-Index']
- end
+ update_index(consul_server,
+ catalog_res.headers['X-Consul-Index'],
+ health_res.headers['X-Consul-Index'])
end
- :: ERR ::
- local keepalive = consul_server.keepalive
- if keepalive then
- local ok, err = ngx_timer_at(0, _M.connect, consul_server, retry_delay)
- if not ok then
- log.error("create ngx_timer_at got error: ", err)
- return
- end
- end
+ check_keepalive(consul_server, retry_delay)
end
local function format_consul_params(consul_conf)
local consul_server_list = core.table.new(0, #consul_conf.servers)
- local args
-
- if consul_conf.keepalive == false then
- args = {}
- elseif consul_conf.keepalive then
- args = {
- wait = consul_conf.timeout.wait, --blocked wait!=0; unblocked by
wait=0
- index = 0,
- }
- end
for _, v in pairs(consul_conf.servers) do
local scheme, host, port, path = unpack(http.parse_uri(nil, v))
@@ -319,23 +542,23 @@ local function format_consul_params(consul_conf)
elseif path ~= "/" or core.string.has_suffix(v, '/') then
return nil, "invalid consul server address, the valid format:
http://address:port"
end
-
core.table.insert(consul_server_list, {
host = host,
port = port,
connect_timeout = consul_conf.timeout.connect,
read_timeout = consul_conf.timeout.read,
- consul_sub_url = "/catalog/service",
- consul_watch_sub_url = "/catalog/services",
+ wait_timeout = consul_conf.timeout.wait,
+ consul_watch_catalog_url = "/catalog/services",
+ consul_sub_url = "/health/service",
+ consul_watch_health_url = "/health/state/any",
consul_server_url = v .. "/v1",
weight = consul_conf.weight,
keepalive = consul_conf.keepalive,
- default_args = args,
- index = 0,
+ health_index = 0,
+ catalog_index = 0,
fetch_interval = consul_conf.fetch_interval -- fetch interval to
next connect consul
})
end
-
return consul_server_list, nil
end
diff --git a/t/discovery/consul.t b/t/discovery/consul.t
index cf97e0ce8..57a6ab5b4 100644
--- a/t/discovery/consul.t
+++ b/t/discovery/consul.t
@@ -582,3 +582,78 @@ GET /thc
[{"ip":"127.0.0.1","port":30513,"status":"healthy"},{"ip":"127.0.0.1","port":30514,"status":"healthy"}]
[{"ip":"127.0.0.1","port":30513,"status":"healthy"},{"ip":"127.0.0.1","port":30514,"status":"healthy"}]
--- ignore_error_log
+
+
+
+=== TEST 13: test consul catalog service change
+--- yaml_config
+apisix:
+ node_listen: 1984
+deployment:
+ role: data_plane
+ role_data_plane:
+ config_provider: yaml
+discovery:
+ consul:
+ servers:
+ - "http://127.0.0.1:8500"
+ keepalive: false
+ fetch_interval: 3
+ default_service:
+ host: "127.0.0.1"
+ port: 20999
+#END
+--- apisix_yaml
+routes:
+ -
+ uri: /*
+ upstream:
+ service_name: service_a
+ discovery_type: consul
+ type: roundrobin
+#END
+--- config
+location /v1/agent {
+ proxy_pass http://127.0.0.1:8500;
+}
+
+location /sleep {
+ content_by_lua_block {
+ local args = ngx.req.get_uri_args()
+ local sec = args.sec or "2"
+ ngx.sleep(tonumber(sec))
+ ngx.say("ok")
+ }
+}
+--- timeout: 6
+--- request eval
+[
+ "PUT /v1/agent/service/deregister/service_a1",
+ "GET /sleep?sec=3",
+ "GET /hello",
+ "PUT /v1/agent/service/register\n" .
"{\"ID\":\"service_a1\",\"Name\":\"service_a\",\"Tags\":[\"primary\",\"v1\"],\"Address\":\"127.0.0.1\",\"Port\":30511,\"Meta\":{\"service_a_version\":\"4.0\"},\"EnableTagOverride\":false,\"Weights\":{\"Passing\":10,\"Warning\":1}}",
+ "GET /sleep?sec=5",
+ "GET /hello",
+ "PUT /v1/agent/service/deregister/service_a1",
+ "GET /sleep?sec=5",
+ "GET /hello",
+ "PUT /v1/agent/service/register\n" .
"{\"ID\":\"service_a1\",\"Name\":\"service_a\",\"Tags\":[\"primary\",\"v1\"],\"Address\":\"127.0.0.1\",\"Port\":30511,\"Meta\":{\"service_a_version\":\"4.0\"},\"EnableTagOverride\":false,\"Weights\":{\"Passing\":10,\"Warning\":1}}",
+ "GET /sleep?sec=5",
+ "GET /hello",
+]
+--- response_body_like eval
+[
+ qr//,
+ qr/ok\n/,
+ qr/missing consul services\n/,
+ qr//,
+ qr/ok\n/,
+ qr/server 1\n/,
+ qr//,
+ qr/ok\n/,
+ qr/missing consul services\n/,
+ qr//,
+ qr/ok\n/,
+ qr/server 1\n/,
+]
+--- ignore_error_log
diff --git a/t/discovery/consul_dump.t b/t/discovery/consul_dump.t
index 366c76a98..c74e5b150 100644
--- a/t/discovery/consul_dump.t
+++ b/t/discovery/consul_dump.t
@@ -451,3 +451,55 @@ discovery:
GET /bonjour
--- response_body
{"service_a":[{"host":"127.0.0.1","port":30511,"weight":1}],"service_b":[{"host":"127.0.0.1","port":30517,"weight":1}]}
+
+
+
+=== TEST 16: prepare nodes with consul health check
+--- config
+location /v1/agent {
+ proxy_pass http://127.0.0.1:8500;
+}
+--- request eval
+[
+ "PUT /v1/agent/service/deregister/service_a1",
+ "PUT /v1/agent/service/deregister/service_a2",
+ "PUT /v1/agent/service/deregister/service_b1",
+ "PUT /v1/agent/service/deregister/service_b2",
+ "PUT /v1/agent/service/register\n" . "{\"Checks\": [{\"http\":
\"http://baidu.com\",\"interval\":
\"1s\"}],\"ID\":\"service_a1\",\"Name\":\"service_a\",\"Tags\":[\"primary\",\"v1\"],\"Address\":\"127.0.0.1\",\"Port\":30511,\"Meta\":{\"service_a_version\":\"4.0\"},\"EnableTagOverride\":false,\"Weights\":{\"Passing\":10,\"Warning\":1}}",
+ "PUT /v1/agent/service/register\n" . "{\"Checks\": [{\"http\":
\"http://127.0.0.1:8002\",\"interval\":
\"1s\"}],\"ID\":\"service_b1\",\"Name\":\"service_b\",\"Tags\":[\"primary\",\"v1\"],\"Address\":\"127.0.0.1\",\"Port\":8002,\"Meta\":{\"service_b_version\":\"4.1\"},\"EnableTagOverride\":false,\"Weights\":{\"Passing\":10,\"Warning\":1}}",
+]
+--- response_body eval
+--- error_code eval
+[200, 200, 200, 200, 200, 200]
+
+
+
+=== TEST 17: show dump services with consul health check
+--- yaml_config
+apisix:
+ node_listen: 1984
+ enable_control: true
+discovery:
+ consul:
+ servers:
+ - "http://127.0.0.1:8500"
+ dump:
+ path: "consul.dump"
+ load_on_init: false
+--- config
+ location /t {
+ content_by_lua_block {
+ local json = require("toolkit.json")
+ local t = require("lib.test_admin")
+ ngx.sleep(2)
+ local code, body, res =
t.test('/v1/discovery/consul/show_dump_file',
+ ngx.HTTP_GET)
+ local entity = json.decode(res)
+ ngx.say(json.encode(entity.services))
+ }
+ }
+--- timeout: 3
+--- request
+GET /t
+--- response_body
+{"service_a":[{"host":"127.0.0.1","port":30511,"weight":1}]}