adugeek edited a comment on issue #4388:
URL: https://github.com/apache/apisix/issues/4388#issuecomment-857652609


   > I mean, too frequent endpoints change will make the disk io heavy.
   
   I understand your concerns.
   But what I mean is that changes in enpoints will be sensed by k8s discvery 
and written to ngx.shared.DICT. but not write to apisix.yaml .
   There is no disk io here.
   
   
   Talk is cheap. Let's look at the code
   
   ```lua
   local local_conf = require("apisix.core.config_local").local_conf()
   local ipairs = ipairs
   local ngx = ngx
   local string = string
   local tonumber = tonumber
   local math = math
   local ngx_timer_at = ngx.timer.at
   local process = require("ngx.process")
   local core = require("apisix.core")
   local util = require("apisix.cli.util")
   local http = require("resty.http")
   local shared_endpoints = ngx.shared.discovery
   
   local namespace = ""
   local token = ""
   local apiserver_host = ""
   local apiserver_port = ""
   
   local default_weight = 50
   
   local lrucache = core.lrucache.new({
       ttl = 300,
       count = 1024
   })
   
   local function create_nodes(service_name, port_name)
       local endpoint, _, _ = shared_endpoints:get_stale(service_name)
       core.log.error("get endpoint: ", endpoint)
       if not endpoint then
           return nil
       end
   
       local t, err = core.json.decode(endpoint)
       if not t then
           core.log.error("decode [[", endpoint, "]] error : ", err or "")
       end
       local v = t[port_name]
       core.log.info("get port: ", port_name, core.json.encode(v, true))
       return v
   end
   
   local function sort_by_key_host(a, b)
       return a.host < b.host
   end
   
   local function on_endpoint_deleted(endpoint)
       shared_endpoints:delete(endpoint.metadata.name)
   end
   
   local function on_endpoint_modified(endpoint)
       local subsets = endpoint.subsets
       if subsets == nil or #subsets == 0 then
           return on_endpoint_deleted(endpoint)
       end
   
       local subset = subsets[1]
   
       local addresses = subset.addresses
       if addresses == nil or #addresses == 0 then
           return on_endpoint_deleted(endpoint)
       end
   
       local ports = subset.ports
       if ports == nil or #ports == 0 then
           return on_endpoint_deleted(endpoint)
       end
   
       local t = core.tablepool.fetch("k8s#endpoint#subsets", 0, #ports)
       for _, port in ipairs(ports) do
           local nodes = core.table.new(#addresses, 0)
           for i, address in ipairs(addresses) do
               nodes[i] = {
                   host = address.ip,
                   port = port.port,
                   weight = default_weight
               }
           end
           core.table.sort(nodes, sort_by_key_host)
           t[port.name] = nodes
       end
       core.log.info("save to cache : ", core.json.encode(t), "version : ", 
endpoint.metadata.resourceVersion)
       shared_endpoints:safe_set(endpoint.metadata.name .. "#version", 
endpoint.metadata.resourceVersion)
       shared_endpoints:safe_set(endpoint.metadata.name, core.json.encode(t, 
true))
       core.tablepool.release("k8s#endpoint#subsets", t)
   end
   
   local function on_endpoint_added(endpoint)
       local subsets = endpoint.subsets
       if subsets == nil or #subsets == 0 then
           return
       end
   
       local subset = subsets[1]
   
       local addresses = subset.addresses
       if addresses == nil or #addresses == 0 then
           return
       end
   
       local ports = subset.ports
       if ports == nil or #ports == 0 then
           return
       end
   
       local t = core.tablepool.fetch("k8s#endpoint#subsets", 0, #ports)
       for _, port in ipairs(ports) do
           local nodes = core.table.new(#addresses, 0)
           for i, address in ipairs(addresses) do
               nodes[i] = {
                   host = address.ip,
                   port = port.port,
                   weight = default_weight
               }
           end
           core.table.sort(nodes, sort_by_key_host)
           t[port.name] = nodes
       end
       core.log.info("save to cache : ", core.json.encode(t), "version : ", 
endpoint.metadata.resourceVersion)
       shared_endpoints:safe_set(endpoint.metadata.name .. "#version", 
endpoint.metadata.resourceVersion)
       shared_endpoints:safe_set(endpoint.metadata.name, core.json.encode(t, 
true))
       core.tablepool.release("k8s#endpoint#subsets", t)
   end
   
   local watch_resource_list = {{
       version = "v1",
       kind = "Endpoints",
       listKind = "EndpointsList",
       plural = "endpoints",
   
       list_path = function(self)
           return string.format("/api/v1/namespaces/%s/endpoints", namespace)
       end,
   
       list_query = function(self, continue)
           if continue == nil or continue == "" then
               return "limit=30"
           else
               return "limit=30&continue=" .. continue
           end
       end,
   
       watch_path = function(self)
           return string.format("/api/v1/namespaces/%s/endpoints", namespace)
       end,
       watch_query = function(self, timeout)
           return 
string.format("watch=1&allowWatchBookmarks=true&timeoutSeconds=%d&resourceVersion=%d",
 timeout,
                      self.max_resource_version)
       end,
       max_resource_version = 0,
       added_callback = function(self, object, drive)
           on_endpoint_added(object)
       end,
       modified_callback = function(self, object)
           on_endpoint_modified(object)
       end,
       deleted_callback = function(self, object)
           on_endpoint_deleted(object)
       end,
       pre_list_callback = function(self)
           shared_endpoints:flush_all()
       end,
       post_list_callback = function(self)
           shared_endpoints:flush_expired()
       end
   }}
   local watch_threads = {}
   
   local function event_dispatch(resource, event, object, drive)
       local rvstr = object.metadata.resourceVersion
       if rvstr ~= nil then
           local rv = tonumber(rvstr)
           if resource.max_resource_version < rv then
               resource.max_resource_version = rv
           end
       end
   
       if event == "ADDED" then
           resource:added_callback(object, drive)
       elseif event == "MODIFIED" then
           if object.deletionTimestamp ~= nil then
               resource:deleted_callback(object)
           else
               resource:modified_callback(object)
           end
       elseif event == "DELETED" then
           resource.deleted_callback(object)
       elseif event == "BOOKMARK" then
           -- do nothing because we had record max_resource_version to 
resource.max_resource_version
       end
   end
   
   local function list_resource(httpc, resource, continue)
       httpc:set_timeouts(2000, 2000, 3000)
       local res, err = httpc:request({
           path = resource:list_path(),
           query = resource:list_query(),
           headers = {
               ["Authorization"] = string.format("Bearer %s", token)
           }
       })
   
       if not res then
           return false, "RequestError", err or ""
       end
   
       if res.status ~= 200 then
           return false, res.reason, res.read_body() or ""
       end
   
       local body, err = res:read_body()
       if err then
           return false, "ReadBodyError", err
       end
   
       local data, _ = core.json.decode(body)
       if not data or data.kind ~= resource.listKind then
           return false, "UnexpectedBody", body
       end
   
       local resource_version = data.metadata.resourceVersion
       core.log.info("list resource version ", resource_version)
       if resource_version ~= nil then
           local rvv = tonumber(resource_version)
           if rvv <= resource.max_resource_version then
               return
           end
           resource.max_resource_version = rvv
       end
   
       for _, item in ipairs(data.items) do
           event_dispatch(resource, "ADDED", item, "listing")
       end
   
       if data.metadata.continue ~= nil and data.metadata.continue ~= "" then
           list_resource(httpc, resource, data.metadata.continue)
       end
   
       return true, "Success", ""
   end
   
   local function watch_resource(httpc, resource)
       math.randomseed(process.get_master_pid())
       local watch_seconds = 8800 + math.random(200, 1000)
       local allowance_seconds = 100
       httpc:set_timeouts(2000, 2000, (watch_seconds + allowance_seconds) * 
1000)
       local res, err = httpc:request({
           path = resource:watch_path(),
           query = resource:watch_query(watch_seconds),
           headers = {
               ["Authorization"] = string.format("Bearer %s", token)
           }
       })
   
       if err then
           return false, "RequestError", err
       end
   
       if res.status ~= 200 then
           return false, res.reason, res.read_body and res.read_body()
       end
   
       local remaindBody = ""
       local body = ""
       local reader = res.body_reader
       local gmatchIterator;
       local captures;
       local capturedSize = 0
       while true do
   
           body, err = reader()
           if err then
               return false, "ReadBodyError", err
           end
   
           if not body then
               break
           end
   
           if #remaindBody ~= 0 then
               body = remaindBody .. body
           end
   
           gmatchIterator, err = ngx.re.gmatch(body, "{\"type\":.*}\n", "jiao")
           if not gmatchIterator then
               return false, "GmatchError", err
           end
   
           while true do
               captures, err = gmatchIterator()
               if err then
                   return false, "GmatchError", err
               end
               if not captures then
                   break
               end
               capturedSize = capturedSize + #captures[0]
               local v, _ = core.json.decode(captures[0])
               if not v or not v.object or v.object.kind ~= resource.kind then
                   return false, "UnexpectedBody", captures[0]
               end
               event_dispatch(resource, v.type, v.object, "watching")
           end
   
           if capturedSize == #body then
               remaindBody = ""
           elseif capturedSize == 0 then
               remaindBody = body
           else
               remaindBody = string.sub(body, capturedSize + 1)
           end
       end
       watch_resource(httpc, resource)
   end
   
   local function fetch_resource(resource)
       while true do
           local intervalTime = 0
           local reason, message = "", ""
           local ok = false
           repeat
               local httpc = http.new()
               resource.watch_state = "connecting"
               core.log.info("begin to connect ", resource.plural)
               ok, message = httpc:connect({
                   scheme = "https",
                   host = apiserver_host,
                   port = tonumber(apiserver_port),
                   ssl_verify = false
               })
               if not ok then
                   resource.watch_state = "connecting"
                   core.log
                       .error("connect failed, resource: ", resource.plural, " 
reason: ", reason, "message : ", message)
                   intervalTime = 200
                   break
               end
   
               core.log.info("begin to list ", resource.plural)
               resource.watch_state = "listing"
               resource:pre_list_callback()
               ok, reason, message = list_resource(httpc, resource)
               if not ok then
                   resource.watch_state = "listing failed"
                   core.log.error("list failed, resource: ", resource.plural, " 
reason: ", reason, "message : ", message)
                   intervalTime = 100
                   break
               end
               resource.watch_state = "listing finished"
               resource:post_list_callback()
   
               core.log.info("begin to watch ", resource.plural)
               resource.watch_state = "watching"
               ok, reason, message = watch_resource(httpc, resource)
               if not ok then
                   resource.watch_state = "watching failed"
                   core.log.error("watch failed, resource: ", resource.plural, 
" reason: ", reason, "message : ", message)
                   intervalTime = 100
                   break
               end
               resource.watch_state = "watching finished"
               intervalTime = 0
           until true
           ngx.sleep(intervalTime)
       end
   end
   
   local function fetch_resources()
       for i, resource in ipairs(watch_resource_list) do
           watch_threads[i] = ngx.thread.spawn(fetch_resource, resource)
       end
       ngx.thread.wait(core.table.unpack(watch_threads))
   end
   
   local function end_world(reason)
       core.log.warn("send USER1 signal to master process [", 
process.get_master_pid(), "] for reopening log file")
       -- -- local ok, err = signal.kill(process.get_master_pid(), 
signal.signum("USR1"))
       -- if not ok then
       --     core.log.error("failed to send USER1 signal for reopening log 
file: ", err)
       -- end
   end
   
   local schema = {
       type = "object",
       properties = {},
       additionalProperties = false
   }
   
   local _M = {
       version = 0.1
   }
   
   function _M.check_schema(conf)
       return true
   end
   
   function _M.nodes(service_name)
       local pattern = "([a-z][a-z0-9-.]{0,})[:]([a-z][a-z0-9-.]{0,})$"
       local match, err = ngx.re.match(service_name, pattern, "jiao")
       if not match then
           core.log.error("get nodes for service error: ", err or "")
           return nil
       end
       local k8s_service_name = match[1]
       local k8s_port_name = match[2]
       local version, _, _ = shared_endpoints:get_stale(k8s_service_name .. 
"#version")
       if not version then
           core.log.error("get version: ", version)
           return nil
       end
       core.log.error("get version: ", version)
       return lrucache(service_name, version, create_nodes, k8s_service_name, 
k8s_port_name)
   end
   
   function _M.init_worker()
       if process.type() ~= "privileged agent" then
           return
       end
   
       local ok, err = core.schema.check(schema, local_conf.discovery.k8s)
       if not ok then
           error("invalid k8s discovery configuration: " .. err)
           return
       end
   
       local err
       namespace, err = util.read_file("/home/adugeek/Temp/namespace")
       if err then
           end_world(err)
           return
       end
   
       core.log.info("here")
       token, err = util.read_file("/home/adugeek/Temp/token")
       if err then
           end_world(err)
           return
       end
   
       core.log.info("here")
       -- apiserver_host = os.getenv("KUBERNETES_SERVICE_HOST")
   
       -- apiserver_port = os.getenv("KUBERNETES_SERVICE_PORT")
   
       apiserver_host = "127.0.0.1"
       apiserver_port = "8001"
   
       ngx_timer_at(0, fetch_resources)
   end
   
   return _M
   
   ```
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to