This is an automated email from the ASF dual-hosted git repository.
shreemaanabhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git
The following commit(s) were added to refs/heads/master by this push:
new a0552823c fix: use shdict instead of events module for nodes data
exchange (#13066)
a0552823c is described below
commit a0552823c05d9f31707e34ce50fa4b9ba5792a8c
Author: Shreemaan Abhishek <[email protected]>
AuthorDate: Fri Mar 13 15:08:00 2026 +0545
fix: use shdict instead of events module for nodes data exchange (#13066)
---
apisix/cli/ops.lua | 10 ++
apisix/discovery/consul/init.lua | 181 ++++++++++++++++++++++++-------------
apisix/discovery/consul/schema.lua | 5 +
t/APISIX.pm | 1 +
t/discovery/consul_dump.t | 2 +-
5 files changed, 137 insertions(+), 62 deletions(-)
diff --git a/apisix/cli/ops.lua b/apisix/cli/ops.lua
index f5e9beb17..c7435ee08 100644
--- a/apisix/cli/ops.lua
+++ b/apisix/cli/ops.lua
@@ -767,6 +767,16 @@ Please modify "admin_key" in conf/config.yaml .
end
+ -- inject consul discovery shared dict
+ if enabled_discoveries["consul"] then
+ if not sys_conf["discovery_shared_dicts"] then
+ sys_conf["discovery_shared_dicts"] = {}
+ end
+
+ local consul_conf = yaml_conf.discovery["consul"]
+ sys_conf["discovery_shared_dicts"]["consul"] = consul_conf.shared_size
or "10m"
+ end
+
-- fix up lua path
sys_conf["extra_lua_path"] = get_lua_path(yaml_conf.apisix.extra_lua_path)
sys_conf["extra_lua_cpath"] =
get_lua_path(yaml_conf.apisix.extra_lua_cpath)
diff --git a/apisix/discovery/consul/init.lua b/apisix/discovery/consul/init.lua
index 4d3c0e46b..c27330812 100644
--- a/apisix/discovery/consul/init.lua
+++ b/apisix/discovery/consul/init.lua
@@ -31,6 +31,7 @@ local ngx_timer_at = ngx.timer.at
local ngx_timer_every = ngx.timer.every
local log = core.log
local json_delay_encode = core.json.delay_encode
+local process = require("ngx.process")
local ngx_worker_id = ngx.worker.id
local exiting = ngx.worker.exiting
local thread_spawn = ngx.thread.spawn
@@ -42,16 +43,28 @@ local null = ngx.null
local type = type
local next = next
-local all_services = core.table.new(0, 5)
+local consul_dict = ngx.shared.consul
+if not consul_dict then
+ error("lua_shared_dict \"consul\" not configured")
+end
+
local default_service
local default_weight
local sort_type
local skip_service_map = core.table.new(0, 1)
local dump_params
-local events
-local events_list
local consul_services
+-- Per-worker LRU cache: avoids shared dict access on every request.
+-- neg_ttl caches unknown services. invalid_stale ensures expired
+-- entries are refreshed from the shared dict instead of re-cached.
+local nodes_cache = core.lrucache.new({
+ ttl = 1,
+ count = 1024,
+ invalid_stale = true,
+ neg_ttl = 1,
+ neg_count = 64,
+})
local default_skip_services = {"consul"}
local default_random_range = 5
@@ -66,53 +79,94 @@ local _M = {
}
-local function discovery_consul_callback(data, event, source, pid)
- all_services = data
- log.notice("update local variable all_services, event is: ", event,
- "source: ", source, "server pid:", pid,
- ", all services: ", json_delay_encode(all_services, true))
-end
+local function fetch_node_from_shdict(service_name)
+ local value = consul_dict:get(service_name)
+ if not value then
+ return nil, "consul service not found: " .. service_name
+ end
+ local nodes, err = core.json.decode(value)
+ if not nodes then
+ return nil, "failed to decode nodes for service: "
+ .. service_name .. ", error: " .. (err or "")
+ end
-function _M.all_nodes()
- return all_services
+ return nodes
end
-function _M.nodes(service_name)
- if not all_services then
- log.error("all_services is nil, failed to fetch nodes for : ",
service_name)
- return
+function _M.all_nodes()
+ local keys = consul_dict:get_keys(0)
+ local services = core.table.new(0, #keys)
+ for i, key in ipairs(keys) do
+ local value = consul_dict:get(key)
+ if value then
+ local nodes, err = core.json.decode(value)
+ if nodes then
+ services[key] = nodes
+ else
+ log.error("failed to decode nodes for service: ", key, ",
error: ", err)
+ end
+ end
+
+ if i % 100 == 0 then
+ ngx.sleep(0)
+ end
end
+ return services
+end
- local resp_list = all_services[service_name]
- if not resp_list then
- log.error("fetch nodes failed by ", service_name, ", return default
service")
+function _M.nodes(service_name)
+ local nodes, err = nodes_cache(service_name, nil,
+ fetch_node_from_shdict, service_name)
+ if not nodes then
+ log.error("fetch nodes failed by ", service_name, ", error: ", err)
return default_service and {default_service}
end
- log.info("process id: ", ngx_worker_id(), ", all_services[", service_name,
"] = ",
- json_delay_encode(resp_list, true))
+ log.info("process id: ", ngx_worker_id(), ", [", service_name, "] = ",
+ json_delay_encode(nodes, true))
- return resp_list
+ return nodes
end
local function update_all_services(consul_server_url, up_services)
- -- clean old unused data
+ -- write new/updated values first so readers never see a missing service
+ local i = 0
+ for k, v in pairs(up_services) do
+ local content, err = core.json.encode(v)
+ if content then
+ local ok, set_err, forcible = consul_dict:set(k, content)
+ if not ok then
+ log.error("failed to set nodes for service: ", k, ", error: ",
set_err,
+ ", please consider increasing lua_shared_dict consul
size")
+ elseif forcible then
+ log.warn("consul shared dict is full, forcibly evicting items
while ",
+ "setting nodes for service: ", k,
+ ", please consider increasing lua_shared_dict consul
size")
+ end
+ else
+ log.error("failed to encode nodes for service: ", k, ", error: ",
err)
+ end
+ i = i + 1
+ if i % 100 == 0 then
+ ngx.sleep(0)
+ end
+ end
+
+ -- then delete keys that are no longer present
local old_services = consul_services[consul_server_url] or {}
for k, _ in pairs(old_services) do
- all_services[k] = nil
+ if not up_services[k] then
+ consul_dict:delete(k)
+ end
end
- core.table.clear(old_services)
- for k, v in pairs(up_services) do
- all_services[k] = v
- end
consul_services[consul_server_url] = up_services
- log.info("update all services: ", json_delay_encode(all_services, true))
+ log.info("update all services to shared dict")
end
@@ -149,14 +203,30 @@ local function read_dump_services()
return
end
- all_services = entity.services
- log.info("load dump file into memory success")
+ for k, v in pairs(entity.services) do
+ local content, json_err = core.json.encode(v)
+ if content then
+ consul_dict:set(k, content)
+ else
+ log.error("failed to encode dump service: ", k, ", error: ",
json_err)
+ end
+ end
+ log.info("load dump file into shared dict success")
end
local function write_dump_services()
+ -- build services from the privileged agent's in-memory tracking table
+ -- to avoid a full shared dict scan + JSON decode via _M.all_nodes()
+ local services = core.table.new(0, 8)
+ for _, svcs in pairs(consul_services) do
+ for k, v in pairs(svcs) do
+ services[k] = v
+ end
+ end
+
local entity = {
- services = all_services,
+ services = services,
last_update = ngx.time(),
expire = dump_params.expire, -- later need handle it
}
@@ -556,14 +626,6 @@ function _M.connect(premature, consul_server, retry_delay)
update_all_services(consul_server.consul_server_url, up_services)
- --update events
- local post_ok, post_err = events:post(events_list._source,
- events_list.updating, all_services)
- if not post_ok then
- log.error("post_event failure with ", events_list._source,
- ", update all services error: ", post_err)
- end
-
if dump_params then
ngx_timer_at(0, write_dump_services)
end
@@ -611,28 +673,8 @@ end
function _M.init_worker()
local consul_conf = local_conf.discovery.consul
+ dump_params = consul_conf.dump
- if consul_conf.dump then
- local dump = consul_conf.dump
- dump_params = dump
-
- if dump.load_on_init then
- read_dump_services()
- end
- end
-
- events = require("apisix.events")
- events_list = events:event_list(
- "discovery_consul_update_all_services",
- "updating"
- )
-
- if 0 ~= ngx_worker_id() then
- events:register(discovery_consul_callback, events_list._source,
events_list.updating)
- return
- end
-
- log.notice("consul_conf: ", json_delay_encode(consul_conf, true))
default_weight = consul_conf.weight
sort_type = consul_conf.sort_type
-- set default service, used when the server node cannot be found
@@ -640,6 +682,23 @@ function _M.init_worker()
default_service = consul_conf.default_service
default_service.weight = default_weight
end
+
+ if process.type() ~= "privileged agent" then
+ return
+ end
+
+ -- flush stale data that may persist across reloads,
+ -- since consul_services is re-initialized empty
+ consul_dict:flush_all()
+
+ if consul_conf.dump then
+ if consul_conf.dump.load_on_init then
+ read_dump_services()
+ end
+ end
+
+ log.notice("consul_conf: ", json_delay_encode(consul_conf, true))
+
if consul_conf.skip_services then
skip_service_map = core.table.new(0, #consul_conf.skip_services)
for _, v in ipairs(consul_conf.skip_services) do
@@ -673,7 +732,7 @@ end
function _M.dump_data()
- return {config = local_conf.discovery.consul, services = all_services }
+ return {config = local_conf.discovery.consul, services = _M.all_nodes()}
end
diff --git a/apisix/discovery/consul/schema.lua
b/apisix/discovery/consul/schema.lua
index 5d6fc641e..06fa9cabf 100644
--- a/apisix/discovery/consul/schema.lua
+++ b/apisix/discovery/consul/schema.lua
@@ -24,6 +24,11 @@ return {
type = "string",
}
},
+ shared_size = {
+ type = "string",
+ pattern = [[^[1-9][0-9]*m$]],
+ default = "1m",
+ },
token = {type = "string", default = ""},
fetch_interval = {type = "integer", minimum = 1, default = 3},
keepalive = {
diff --git a/t/APISIX.pm b/t/APISIX.pm
index 5aa54a1af..ac0edef49 100644
--- a/t/APISIX.pm
+++ b/t/APISIX.pm
@@ -294,6 +294,7 @@ lua {
lua_shared_dict standalone-config 10m;
lua_shared_dict status-report 1m;
lua_shared_dict nacos 10m;
+ lua_shared_dict consul 10m;
lua_shared_dict upstream-healthcheck 10m;
}
_EOC_
diff --git a/t/discovery/consul_dump.t b/t/discovery/consul_dump.t
index 9cb24a3c1..030ddf2f4 100644
--- a/t/discovery/consul_dump.t
+++ b/t/discovery/consul_dump.t
@@ -230,7 +230,7 @@ GET /hello
--- error_code: 503
--- error_log
connect consul
-fetch nodes failed
+consul service not found
failed to set upstream