liquanzhou commented on issue #12610:
URL: https://github.com/apache/apisix/issues/12610#issuecomment-3432001253

   通过ai生成代码修复,测试是可以解决一个eureka节点宕机后,reload还能正常加载列表,可以当做一个参考
   
   日志中会有持续的eureka节点失败的警告
   2025/10/22 19:55:26 [warn] 7225#7225: *40628 [lua] init.lua:200: failed to 
fetch registry from http://10.250.200.98:8761/eureka/: timeout, context: 
ngx.timer
   
   `--
   -- Licensed to the Apache Software Foundation (ASF) under one or more
   -- contributor license agreements.  See the NOTICE file distributed with
   -- this work for additional information regarding copyright ownership.
   -- The ASF licenses this file to You under the Apache License, Version 2.0
   -- (the "License"); you may not use this file except in compliance with
   -- the License.  You may obtain a copy of the License at
   --
   --     http://www.apache.org/licenses/LICENSE-2.0
   --
   -- Unless required by applicable law or agreed to in writing, software
   -- distributed under the License is distributed on an "AS IS" BASIS,
   -- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   -- See the License for the specific language governing permissions and
   -- limitations under the License.
   --
   
   local local_conf         = require("apisix.core.config_local").local_conf()
   local http               = require("resty.http")
   local core               = require("apisix.core")
   local ipmatcher          = require("resty.ipmatcher")
   local ipairs             = ipairs
   local tostring           = tostring
   local type               = type
   local math_random        = math.random
   local ngx                = ngx
   local ngx_timer_at       = ngx.timer.at
   local ngx_timer_every    = ngx.timer.every
   local string_sub         = string.sub
   local str_find           = core.string.find
   local log                = core.log
   local semaphore = require("ngx.semaphore")
   
   local default_weight
   local applications
   local init_sema
   local initial_fetched = false
   
   local _M = {
       version = 0.1,
   }
   
   
   local function service_info()
       local host = local_conf.discovery and
           local_conf.discovery.eureka and local_conf.discovery.eureka.host
       if not host then
           log.error("do not set eureka.host")
           return
       end
   
       local basic_auth
       -- TODO Add health check to get healthy nodes.
       local url = host[math_random(#host)]
       local auth_idx = str_find(url, "@")
       if auth_idx then
           local protocol_idx = str_find(url, "://")
           local protocol = string_sub(url, 1, protocol_idx + 2)
           local user_and_password = string_sub(url, protocol_idx + 3, auth_idx 
- 1)
           local other = string_sub(url, auth_idx + 1)
           url = protocol .. other
           basic_auth = "Basic " .. ngx.encode_base64(user_and_password)
       end
       if local_conf.discovery.eureka.prefix then
           url = url .. local_conf.discovery.eureka.prefix
       end
       if string_sub(url, #url) ~= "/" then
           url = url .. "/"
       end
   
       return url, basic_auth
   end
   
   
   local function request(request_uri, basic_auth, method, path, query, body)
       log.info("eureka uri:", request_uri, ".")
       local url = request_uri .. path
       local headers = core.table.new(0, 5)
       headers['Connection'] = 'Keep-Alive'
       headers['Accept'] = 'application/json'
   
       if basic_auth then
           headers['Authorization'] = basic_auth
       end
   
       if body and 'table' == type(body) then
           local err
           body, err = core.json.encode(body)
           if not body then
               return nil, 'invalid body : ' .. err
           end
           -- log.warn(method, url, body)
           headers['Content-Type'] = 'application/json'
       end
   
       local httpc = http.new()
       local timeout = local_conf.discovery.eureka.timeout
       local connect_timeout = timeout and timeout.connect or 2000
       local send_timeout = timeout and timeout.send or 2000
       local read_timeout = timeout and timeout.read or 5000
       log.info("connect_timeout:", connect_timeout, ", send_timeout:", 
send_timeout,
               ", read_timeout:", read_timeout, ".")
       httpc:set_timeouts(connect_timeout, send_timeout, read_timeout)
       return httpc:request_uri(url, {
           version = 1.1,
           method = method,
           headers = headers,
           query = query,
           body = body,
           ssl_verify = false,
       })
   end
   
   
   local function parse_instance(instance)
       local status = instance.status
       local overridden_status = instance.overriddenstatus or 
instance.overriddenStatus
       if overridden_status and overridden_status ~= "UNKNOWN" then
           status = overridden_status
       end
   
       if status ~= "UP" then
           return
       end
       local port
       if tostring(instance.port["@enabled"]) == "true" and instance.port["$"] 
then
           port = instance.port["$"]
           -- secure = false
       end
       if tostring(instance.securePort["@enabled"]) == "true" and 
instance.securePort["$"] then
           port = instance.securePort["$"]
           -- secure = true
       end
       local ip = instance.ipAddr
       if not ipmatcher.parse_ipv4(ip) and
               not ipmatcher.parse_ipv6(ip) then
           log.error(instance.app, " service ", instance.hostName, " node IP ", 
ip,
                   " is invalid(must be IPv4 or IPv6).")
           return
       end
       return ip, port, instance.metadata
   end
   
   
   local function build_endpoints()
       local host_list = local_conf.discovery and local_conf.discovery.eureka 
and local_conf.discovery.eureka.host
       if not host_list or #host_list == 0 then
           log.error("do not set eureka.host")
           return nil
       end
   
       local endpoints = core.table.new(#host_list, 0)
       for _, h in ipairs(host_list) do
           local url = h
           local basic_auth
           local auth_idx = str_find(url, "@")
           if auth_idx then
               local protocol_idx = str_find(url, "://")
               local protocol = string_sub(url, 1, protocol_idx + 2)
               local user_and_password = string_sub(url, protocol_idx + 3, 
auth_idx - 1)
               local other = string_sub(url, auth_idx + 1)
               url = protocol .. other
               basic_auth = "Basic " .. ngx.encode_base64(user_and_password)
           end
           if local_conf.discovery.eureka.prefix then
               url = url .. local_conf.discovery.eureka.prefix
           end
           if string_sub(url, #url) ~= "/" then
               url = url .. "/"
           end
           core.table.insert(endpoints, { url = url, auth = basic_auth })
       end
       return endpoints
   end
   
   
   local function fetch_full_registry(premature)
       if premature then
           return
       end
   
       -- 遍历所有 eureka 端点,直到成功
       local endpoints = build_endpoints()
       if not endpoints or #endpoints == 0 then
           return
       end
   
       local res, err
       local used_endpoint
       local start = math_random(#endpoints)
       for i = 0, #endpoints - 1 do
           local ep = endpoints[((start + i) % #endpoints) + 1]
           log.info("eureka uri:", ep.url, ".")
           local r, e = request(ep.url, ep.auth, "GET", "apps")
           if r and r.body and r.status == 200 then
               res = r
               used_endpoint = ep
               break
           end
           log.warn("failed to fetch registry from ", ep.url, ": ", e or (r and 
("status=" .. tostring(r.status)) or "unknown"))
       end
   
       if not res then
           log.error("failed to fetch registry from all eureka hosts")
           return
       end
   
       local json_str = res.body
       local data, derr = core.json.decode(json_str)
       if not data then
           log.error("invalid response body: ", json_str, " err: ", derr)
           return
       end
       local apps = data.applications.application
       local up_apps = core.table.new(0, #apps)
       for _, app in ipairs(apps) do
           for _, instance in ipairs(app.instance) do
               local ip, port, metadata = parse_instance(instance)
               if ip and port then
                   local nodes = up_apps[app.name]
                   if not nodes then
                       nodes = core.table.new(#app.instance, 0)
                       up_apps[app.name] = nodes
                   end
                   core.table.insert(nodes, {
                       host = ip,
                       port = port,
                       weight = metadata and metadata.weight or default_weight,
                       metadata = metadata,
                   })
                   if metadata then
                       metadata.weight = nil
                   end
               end
           end
       end
       applications = up_apps
       log.info("successfully updated service registry, services count=",
                core.table.nkeys(up_apps), "; source=", used_endpoint and 
used_endpoint.url or "unknown")
       if not initial_fetched then
           initial_fetched = true
           if init_sema then
               init_sema:post(1)
           end
       end
   end
   
   
   function _M.nodes(service_name)
       if not applications then
           if init_sema then
               local ok, err = init_sema:wait(3)
               if not ok then
                   log.warn("wait eureka initial fetch timeout: ", err)
               end
           end
       end
   
       if not applications then
           log.error("failed to fetch nodes for : ", service_name)
           return
       end
   
       return applications[service_name]
   end
   
   
   -- 注释掉文件缓存相关依赖与变量,避免写盘/读盘
   -- local core_io = require("apisix.core.io")
   -- local io_open = io.open
   -- local cache_file = ngx.config.prefix() .. "logs/eureka_registry.json"
   -- local save_registry_cache
   -- local load_registry_cache
   -- 将文件缓存函数整体注释掉,避免写盘与读盘
   --[[
   local function save_registry_cache(apps)
       local body, err = core.json.encode({ applications = apps, ts = ngx.now() 
})
       if not body then
           log.error("encode eureka registry cache failed: ", err)
           return
       end
       local f, ferr = io_open(cache_file, "w")
       if not f then
           log.error("open eureka registry cache file failed: ", ferr,
                     ", path: ", cache_file)
           return
       end
       f:write(body)
       f:close()
   end
   
   local function load_registry_cache()
       local body = core_io.get_file(cache_file)
       if not body then
           return nil, "no cache file"
       end
       local data, err = core.json.decode(body)
       if not data or not data.applications then
           return nil, "invalid cache format: " .. (err or "")
       end
       return data.applications
   end
   ]]
   -- 移除保存缓存调用
   -- save_registry_cache(up_apps)
   
   
   function _M.init_worker()
       default_weight = local_conf.discovery.eureka.weight or 100
       log.info("default_weight:", default_weight, ".")
       local fetch_interval = local_conf.discovery.eureka.fetch_interval or 30
       log.info("fetch_interval:", fetch_interval, ".")
       init_sema = semaphore.new()
   
       ngx_timer_at(0, fetch_full_registry)
       ngx_timer_every(fetch_interval, fetch_full_registry)
   end
   
   
   function _M.dump_data()
       return {config = local_conf.discovery.eureka, services = applications or 
{}}
   end
   
   
   return _M
   `


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to