adugeek commented on a change in pull request #4880: URL: https://github.com/apache/apisix/pull/4880#discussion_r711904560
########## File path: apisix/discovery/k8s.lua ########## @@ -0,0 +1,712 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +local ngx = ngx +local ipairs = ipairs +local pairs = pairs +local string = string +local tonumber = tonumber +local tostring = tostring +local math = math +local os = os +local error = error +local process = require("ngx.process") +local core = require("apisix.core") +local util = require("apisix.cli.util") +local local_conf = require("apisix.core.config_local").local_conf() +local http = require("resty.http") +local endpoints_shared = ngx.shared.discovery + +local apiserver_schema = "" +local apiserver_host = "" +local apiserver_port = 0 +local apiserver_token = "" +local namespace_selector_string = "" +local namespace_selector_function +local default_weight = 0 + +local endpoint_lrucache = core.lrucache.new({ + ttl = 300, + count = 1024 +}) + +local endpoint_cache = {} +local empty_table = {} +local pending_resources + +local function sort_by_ip(a, b) + return a.ip < b.ip +end + +local function on_endpoint_deleted(endpoint, selector_check) + if selector_check then + if namespace_selector_function and + not namespace_selector_function(endpoint.metadata.namespace) then + return + end + end + + local endpoint_key = endpoint.metadata.namespace .. "/" .. endpoint.metadata.name + endpoints_shared:delete(endpoint_key .. "#version") + endpoints_shared:delete(endpoint_key) +end + +local function on_endpoint_modified(endpoint) + if namespace_selector_function and + not namespace_selector_function(endpoint.metadata.namespace) then + return + end + + if endpoint.subsets == nil or #endpoint.subsets == 0 then + return on_endpoint_deleted(endpoint) + end + + core.table.clear(endpoint_cache) + local subsets = endpoint.subsets + for _, subset in ipairs(subsets) do + if subset.addresses ~= nil then + local addresses = subset.addresses + + for _, port in ipairs(subset.ports or empty_table) do + local port_name + if port.name then + port_name = port.name + else + port_name = tostring(port.port) + end + + local nodes = endpoint_cache[port_name] + if nodes == nil then + nodes = core.table.new(0, #addresses * #subsets) + endpoint_cache[port_name] = nodes + end + + for _, address in ipairs(subset.addresses) do + core.table.insert(nodes, { + host = address.ip, + port = port.port, + weight = default_weight + }) + end + end + end + end + + for _, ports in pairs(endpoint_cache) do + for _, nodes in pairs(ports) do + core.table.sort(nodes, sort_by_ip) + end + end + + local endpoint_key = endpoint.metadata.namespace .. "/" .. endpoint.metadata.name + local endpoint_content = core.json.encode(endpoint_cache, true) + local endpoint_version = ngx.crc32_long(endpoint_content) + + local _, err + _, err = endpoints_shared:safe_set(endpoint_key .. "#version", endpoint_version) + if err then + core.log.emerg("set endpoint version into discovery DICT failed, ", err) + return + end + endpoints_shared:safe_set(endpoint_key, endpoint_content) + if err then + core.log.emerg("set endpoint into discovery DICT failed, ", err) + endpoints_shared:delete(endpoint_key .. "#version") + end +end + +local function list_resource(httpc, resource, continue) + httpc:set_timeouts(2000, 2000, 3000) + local res, err = httpc:request({ + path = resource:list_path(), + query = resource:list_query(continue), + headers = { + ["Host"] = apiserver_host .. ":" .. apiserver_port, + ["Authorization"] = "Bearer " .. apiserver_token, + ["Accept"] = "application/json", + ["Connection"] = "keep-alive" + } + }) + + core.log.debug("--raw=" .. resource:list_path() .. "?" .. resource:list_query(continue)) + + if not res then + return false, "RequestError", err or "" + end + + if res.status ~= 200 then + return false, res.reason, res:read_body() or "" + end + local body, err = res:read_body() + if err then + return false, "ReadBodyError", err + end + + local data, _ = core.json.decode(body) + if not data or data.kind ~= resource.listKind then + return false, "UnexpectedBody", body + end + + resource.newest_resource_version = data.metadata.resourceVersion + + for _, item in ipairs(data.items or empty_table) do + resource:event_dispatch("ADDED", item, "list") Review comment: Thanks suggestion, I will update -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
