adugeek edited a comment on issue #4388:
URL: https://github.com/apache/apisix/issues/4388#issuecomment-857652609
> I mean, too frequent endpoints change will make the disk io heavy.
I understand your concerns.
But what I mean is that changes in enpoints will be sensed by k8s discvery
and written to ngx.shared.DICT. but not write to apisix.yaml .
There is no disk io here.
Talk is cheap. Let's look at the code
```lua
local ipairs = ipairs
local ngx = ngx
local string = string
local tonumber = tonumber
local math = math
local os = os
local process = require("ngx.process")
local core = require("apisix.core")
local util = require("apisix.cli.util")
local http = require("resty.http")
local signal = require("resty.signal")
local ngx_timer_at = ngx.timer.at
local shared_endpoints = ngx.shared.discovery
local apiserver_host = ""
local apiserver_port = ""
local namespace = ""
local token = ""
local default_weight = 50
local lrucache = core.lrucache.new({
ttl = 300,
count = 1024
})
local cache_table = {}
local function end_world(reason)
core.log.emerg(reason)
signal.kill(process.get_master_pid(), signal.signum("QUIT"))
end
local function sort_by_key_host(a, b)
return a.host < b.host
end
local function on_endpoint_added(endpoint)
local subsets = endpoint.subsets
if subsets == nil or #subsets == 0 then
return
end
local subset = subsets[1]
local addresses = subset.addresses
if addresses == nil or #addresses == 0 then
return
end
local ports = subset.ports
if ports == nil or #ports == 0 then
return
end
core.table.clear(cache_table)
for _, port in ipairs(ports) do
local nodes = core.table.new(#addresses, 0)
for i, address in ipairs(addresses) do
nodes[i] = {
host = address.ip,
port = port.port,
weight = default_weight
}
end
core.table.sort(nodes, sort_by_key_host)
cache_table[port.name] = nodes
end
local _, err
_, err = shared_endpoints:safe_set(endpoint.metadata.name .. "#version",
endpoint.metadata.resourceVersion)
if err then
core.log.emerg("set endpoint version into discovery DICT failed ,",
err)
end
shared_endpoints:safe_set(endpoint.metadata.name,
core.json.encode(cache_table, true))
if err then
core.log.emerg("set endpoint into discovery DICT failed ,", err)
end
end
local function on_endpoint_deleted(endpoint)
shared_endpoints:deleted(endpoint.metadata.name .. "#version")
shared_endpoints:delete(endpoint.metadata.name)
end
local function on_endpoint_modified(endpoint)
local subsets = endpoint.subsets
if subsets == nil or #subsets == 0 then
return on_endpoint_deleted(endpoint)
end
local subset = subsets[1]
local addresses = subset.addresses
if addresses == nil or #addresses == 0 then
return on_endpoint_deleted(endpoint)
end
local ports = subset.ports
if ports == nil or #ports == 0 then
return on_endpoint_deleted(endpoint)
end
core.table.clear(cache_table)
for _, port in ipairs(ports) do
local nodes = core.table.new(#addresses, 0)
for i, address in ipairs(addresses) do
nodes[i] = {
host = address.ip,
port = port.port,
weight = default_weight
}
end
core.table.sort(nodes, sort_by_key_host)
cache_table[port.name] = nodes
end
local _, err
_, err = shared_endpoints:safe_set(endpoint.metadata.name .. "#version",
endpoint.metadata.resourceVersion)
if err then
core.log.emerg("set endpoints version into discovery DICT failed ,",
err)
end
shared_endpoints:safe_set(endpoint.metadata.name,
core.json.encode(cache_table, true))
if err then
core.log.emerg("set endpoints into discovery DICT failed ,", err)
end
end
local endpoint_resource = {
version = "v1",
kind = "Endpoints",
listKind = "EndpointsList",
plural = "endpoints",
max_resource_version = 0,
list_path = function(self)
return string.format("/api/v1/namespaces/%s/endpoints", namespace)
end,
list_query = function(self, continue)
if continue == nil or continue == "" then
return "limit=45"
else
return "limit=45&continue=" .. continue
end
end,
watch_path = function(self)
return string.format("/api/v1/namespaces/%s/endpoints", namespace)
end,
watch_query = function(self, timeout)
return
string.format("watch=1&allowWatchBookmarks=true&timeoutSeconds=%d&resourceVersion=%d",
timeout,
self.max_resource_version)
end,
pre_list_callback = function(self)
self.max_resource_version = 0
shared_endpoints:flush_all()
end,
post_list_callback = function(self)
shared_endpoints:flush_expired()
end,
added_callback = function(self, object, drive)
on_endpoint_added(object)
end,
modified_callback = function(self, object)
on_endpoint_modified(object)
end,
deleted_callback = function(self, object)
on_endpoint_deleted(object)
end
}
local function event_dispatch(resource, event, object, drive)
if drive == "watch" then
local resource_version = object.metadata.resourceVersion
local rvv = tonumber(resource_version)
if rvv <= resource.max_resource_version then
return
end
resource.max_resource_version = rvv
end
if event == "ADDED" then
resource:added_callback(object, drive)
elseif event == "MODIFIED" then
if object.deletionTimestamp ~= nil then
resource:deleted_callback(object)
else
resource:modified_callback(object)
end
elseif event == "DELETED" then
resource:deleted_callback(object)
elseif event == "BOOKMARK" then
-- do nothing because we had record max_resource_version to
resource.max_resource_version
end
end
local function list_resource(httpc, resource, continue)
httpc:set_timeouts(2000, 2000, 3000)
local res, err = httpc:request({
path = resource:list_path(),
query = resource:list_query(),
headers = {
["Authorization"] = string.format("Bearer %s", token)
}
})
if not res then
return false, "RequestError", err or ""
end
if res.status ~= 200 then
return false, res.reason, res.read_body() or ""
end
local body, err = res:read_body()
if err then
return false, "ReadBodyError", err
end
local data, _ = core.json.decode(body)
if not data or data.kind ~= resource.listKind then
return false, "UnexpectedBody", body
end
local resource_version = data.metadata.resourceVersion
resource.max_resource_version = tonumber(resource_version)
for _, item in ipairs(data.items) do
event_dispatch(resource, "ADDED", item, "list")
end
if data.metadata.continue ~= nil and data.metadata.continue ~= "" then
list_resource(httpc, resource, data.metadata.continue)
end
return true, "Success", ""
end
local function watch_resource(httpc, resource)
math.randomseed(process.get_master_pid())
local watch_seconds = 1800 + math.random(60, 1200)
local allowance_seconds = 120
httpc:set_timeouts(2000, 3000, (watch_seconds + allowance_seconds) *
1000)
local res, err = httpc:request({
path = resource:watch_path(),
query = resource:watch_query(watch_seconds),
headers = {
["Authorization"] = string.format("Bearer %s", token)
}
})
if err then
return false, "RequestError", err
end
if res.status ~= 200 then
return false, res.reason, res.read_body and res.read_body()
end
local remaindBody = ""
local body = ""
local reader = res.body_reader
local gmatchIterator;
local captures;
local capturedSize = 0
while true do
body, err = reader()
if err then
return false, "ReadBodyError", err
end
if not body then
break
end
if #remaindBody ~= 0 then
body = remaindBody .. body
end
gmatchIterator, err = ngx.re.gmatch(body, "{\"type\":.*}\n", "jiao")
if not gmatchIterator then
return false, "GmatchError", err
end
while true do
captures, err = gmatchIterator()
if err then
return false, "GmatchError", err
end
if not captures then
break
end
capturedSize = capturedSize + #captures[0]
local v, _ = core.json.decode(captures[0])
if not v or not v.object or v.object.kind ~= resource.kind then
return false, "UnexpectedBody", captures[0]
end
event_dispatch(resource, v.type, v.object, "watch")
end
if capturedSize == #body then
remaindBody = ""
elseif capturedSize == 0 then
remaindBody = body
else
remaindBody = string.sub(body, capturedSize + 1)
end
end
watch_resource(httpc, resource)
end
local function fetch_resource(resource)
while true do
local ok = false
local reason, message = "", ""
local intervalTime = 0
repeat
local httpc = http.new()
resource.watch_state = "connecting"
core.log.info("begin to connect ", resource.plural)
ok, message = httpc:connect({
scheme = "https",
host = apiserver_host,
port = tonumber(apiserver_port),
ssl_verify = false
})
if not ok then
resource.watch_state = "connecting"
core.log.error("connect apiserver failed , apiserver_host:
", apiserver_host, "apiserver_port",
apiserver_port, "message : ", message)
intervalTime = 200
break
end
core.log.info("begin to list ", resource.plural)
resource.watch_state = "listing"
resource:pre_list_callback()
ok, reason, message = list_resource(httpc, resource)
if not ok then
resource.watch_state = "listing failed"
core.log.error("list failed , resource: ", resource.plural,
" reason: ", reason, "message : ", message)
intervalTime = 200
break
end
resource.watch_state = "list finished"
resource:post_list_callback()
core.log.info("begin to watch ", resource.plural)
resource.watch_state = "watching"
ok, reason, message = watch_resource(httpc, resource)
if not ok then
resource.watch_state = "watch failed"
core.log.error("watch failed, resource: ", resource.plural,
" reason: ", reason, "message : ", message)
intervalTime = 100
break
end
resource.watch_state = "watch finished"
intervalTime = 0
until true
ngx.sleep(intervalTime)
end
end
local function create_lrucache(service_name, port_name)
local endpoint, _, _ = shared_endpoints:get_stale(service_name)
if not endpoint then
core.log.error("get emppty endpoint from discovery DICT,this should
not happen ", service_name)
return nil
end
local t, _ = core.json.decode(endpoint)
if not t then
core.log.error("json decode endpoint failed, this should not happen,
content : ", endpoint)
end
return t[port_name]
end
local _M = {
version = 0.01
}
function _M.nodes(service_name)
local pattern = "([a-z][a-z0-9-.]{0,62})[:]([a-z][a-z0-9-.]{0,62})$"
local match, _ = ngx.re.match(service_name, pattern, "jiao")
if not match then
core.log.info("get unexpected upstream service_name: ", service_name)
return nil
end
local k8s_service_name = match[1]
local k8s_port_name = match[2]
local version, _, err = shared_endpoints:get_stale(k8s_service_name ..
"#version")
if not version then
core.log.info("get emppty endpoint version from discovery DICT ",
k8s_service_name)
return nil
end
return lrucache(service_name, version, create_lrucache,
k8s_service_name, k8s_port_name)
end
function _M.init_worker()
if process.type() ~= "privileg,med agent" then
return
end
local err
namespace, err = util.read_file("/home/adugeek/Temp/namespace")
if not namespace or namespace == "" then
end_world("get empty namespace value " .. (err or ""))
return
end
token, err = util.read_file("/home/adugeek/Temp/token")
if not token or token == "" then
end_world("get empty token value " .. (err or ""))
return
end
apiserver_host = os.getenv("KUBERNETES_SERVICE_HOST")
if not apiserver_host or apiserver_host == "" then
end_world("get empty KUBERNETES_SERVICE_HOST value")
end
apiserver_port = os.getenv("KUBERNETES_SERVICE_PORT")
if not apiserver_port or apiserver_port == "" then
end_world("get empty KUBERNETES_SERVICE_PORT value")
end
ngx_timer_at(0, fetch_resource, endpoint_resource)
end
return _M
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]