Copilot commented on code in PR #13066:
URL: https://github.com/apache/apisix/pull/13066#discussion_r2894343381


##########
apisix/discovery/consul/init.lua:
##########
@@ -66,53 +73,70 @@ local _M = {
 }
 
 
-local function discovery_consul_callback(data, event, source, pid)
-    all_services = data
-    log.notice("update local variable all_services, event is: ", event,
-        "source: ", source, "server pid:", pid,
-        ", all services: ", json_delay_encode(all_services, true))
-end
-
-
 function _M.all_nodes()
-    return all_services
+    local keys = consul_dict:get_keys(0)
+    local services = core.table.new(0, #keys)
+    for _, key in ipairs(keys) do
+        local value = consul_dict:get(key)
+        if value then
+            local nodes, err = core.json.decode(value)
+            if nodes then
+                services[key] = nodes
+            else
+                log.error("failed to decode nodes for service: ", key, ", 
error: ", err)
+            end
+        end
+    end
+    return services
 end
 
 
 function _M.nodes(service_name)
-    if not all_services then
-        log.error("all_services is nil, failed to fetch nodes for : ", 
service_name)
-        return
+    local value = consul_dict:get(service_name)
+    if not value then
+        log.error("consul service not found: ", service_name, ", return 
default service")
+        return default_service and {default_service}
     end
 
-    local resp_list = all_services[service_name]
+    local cached = nodes_cache[service_name]
+    if cached and cached.raw == value then
+        return cached.nodes
+    end
 
-    if not resp_list then
-        log.error("fetch nodes failed by ", service_name, ", return default 
service")
+    local nodes, err = core.json.decode(value)
+    if not nodes then
+        log.error("fetch nodes failed by ", service_name, ", error: ", err)
         return default_service and {default_service}
     end
 
-    log.info("process id: ", ngx_worker_id(), ", all_services[", service_name, 
"] = ",
-        json_delay_encode(resp_list, true))
+    nodes_cache[service_name] = {raw = value, nodes = nodes}
+
+    log.info("process id: ", ngx_worker_id(), ", [", service_name, "] = ",
+        json_delay_encode(nodes, true))
 
-    return resp_list
+    return nodes
 end
 
 
 local function update_all_services(consul_server_url, up_services)
     -- clean old unused data
     local old_services = consul_services[consul_server_url] or {}
     for k, _ in pairs(old_services) do
-        all_services[k] = nil
+        consul_dict:delete(k)
     end
     core.table.clear(old_services)
 
     for k, v in pairs(up_services) do
-        all_services[k] = v
+        local content, err = core.json.encode(v)
+        if content then
+            consul_dict:set(k, content)
+        else

Review Comment:
   `update_all_services()` deletes all previously-known keys before setting the 
new ones. While this runs, workers can observe a missing `service_name` in 
`ngx.shared.consul` and fall back to the default (or nil), causing transient 
request failures during an update window. A safer approach is to write/set new 
values first (optionally under a generation/version namespace) and only then 
delete obsolete keys, so reads never see an empty/missing service during an 
update.



##########
apisix/discovery/consul/init.lua:
##########
@@ -621,25 +644,24 @@ function _M.init_worker()
         end
     end
 
-    events = require("apisix.events")
-    events_list = events:event_list(
-            "discovery_consul_update_all_services",
-            "updating"
-    )
-
-    if 0 ~= ngx_worker_id() then
-        events:register(discovery_consul_callback, events_list._source, 
events_list.updating)
-        return
-    end
-
-    log.notice("consul_conf: ", json_delay_encode(consul_conf, true))
     default_weight = consul_conf.weight
     sort_type = consul_conf.sort_type
     -- set default service, used when the server node cannot be found
     if consul_conf.default_service then
         default_service = consul_conf.default_service
         default_service.weight = default_weight
     end
+
+    if process.type() ~= "privileged agent" then
+        return
+    end
+
+    -- flush stale data that may persist across reloads,
+    -- since consul_services is re-initialized empty
+    consul_dict:flush_all()
+

Review Comment:
   `consul_dict:flush_all()` runs in the privileged agent during 
`init_worker()`, but it executes *after* `read_dump_services()` may have 
populated the shared dict (and workers also call `read_dump_services()` before 
the privileged-agent guard). This can wipe the dump-loaded nodes and leave the 
dict empty until Consul fetch completes, defeating the dump-on-reload/startup 
mitigation and potentially causing 503s. Consider either (a) moving the flush 
earlier (before any dump load) and performing dump load only in the privileged 
agent, or (b) removing `flush_all()` and instead cleaning stale keys during 
`update_all_services()` in a way that survives reloads.



##########
apisix/discovery/consul/init.lua:
##########
@@ -621,25 +644,24 @@ function _M.init_worker()
         end
     end

Review Comment:
   `read_dump_services()` now writes into `ngx.shared.consul`, but 
`init_worker()` calls it in every process (workers + privileged agent). That 
means multiple processes can concurrently read/encode/write the same dump data 
at startup, adding contention and making the final dict contents depend on 
startup ordering. Consider guarding dump load so only the privileged agent (or 
a single designated worker) performs the shared-dict writes, and other workers 
only read from the dict.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to