This is an automated email from the ASF dual-hosted git repository.
spacewander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git
The following commit(s) were added to refs/heads/master by this push:
new bbff579e9 fix: fix fetch all service info from consul (#8651)
bbff579e9 is described below
commit bbff579e9af19e569fc229f3b61664aa0a1d1c93
Author: fabriceli <[email protected]>
AuthorDate: Wed Feb 8 09:55:20 2023 +0800
fix: fix fetch all service info from consul (#8651)
Co-authored-by: Fabriceli <[email protected]>
---
apisix/discovery/consul/init.lua | 159 +++++++++++++++++++------------------
conf/config-default.yaml | 7 +-
docs/en/latest/discovery/consul.md | 4 +-
t/discovery/consul_dump.t | 2 +-
4 files changed, 89 insertions(+), 83 deletions(-)
diff --git a/apisix/discovery/consul/init.lua b/apisix/discovery/consul/init.lua
index dd8275e7d..686ab4120 100644
--- a/apisix/discovery/consul/init.lua
+++ b/apisix/discovery/consul/init.lua
@@ -43,16 +43,18 @@ local events
local events_list
local consul_services
+local default_skip_services = {"consul"}
+
local _M = {
- version = 0.1,
+ version = 0.2,
}
local function discovery_consul_callback(data, event, source, pid)
all_services = data
log.notice("update local variable all_services, event is: ", event,
- "source: ", source, "server pid:", pid,
- ", all services: ", json_delay_encode(all_services, true))
+ "source: ", source, "server pid:", pid,
+ ", all services: ", json_delay_encode(all_services, true))
end
@@ -75,44 +77,15 @@ function _M.nodes(service_name)
end
log.info("process id: ", ngx_worker_id(), ", all_services[", service_name,
"] = ",
- json_delay_encode(resp_list, true))
+ json_delay_encode(resp_list, true))
return resp_list
end
-local function parse_instance(node)
- local service_name, host, port = node.Service, node.Address, node.Port
- -- if exist, skip special service name
- if service_name and skip_service_map[service_name] then
- return false
- end
- -- "" means metadata of the service
- return true, host, tonumber(port), "", service_name
-end
-
-
-local function update_all_services(server_name_prefix, data)
- local up_services = core.table.new(0, #data)
- local weight = default_weight
- for _, node in pairs(data) do
- local succ, ip, port, metadata, server_name = parse_instance(node)
- if succ then
- local nodes = up_services[server_name]
- if not nodes then
- nodes = core.table.new(1, 0)
- up_services[server_name] = nodes
- end
- core.table.insert(nodes, {
- host = ip,
- port = port,
- weight = metadata and metadata.weight or weight,
- })
- end
- end
-
+local function update_all_services(consul_server_url, up_services)
-- clean old unused data
- local old_services = consul_services[server_name_prefix] or {}
+ local old_services = consul_services[consul_server_url] or {}
for k, _ in pairs(old_services) do
all_services[k] = nil
end
@@ -121,7 +94,7 @@ local function update_all_services(server_name_prefix, data)
for k, v in pairs(up_services) do
all_services[k] = v
end
- consul_services[server_name_prefix] = up_services
+ consul_services[consul_server_url] = up_services
log.info("update all services: ", json_delay_encode(all_services, true))
end
@@ -154,7 +127,7 @@ local function read_dump_services()
local now_time = ngx.time()
log.info("dump file last_update: ", entity.last_update, ",
dump_params.expire: ",
- dump_params.expire, ", now_time: ", now_time)
+ dump_params.expire, ", now_time: ", now_time)
if dump_params.expire ~= 0 and (entity.last_update + dump_params.expire)
< now_time then
log.warn("dump file: ", dump_params.path, " had expired, ignored it")
return
@@ -223,9 +196,9 @@ function _M.connect(premature, consul_server, retry_delay)
and watch_result.status)
if watch_error_info then
log.error("connect consul: ", consul_server.consul_server_url,
- " by sub url: ", consul_server.consul_watch_sub_url,
- ", got watch result: ", json_delay_encode(watch_result, true),
- ", with error: ", watch_error_info)
+ " by sub url: ", consul_server.consul_watch_sub_url,
+ ", got watch result: ", json_delay_encode(watch_result, true),
+ ", with error: ", watch_error_info)
retry_delay = get_retry_delay(retry_delay)
log.warn("retry connecting consul after ", retry_delay, " seconds")
@@ -235,31 +208,76 @@ function _M.connect(premature, consul_server, retry_delay)
end
log.info("connect consul: ", consul_server.consul_server_url,
- ", watch_result status: ", watch_result.status,
- ", watch_result.headers.index: ",
watch_result.headers['X-Consul-Index'],
- ", consul_server.index: ", consul_server.index,
- ", consul_server: ", json_delay_encode(consul_server, true))
+ ", watch_result status: ", watch_result.status,
+ ", watch_result.headers.index: ",
watch_result.headers['X-Consul-Index'],
+ ", consul_server.index: ", consul_server.index,
+ ", consul_server: ", json_delay_encode(consul_server, true))
-- if current index different last index then update service
if consul_server.index ~= watch_result.headers['X-Consul-Index'] then
+ local up_services = core.table.new(0, #watch_result.body)
+ local consul_client_svc = resty_consul:new({
+ host = consul_server.host,
+ port = consul_server.port,
+ connect_timeout = consul_server.connect_timeout,
+ read_timeout = consul_server.read_timeout,
+ })
+ for service_name, _ in pairs(watch_result.body) do
+ -- check if the service_name is 'skip service'
+ if skip_service_map[service_name] then
+ goto CONTINUE
+ end
+ -- get node from service
+ local svc_url = consul_server.consul_sub_url .. "/" .. service_name
+ local result, err = consul_client_svc:get(svc_url)
+ local error_info = (err ~= nil and err) or
+ ((result ~= nil and result.status ~= 200) and
result.status)
+ if error_info then
+ log.error("connect consul: ", consul_server.consul_server_url,
+ ", by service url: ", svc_url, ", with error: ",
error_info)
+ goto CONTINUE
+ end
- -- fetch all services info
- local result, err = consul_client:get(consul_server.consul_sub_url)
-
- local error_info = (err ~= nil and err) or
- ((result ~= nil and result.status ~= 200) and result.status)
+ -- decode body, decode json, update service, error handling
+ if result.body then
+ log.notice("service url: ", svc_url,
+ ", header: ", json_delay_encode(result.headers, true),
+ ", body: ", json_delay_encode(result.body, true))
+ -- add services to table
+ local nodes = up_services[service_name]
+ for _, node in ipairs(result.body) do
+ local svc_address, svc_port = node.ServiceAddress,
node.ServicePort
+ if not svc_address then
+ svc_address = node.Address
+ end
+ -- if nodes is nil, new nodes table and set to up_services
+ if not nodes then
+ nodes = core.table.new(1, 0)
+ up_services[service_name] = nodes
+ end
+ -- add node to nodes table
+ core.table.insert(nodes, {
+ host = svc_address,
+ port = tonumber(svc_port),
+ weight = default_weight,
+ })
+ end
+ up_services[service_name] = nodes
+ end
+ :: CONTINUE ::
+ end
- if error_info then
- log.error("connect consul: ", consul_server.consul_server_url,
- " by sub url: ", consul_server.consul_sub_url,
- ", got result: ", json_delay_encode(result, true),
- ", with error: ", error_info)
+ update_all_services(consul_server.consul_server_url, up_services)
- retry_delay = get_retry_delay(retry_delay)
- log.warn("retry connecting consul after ", retry_delay, " seconds")
- core_sleep(retry_delay)
+ --update events
+ local ok, post_err = events.post(events_list._source,
events_list.updating, all_services)
+ if not ok then
+ log.error("post_event failure with ", events_list._source,
+ ", update all services error: ", post_err)
+ end
- goto ERR
+ if dump_params then
+ ngx_timer_at(0, write_dump_services)
end
consul_server.index = watch_result.headers['X-Consul-Index']
@@ -267,23 +285,6 @@ function _M.connect(premature, consul_server, retry_delay)
if consul_server.keepalive then
consul_server.default_args.index =
watch_result.headers['X-Consul-Index']
end
- -- decode body, decode json, update service, error handling
- if result.body then
- log.notice("server_name: ", consul_server.consul_server_url,
- ", header: ", json_delay_encode(result.headers, true),
- ", body: ", json_delay_encode(result.body, true))
- update_all_services(consul_server.consul_server_url, result.body)
- --update events
- local ok, err = events.post(events_list._source,
events_list.updating, all_services)
- if not ok then
- log.error("post_event failure with ", events_list._source,
- ", update all services error: ", err)
- end
-
- if dump_params then
- ngx_timer_at(0, write_dump_services)
- end
- end
end
:: ERR ::
@@ -324,7 +325,7 @@ local function format_consul_params(consul_conf)
port = port,
connect_timeout = consul_conf.timeout.connect,
read_timeout = consul_conf.timeout.read,
- consul_sub_url = "/agent/services",
+ consul_sub_url = "/catalog/service",
consul_watch_sub_url = "/catalog/services",
consul_server_url = v .. "/v1",
weight = consul_conf.weight,
@@ -375,10 +376,14 @@ function _M.init_worker()
skip_service_map[v] = true
end
end
+ -- set up default skip service
+ for _, v in ipairs(default_skip_services) do
+ skip_service_map[v] = true
+ end
local consul_servers_list, err = format_consul_params(consul_conf)
if err then
- error(err)
+ error("format consul config got error: " .. err)
end
log.info("consul_server_list: ", json_delay_encode(consul_servers_list,
true))
diff --git a/conf/config-default.yaml b/conf/config-default.yaml
index 43bbad005..3fcd7505b 100755
--- a/conf/config-default.yaml
+++ b/conf/config-default.yaml
@@ -305,11 +305,11 @@ nginx_config: # config for render the
template to generate n
# path: "logs/consul_kv.dump"
# expire: 2592000 # unit sec, here is 30 day
# consul:
-# servers:
-# - "http://127.0.0.1:8500"
+# servers: # make sure service name is unique in
these consul servers
+# - "http://127.0.0.1:8500" # `http://127.0.0.1:8500` and
`http://127.0.0.1:8600` are different clusters
# - "http://127.0.0.1:8600"
# skip_services: # if you need to skip special services
-# - "service_a"
+# - "service_a" # `consul` service is default skip
service
# timeout:
# connect: 2000 # default 2000 ms
# read: 2000 # default 2000 ms
@@ -327,6 +327,7 @@ nginx_config: # config for render the
template to generate n
# dump: # if you need, when registered nodes
updated can dump into file
# path: "logs/consul.dump"
# expire: 2592000 # unit sec, here is 30 day
+# load_on_init: true # default true, load the consul dump file
on init
# kubernetes:
# ### kubernetes service discovery both support single-cluster and
multi-cluster mode
# ### applicable to the case where the service is distributed in a single
or multiple kubernetes clusters.
diff --git a/docs/en/latest/discovery/consul.md
b/docs/en/latest/discovery/consul.md
index 2db7af144..e31a83690 100644
--- a/docs/en/latest/discovery/consul.md
+++ b/docs/en/latest/discovery/consul.md
@@ -35,8 +35,8 @@ First of all, we need to add following configuration in
`conf/config.yaml` :
discovery:
consul:
servers: # make sure service name is unique in these
consul servers
- - "http://127.0.0.1:8500"
- - "http://127.0.0.1:8600" # `http://127.0.0.1:8500` and
`http://127.0.0.1:8600` are different clusters
+ - "http://127.0.0.1:8500" # `http://127.0.0.1:8500` and
`http://127.0.0.1:8600` are different clusters
+ - "http://127.0.0.1:8600" # `consul` service is default skip service
skip_services: # if you need to skip special services
- "service_a"
timeout:
diff --git a/t/discovery/consul_dump.t b/t/discovery/consul_dump.t
index b229fb7a4..366c76a98 100644
--- a/t/discovery/consul_dump.t
+++ b/t/discovery/consul_dump.t
@@ -77,7 +77,7 @@ discovery:
- "http://127.0.0.1:8500"
dump:
path: "consul.dump"
- load_on_init: true
+ load_on_init: false
--- config
location /t {
content_by_lua_block {