Copilot commented on code in PR #13066:
URL: https://github.com/apache/apisix/pull/13066#discussion_r2893100639


##########
apisix/discovery/consul/init.lua:
##########
@@ -66,53 +69,63 @@ local _M = {
 }
 
 
-local function discovery_consul_callback(data, event, source, pid)
-    all_services = data
-    log.notice("update local variable all_services, event is: ", event,
-        "source: ", source, "server pid:", pid,
-        ", all services: ", json_delay_encode(all_services, true))
-end
-
-
 function _M.all_nodes()
-    return all_services
+    local keys = consul_dict:get_keys(0)
+    local services = core.table.new(0, #keys)
+    for _, key in ipairs(keys) do
+        local value = consul_dict:get(key)
+        if value then
+            local nodes, err = core.json.decode(value)
+            if nodes then
+                services[key] = nodes
+            else
+                log.error("failed to decode nodes for service: ", key, ", 
error: ", err)
+            end
+        end
+    end
+    return services
 end
 
 
 function _M.nodes(service_name)
-    if not all_services then
-        log.error("all_services is nil, failed to fetch nodes for : ", 
service_name)
-        return
+    local value = consul_dict:get(service_name)
+    if not value then
+        log.error("consul service not found: ", service_name, ", return 
default service")
+        return default_service and {default_service}
     end
 
-    local resp_list = all_services[service_name]
-
-    if not resp_list then
-        log.error("fetch nodes failed by ", service_name, ", return default 
service")
+    local nodes, err = core.json.decode(value)
+    if not nodes then
+        log.error("fetch nodes failed by ", service_name, ", error: ", err)
         return default_service and {default_service}
     end
 
     log.info("process id: ", ngx_worker_id(), ", all_services[", service_name, 
"] = ",
-        json_delay_encode(resp_list, true))
+        json_delay_encode(nodes, true))

Review Comment:
   The log message still references `all_services[...]`, but the data source is 
now `ngx.shared.consul` and a decoded `nodes` table. Updating the message would 
avoid confusion when debugging (especially since `all_services` no longer 
exists).



##########
apisix/discovery/consul/init.lua:
##########
@@ -66,53 +69,63 @@ local _M = {
 }
 
 
-local function discovery_consul_callback(data, event, source, pid)
-    all_services = data
-    log.notice("update local variable all_services, event is: ", event,
-        "source: ", source, "server pid:", pid,
-        ", all services: ", json_delay_encode(all_services, true))
-end
-
-
 function _M.all_nodes()
-    return all_services
+    local keys = consul_dict:get_keys(0)
+    local services = core.table.new(0, #keys)
+    for _, key in ipairs(keys) do
+        local value = consul_dict:get(key)
+        if value then
+            local nodes, err = core.json.decode(value)
+            if nodes then
+                services[key] = nodes
+            else
+                log.error("failed to decode nodes for service: ", key, ", 
error: ", err)
+            end
+        end
+    end
+    return services
 end
 
 
 function _M.nodes(service_name)
-    if not all_services then
-        log.error("all_services is nil, failed to fetch nodes for : ", 
service_name)
-        return
+    local value = consul_dict:get(service_name)
+    if not value then
+        log.error("consul service not found: ", service_name, ", return 
default service")
+        return default_service and {default_service}
     end
 
-    local resp_list = all_services[service_name]
-
-    if not resp_list then
-        log.error("fetch nodes failed by ", service_name, ", return default 
service")
+    local nodes, err = core.json.decode(value)
+    if not nodes then
+        log.error("fetch nodes failed by ", service_name, ", error: ", err)
         return default_service and {default_service}
     end
 
     log.info("process id: ", ngx_worker_id(), ", all_services[", service_name, 
"] = ",
-        json_delay_encode(resp_list, true))
+        json_delay_encode(nodes, true))
 
-    return resp_list
+    return nodes

Review Comment:
   `nodes()` now reads JSON from `ngx.shared.consul` and decodes it on every 
call, which makes it return a fresh table each time. In `apisix/upstream.lua`, 
this defeats the `compare_upstream_node` fast-path (`old_t == new_t`) and 
forces the slow path (sorting/comparing) on every request, which is both a 
performance regression and can change node ordering (the slow path sorts by 
host in-place). Consider caching decoded nodes per-worker (e.g., LRU keyed by 
service name + raw JSON string/version) and returning the same table instance 
while the shared-dict value is unchanged.



##########
apisix/discovery/consul/init.lua:
##########
@@ -66,53 +69,63 @@ local _M = {
 }
 
 
-local function discovery_consul_callback(data, event, source, pid)
-    all_services = data
-    log.notice("update local variable all_services, event is: ", event,
-        "source: ", source, "server pid:", pid,
-        ", all services: ", json_delay_encode(all_services, true))
-end
-
-
 function _M.all_nodes()
-    return all_services
+    local keys = consul_dict:get_keys(0)
+    local services = core.table.new(0, #keys)
+    for _, key in ipairs(keys) do
+        local value = consul_dict:get(key)
+        if value then
+            local nodes, err = core.json.decode(value)
+            if nodes then
+                services[key] = nodes
+            else
+                log.error("failed to decode nodes for service: ", key, ", 
error: ", err)
+            end
+        end
+    end
+    return services
 end
 
 
 function _M.nodes(service_name)
-    if not all_services then
-        log.error("all_services is nil, failed to fetch nodes for : ", 
service_name)
-        return
+    local value = consul_dict:get(service_name)
+    if not value then
+        log.error("consul service not found: ", service_name, ", return 
default service")
+        return default_service and {default_service}
     end
 
-    local resp_list = all_services[service_name]
-
-    if not resp_list then
-        log.error("fetch nodes failed by ", service_name, ", return default 
service")
+    local nodes, err = core.json.decode(value)
+    if not nodes then
+        log.error("fetch nodes failed by ", service_name, ", error: ", err)
         return default_service and {default_service}
     end
 
     log.info("process id: ", ngx_worker_id(), ", all_services[", service_name, 
"] = ",
-        json_delay_encode(resp_list, true))
+        json_delay_encode(nodes, true))
 
-    return resp_list
+    return nodes
 end
 
 
 local function update_all_services(consul_server_url, up_services)
     -- clean old unused data
     local old_services = consul_services[consul_server_url] or {}
     for k, _ in pairs(old_services) do
-        all_services[k] = nil
+        consul_dict:delete(k)
     end
     core.table.clear(old_services)

Review Comment:
   `update_all_services()` only deletes keys found in 
`consul_services[consul_server_url]`. After an nginx reload (shared dict 
persists but `consul_services` is reinitialized), any stale keys already 
present in `ngx.shared.consul` will never be deleted, so removed Consul 
services can keep being returned by `nodes()`. Consider clearing the shared 
dict on privileged-agent init, or persisting/deriving the previous key set from 
the shared dict (e.g., maintain a stored service list/version and delete keys 
not present in the new update).



##########
apisix/discovery/consul/init.lua:
##########
@@ -66,53 +69,63 @@ local _M = {
 }
 
 
-local function discovery_consul_callback(data, event, source, pid)
-    all_services = data
-    log.notice("update local variable all_services, event is: ", event,
-        "source: ", source, "server pid:", pid,
-        ", all services: ", json_delay_encode(all_services, true))
-end
-
-
 function _M.all_nodes()
-    return all_services
+    local keys = consul_dict:get_keys(0)
+    local services = core.table.new(0, #keys)
+    for _, key in ipairs(keys) do
+        local value = consul_dict:get(key)
+        if value then
+            local nodes, err = core.json.decode(value)
+            if nodes then
+                services[key] = nodes
+            else
+                log.error("failed to decode nodes for service: ", key, ", 
error: ", err)
+            end
+        end
+    end
+    return services
 end
 
 
 function _M.nodes(service_name)
-    if not all_services then
-        log.error("all_services is nil, failed to fetch nodes for : ", 
service_name)
-        return
+    local value = consul_dict:get(service_name)
+    if not value then
+        log.error("consul service not found: ", service_name, ", return 
default service")
+        return default_service and {default_service}
     end
 
-    local resp_list = all_services[service_name]
-
-    if not resp_list then
-        log.error("fetch nodes failed by ", service_name, ", return default 
service")
+    local nodes, err = core.json.decode(value)
+    if not nodes then
+        log.error("fetch nodes failed by ", service_name, ", error: ", err)
         return default_service and {default_service}
     end
 
     log.info("process id: ", ngx_worker_id(), ", all_services[", service_name, 
"] = ",
-        json_delay_encode(resp_list, true))
+        json_delay_encode(nodes, true))
 
-    return resp_list
+    return nodes
 end
 
 
 local function update_all_services(consul_server_url, up_services)
     -- clean old unused data
     local old_services = consul_services[consul_server_url] or {}
     for k, _ in pairs(old_services) do
-        all_services[k] = nil
+        consul_dict:delete(k)
     end
     core.table.clear(old_services)
 
     for k, v in pairs(up_services) do
-        all_services[k] = v
+        local content, err = core.json.encode(v)
+        if content then
+            consul_dict:set(k, content)

Review Comment:
   `consul_dict:set(k, content)` return values are ignored. If the shared dict 
runs out of memory, `set` can fail (or evict other keys via `forcible`), which 
would silently drop discovery data and lead to intermittent 503s. Please 
capture and handle `ok, err, forcible` from `set` (and log/metric when it fails 
or is forcible).
   ```suggestion
               local ok, set_err, forcible = consul_dict:set(k, content)
               if not ok then
                   log.error("failed to set nodes for service: ", k, " into 
consul_dict, error: ", set_err)
               elseif forcible then
                   log.warn("consul_dict is out of memory, key(s) evicted when 
setting service: ", k)
               end
   ```



##########
apisix/discovery/consul/init.lua:
##########
@@ -149,14 +162,21 @@ local function read_dump_services()
         return
     end
 
-    all_services = entity.services
-    log.info("load dump file into memory success")
+    for k, v in pairs(entity.services) do
+        local content, json_err = core.json.encode(v)
+        if content then
+            consul_dict:set(k, content)
+        else
+            log.error("failed to encode dump service: ", k, ", error: ", 
json_err)
+        end
+    end
+    log.info("load dump file into shared dict success")

Review Comment:
   `read_dump_services()` writes services into `ngx.shared.consul` without 
clearing existing keys first. Since `lua_shared_dict` contents survive nginx 
reloads, this can retain stale keys (or temporarily overwrite newer values) 
when the dump file is loaded during startup/reload. Consider either clearing 
the dict (or using a generation/version namespace) before loading the dump so 
the shared dict reflects exactly the dump contents until Consul updates arrive.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to