membphis commented on a change in pull request #4880:
URL: https://github.com/apache/apisix/pull/4880#discussion_r800117950



##########
File path: apisix/discovery/kubernetes/informer_factory.lua
##########
@@ -0,0 +1,342 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+local ngx = ngx
+local ipairs = ipairs
+local string = string
+local math = math
+local setmetatable = setmetatable
+local core = require("apisix.core")
+local http = require("resty.http")
+
+local empty_table = {}
+
+local function list_query(informer)
+    local arguments = {
+        limit = informer.limit,
+    }
+
+    if informer.continue ~= nil and informer.continue ~= "" then
+        arguments.continue = informer.continue
+    end
+
+    if informer.label_selector and informer.label_selector ~= "" then
+        arguments.labelSelector = informer.label_selector
+    end
+
+    if informer.field_selector and informer.field_selector ~= "" then
+        arguments.fieldSelector = informer.field_selector
+    end
+
+    return ngx.encode_args(arguments)
+end
+
+local function list(httpc, apiserver, informer)

Review comment:
       style: two blank lines are used between different functions

##########
File path: apisix/discovery/kubernetes/init.lua
##########
@@ -0,0 +1,335 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+local ngx = ngx
+local ipairs = ipairs
+local pairs = pairs
+local string = string
+local tonumber = tonumber
+local tostring = tostring
+local os = os
+local error = error
+local process = require("ngx.process")
+local core = require("apisix.core")
+local util = require("apisix.cli.util")
+local local_conf = require("apisix.core.config_local").local_conf()
+local kubernetes = require("apisix.discovery.kubernetes.kubernetes")
+local endpoint_dict = ngx.shared.discovery

Review comment:
       we need to check it now

##########
File path: apisix/discovery/kubernetes/informer_factory.lua
##########
@@ -0,0 +1,342 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+local ngx = ngx
+local ipairs = ipairs
+local string = string
+local math = math
+local setmetatable = setmetatable
+local core = require("apisix.core")
+local http = require("resty.http")
+
+local empty_table = {}
+
+local function list_query(informer)
+    local arguments = {
+        limit = informer.limit,
+    }
+
+    if informer.continue ~= nil and informer.continue ~= "" then
+        arguments.continue = informer.continue
+    end
+
+    if informer.label_selector and informer.label_selector ~= "" then
+        arguments.labelSelector = informer.label_selector
+    end
+
+    if informer.field_selector and informer.field_selector ~= "" then
+        arguments.fieldSelector = informer.field_selector
+    end
+
+    return ngx.encode_args(arguments)
+end
+
+local function list(httpc, apiserver, informer)
+    local response, err = httpc:request({
+        path = informer.path,
+        query = list_query(informer),
+        headers = {
+            ["Host"] = apiserver.host .. ":" .. apiserver.port,
+            ["Authorization"] = "Bearer " .. apiserver.token,
+            ["Accept"] = "application/json",
+            ["Connection"] = "keep-alive"
+        }
+    })
+
+    core.log.info("--raw=", informer.path, "?", list_query(informer))
+
+    if not response then
+        return false, "RequestError", err or ""
+    end
+
+    if response.status ~= 200 then
+        return false, response.reason, response:read_body() or ""
+    end
+
+    local body, err = response:read_body()
+    if err then
+        return false, "ReadBodyError", err
+    end
+
+    local data, _ = core.json.decode(body)
+    if not data or data.kind ~= informer.list_kind then
+        return false, "UnexpectedBody", body
+    end
+
+    informer.version = data.metadata.resourceVersion
+
+    if informer.on_added ~= nil then
+        for _, item in ipairs(data.items or empty_table) do
+            informer:on_added(item, "list")
+        end
+    end
+
+    informer.continue = data.metadata.continue
+    if informer.continue ~= nil and informer.continue ~= "" then
+        list(httpc, informer)
+    end
+
+    return true, "Success", ""

Review comment:
       `return true` should be enough

