ChuanFF commented on code in PR #12634:
URL: https://github.com/apache/apisix/pull/12634#discussion_r2732595673
##########
apisix/discovery/kubernetes/init.lua:
##########
@@ -51,69 +52,133 @@ local function sort_nodes_cmp(left, right)
return left.port < right.port
end
-local function on_endpoint_slices_modified(handle, endpoint, operate)
- if handle.namespace_selector and
- not handle:namespace_selector(endpoint.metadata.namespace) then
- return
+local function update_endpoint_slices_cache(handle, endpoint_key, slice,
slice_name)
+ if not handle.endpoint_slices_cache[endpoint_key] then
+ handle.endpoint_slices_cache[endpoint_key] = {}
end
+ local endpoint_slices = handle.endpoint_slices_cache[endpoint_key]
+ endpoint_slices[slice_name] = slice
+end
- core.log.debug(core.json.delay_encode(endpoint))
- core.table.clear(endpoint_buffer)
-
- local endpointslices = endpoint.endpoints
- if type(endpointslices) == "table" then
- for _, endpointslice in ipairs(endpointslices) do
- if endpointslice.addresses then
- local addresses = endpointslice.addresses
- for _, port in ipairs(endpoint.ports or {}) do
- local port_name
- if port.name then
- port_name = port.name
- elseif port.targetPort then
- port_name = tostring(port.targetPort)
- else
- port_name = tostring(port.port)
- end
-
- if endpointslice.conditions and
endpointslice.conditions.ready then
- local nodes = endpoint_buffer[port_name]
- if nodes == nil then
- nodes = core.table.new(0, #endpointslices *
#addresses)
- endpoint_buffer[port_name] = nodes
- end
-
- for _, address in ipairs(addresses) do
- core.table.insert(nodes, {
- host = address.ip,
- port = port.port,
- weight = handle.default_weight
- })
- end
- end
- end
+local function get_endpoints_from_cache(handle, endpoint_key)
+ local endpoint_slices = handle.endpoint_slices_cache[endpoint_key] or {}
+ local endpoints = {}
+ for _, endpoint_slice in pairs(endpoint_slices) do
+ for port, targets in pairs(endpoint_slice) do
+ if not endpoints[port] then
+ endpoints[port] = core.table.new(0, #targets)
end
+ core.table.insert_tail(endpoints[port], unpack(targets))
end
end
- for _, ports in pairs(endpoint_buffer) do
- for _, nodes in pairs(ports) do
- core.table.sort(nodes, sort_nodes_cmp)
- end
- end
- local endpoint_key = endpoint.metadata.namespace .. "/" ..
endpoint.metadata.name
- local endpoint_content = core.json.encode(endpoint_buffer, true)
- local endpoint_version = ngx.crc32_long(endpoint_content)
+ return endpoints
+end
+local function update_endpoint_dict(handle, endpoints, endpoint_key)
+ local endpoint_content = core.json.encode(endpoints, true)
+ local endpoint_version = ngx.crc32_long(endpoint_content)
local _, err
_, err = handle.endpoint_dict:safe_set(endpoint_key .. "#version",
endpoint_version)
if err then
- core.log.error("set endpoint version into discovery DICT failed, ",
err)
- return
+ return false, "set endpoint version into discovery DICT failed, " ..
err
end
_, err = handle.endpoint_dict:safe_set(endpoint_key, endpoint_content)
if err then
- core.log.error("set endpoint into discovery DICT failed, ", err)
handle.endpoint_dict:delete(endpoint_key .. "#version")
+ return false, "set endpoint into discovery DICT failed, " .. err
+ end
+
+ return true
+end
+
+local function validate_endpoint_slice(endpoint_slice)
+ if not endpoint_slice.metadata then
+ return false, "endpoint_slice has no metadata, endpointSlice: "
+ .. core.json.encode(endpoint_slice)
+ end
+ if not endpoint_slice.metadata.name then
+ return false, "endpoint_slice has no metadata.name, endpointSlice: "
+ .. core.json.encode(endpoint_slice)
+ end
+ if not endpoint_slice.metadata.namespace then
+ return false, "endpoint_slice has no metadata.namespace,
endpointSlice: "
+ .. core.json.encode(endpoint_slice)
+ end
+ if not endpoint_slice.metadata.labels
+ or not
endpoint_slice.metadata.labels[kubernetes_service_name_label] then
+ return false, "endpoint_slice has no service-name, endpointSlice: "
+ .. core.json.encode(endpoint_slice)
+ end
+
+ return true
+end
+
+local function on_endpoint_slices_modified(handle, endpoint_slice, operate)
+ local ok, err = validate_endpoint_slice(endpoint_slice)
+ if not ok then
+ core.log.error("endpoint_slice validation fail: ", err)
+ return
+ end
+ if handle.namespace_selector and
+ not handle:namespace_selector(endpoint_slice.metadata.namespace)
then
+ return
+ end
+
+ core.log.debug(core.json.delay_encode(endpoint_slice))
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]