Copilot commented on code in PR #13066:
URL: https://github.com/apache/apisix/pull/13066#discussion_r2894343381
##########
apisix/discovery/consul/init.lua:
##########
@@ -66,53 +73,70 @@ local _M = {
}
-local function discovery_consul_callback(data, event, source, pid)
- all_services = data
- log.notice("update local variable all_services, event is: ", event,
- "source: ", source, "server pid:", pid,
- ", all services: ", json_delay_encode(all_services, true))
-end
-
-
function _M.all_nodes()
- return all_services
+ local keys = consul_dict:get_keys(0)
+ local services = core.table.new(0, #keys)
+ for _, key in ipairs(keys) do
+ local value = consul_dict:get(key)
+ if value then
+ local nodes, err = core.json.decode(value)
+ if nodes then
+ services[key] = nodes
+ else
+ log.error("failed to decode nodes for service: ", key, ",
error: ", err)
+ end
+ end
+ end
+ return services
end
function _M.nodes(service_name)
- if not all_services then
- log.error("all_services is nil, failed to fetch nodes for : ",
service_name)
- return
+ local value = consul_dict:get(service_name)
+ if not value then
+ log.error("consul service not found: ", service_name, ", return
default service")
+ return default_service and {default_service}
end
- local resp_list = all_services[service_name]
+ local cached = nodes_cache[service_name]
+ if cached and cached.raw == value then
+ return cached.nodes
+ end
- if not resp_list then
- log.error("fetch nodes failed by ", service_name, ", return default
service")
+ local nodes, err = core.json.decode(value)
+ if not nodes then
+ log.error("fetch nodes failed by ", service_name, ", error: ", err)
return default_service and {default_service}
end
- log.info("process id: ", ngx_worker_id(), ", all_services[", service_name,
"] = ",
- json_delay_encode(resp_list, true))
+ nodes_cache[service_name] = {raw = value, nodes = nodes}
+
+ log.info("process id: ", ngx_worker_id(), ", [", service_name, "] = ",
+ json_delay_encode(nodes, true))
- return resp_list
+ return nodes
end
local function update_all_services(consul_server_url, up_services)
-- clean old unused data
local old_services = consul_services[consul_server_url] or {}
for k, _ in pairs(old_services) do
- all_services[k] = nil
+ consul_dict:delete(k)
end
core.table.clear(old_services)
for k, v in pairs(up_services) do
- all_services[k] = v
+ local content, err = core.json.encode(v)
+ if content then
+ consul_dict:set(k, content)
+ else
Review Comment:
`update_all_services()` deletes all previously-known keys before setting the
new ones. While this runs, workers can observe a missing `service_name` in
`ngx.shared.consul` and fall back to the default (or nil), causing transient
request failures during an update window. A safer approach is to write/set new
values first (optionally under a generation/version namespace) and only then
delete obsolete keys, so reads never see an empty/missing service during an
update.
##########
apisix/discovery/consul/init.lua:
##########
@@ -621,25 +644,24 @@ function _M.init_worker()
end
end
- events = require("apisix.events")
- events_list = events:event_list(
- "discovery_consul_update_all_services",
- "updating"
- )
-
- if 0 ~= ngx_worker_id() then
- events:register(discovery_consul_callback, events_list._source,
events_list.updating)
- return
- end
-
- log.notice("consul_conf: ", json_delay_encode(consul_conf, true))
default_weight = consul_conf.weight
sort_type = consul_conf.sort_type
-- set default service, used when the server node cannot be found
if consul_conf.default_service then
default_service = consul_conf.default_service
default_service.weight = default_weight
end
+
+ if process.type() ~= "privileged agent" then
+ return
+ end
+
+ -- flush stale data that may persist across reloads,
+ -- since consul_services is re-initialized empty
+ consul_dict:flush_all()
+
Review Comment:
`consul_dict:flush_all()` runs in the privileged agent during
`init_worker()`, but it executes *after* `read_dump_services()` may have
populated the shared dict (and workers also call `read_dump_services()` before
the privileged-agent guard). This can wipe the dump-loaded nodes and leave the
dict empty until Consul fetch completes, defeating the dump-on-reload/startup
mitigation and potentially causing 503s. Consider either (a) moving the flush
earlier (before any dump load) and performing dump load only in the privileged
agent, or (b) removing `flush_all()` and instead cleaning stale keys during
`update_all_services()` in a way that survives reloads.
##########
apisix/discovery/consul/init.lua:
##########
@@ -621,25 +644,24 @@ function _M.init_worker()
end
end
Review Comment:
`read_dump_services()` now writes into `ngx.shared.consul`, but
`init_worker()` calls it in every process (workers + privileged agent). That
means multiple processes can concurrently read/encode/write the same dump data
at startup, adding contention and making the final dict contents depend on
startup ordering. Consider guarding dump load so only the privileged agent (or
a single designated worker) performs the shared-dict writes, and other workers
only read from the dict.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]