##########
File path: apisix/discovery/kubernetes/init.lua
##########
@@ -0,0 +1,356 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+local ngx = ngx
+local ipairs = ipairs
+local pairs = pairs
+local string = string
+local tonumber = tonumber
+local tostring = tostring
+local os = os
+local error = error
+local pcall = pcall
+local process = require("ngx.process")
+local core = require("apisix.core")
+local util = require("apisix.cli.util")
+local local_conf = require("apisix.core.config_local").local_conf()
+local informer_factory = 
require("apisix.discovery.kubernetes.informer_factory")
+local endpoint_dict = ngx.shared.discovery
+
+local default_weight = 0
+
+local endpoint_lrucache = core.lrucache.new({
+    ttl = 300,
+    count = 1024
+})
+
+local endpoint_buffer = {}
+local empty_table = {}
+
+local function sort_nodes_cmp(left, right)
+    if left.host ~= right.host then
+        return left.host < right.host
+    end
+    return left.port < right.port
+end
+
+local function on_endpoint_modified(informer, endpoint)
+    if informer.namespace_selector ~= nil and
+            not informer:namespace_selector(endpoint.metadata.namespace) then
+        return
+    end
+
+    core.log.debug(core.json.delay_encode(endpoint))
+    core.table.clear(endpoint_buffer)
+
+    local subsets = endpoint.subsets
+    for _, subset in ipairs(subsets or empty_table) do
+        if subset.addresses ~= nil then
+            local addresses = subset.addresses
+            for _, port in ipairs(subset.ports or empty_table) do
+                local port_name
+                if port.name then
+                    port_name = port.name
+                elseif port.targetPort then
+                    port_name = tostring(port.targetPort)
+                else
+                    port_name = tostring(port.port)
+                end
+
+                local nodes = endpoint_buffer[port_name]
+                if nodes == nil then
+                    nodes = core.table.new(0, #subsets * #addresses)
+                    endpoint_buffer[port_name] = nodes
+                end
+
+                for _, address in ipairs(subset.addresses) do
+                    core.table.insert(nodes, {
+                        host = address.ip,
+                        port = port.port,
+                        weight = default_weight
+                    })
+                end
+            end
+        end
+    end
+
+    for _, ports in pairs(endpoint_buffer) do
+        for _, nodes in pairs(ports) do
+            core.table.sort(nodes, sort_nodes_cmp)
+        end
+    end
+
+    local endpoint_key = endpoint.metadata.namespace .. "/" .. 
endpoint.metadata.name
+    local endpoint_content = core.json.encode(endpoint_buffer, true)
+    local endpoint_version = ngx.crc32_long(endpoint_content)
+
+    local _, err
+    _, err = endpoint_dict:safe_set(endpoint_key .. "#version", 
endpoint_version)
+    if err then
+        core.log.error("set endpoint version into discovery DICT failed, ", 
err)
+        return
+    end
+    _, err = endpoint_dict:safe_set(endpoint_key, endpoint_content)
+    if err then
+        core.log.error("set endpoint into discovery DICT failed, ", err)
+        endpoint_dict:delete(endpoint_key .. "#version")
+    end
+end
+
+local function on_endpoint_deleted(informer, endpoint)
+    if informer.namespace_selector ~= nil and
+            not informer:namespace_selector(endpoint.metadata.namespace) then
+        return
+    end
+
+    core.log.debug(core.json.delay_encode(endpoint))
+    local endpoint_key = endpoint.metadata.namespace .. "/" .. 
endpoint.metadata.name
+    endpoint_dict:delete(endpoint_key .. "#version")
+    endpoint_dict:delete(endpoint_key)
+end
+
+local function pre_list(informer)
+    endpoint_dict:flush_all()
+end
+
+local function post_list(informer)
+    endpoint_dict:flush_expired()
+end
+
+local function setup_label_selector(conf, informer)
+    informer.label_selector = conf.label_selector
+end
+
+local function setup_namespace_selector(conf, informer)
+    local ns = conf.namespace_selector
+    if ns == nil then
+        informer.namespace_selector = nil
+        return
+    end
+
+    if ns.equal then
+        informer.field_selector = "metadata.namespace=" .. ns.equal
+        informer.namespace_selector = nil
+        return
+    end
+
+    if ns.not_equal then
+        informer.field_selector = "metadata.namespace!=" .. ns.not_equal
+        informer.namespace_selector = nil
+        return
+    end
+
+    if ns.match then
+        informer.namespace_selector = function(self, namespace)
+            local match = conf.namespace_selector.match
+            local m, err
+            for _, v in ipairs(match) do
+                m, err = ngx.re.match(namespace, v, "j")
+                if m and m[0] == namespace then
+                    return true
+                end
+                if err then
+                    core.log.error("ngx.re.match failed: ", err)
+                end
+            end
+            return false
+        end
+        return
+    end
+
+    if ns.not_match then
+        informer.namespace_selector = function(self, namespace)
+            local not_match = conf.namespace_selector.not_match
+            local m, err
+            for _, v in ipairs(not_match) do
+                m, err = ngx.re.match(namespace, v, "j")
+                if m and m[0] == namespace then
+                    return false
+                end
+                if err then
+                    return false
+                end
+            end
+            return true
+        end
+        return
+    end

Review comment:
       what is the default action?

##########
File path: apisix/discovery/kubernetes/informer_factory.lua
##########
@@ -0,0 +1,342 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+local ngx = ngx
+local ipairs = ipairs
+local string = string
+local math = math
+local setmetatable = setmetatable
+local core = require("apisix.core")
+local http = require("resty.http")
+
+local empty_table = {}
+
+local function list_query(informer)
+    local arguments = {
+        limit = informer.limit,
+    }
+
+    if informer.continue ~= nil and informer.continue ~= "" then
+        arguments.continue = informer.continue
+    end
+
+    if informer.label_selector and informer.label_selector ~= "" then
+        arguments.labelSelector = informer.label_selector
+    end
+
+    if informer.field_selector and informer.field_selector ~= "" then
+        arguments.fieldSelector = informer.field_selector
+    end
+
+    return ngx.encode_args(arguments)
+end
+
+local function list(httpc, apiserver, informer)
+    local response, err = httpc:request({
+        path = informer.path,
+        query = list_query(informer),
+        headers = {
+            ["Host"] = apiserver.host .. ":" .. apiserver.port,
+            ["Authorization"] = "Bearer " .. apiserver.token,
+            ["Accept"] = "application/json",
+            ["Connection"] = "keep-alive"
+        }
+    })
+
+    core.log.info("--raw=", informer.path, "?", list_query(informer))
+
+    if not response then
+        return false, "RequestError", err or ""
+    end
+
+    if response.status ~= 200 then
+        return false, response.reason, response:read_body() or ""
+    end
+
+    local body, err = response:read_body()
+    if err then
+        return false, "ReadBodyError", err
+    end
+
+    local data, _ = core.json.decode(body)
+    if not data or data.kind ~= informer.list_kind then
+        return false, "UnexpectedBody", body
+    end
+
+    informer.version = data.metadata.resourceVersion
+
+    if informer.on_added ~= nil then
+        for _, item in ipairs(data.items or empty_table) do
+            informer:on_added(item, "list")
+        end
+    end
+
+    informer.continue = data.metadata.continue
+    if informer.continue ~= nil and informer.continue ~= "" then
+        list(httpc, informer)
+    end
+
+    return true, "Success", ""
+end
+
+local function watch_query(informer)
+    local arguments = {
+        watch = "true",
+        allowWatchBookmarks = "true",
+        timeoutSeconds = informer.overtime,
+    }
+
+    if informer.version ~= nil and informer.version ~= "" then
+        arguments.resourceVersion = informer.version
+    end
+
+    if informer.label_selector and informer.label_selector ~= "" then
+        arguments.labelSelector = informer.label_selector
+    end
+
+    if informer.field_selector and informer.field_selector ~= "" then
+        arguments.fieldSelector = informer.field_selector
+    end
+
+    return ngx.encode_args(arguments)
+end
+
+local function split_event (body, dispatch_event)
+    local gmatch_iterator, err = ngx.re.gmatch(body, "{\"type\":.*}\n", "jao")
+    if not gmatch_iterator then
+        return nil, "GmatchError", err
+    end
+
+    local captures
+    local captured_size = 0
+    local ok, reason
+    while true do
+        captures, err = gmatch_iterator()
+
+        if err then
+            return nil, "GmatchError", err
+        end
+
+        if not captures then
+            break
+        end
+
+        captured_size = captured_size + #captures[0]
+
+        ok, reason, err = dispatch_event(captures[0])
+        if not ok then
+            return nil, reason, err
+        end
+    end
+
+    local remainder_body
+    if captured_size == #body then
+        remainder_body = ""
+    elseif captured_size == 0 then
+        remainder_body = body
+    elseif captured_size < #body then
+        remainder_body = string.sub(body, captured_size + 1)
+    end
+
+    return remainder_body, "Success", nil
+end
+
+local function watch(httpc, apiserver, informer)
+
+    local dispatch_event = function(event_string)
+        local event, _ = core.json.decode(event_string)
+
+        if not event or not event.type or not event.object then
+            return false, "UnexpectedBody", event_string
+        end
+
+        local type = event.type
+
+        if type == "ERROR" then
+            if event.object.code == 410 then
+                return false, "ResourceGone", nil
+            end
+            return false, "UnexpectedBody", event_string
+        end
+
+        local object = event.object
+        informer.version = object.metadata.resourceVersion
+
+        if type == "ADDED" then
+            if informer.on_added ~= nil then
+                informer:on_added(object, "watch")
+            end
+        elseif type == "DELETED" then
+            if informer.on_deleted ~= nil then
+                informer:on_deleted(object)
+            end
+        elseif type == "MODIFIED" then
+            if informer.on_modified ~= nil then
+                informer:on_modified(object)
+            end
+            -- elseif type == "BOOKMARK" then
+            --    do nothing
+        end
+        return true, "Success", nil

Review comment:
       ditto

##########
File path: apisix/discovery/kubernetes/informer_factory.lua
##########
@@ -0,0 +1,342 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+local ngx = ngx
+local ipairs = ipairs
+local string = string
+local math = math
+local setmetatable = setmetatable
+local core = require("apisix.core")
+local http = require("resty.http")
+
+local empty_table = {}
+
+local function list_query(informer)
+    local arguments = {
+        limit = informer.limit,
+    }
+
+    if informer.continue ~= nil and informer.continue ~= "" then
+        arguments.continue = informer.continue
+    end
+
+    if informer.label_selector and informer.label_selector ~= "" then
+        arguments.labelSelector = informer.label_selector
+    end
+
+    if informer.field_selector and informer.field_selector ~= "" then
+        arguments.fieldSelector = informer.field_selector
+    end
+
+    return ngx.encode_args(arguments)
+end
+
+local function list(httpc, apiserver, informer)
+    local response, err = httpc:request({
+        path = informer.path,
+        query = list_query(informer),
+        headers = {
+            ["Host"] = apiserver.host .. ":" .. apiserver.port,
+            ["Authorization"] = "Bearer " .. apiserver.token,
+            ["Accept"] = "application/json",
+            ["Connection"] = "keep-alive"
+        }
+    })
+
+    core.log.info("--raw=", informer.path, "?", list_query(informer))
+
+    if not response then
+        return false, "RequestError", err or ""
+    end
+
+    if response.status ~= 200 then
+        return false, response.reason, response:read_body() or ""
+    end
+
+    local body, err = response:read_body()
+    if err then
+        return false, "ReadBodyError", err
+    end
+
+    local data, _ = core.json.decode(body)
+    if not data or data.kind ~= informer.list_kind then
+        return false, "UnexpectedBody", body
+    end
+
+    informer.version = data.metadata.resourceVersion
+
+    if informer.on_added ~= nil then
+        for _, item in ipairs(data.items or empty_table) do
+            informer:on_added(item, "list")
+        end
+    end
+
+    informer.continue = data.metadata.continue
+    if informer.continue ~= nil and informer.continue ~= "" then
+        list(httpc, informer)
+    end
+
+    return true, "Success", ""
+end
+
+local function watch_query(informer)
+    local arguments = {
+        watch = "true",
+        allowWatchBookmarks = "true",
+        timeoutSeconds = informer.overtime,
+    }
+
+    if informer.version ~= nil and informer.version ~= "" then
+        arguments.resourceVersion = informer.version
+    end
+
+    if informer.label_selector and informer.label_selector ~= "" then
+        arguments.labelSelector = informer.label_selector
+    end
+
+    if informer.field_selector and informer.field_selector ~= "" then
+        arguments.fieldSelector = informer.field_selector
+    end
+
+    return ngx.encode_args(arguments)
+end
+
+local function split_event (body, dispatch_event)
+    local gmatch_iterator, err = ngx.re.gmatch(body, "{\"type\":.*}\n", "jao")
+    if not gmatch_iterator then
+        return nil, "GmatchError", err
+    end
+
+    local captures
+    local captured_size = 0
+    local ok, reason
+    while true do
+        captures, err = gmatch_iterator()
+
+        if err then
+            return nil, "GmatchError", err
+        end
+
+        if not captures then
+            break
+        end
+
+        captured_size = captured_size + #captures[0]
+
+        ok, reason, err = dispatch_event(captures[0])
+        if not ok then
+            return nil, reason, err
+        end
+    end
+
+    local remainder_body
+    if captured_size == #body then
+        remainder_body = ""
+    elseif captured_size == 0 then
+        remainder_body = body
+    elseif captured_size < #body then
+        remainder_body = string.sub(body, captured_size + 1)
+    end
+
+    return remainder_body, "Success", nil

Review comment:
       `return remainder_body` should be enough too

##########
File path: apisix/discovery/kubernetes/init.lua
##########
@@ -0,0 +1,356 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+local ngx = ngx
+local ipairs = ipairs
+local pairs = pairs
+local string = string
+local tonumber = tonumber
+local tostring = tostring
+local os = os
+local error = error
+local pcall = pcall
+local process = require("ngx.process")
+local core = require("apisix.core")
+local util = require("apisix.cli.util")
+local local_conf = require("apisix.core.config_local").local_conf()
+local informer_factory = 
require("apisix.discovery.kubernetes.informer_factory")
+local endpoint_dict = ngx.shared.discovery
+
+local default_weight = 0
+
+local endpoint_lrucache = core.lrucache.new({
+    ttl = 300,
+    count = 1024
+})
+
+local endpoint_buffer = {}
+local empty_table = {}
+
+local function sort_nodes_cmp(left, right)
+    if left.host ~= right.host then
+        return left.host < right.host
+    end
+    return left.port < right.port
+end
+
+local function on_endpoint_modified(informer, endpoint)
+    if informer.namespace_selector ~= nil and
+            not informer:namespace_selector(endpoint.metadata.namespace) then
+        return
+    end
+
+    core.log.debug(core.json.delay_encode(endpoint))
+    core.table.clear(endpoint_buffer)
+
+    local subsets = endpoint.subsets
+    for _, subset in ipairs(subsets or empty_table) do
+        if subset.addresses ~= nil then
+            local addresses = subset.addresses
+            for _, port in ipairs(subset.ports or empty_table) do
+                local port_name
+                if port.name then
+                    port_name = port.name
+                elseif port.targetPort then
+                    port_name = tostring(port.targetPort)
+                else
+                    port_name = tostring(port.port)
+                end
+
+                local nodes = endpoint_buffer[port_name]
+                if nodes == nil then
+                    nodes = core.table.new(0, #subsets * #addresses)
+                    endpoint_buffer[port_name] = nodes
+                end
+
+                for _, address in ipairs(subset.addresses) do
+                    core.table.insert(nodes, {
+                        host = address.ip,
+                        port = port.port,
+                        weight = default_weight
+                    })
+                end
+            end
+        end
+    end
+
+    for _, ports in pairs(endpoint_buffer) do
+        for _, nodes in pairs(ports) do
+            core.table.sort(nodes, sort_nodes_cmp)
+        end
+    end
+
+    local endpoint_key = endpoint.metadata.namespace .. "/" .. 
endpoint.metadata.name
+    local endpoint_content = core.json.encode(endpoint_buffer, true)
+    local endpoint_version = ngx.crc32_long(endpoint_content)
+
+    local _, err
+    _, err = endpoint_dict:safe_set(endpoint_key .. "#version", 
endpoint_version)
+    if err then
+        core.log.error("set endpoint version into discovery DICT failed, ", 
err)
+        return
+    end
+    _, err = endpoint_dict:safe_set(endpoint_key, endpoint_content)
+    if err then
+        core.log.error("set endpoint into discovery DICT failed, ", err)
+        endpoint_dict:delete(endpoint_key .. "#version")
+    end
+end
+
+local function on_endpoint_deleted(informer, endpoint)
+    if informer.namespace_selector ~= nil and
+            not informer:namespace_selector(endpoint.metadata.namespace) then
+        return
+    end
+
+    core.log.debug(core.json.delay_encode(endpoint))
+    local endpoint_key = endpoint.metadata.namespace .. "/" .. 
endpoint.metadata.name
+    endpoint_dict:delete(endpoint_key .. "#version")
+    endpoint_dict:delete(endpoint_key)
+end
+
+local function pre_list(informer)
+    endpoint_dict:flush_all()
+end
+
+local function post_list(informer)
+    endpoint_dict:flush_expired()
+end
+
+local function setup_label_selector(conf, informer)
+    informer.label_selector = conf.label_selector
+end
+
+local function setup_namespace_selector(conf, informer)
+    local ns = conf.namespace_selector
+    if ns == nil then
+        informer.namespace_selector = nil
+        return
+    end
+
+    if ns.equal then
+        informer.field_selector = "metadata.namespace=" .. ns.equal
+        informer.namespace_selector = nil
+        return
+    end
+
+    if ns.not_equal then
+        informer.field_selector = "metadata.namespace!=" .. ns.not_equal
+        informer.namespace_selector = nil
+        return
+    end
+
+    if ns.match then
+        informer.namespace_selector = function(self, namespace)
+            local match = conf.namespace_selector.match
+            local m, err
+            for _, v in ipairs(match) do
+                m, err = ngx.re.match(namespace, v, "j")
+                if m and m[0] == namespace then
+                    return true
+                end
+                if err then
+                    core.log.error("ngx.re.match failed: ", err)
+                end
+            end
+            return false
+        end
+        return
+    end
+
+    if ns.not_match then
+        informer.namespace_selector = function(self, namespace)
+            local not_match = conf.namespace_selector.not_match
+            local m, err
+            for _, v in ipairs(not_match) do
+                m, err = ngx.re.match(namespace, v, "j")
+                if m and m[0] == namespace then
+                    return false
+                end
+                if err then
+                    return false
+                end
+            end
+            return true
+        end
+        return
+    end
+end
+
+local function read_env(key)
+    if #key > 3 then
+        local a, b = string.byte(key, 1, 2)
+        local c = string.byte(key, #key, #key)
+        -- '$', '{', '}' == 36,123,125
+        if a == 36 and b == 123 and c == 125 then
+            local env = string.sub(key, 3, #key - 1)
+            local val = os.getenv(env)
+            if not val then
+                return false, nil, "not found environment variable " .. env
+            end
+            return true, val, nil
+        end
+    end
+    return true, key, nil
+end
+
+local function get_apiserver(conf)
+    local apiserver = {
+        schema = "",
+        host = "",
+        port = "",
+        token = ""
+    }
+
+    apiserver.schema = conf.service.schema
+
+    local ok, value, message
+    ok, value, message = read_env(conf.service.host)
+    if not ok then
+        return nil, message
+    end
+
+    apiserver.host = value
+    if apiserver.host == "" then
+        return nil, "get empty host value"
+    end
+
+    ok, value, message = read_env(conf.service.port)
+    if not ok then
+        return nil, message
+    end
+
+    apiserver.port = tonumber(value)
+    if not apiserver.port or apiserver.port <= 0 or apiserver.port > 65535 then
+        return nil, "get invalid port value: " .. apiserver.port
+    end
+
+    -- we should not check if the apiserver.token is empty here
+    if conf.client.token then
+        ok, value, message = read_env(conf.client.token)
+        if not ok then
+            return nil, message
+        end
+        apiserver.token = value
+    elseif conf.client.token_file and conf.client.token_file ~= "" then
+        ok, value, message = read_env(conf.client.token_file)
+        if not ok then
+            return nil, message
+        end
+        local apiserver_token_file = value
+
+        apiserver.token, message = util.read_file(apiserver_token_file)
+        if not apiserver.token then
+            return nil, message
+        end
+    else
+        return nil, "invalid kubernetes discovery configuration:" ..
+                "should set one of [client.token,client.token_file] but none"
+    end
+
+    return apiserver, nil
+end
+
+local function create_endpoint_lrucache(endpoint_key, endpoint_port)
+    local endpoint_content = endpoint_dict:get_stale(endpoint_key)
+    if not endpoint_content then
+        core.log.error("get empty endpoint content from discovery DIC, this 
should not happen ",
+                endpoint_key)
+        return nil
+    end
+
+    local endpoint, _ = core.json.decode(endpoint_content)
+    if not endpoint then
+        core.log.error("decode endpoint content failed, this should not 
happen, content: ",
+                endpoint_content)
+    end

Review comment:
       I think we should call `return nil`

##########
File path: apisix/discovery/kubernetes/init.lua
##########
@@ -0,0 +1,356 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+local ngx = ngx
+local ipairs = ipairs
+local pairs = pairs
+local string = string
+local tonumber = tonumber
+local tostring = tostring
+local os = os
+local error = error
+local pcall = pcall
+local process = require("ngx.process")
+local core = require("apisix.core")
+local util = require("apisix.cli.util")
+local local_conf = require("apisix.core.config_local").local_conf()
+local informer_factory = 
require("apisix.discovery.kubernetes.informer_factory")
+local endpoint_dict = ngx.shared.discovery
+
+local default_weight = 0
+
+local endpoint_lrucache = core.lrucache.new({
+    ttl = 300,
+    count = 1024
+})
+
+local endpoint_buffer = {}
+local empty_table = {}
+
+local function sort_nodes_cmp(left, right)
+    if left.host ~= right.host then
+        return left.host < right.host
+    end
+    return left.port < right.port
+end
+
+local function on_endpoint_modified(informer, endpoint)
+    if informer.namespace_selector ~= nil and
+            not informer:namespace_selector(endpoint.metadata.namespace) then
+        return
+    end
+
+    core.log.debug(core.json.delay_encode(endpoint))
+    core.table.clear(endpoint_buffer)
+
+    local subsets = endpoint.subsets
+    for _, subset in ipairs(subsets or empty_table) do
+        if subset.addresses ~= nil then
+            local addresses = subset.addresses
+            for _, port in ipairs(subset.ports or empty_table) do
+                local port_name
+                if port.name then
+                    port_name = port.name
+                elseif port.targetPort then
+                    port_name = tostring(port.targetPort)
+                else
+                    port_name = tostring(port.port)
+                end
+
+                local nodes = endpoint_buffer[port_name]
+                if nodes == nil then
+                    nodes = core.table.new(0, #subsets * #addresses)
+                    endpoint_buffer[port_name] = nodes
+                end
+
+                for _, address in ipairs(subset.addresses) do
+                    core.table.insert(nodes, {
+                        host = address.ip,
+                        port = port.port,
+                        weight = default_weight
+                    })
+                end
+            end
+        end
+    end
+
+    for _, ports in pairs(endpoint_buffer) do
+        for _, nodes in pairs(ports) do
+            core.table.sort(nodes, sort_nodes_cmp)
+        end
+    end
+
+    local endpoint_key = endpoint.metadata.namespace .. "/" .. 
endpoint.metadata.name
+    local endpoint_content = core.json.encode(endpoint_buffer, true)
+    local endpoint_version = ngx.crc32_long(endpoint_content)
+
+    local _, err
+    _, err = endpoint_dict:safe_set(endpoint_key .. "#version", 
endpoint_version)
+    if err then
+        core.log.error("set endpoint version into discovery DICT failed, ", 
err)
+        return
+    end
+    _, err = endpoint_dict:safe_set(endpoint_key, endpoint_content)
+    if err then
+        core.log.error("set endpoint into discovery DICT failed, ", err)
+        endpoint_dict:delete(endpoint_key .. "#version")
+    end
+end
+
+local function on_endpoint_deleted(informer, endpoint)
+    if informer.namespace_selector ~= nil and
+            not informer:namespace_selector(endpoint.metadata.namespace) then
+        return
+    end
+
+    core.log.debug(core.json.delay_encode(endpoint))
+    local endpoint_key = endpoint.metadata.namespace .. "/" .. 
endpoint.metadata.name
+    endpoint_dict:delete(endpoint_key .. "#version")
+    endpoint_dict:delete(endpoint_key)
+end
+
+local function pre_list(informer)
+    endpoint_dict:flush_all()
+end
+
+local function post_list(informer)
+    endpoint_dict:flush_expired()
+end
+
+local function setup_label_selector(conf, informer)
+    informer.label_selector = conf.label_selector
+end
+
+local function setup_namespace_selector(conf, informer)
+    local ns = conf.namespace_selector
+    if ns == nil then
+        informer.namespace_selector = nil
+        return
+    end
+
+    if ns.equal then
+        informer.field_selector = "metadata.namespace=" .. ns.equal
+        informer.namespace_selector = nil
+        return
+    end
+
+    if ns.not_equal then
+        informer.field_selector = "metadata.namespace!=" .. ns.not_equal
+        informer.namespace_selector = nil
+        return
+    end
+
+    if ns.match then
+        informer.namespace_selector = function(self, namespace)
+            local match = conf.namespace_selector.match
+            local m, err
+            for _, v in ipairs(match) do
+                m, err = ngx.re.match(namespace, v, "j")
+                if m and m[0] == namespace then
+                    return true
+                end
+                if err then
+                    core.log.error("ngx.re.match failed: ", err)
+                end
+            end
+            return false
+        end
+        return
+    end
+
+    if ns.not_match then
+        informer.namespace_selector = function(self, namespace)
+            local not_match = conf.namespace_selector.not_match
+            local m, err
+            for _, v in ipairs(not_match) do
+                m, err = ngx.re.match(namespace, v, "j")
+                if m and m[0] == namespace then
+                    return false
+                end
+                if err then
+                    return false
+                end
+            end
+            return true
+        end
+        return
+    end
+end
+
+local function read_env(key)

Review comment:
       `local val, error = read_env(key)`
   
   this is the right way at Lua land

##########
File path: apisix/discovery/kubernetes/init.lua
##########
@@ -0,0 +1,356 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+local ngx = ngx
+local ipairs = ipairs
+local pairs = pairs
+local string = string
+local tonumber = tonumber
+local tostring = tostring
+local os = os
+local error = error
+local pcall = pcall
+local process = require("ngx.process")
+local core = require("apisix.core")
+local util = require("apisix.cli.util")
+local local_conf = require("apisix.core.config_local").local_conf()
+local informer_factory = 
require("apisix.discovery.kubernetes.informer_factory")
+local endpoint_dict = ngx.shared.discovery
+
+local default_weight = 0
+
+local endpoint_lrucache = core.lrucache.new({
+    ttl = 300,
+    count = 1024
+})
+
+local endpoint_buffer = {}
+local empty_table = {}
+
+local function sort_nodes_cmp(left, right)
+    if left.host ~= right.host then
+        return left.host < right.host
+    end
+    return left.port < right.port
+end
+
+local function on_endpoint_modified(informer, endpoint)
+    if informer.namespace_selector ~= nil and
+            not informer:namespace_selector(endpoint.metadata.namespace) then
+        return
+    end
+
+    core.log.debug(core.json.delay_encode(endpoint))
+    core.table.clear(endpoint_buffer)
+
+    local subsets = endpoint.subsets
+    for _, subset in ipairs(subsets or empty_table) do
+        if subset.addresses ~= nil then
+            local addresses = subset.addresses
+            for _, port in ipairs(subset.ports or empty_table) do
+                local port_name
+                if port.name then
+                    port_name = port.name
+                elseif port.targetPort then
+                    port_name = tostring(port.targetPort)
+                else
+                    port_name = tostring(port.port)
+                end
+
+                local nodes = endpoint_buffer[port_name]
+                if nodes == nil then
+                    nodes = core.table.new(0, #subsets * #addresses)
+                    endpoint_buffer[port_name] = nodes
+                end
+
+                for _, address in ipairs(subset.addresses) do
+                    core.table.insert(nodes, {
+                        host = address.ip,
+                        port = port.port,
+                        weight = default_weight
+                    })
+                end
+            end
+        end
+    end
+
+    for _, ports in pairs(endpoint_buffer) do
+        for _, nodes in pairs(ports) do
+            core.table.sort(nodes, sort_nodes_cmp)
+        end
+    end
+
+    local endpoint_key = endpoint.metadata.namespace .. "/" .. 
endpoint.metadata.name
+    local endpoint_content = core.json.encode(endpoint_buffer, true)
+    local endpoint_version = ngx.crc32_long(endpoint_content)
+
+    local _, err
+    _, err = endpoint_dict:safe_set(endpoint_key .. "#version", 
endpoint_version)
+    if err then
+        core.log.error("set endpoint version into discovery DICT failed, ", 
err)
+        return
+    end
+    _, err = endpoint_dict:safe_set(endpoint_key, endpoint_content)
+    if err then
+        core.log.error("set endpoint into discovery DICT failed, ", err)
+        endpoint_dict:delete(endpoint_key .. "#version")
+    end
+end
+
+local function on_endpoint_deleted(informer, endpoint)
+    if informer.namespace_selector ~= nil and
+            not informer:namespace_selector(endpoint.metadata.namespace) then
+        return
+    end
+
+    core.log.debug(core.json.delay_encode(endpoint))
+    local endpoint_key = endpoint.metadata.namespace .. "/" .. 
endpoint.metadata.name
+    endpoint_dict:delete(endpoint_key .. "#version")
+    endpoint_dict:delete(endpoint_key)
+end
+
+local function pre_list(informer)
+    endpoint_dict:flush_all()
+end
+
+local function post_list(informer)
+    endpoint_dict:flush_expired()
+end
+
+local function setup_label_selector(conf, informer)
+    informer.label_selector = conf.label_selector
+end
+
+local function setup_namespace_selector(conf, informer)
+    local ns = conf.namespace_selector
+    if ns == nil then
+        informer.namespace_selector = nil
+        return
+    end
+
+    if ns.equal then
+        informer.field_selector = "metadata.namespace=" .. ns.equal
+        informer.namespace_selector = nil
+        return
+    end
+
+    if ns.not_equal then
+        informer.field_selector = "metadata.namespace!=" .. ns.not_equal
+        informer.namespace_selector = nil
+        return
+    end
+
+    if ns.match then
+        informer.namespace_selector = function(self, namespace)
+            local match = conf.namespace_selector.match
+            local m, err
+            for _, v in ipairs(match) do
+                m, err = ngx.re.match(namespace, v, "j")
+                if m and m[0] == namespace then
+                    return true
+                end
+                if err then
+                    core.log.error("ngx.re.match failed: ", err)
+                end
+            end
+            return false
+        end
+        return
+    end
+
+    if ns.not_match then
+        informer.namespace_selector = function(self, namespace)
+            local not_match = conf.namespace_selector.not_match
+            local m, err
+            for _, v in ipairs(not_match) do
+                m, err = ngx.re.match(namespace, v, "j")
+                if m and m[0] == namespace then
+                    return false
+                end
+                if err then
+                    return false
+                end
+            end
+            return true
+        end
+        return
+    end
+end
+
+local function read_env(key)
+    if #key > 3 then
+        local a, b = string.byte(key, 1, 2)
+        local c = string.byte(key, #key, #key)
+        -- '$', '{', '}' == 36,123,125
+        if a == 36 and b == 123 and c == 125 then
+            local env = string.sub(key, 3, #key - 1)
+            local val = os.getenv(env)
+            if not val then
+                return false, nil, "not found environment variable " .. env
+            end
+            return true, val, nil
+        end
+    end
+    return true, key, nil
+end
+
+local function get_apiserver(conf)
+    local apiserver = {
+        schema = "",
+        host = "",
+        port = "",
+        token = ""
+    }
+
+    apiserver.schema = conf.service.schema
+
+    local ok, value, message
+    ok, value, message = read_env(conf.service.host)
+    if not ok then
+        return nil, message
+    end
+
+    apiserver.host = value
+    if apiserver.host == "" then
+        return nil, "get empty host value"
+    end
+
+    ok, value, message = read_env(conf.service.port)
+    if not ok then
+        return nil, message
+    end
+
+    apiserver.port = tonumber(value)
+    if not apiserver.port or apiserver.port <= 0 or apiserver.port > 65535 then
+        return nil, "get invalid port value: " .. apiserver.port
+    end
+
+    -- we should not check if the apiserver.token is empty here
+    if conf.client.token then
+        ok, value, message = read_env(conf.client.token)
+        if not ok then
+            return nil, message
+        end
+        apiserver.token = value
+    elseif conf.client.token_file and conf.client.token_file ~= "" then
+        ok, value, message = read_env(conf.client.token_file)
+        if not ok then
+            return nil, message
+        end
+        local apiserver_token_file = value
+
+        apiserver.token, message = util.read_file(apiserver_token_file)
+        if not apiserver.token then
+            return nil, message
+        end
+    else
+        return nil, "invalid kubernetes discovery configuration:" ..
+                "should set one of [client.token,client.token_file] but none"
+    end
+
+    return apiserver, nil

Review comment:
       pls remove `, nil`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to