This is an automated email from the ASF dual-hosted git repository.

spacewander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new fc3a1c9c7 feat(xds): using data written by xds to control dp behavior 
(#6759)
fc3a1c9c7 is described below

commit fc3a1c9c7403b4517dcbf760329cf9e4d2c5a5af
Author: tzssangglass <[email protected]>
AuthorDate: Thu Apr 7 11:32:14 2022 +0800

    feat(xds): using data written by xds to control dp behavior (#6759)
---
 apisix/cli/ngx_tpl.lua       |   3 +-
 apisix/core/config_xds.lua   | 280 +++++++++++++++++++++++++++++++++++++++++--
 apisix/init.lua              |  12 +-
 t/APISIX.pm                  |   3 +-
 t/xds-library/config_xds.t   |  22 +++-
 t/xds-library/config_xds_2.t | 239 ++++++++++++++++++++++++++++++++++++
 t/xds-library/main.go        |  60 ++++++++--
 7 files changed, 589 insertions(+), 30 deletions(-)

diff --git a/apisix/cli/ngx_tpl.lua b/apisix/cli/ngx_tpl.lua
index 0dc4da68c..e92b8092a 100644
--- a/apisix/cli/ngx_tpl.lua
+++ b/apisix/cli/ngx_tpl.lua
@@ -244,7 +244,8 @@ http {
     {% end %}
 
     {% if config_center == "xds" then %}
-    lua_shared_dict xds-route-config  10m;
+    lua_shared_dict xds-config  10m;
+    lua_shared_dict xds-config-version  1m;
     {% end %}
 
     # for custom shared dict
diff --git a/apisix/core/config_xds.lua b/apisix/core/config_xds.lua
index 7c0c9f4a6..05f97f5c0 100644
--- a/apisix/core/config_xds.lua
+++ b/apisix/core/config_xds.lua
@@ -15,39 +15,57 @@
 -- limitations under the License.
 --
 
---- Get configuration form ngx.shared.DICT.
+--- Get configuration form ngx.shared.DICT
 --
 -- @module core.config_xds
 
-local base              = require("resty.core.base")
 local config_local      = require("apisix.core.config_local")
+local string            = require("apisix.core.string")
+local log               = require("apisix.core.log")
+local json              = require("apisix.core.json")
+local ngx_sleep         = require("apisix.core.utils").sleep
+local check_schema      = require("apisix.core.schema").check
+local new_tab           = require("table.new")
 local table             = table
+local insert_tab        = table.insert
 local error             = error
-local is_http           = ngx.config.subsystem == "http"
+local pcall             = pcall
+local tostring          = tostring
+local setmetatable      = setmetatable
 local io                = io
 local io_open           = io.open
 local io_close          = io.close
 local package           = package
-local new_tab           = base.new_tab
+local ipairs            = ipairs
+local type              = type
+local sub_str           = string.sub
 local ffi               = require ("ffi")
 local C                 = ffi.C
-local route_config     = ngx.shared["xds-route-config"]
+local config            = ngx.shared["xds-config"]
+local conf_ver          = ngx.shared["xds-config-version"]
+local is_http           = ngx.config.subsystem == "http"
 local ngx_re_match      = ngx.re.match
 local ngx_re_gmatch     = ngx.re.gmatch
+local ngx_timer_every   = ngx.timer.every
+local ngx_timer_at      = ngx.timer.at
+local exiting           = ngx.worker.exiting
+local ngx_time          = ngx.time
 
-local xds_lib_name = "libxds.so"
-
+local xds_lib_name      = "libxds.so"
 
 local process
 if is_http then
     process = require("ngx.process")
 end
 
-
 ffi.cdef[[
-extern void initial(void* route_zone_ptr);
+typedef unsigned int  useconds_t;
+
+extern void initial(void* config_zone, void* version_zone);
+int usleep(useconds_t usec);
 ]]
 
+local created_obj  = {}
 
 local _M = {
     version = 0.1,
@@ -55,6 +73,14 @@ local _M = {
 }
 
 
+local mt = {
+    __index = _M,
+    __tostring = function(self)
+        return " xds key: " .. self.key
+    end
+}
+
+
 -- todo: refactor this function in chash.lua and radixtree.lua
 local function load_shared_lib(lib_name)
     local cpath = package.cpath
@@ -101,18 +127,248 @@ local function load_libxds(lib_name)
               table.concat(tried_paths, '\r\n', 1, #tried_paths))
     end
 
-    local route_zone = C.ngx_http_lua_ffi_shdict_udata_to_zone(route_config[1])
-    local route_shd_cdata = ffi.cast("void*", route_zone)
-    xdsagent.initial(route_shd_cdata)
+    local config_zone = C.ngx_http_lua_ffi_shdict_udata_to_zone(config[1])
+    local config_shd_cdata = ffi.cast("void*", config_zone)
+
+    local conf_ver_zone = C.ngx_http_lua_ffi_shdict_udata_to_zone(conf_ver[1])
+    local conf_ver_shd_cdata = ffi.cast("void*", conf_ver_zone)
+
+    xdsagent.initial(config_shd_cdata, conf_ver_shd_cdata)
+end
+
+
+local latest_version
+local function sync_data(self)
+    if self.conf_version == latest_version then
+        return true
+    end
+
+    if self.values then
+        for _, val in ipairs(self.values) do
+            if val and val.clean_handlers then
+                for _, clean_handler in ipairs(val.clean_handlers) do
+                    clean_handler(val)
+                end
+                val.clean_handlers = nil
+            end
+        end
+        self.values = nil
+        self.values_hash = nil
+    end
+
+    local keys = config:get_keys(0)
+
+    if not keys or #keys <= 0 then
+        -- xds did not write any data to shdict
+        return false, "no keys"
+    end
+
+    self.values = new_tab(#keys, 0)
+    self.values_hash = new_tab(0, #keys)
+
+    for _, key in ipairs(keys) do
+        if string.has_prefix(key, self.key) then
+            local data_valid = true
+            local conf_str = config:get(key, 0)
+            local conf, err = json.decode(conf_str)
+            if not conf then
+                data_valid = false
+                log.error("decode the conf of [", key, "] failed, err: ", err,
+                          ", conf_str: ", conf_str)
+            end
+
+            if not self.single_item and type(conf) ~= "table" then
+                data_valid = false
+                log.error("invalid conf of [", key, "], conf: ", conf,
+                          ", it should be an object")
+            end
+
+            if data_valid and self.item_schema then
+                local ok, err = check_schema(self.item_schema, conf)
+                if not ok then
+                    data_valid = false
+                    log.error("failed to check the conf of [", key, "] err:", 
err)
+                end
+            end
+
+            if data_valid and self.checker then
+                local ok, err = self.checker(conf)
+                if not ok then
+                    data_valid = false
+                    log.error("failed to check the conf of [", key, "] err:", 
err)
+                end
+            end
+
+            if data_valid then
+                if not conf.id then
+                    conf.id = sub_str(key, #self.key + 2, #key + 1)
+                    log.warn("the id of [", key, "] is nil, use the id: ", 
conf.id)
+                end
+
+                local conf_item = {value = conf, modifiedIndex = 
latest_version,
+                                   key = key}
+                insert_tab(self.values, conf_item)
+                self.values_hash[conf.id] = #self.values
+                conf_item.clean_handlers = {}
+
+                if self.filter then
+                    self.filter(conf_item)
+                end
+            end
+        end
+    end
+
+    self.conf_version = latest_version
+    return true
 end
 
 
+local function _automatic_fetch(premature, self)
+    if premature then
+        return
+    end
+
+    local i = 0
+    while not exiting() and self.running and i <= 32 do
+        i = i + 1
+        local ok, ok2, err = pcall(sync_data, self)
+        if not ok then
+            err = ok2
+            log.error("failed to fetch data from xds: ",
+                      err, ", ", tostring(self))
+            ngx_sleep(3)
+            break
+        elseif not ok2 and err then
+            -- todo: handler other error
+            if err ~= "wait for more time" and err ~= "no keys" and 
self.last_err ~= err then
+                log.error("failed to fetch data from xds, ", err, ", ", 
tostring(self))
+            end
+
+            if err ~= self.last_err then
+                self.last_err = err
+                self.last_err_time = ngx_time()
+            else
+                if ngx_time() - self.last_err_time >= 30 then
+                    self.last_err = nil
+                end
+            end
+            ngx_sleep(0.5)
+        elseif not ok2 then
+            ngx_sleep(0.05)
+        else
+            ngx_sleep(0.1)
+        end
+    end
+
+    if not exiting() and self.running then
+        ngx_timer_at(0, _automatic_fetch, self)
+    end
+end
+
+
+local function fetch_version(premature)
+    if premature then
+        return
+    end
+
+    local version = conf_ver:get("version")
+
+    if not version then
+        return
+    end
+
+    if version ~= latest_version then
+        latest_version = version
+    end
+end
+
+
+function _M.new(key, opts)
+    local automatic = opts and opts.automatic
+    local item_schema = opts and opts.item_schema
+    local filter_fun = opts and opts.filter
+    local single_item = opts and opts.single_item
+    local checker = opts and opts.checker
+
+
+    local obj = setmetatable({
+        automatic = automatic,
+        item_schema = item_schema,
+        checker = checker,
+        sync_times = 0,
+        running = true,
+        conf_version = 0,
+        values = nil,
+        routes_hash = nil,
+        prev_index = nil,
+        last_err = nil,
+        last_err_time = nil,
+        key = key,
+        single_item = single_item,
+        filter = filter_fun,
+    }, mt)
+
+    if automatic then
+        if not key then
+            return nil, "missing `key` argument"
+        end
+
+        -- blocking until xds completes initial configuration
+        while true do
+            C.usleep(0.1)
+            fetch_version()
+            if latest_version then
+                break
+            end
+        end
+
+        local ok, ok2, err = pcall(sync_data, obj)
+        if not ok then
+            err = ok2
+        end
+
+        if err then
+            log.error("failed to fetch data from xds ",
+                      err, ", ", key)
+        end
+
+        ngx_timer_at(0, _automatic_fetch, obj)
+    end
+
+    if key then
+        created_obj[key] = obj
+    end
+
+    return obj
+end
+
+
+function _M.get(self, key)
+    if not self.values_hash then
+        return
+    end
+
+    local arr_idx = self.values_hash[tostring(key)]
+    if not arr_idx then
+        return nil
+    end
+
+    return self.values[arr_idx]
+end
+
+
+function _M.fetch_created_obj(key)
+    return created_obj[key]
+end
+
 
 function _M.init_worker()
     if process.type() == "privileged agent" then
         load_libxds(xds_lib_name)
     end
 
+    ngx_timer_every(1, fetch_version)
+
     return true
 end
 
diff --git a/apisix/init.lua b/apisix/init.lua
index e60f8b245..b61d43615 100644
--- a/apisix/init.lua
+++ b/apisix/init.lua
@@ -116,12 +116,6 @@ function _M.http_init_worker()
 
     require("apisix.debug").init_worker()
 
-    plugin.init_worker()
-    router.http_init_worker()
-    require("apisix.http.service").init_worker()
-    plugin_config.init_worker()
-    require("apisix.consumer").init_worker()
-
     if core.config.init_worker then
         local ok, err = core.config.init_worker()
         if not ok then
@@ -130,6 +124,12 @@ function _M.http_init_worker()
         end
     end
 
+    plugin.init_worker()
+    router.http_init_worker()
+    require("apisix.http.service").init_worker()
+    plugin_config.init_worker()
+    require("apisix.consumer").init_worker()
+
     apisix_upstream.init_worker()
     require("apisix.plugins.ext-plugin.init").init_worker()
 
diff --git a/t/APISIX.pm b/t/APISIX.pm
index fb1839b78..1bd72edf3 100644
--- a/t/APISIX.pm
+++ b/t/APISIX.pm
@@ -509,7 +509,8 @@ _EOC_
     lua_shared_dict ext-plugin 1m;
     lua_shared_dict kubernetes 1m;
     lua_shared_dict tars 1m;
-    lua_shared_dict xds-route-config 1m;
+    lua_shared_dict xds-config 1m;
+    lua_shared_dict xds-config-version 1m;
 
     proxy_ssl_name \$upstream_host;
     proxy_ssl_server_name on;
diff --git a/t/xds-library/config_xds.t b/t/xds-library/config_xds.t
index dabdbc141..8df413aa3 100644
--- a/t/xds-library/config_xds.t
+++ b/t/xds-library/config_xds.t
@@ -95,11 +95,27 @@ qr/can not load xDS library/
             -- wait for xds library sync data
             ngx.sleep(1.5)
             local core = require("apisix.core")
-            local value = 
ngx.shared["xds-route-config"]:get("/apisix/routes/1")
+            local value = ngx.shared["xds-config"]:get("/routes/1")
             local route_conf, err = core.json.decode(value)
             local json_encode = require("toolkit.json").encode
-            ngx.say(json_encode(route_conf))
+            ngx.say(json_encode(route_conf.uri))
         }
     }
 --- response_body
-{"create_time":1646972532,"id":"1","priority":0,"status":1,"update_time":1647250524,"upstream":{"hash_on":"vars","nodes":[{"host":"127.0.0.1","port":80,"priority":0,"weight":1}],"pass_host":"pass","scheme":"http","type":"roundrobin"},"uri":"/hello"}
+"/hello"
+
+
+
+=== TEST 3: read conf version
+--- config
+    location /t {
+        content_by_lua_block {
+            -- wait for xds library sync data
+            ngx.sleep(1.5)
+            local core = require("apisix.core")
+            local version = ngx.shared["xds-config-version"]:get("version")
+            ngx.say(version)
+        }
+    }
+--- response_body eval
+qr/^\d{13}$/
diff --git a/t/xds-library/config_xds_2.t b/t/xds-library/config_xds_2.t
new file mode 100644
index 000000000..85f9e0de3
--- /dev/null
+++ b/t/xds-library/config_xds_2.t
@@ -0,0 +1,239 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX 'no_plan';
+
+use Cwd qw(cwd);
+my $apisix_home = $ENV{APISIX_HOME} || cwd();
+
+repeat_each(1);
+no_long_string();
+no_root_location();
+log_level("info");
+
+add_block_preprocessor(sub {
+    my ($block) = @_;
+
+    if (!$block->request) {
+        $block->set_value("request", "GET /t");
+    }
+
+    if ((!defined $block->error_log) && (!defined $block->no_error_log)) {
+        $block->set_value("no_error_log", "[error]\n[alert]");
+    }
+
+    my $lua_deps_path = $block->lua_deps_path // <<_EOC_;
+        lua_package_path 
"$apisix_home/?.lua;$apisix_home/?/init.lua;$apisix_home/deps/share/lua/5.1/?/init.lua;$apisix_home/deps/share/lua/5.1/?.lua;$apisix_home/apisix/?.lua;$apisix_home/t/?.lua;;";
+        lua_package_cpath 
"$apisix_home/?.so;$apisix_home/t/xds-library/?.so;$apisix_home/deps/lib/lua/5.1/?.so;$apisix_home/deps/lib64/lua/5.1/?.so;;";
+_EOC_
+
+    $block->set_value("lua_deps_path", $lua_deps_path);
+
+    if (!$block->yaml_config) {
+        my $yaml_config = <<_EOC_;
+apisix:
+    node_listen: 1984
+    config_center: xds
+    enable_admin: false
+_EOC_
+
+        $block->set_value("yaml_config", $yaml_config);
+    }
+});
+
+run_tests;
+
+__DATA__
+
+=== TEST 1: proxy request using data written by xds(id = 1)
+--- config
+    location /t {
+        content_by_lua_block {
+            local http = require "resty.http"
+            local httpc = http.new()
+            local uri = "http://127.0.0.1:"; .. ngx.var.server_port .. "/hello"
+            local res, err = httpc:request_uri(uri, { method = "GET"})
+
+            if not res then
+                ngx.say(err)
+                return
+            end
+            ngx.print(res.body)
+        }
+    }
+--- response_body
+hello world
+
+
+
+=== TEST 2: proxy request using data written by xds(id = 2, upstream_id = 1)
+--- config
+    location /t {
+        content_by_lua_block {
+            local http = require "resty.http"
+            local httpc = http.new()
+            local uri = "http://127.0.0.1:"; .. ngx.var.server_port .. "/hello1"
+            local res, err = httpc:request_uri(uri, { method = "GET"})
+
+            if not res then
+                ngx.say(err)
+                return
+            end
+            ngx.print(res.body)
+        }
+    }
+--- response_body
+hello1 world
+
+
+
+=== TEST 3: proxy request using data written by xds(id = 3, upstream_id = 2)
+--- config
+    location /t {
+        content_by_lua_block {
+            ngx.sleep(1.5)
+            local core = require("apisix.core")
+            local value = ngx.shared["xds-config"]:flush_all()
+            ngx.sleep(1.5)
+            local http = require "resty.http"
+            local httpc = http.new()
+            local uri = "http://127.0.0.1:"; .. ngx.var.server_port .. "/hello"
+            local res, err = httpc:request_uri(uri, { method = "GET"})
+
+            if not res then
+                ngx.say(err)
+                return
+            end
+            ngx.print(res.body)
+        }
+    }
+--- response_body
+hello world
+
+
+
+=== TEST 4: flush all keys in xds config
+--- config
+    location /t {
+        content_by_lua_block {
+            local core = require("apisix.core")
+            ngx.shared["xds-config"]:flush_all()
+            ngx.update_time()
+            ngx.shared["xds-config-version"]:set("version", ngx.now())
+            ngx.sleep(1.5)
+
+            local http = require "resty.http"
+            local httpc = http.new()
+            local uri = "http://127.0.0.1:"; .. ngx.var.server_port .. "/hello"
+            local res, err = httpc:request_uri(uri, { method = "GET"})
+
+            if not res then
+                ngx.say(err)
+                return
+            end
+            ngx.status = res.status
+            ngx.print(res.body)
+        }
+    }
+--- error_code: 404
+--- response_body
+{"error_msg":"404 Route Not Found"}
+
+
+
+=== TEST 5: bad format json
+--- config
+    location /t {
+        content_by_lua_block {
+            local data = [[{
+                upstream = {
+                    type = "roundrobin"
+                    nodes = {
+                        ["127.0.0.1:1980"] = 1,
+                    }
+                },
+                uri = "/bad_json"
+            }]]
+            ngx.shared["xds-config"]:set("/routes/3", data)
+            ngx.update_time()
+            ngx.shared["xds-config-version"]:set("version", ngx.now())
+            ngx.sleep(1.5)
+
+            local http = require "resty.http"
+            local httpc = http.new()
+            local uri = "http://127.0.0.1:"; .. ngx.var.server_port .. 
"/bad_json"
+            local res, err = httpc:request_uri(uri, { method = "GET"})
+
+            if not res then
+                ngx.say(err)
+                return
+            end
+            ngx.status = res.status
+        }
+    }
+--- wait: 2
+--- error_code: 404
+--- error_log
+decode the conf of [/routes/3] failed, err: Expected object key string but 
found invalid token
+
+
+
+=== TEST 6: schema check fail
+--- config
+    location /t {
+        content_by_lua_block {
+            local core = require("apisix.core")
+            local data = {
+                upstream = {
+                    type = "roundrobin",
+                    nodes = {
+                        ["127.0.0.1:65536"] = 1,
+                    }
+                }
+            }
+            local data_str = core.json.encode(data)
+            ngx.log(ngx.WARN, "data_str : ", require("inspect")(data_str))
+
+            ngx.shared["xds-config"]:set("/routes/3", data_str)
+            ngx.update_time()
+            ngx.shared["xds-config-version"]:set("version", ngx.now())
+            ngx.sleep(1.5)
+        }
+    }
+--- no_error_log
+[alert]
+-- wait: 2
+--- error_log
+failed to check the conf of [/routes/3] err:allOf 1 failed: value should match 
only one schema, but matches none
+
+
+
+=== TEST 7: not table
+--- config
+    location /t {
+        content_by_lua_block {
+            local data = "/not_table"
+            ngx.shared["xds-config"]:set("/routes/3", data)
+            ngx.update_time()
+            ngx.shared["xds-config-version"]:set("version", ngx.now())
+            ngx.sleep(1.5)
+        }
+    }
+--- no_error_log
+[alert]
+-- wait: 2
+--- error_log
+invalid conf of [/routes/3], conf: nil, it should be an object
diff --git a/t/xds-library/main.go b/t/xds-library/main.go
index 1463ecf91..b682d1358 100644
--- a/t/xds-library/main.go
+++ b/t/xds-library/main.go
@@ -31,7 +31,10 @@ extern void ngx_http_lua_ffi_shdict_store(void *zone, int op,
 import "C"
 
 import (
+       "context"
        "fmt"
+       "math/rand"
+       "strconv"
        "time"
        "unsafe"
 )
@@ -39,11 +42,14 @@ import (
 func main() {
 }
 
-
 //export initial
-func initial(zone unsafe.Pointer) {
-       time.Sleep(time.Second)
-       value := fmt.Sprintf(`{
+func initial(config_zone unsafe.Pointer, version_zone unsafe.Pointer) {
+       write_config(config_zone, version_zone)
+}
+
+func write_config(config_zone unsafe.Pointer, version_zone unsafe.Pointer) {
+       route_key := "/routes/1"
+       route_value := fmt.Sprintf(`{
 "status": 1,
 "update_time": 1647250524,
 "create_time": 1646972532,
@@ -53,7 +59,7 @@ func initial(zone unsafe.Pointer) {
 "upstream": {
        "nodes": [
                {
-                       "port": 80,
+                       "port": 1980,
                        "priority": 0,
                        "host": "127.0.0.1",
                        "weight": 1
@@ -66,10 +72,50 @@ func initial(zone unsafe.Pointer) {
 }
 }`)
 
-       write_route(zone, "/apisix/routes/1", value)
+       upstream_key := "/upstreams/1"
+       upstream_value := fmt.Sprintf(`{
+"id": "1",
+"nodes": {
+       "127.0.0.1:1980": 1
+},
+"type": "roundrobin"
+}`)
+
+       r_u_key := "/routes/2"
+       r_u_value := fmt.Sprintf(`{
+"status": 1,
+"update_time": 1647250524,
+"create_time": 1646972532,
+"uri": "/hello1",
+"priority": 0,
+"id": "1",
+"upstream_id": "1"
+}`)
+
+       write_shdict(route_key, route_value, config_zone)
+       write_shdict(upstream_key, upstream_value, config_zone)
+       write_shdict(r_u_key, r_u_value, config_zone)
+       update_conf_version(version_zone)
+
+}
+
+func update_conf_version(zone unsafe.Pointer) {
+       ctx := context.Background()
+       go func() {
+               for {
+                       select {
+                       case <-ctx.Done():
+                               return
+                       case <-time.After(time.Second * 
time.Duration(rand.Intn(10))):
+                               key := "version"
+                               version := 
strconv.FormatInt(time.Now().UnixNano()/1e6, 10)
+                               write_shdict(key, version, zone)
+                       }
+               }
+       }()
 }
 
-func write_route(zone unsafe.Pointer, key, value string) {
+func write_shdict(key string, value string, zone unsafe.Pointer) {
        var keyCStr = C.CString(key)
        defer C.free(unsafe.Pointer(keyCStr))
        var keyLen = C.size_t(len(key))

Reply via email to