zhixiongdu027 commented on a change in pull request #4880:
URL: https://github.com/apache/apisix/pull/4880#discussion_r809912757



##########
File path: apisix/discovery/kubernetes/informer_factory.lua
##########
@@ -0,0 +1,376 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+local ngx = ngx
+local ipairs = ipairs
+local string = string
+local math = math
+local type = type
+local core = require("apisix.core")
+local http = require("resty.http")
+
+local empty_table = {}
+
+local function list_query(informer)
+    local arguments = {
+        limit = informer.limit,
+    }
+
+    if informer.continue and informer.continue ~= "" then
+        arguments.continue = informer.continue
+    end
+
+    if informer.label_selector and informer.label_selector ~= "" then
+        arguments.labelSelector = informer.label_selector
+    end
+
+    if informer.field_selector and informer.field_selector ~= "" then
+        arguments.fieldSelector = informer.field_selector
+    end
+
+    return ngx.encode_args(arguments)
+end
+
+
+local function list(httpc, apiserver, informer)
+    local response, err = httpc:request({
+        path = informer.path,
+        query = list_query(informer),
+        headers = {
+            ["Host"] = apiserver.host .. ":" .. apiserver.port,
+            ["Authorization"] = "Bearer " .. apiserver.token,
+            ["Accept"] = "application/json",
+            ["Connection"] = "keep-alive"
+        }
+    })
+
+    core.log.info("--raw=", informer.path, "?", list_query(informer))
+
+    if not response then
+        return false, "RequestError", err or ""
+    end
+
+    if response.status ~= 200 then
+        return false, response.reason, response:read_body() or ""
+    end
+
+    local body, err = response:read_body()
+    if err then
+        return false, "ReadBodyError", err
+    end
+
+    local data = core.json.decode(body)
+    if not data or data.kind ~= informer.list_kind then
+        return false, "UnexpectedBody", body
+    end
+
+    informer.version = data.metadata.resourceVersion
+
+    if informer.on_added then
+        for _, item in ipairs(data.items or empty_table) do
+            informer:on_added(item, "list")
+        end
+    end
+
+    informer.continue = data.metadata.continue
+    if informer.continue and informer.continue ~= "" then
+        list(httpc, apiserver, informer)
+    end
+
+    return true
+end
+
+
+local function watch_query(informer)
+    local arguments = {
+        watch = "true",
+        allowWatchBookmarks = "true",
+        timeoutSeconds = informer.overtime,
+    }
+
+    if informer.version and informer.version ~= "" then
+        arguments.resourceVersion = informer.version
+    end
+
+    if informer.label_selector and informer.label_selector ~= "" then
+        arguments.labelSelector = informer.label_selector
+    end
+
+    if informer.field_selector and informer.field_selector ~= "" then
+        arguments.fieldSelector = informer.field_selector
+    end
+
+    return ngx.encode_args(arguments)
+end
+
+
+local function split_event (body, callback, ...)
+    local gmatch_iterator, err = ngx.re.gmatch(body, "{\"type\":.*}\n", "jao")
+    if not gmatch_iterator then
+        return false, nil, "GmatchError", err
+    end
+
+    local captures
+    local captured_size = 0
+    local ok, reason
+    while true do
+        captures, err = gmatch_iterator()
+
+        if err then
+            return false, nil, "GmatchError", err
+        end
+
+        if not captures then
+            break
+        end
+
+        captured_size = captured_size + #captures[0]
+
+        ok, reason, err = callback(captures[0], ...)
+        if not ok then
+            return false, nil, reason, err
+        end
+    end
+
+    local remainder_body
+    if captured_size == #body then
+        remainder_body = ""
+    elseif captured_size == 0 then
+        remainder_body = body
+    elseif captured_size < #body then
+        remainder_body = string.sub(body, captured_size + 1)
+    end
+
+    return true, remainder_body
+end
+
+
+local function dispatch_event(event_string, informer)
+    local event = core.json.decode(event_string)
+
+    if not event or not event.type or not event.object then
+        return false, "UnexpectedBody", event_string
+    end
+
+    local tp = event.type
+
+    if tp == "ERROR" then
+        if event.object.code == 410 then
+            return false, "ResourceGone", nil
+        end
+        return false, "UnexpectedBody", event_string
+    end
+
+    local object = event.object
+    informer.version = object.metadata.resourceVersion
+
+    if tp == "ADDED" then
+        if informer.on_added then
+            informer:on_added(object, "watch")
+        end
+    elseif tp == "DELETED" then
+        if informer.on_deleted then
+            informer:on_deleted(object)
+        end
+    elseif tp == "MODIFIED" then
+        if informer.on_modified then
+            informer:on_modified(object)
+        end
+        -- elseif type == "BOOKMARK" then
+        --    do nothing
+    end
+
+    return true
+end
+
+
+local function watch(httpc, apiserver, informer)
+    local watch_times = 8
+    for _ = 1, watch_times do
+        local watch_seconds = 1800 + math.random(9, 999)
+        informer.overtime = watch_seconds
+        local http_seconds = watch_seconds + 120
+        httpc:set_timeouts(2000, 3000, http_seconds * 1000)
+
+        local response, err = httpc:request({
+            path = informer.path,
+            query = watch_query(informer),
+            headers = {
+                ["Host"] = apiserver.host .. ":" .. apiserver.port,
+                ["Authorization"] = "Bearer " .. apiserver.token,
+                ["Accept"] = "application/json",
+                ["Connection"] = "keep-alive"
+            }
+        })
+
+        core.log.info("--raw=", informer.path, "?", watch_query(informer))
+
+        if err then
+            return false, "RequestError", err
+        end
+
+        if response.status ~= 200 then
+            return false, response.reason, response:read_body() or ""
+        end
+
+        local ok
+        local remainder_body
+        local body
+        local reason
+
+        while true do
+            body, err = response.body_reader()
+            if err then
+                return false, "ReadBodyError", err
+            end
+
+            if not body then
+                break
+            end
+
+            if remainder_body and #remainder_body > 0 then
+                body = remainder_body .. body
+            end
+
+            ok, remainder_body, reason, err = split_event(body, 
dispatch_event, informer)
+            if not ok then
+                if reason == "ResourceGone" then
+                    return true
+                end
+                return false, reason, err
+            end
+        end
+    end
+
+    return true
+end
+
+
+local function list_watch(informer, apiserver)
+    local ok
+    local reason, message
+    local httpc = http.new()
+
+    informer.fetch_state = "connecting"
+    core.log.info("begin to connect ", apiserver.host, ":", apiserver.port)
+
+    ok, message = httpc:connect({
+        scheme = apiserver.schema,
+        host = apiserver.host,
+        port = apiserver.port,
+        ssl_verify = false

Review comment:
       || ssl_verify = false
   | is this necessary ?
   
   "ssl_verify=false" is not the best.
   But if we use the default value -- "ssl_verify = true" , it may need to add 
a lot of code.
   Can we put this in the next PR ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to