wenj91 commented on issue #8782:
URL: https://github.com/apache/apisix/issues/8782#issuecomment-3562219525

   你可以用这个脚本修复:
   ```lua
   -- Licensed to the Apache Software Foundation (ASF) under one or more
   -- contributor license agreements.  See the NOTICE file distributed with
   -- this work for additional information regarding copyright ownership.
   -- The ASF licenses this file to You under the Apache License, Version 2.0
   -- (the "License"); you may not use this file except in compliance with
   -- the License.  You may obtain a copy of the License at
   --
   --     http://www.apache.org/licenses/LICENSE-2.0
   --
   -- Unless required by applicable law or agreed to in writing, software
   -- distributed under the License is distributed on an "AS IS" BASIS,
   -- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   -- See the License for the specific language governing permissions and
   -- limitations under the License.
   --
   
   local require            = require
   local local_conf         = require('apisix.core.config_local').local_conf()
   local http               = require('resty.http')
   local core               = require('apisix.core')
   local ipairs             = ipairs
   local type               = type
   local math               = math
   local math_random        = math.random
   local ngx                = ngx
   local ngx_re             = require('ngx.re')
   local ngx_timer_at       = ngx.timer.at
   local ngx_timer_every    = ngx.timer.every
   local string             = string
   local string_sub         = string.sub
   local str_byte           = string.byte
   local str_find           = core.string.find
   local log                = core.log
   
   local default_weight
   local applications
   local auth_path = 'auth/login'
   local instance_list_path = 'ns/instance/list?healthyOnly=true&serviceName='
   local default_namespace_id = "public"
   local default_group_name = "DEFAULT_GROUP"
   local access_key
   local secret_key
   
   local events
   local events_list
   
   -- 健康状态管理和重试配置(写死)
   local host_health_status = {}
   local health_check_interval = 5000 -- 5秒
   local health_check_timeout = 2000 -- 2秒
   local max_failures = 3 -- 最大失败次数
   local recovery_time = 30000 -- 30秒恢复时间
   
   -- 缓存数据相关
   local cached_applications = nil -- 缓存的应用数据
   local cache_valid = false -- 缓存是否有效
   local last_successful_update = 0 -- 最后一次成功更新时间戳
   local max_cache_age = 300 -- 缓存最大有效期(5分钟)
   
   local _M = {}
   
   local function discovery_nacos_callback(data, event, source, pid)
       applications = data
       -- 更新成功时也更新缓存
       if data then
           cached_applications = core.json.decode(core.json.encode(data)) -- 深拷贝
           cache_valid = true
           last_successful_update = ngx.now()
           log.notice("Updated cache data on successful update")
       end
       log.notice("update local variable application, event is: ", event,
                  "source: ", source, "server pid:", pid,
                  ", application: ", core.json.encode(applications, true))
   end
   
   -- 健康检查函数
   local function check_host_health(host_url)
       local status = host_health_status[host_url]
       
       -- 如果节点处于恢复期,检查是否过了恢复时间
       if status and status.unhealthy_time then
           local now = ngx.now() * 1000 -- 转换为毫秒
           if now - status.unhealthy_time > recovery_time then
               -- 恢复期结束,重置状态
               host_health_status[host_url] = {
                   healthy = true,
                   failure_count = 0,
                   last_check_time = now
               }
               log.info("Host ", host_url, " recovery period ended, reset to 
healthy")
           end
       end
       
       -- 如果节点标记为不健康,直接返回false
       if status and not status.healthy then
           return false
       end
       
       return true
   end
   
   -- 标记节点为健康
   local function mark_host_healthy(host_url)
       host_health_status[host_url] = {
           healthy = true,
           failure_count = 0,
           last_check_time = ngx.now() * 1000
       }
       log.info("Mark host as healthy: ", host_url)
   end
   
   -- 标记节点为不健康
   local function mark_host_unhealthy(host_url, reason)
       local now = ngx.now() * 1000
       local status = host_health_status[host_url] or { failure_count = 0 }
       
       status.failure_count = status.failure_count + 1
       status.last_check_time = now
       status.last_error = reason
       
       if status.failure_count >= max_failures then
           status.healthy = false
           status.unhealthy_time = now
           log.error("Mark host as unhealthy: ", host_url, ", failures: ", 
status.failure_count, ", reason: ", reason)
       else
           log.warn("Host failure count: ", host_url, " - ", 
status.failure_count, "/", max_failures, ", reason: ", reason)
       end
       
       host_health_status[host_url] = status
   end
   
   -- 获取健康节点列表
   local function get_healthy_hosts()
       local hosts = local_conf.discovery.nacos.host
       local healthy_hosts = {}
       
       for _, host in ipairs(hosts) do
           if check_host_health(host) then
               core.table.insert(healthy_hosts, host)
           end
       end
       
       return healthy_hosts
   end
   
   -- 健康检查任务
   local function health_check_task(premature)
       if premature then
           return
       end
       
       local hosts = local_conf.discovery.nacos.host
       for _, host_url in ipairs(hosts) do
           local ok, err = ngx.timer.at(0, function()
               local httpc = http.new()
               httpc:set_timeout(health_check_timeout)
               
               -- 构建健康检查URL
               local health_url = host_url
               if local_conf.discovery.nacos.prefix then
                   health_url = health_url .. local_conf.discovery.nacos.prefix
               end
               health_url = health_url .. 
"cs/history?search=accurate&dataId=1&group=1"
               
               local res, err = httpc:request_uri(health_url, {
                   method = "GET",
                   ssl_verify = false
               })
               
               if res and res.status == 200 then
                   mark_host_healthy(host_url)
               else
                   mark_host_unhealthy(host_url, err or "Health check failed 
with status: " .. (res and res.status or "unknown"))
               end
           end)
           
           if not ok then
               log.error("Failed to create health check timer for host: ", 
host_url, ", error: ", err)
           end
       end
   end
   
   local function request(request_uri, path, body, method, basic_auth)
       local url = request_uri .. path
       log.info('request url:', url)
       local headers = {}
       headers['Accept'] = 'application/json'
   
       if basic_auth then
           headers['Authorization'] = basic_auth
       end
   
       if body and 'table' == type(body) then
           local err
           body, err = core.json.encode(body)
           if not body then
               return nil, 'invalid body : ' .. err
           end
           headers['Content-Type'] = 'application/json'
       end
   
       local httpc = http.new()
       local timeout = local_conf.discovery.nacos.timeout
       local connect_timeout = timeout.connect
       local send_timeout = timeout.send
       local read_timeout = timeout.read
       log.info('connect_timeout:', connect_timeout, ', send_timeout:', 
send_timeout,
                ', read_timeout:', read_timeout)
       httpc:set_timeouts(connect_timeout, send_timeout, read_timeout)
       local res, err = httpc:request_uri(url, {
           method = method,
           headers = headers,
           body = body,
           ssl_verify = true,
       })
       if not res then
           return nil, err
       end
   
       if not res.body or res.status ~= 200 then
           return nil, 'status = ' .. res.status
       end
   
       local json_str = res.body
       local data, err = core.json.decode(json_str)
       if not data then
           return nil, err
       end
       return data
   end
   
   local function get_url(request_uri, path)
       return request(request_uri, path, nil, 'GET', nil)
   end
   
   local function post_url(request_uri, path, body)
       return request(request_uri, path, body, 'POST', nil)
   end
   
   -- 处理主机URL,提取认证信息(保持原逻辑)
   local function get_base_uri()
       local host = local_conf.discovery.nacos.host
       -- 使用健康节点
       local healthy_hosts = get_healthy_hosts()
       if #healthy_hosts == 0 then
           -- 如果没有健康节点,使用所有节点
           healthy_hosts = host
       end
       
       local selected_host = healthy_hosts[math_random(#healthy_hosts)]
       local url = selected_host
       local username, password
       
       local auth_idx = core.string.rfind_char(url, '@')
       if auth_idx then
           local protocol_idx = str_find(url, '://')
           local protocol = string_sub(url, 1, protocol_idx + 2)
           local user_and_password = string_sub(url, protocol_idx + 3, auth_idx 
- 1)
           local arr = ngx_re.split(user_and_password, ':')
           if #arr == 2 then
               username = arr[1]
               password = arr[2]
           end
           local other = string_sub(url, auth_idx + 1)
           url = protocol .. other
       end
   
       if local_conf.discovery.nacos.prefix then
           url = url .. local_conf.discovery.nacos.prefix
       end
   
       if str_byte(url, #url) ~= str_byte('/') then
           url = url .. '/'
       end
   
       return url, username, password
   end
   
   local function get_token_param(base_uri, username, password)
       if not username or not password then
           return ''
       end
   
       local args = { username = username, password = password}
       local data, err = post_url(base_uri, auth_path .. '?' .. 
ngx.encode_args(args), nil)
       if err then
           log.error('nacos login fail:', username, ' ', password, ' desc:', 
err)
           return nil, err
       end
       return '&accessToken=' .. data.accessToken
   end
   
   local function get_namespace_param(namespace_id)
       local param = ''
       if namespace_id then
           local args = {namespaceId = namespace_id}
           param = '&' .. ngx.encode_args(args)
       end
       return param
   end
   
   local function get_group_name_param(group_name)
       local param = ''
       if group_name then
           local args = {groupName = group_name}
           param = '&' .. ngx.encode_args(args)
       end
       return param
   end
   
   local function de_duplication(services, namespace_id, group_name, 
service_name, scheme)
       for _, service in ipairs(services) do
           if service.namespace_id == namespace_id and service.group_name == 
group_name
                   and service.service_name == service_name and service.scheme 
== scheme then
               return true
           end
       end
       return false
   end
   
   local function iter_and_add_service(services, values)
       if not values then
           return
       end
   
       for _, value in core.config_util.iterate_values(values) do
           local conf = value.value
           if not conf then
               goto CONTINUE
           end
   
           local up
           if conf.upstream then
               up = conf.upstream
           else
               up = conf
           end
   
           local namespace_id = (up.discovery_args and 
up.discovery_args.namespace_id)
                                or default_namespace_id
   
           local group_name = (up.discovery_args and 
up.discovery_args.group_name)
                              or default_group_name
   
           local dup = de_duplication(services, namespace_id, group_name,
                   up.service_name, up.scheme)
           if dup then
               goto CONTINUE
           end
   
           if up.discovery_type == 'nacos' then
               core.table.insert(services, {
                   service_name = up.service_name,
                   namespace_id = namespace_id,
                   group_name = group_name,
                   scheme = up.scheme,
               })
           end
           ::CONTINUE::
       end
   end
   
   local function get_nacos_services()
       local services = {}
   
       -- here we use lazy load to work around circle dependency
       local get_upstreams = require('apisix.upstream').upstreams
       local get_routes = require('apisix.router').http_routes
       local get_stream_routes = require('apisix.router').stream_routes
       local get_services = require('apisix.http.service').services
       local values = get_upstreams()
       iter_and_add_service(services, values)
       values = get_routes()
       iter_and_add_service(services, values)
       values = get_services()
       iter_and_add_service(services, values)
       values = get_stream_routes()
       iter_and_add_service(services, values)
       return services
   end
   
   local function is_grpc(scheme)
       if scheme == 'grpc' or scheme == 'grpcs' then
           return true
       end
   
       return false
   end
   
   -- 检查缓存是否过期
   local function is_cache_expired()
       if not cache_valid or not last_successful_update then
           return true
       end
       
       local now = ngx.now()
       return (now - last_successful_update) > max_cache_age
   end
   
   -- 使用缓存数据
   local function use_cached_data()
       if cached_applications and not is_cache_expired() then
           applications = 
core.json.decode(core.json.encode(cached_applications)) -- 深拷贝
           log.warn("Using cached service discovery data due to all nacos nodes 
failure")
           return true
       end
       return false
   end
   
   -- 更新应用数据
   local function update_applications(new_apps)
       local new_apps_md5sum = ngx.md5(core.json.encode(new_apps))
       local old_apps_md5sum = ngx.md5(core.json.encode(applications or {}))
       
       if new_apps_md5sum == old_apps_md5sum then
           return
       end
       
       applications = new_apps
       -- 更新缓存
       cached_applications = core.json.decode(core.json.encode(new_apps)) -- 深拷贝
       cache_valid = true
       last_successful_update = ngx.now()
       
       local ok, err = events.post(events_list._source, events_list.updating,
                                   applications)
       if not ok then
           log.error("post_event failure with ", events_list._source,
                     ", update application error: ", err)
       else
           log.info("Successfully updated applications data and cache")
       end
   end
   
   local function fetch_full_registry(premature)
       if premature then
           return
       end
   
       local up_apps = {}
       local base_uri, username, password = get_base_uri()
       local token_param, err = get_token_param(base_uri, username, password)
       if err then
           log.error('get_token_param error:', err)
           -- 所有节点都失败时使用缓存
           if not applications then
               if use_cached_data() then
                   log.warn("Using cached data for initial setup")
               else
                   applications = up_apps
               end
           end
           return
       end
   
       local infos = get_nacos_services()
       if #infos == 0 then
           applications = up_apps
           return
       end
   
       local success_count = 0
       local total_services = #infos
   
       for _, service_info in ipairs(infos) do
           local data, err
           local namespace_id = service_info.namespace_id
           local group_name = service_info.group_name
           local scheme = service_info.scheme or ''
           local namespace_param = 
get_namespace_param(service_info.namespace_id)
           local group_name_param = 
get_group_name_param(service_info.group_name)
           local query_path = instance_list_path .. service_info.service_name
                              .. token_param .. namespace_param .. 
group_name_param
           data, err = get_url(base_uri, query_path)
           if err then
               log.error('get_url:', query_path, ' err:', err)
               goto CONTINUE
           end
   
           if not up_apps[namespace_id] then
               up_apps[namespace_id] = {}
           end
   
           if not up_apps[namespace_id][group_name] then
               up_apps[namespace_id][group_name] = {}
           end
   
           for _, host in ipairs(data.hosts) do
               local nodes = up_apps[namespace_id]
                   [group_name][service_info.service_name]
               if not nodes then
                   nodes = {}
                   up_apps[namespace_id]
                       [group_name][service_info.service_name] = nodes
               end
   
               local node = {
                   host = host.ip,
                   port = host.port,
                   weight = host.weight or default_weight,
               }
   
               -- docs: 
https://github.com/yidongnan/grpc-spring-boot-starter/pull/496
               if is_grpc(scheme) and host.metadata and host.metadata.gRPC_port 
then
                   node.port = host.metadata.gRPC_port
               end
   
               core.table.insert(nodes, node)
           end
   
           success_count = success_count + 1
       end
   
       log.info("Service discovery completed: ", success_count, "/", 
total_services, " services updated")
       
       if success_count > 0 then
           -- 部分或全部服务更新成功
           update_applications(up_apps)
       else
           -- 所有服务都失败了,尝试使用缓存数据
           log.error("Failed to fetch any service information from all nacos 
nodes")
           if use_cached_data() then
               log.warn("Successfully fallback to cached service discovery 
data")
           else
               log.error("No valid cache available, service discovery data may 
be stale or empty")
           end
       end
        ::CONTINUE::
   end
   
   function _M.nodes(service_name, discovery_args)
       local namespace_id = discovery_args and
               discovery_args.namespace_id or default_namespace_id
       local group_name = discovery_args
               and discovery_args.group_name or default_group_name
   
       local logged = false
       -- maximum waiting time: 5 seconds
       local waiting_time = 5
       local step = 0.1
       while not applications and waiting_time > 0 do
           if not logged then
               log.warn('wait init')
               logged = true
           end
           ngx.sleep(step)
           waiting_time = waiting_time - step
       end
   
       if not applications or not applications[namespace_id]
           or not applications[namespace_id][group_name]
       then
           return nil
       end
       return applications[namespace_id][group_name][service_name]
   end
   
   function _M.init_worker()
       events = require("resty.worker.events")
       events_list = events.event_list("discovery_nacos_update_application",
                                       "updating")
   
       if 0 ~= ngx.worker.id() then
           events.register(discovery_nacos_callback, events_list._source,
                           events_list.updating)
           return
       end
   
       default_weight = local_conf.discovery.nacos.weight
       log.info('default_weight:', default_weight)
       local fetch_interval = local_conf.discovery.nacos.fetch_interval
       log.info('fetch_interval:', fetch_interval)
       access_key = local_conf.discovery.nacos.access_key
       secret_key = local_conf.discovery.nacos.secret_key
       
       -- 初始化健康状态
       local hosts = local_conf.discovery.nacos.host
       for _, host in ipairs(hosts) do
           host_health_status[host] = {
               healthy = true,
               failure_count = 0,
               last_check_time = ngx.now() * 1000
           }
       end
       
       -- 启动定时任务
       ngx_timer_at(0, fetch_full_registry)
       ngx_timer_every(fetch_interval, fetch_full_registry)
       
       -- 启动健康检查
       ngx_timer_every(health_check_interval / 1000, health_check_task)
       
       log.info("Nacos discovery initialized with health check and cache 
support")
   end
   
   function _M.dump_data()
       return {
           config = local_conf.discovery.nacos, 
           services = applications or {},
           cache = {
               cached_applications = cached_applications and true or false,
               cache_valid = cache_valid,
               last_successful_update = last_successful_update,
               is_cache_expired = is_cache_expired()
           }
       }
   end
   
   -- 获取健康状态信息(用于调试)
   function _M.get_health_status()
       return host_health_status
   end
   
   -- 手动清除缓存(用于测试和调试)
   function _M.clear_cache()
       cached_applications = nil
       cache_valid = false
       last_successful_update = 0
       log.info("Cache cleared manually")
   end
   
   -- 获取缓存状态
   function _M.get_cache_status()
       return {
           cache_valid = cache_valid,
           last_successful_update = last_successful_update,
           is_cache_expired = is_cache_expired(),
           cached_applications = cached_applications and true or false
       }
   end
   
   return _M
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to