This is an automated email from the ASF dual-hosted git repository.
spacewander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git
The following commit(s) were added to refs/heads/master by this push:
new fc3a1c9c7 feat(xds): using data written by xds to control dp behavior
(#6759)
fc3a1c9c7 is described below
commit fc3a1c9c7403b4517dcbf760329cf9e4d2c5a5af
Author: tzssangglass <[email protected]>
AuthorDate: Thu Apr 7 11:32:14 2022 +0800
feat(xds): using data written by xds to control dp behavior (#6759)
---
apisix/cli/ngx_tpl.lua | 3 +-
apisix/core/config_xds.lua | 280 +++++++++++++++++++++++++++++++++++++++++--
apisix/init.lua | 12 +-
t/APISIX.pm | 3 +-
t/xds-library/config_xds.t | 22 +++-
t/xds-library/config_xds_2.t | 239 ++++++++++++++++++++++++++++++++++++
t/xds-library/main.go | 60 ++++++++--
7 files changed, 589 insertions(+), 30 deletions(-)
diff --git a/apisix/cli/ngx_tpl.lua b/apisix/cli/ngx_tpl.lua
index 0dc4da68c..e92b8092a 100644
--- a/apisix/cli/ngx_tpl.lua
+++ b/apisix/cli/ngx_tpl.lua
@@ -244,7 +244,8 @@ http {
{% end %}
{% if config_center == "xds" then %}
- lua_shared_dict xds-route-config 10m;
+ lua_shared_dict xds-config 10m;
+ lua_shared_dict xds-config-version 1m;
{% end %}
# for custom shared dict
diff --git a/apisix/core/config_xds.lua b/apisix/core/config_xds.lua
index 7c0c9f4a6..05f97f5c0 100644
--- a/apisix/core/config_xds.lua
+++ b/apisix/core/config_xds.lua
@@ -15,39 +15,57 @@
-- limitations under the License.
--
---- Get configuration form ngx.shared.DICT.
+--- Get configuration form ngx.shared.DICT
--
-- @module core.config_xds
-local base = require("resty.core.base")
local config_local = require("apisix.core.config_local")
+local string = require("apisix.core.string")
+local log = require("apisix.core.log")
+local json = require("apisix.core.json")
+local ngx_sleep = require("apisix.core.utils").sleep
+local check_schema = require("apisix.core.schema").check
+local new_tab = require("table.new")
local table = table
+local insert_tab = table.insert
local error = error
-local is_http = ngx.config.subsystem == "http"
+local pcall = pcall
+local tostring = tostring
+local setmetatable = setmetatable
local io = io
local io_open = io.open
local io_close = io.close
local package = package
-local new_tab = base.new_tab
+local ipairs = ipairs
+local type = type
+local sub_str = string.sub
local ffi = require ("ffi")
local C = ffi.C
-local route_config = ngx.shared["xds-route-config"]
+local config = ngx.shared["xds-config"]
+local conf_ver = ngx.shared["xds-config-version"]
+local is_http = ngx.config.subsystem == "http"
local ngx_re_match = ngx.re.match
local ngx_re_gmatch = ngx.re.gmatch
+local ngx_timer_every = ngx.timer.every
+local ngx_timer_at = ngx.timer.at
+local exiting = ngx.worker.exiting
+local ngx_time = ngx.time
-local xds_lib_name = "libxds.so"
-
+local xds_lib_name = "libxds.so"
local process
if is_http then
process = require("ngx.process")
end
-
ffi.cdef[[
-extern void initial(void* route_zone_ptr);
+typedef unsigned int useconds_t;
+
+extern void initial(void* config_zone, void* version_zone);
+int usleep(useconds_t usec);
]]
+local created_obj = {}
local _M = {
version = 0.1,
@@ -55,6 +73,14 @@ local _M = {
}
+local mt = {
+ __index = _M,
+ __tostring = function(self)
+ return " xds key: " .. self.key
+ end
+}
+
+
-- todo: refactor this function in chash.lua and radixtree.lua
local function load_shared_lib(lib_name)
local cpath = package.cpath
@@ -101,18 +127,248 @@ local function load_libxds(lib_name)
table.concat(tried_paths, '\r\n', 1, #tried_paths))
end
- local route_zone = C.ngx_http_lua_ffi_shdict_udata_to_zone(route_config[1])
- local route_shd_cdata = ffi.cast("void*", route_zone)
- xdsagent.initial(route_shd_cdata)
+ local config_zone = C.ngx_http_lua_ffi_shdict_udata_to_zone(config[1])
+ local config_shd_cdata = ffi.cast("void*", config_zone)
+
+ local conf_ver_zone = C.ngx_http_lua_ffi_shdict_udata_to_zone(conf_ver[1])
+ local conf_ver_shd_cdata = ffi.cast("void*", conf_ver_zone)
+
+ xdsagent.initial(config_shd_cdata, conf_ver_shd_cdata)
+end
+
+
+local latest_version
+local function sync_data(self)
+ if self.conf_version == latest_version then
+ return true
+ end
+
+ if self.values then
+ for _, val in ipairs(self.values) do
+ if val and val.clean_handlers then
+ for _, clean_handler in ipairs(val.clean_handlers) do
+ clean_handler(val)
+ end
+ val.clean_handlers = nil
+ end
+ end
+ self.values = nil
+ self.values_hash = nil
+ end
+
+ local keys = config:get_keys(0)
+
+ if not keys or #keys <= 0 then
+ -- xds did not write any data to shdict
+ return false, "no keys"
+ end
+
+ self.values = new_tab(#keys, 0)
+ self.values_hash = new_tab(0, #keys)
+
+ for _, key in ipairs(keys) do
+ if string.has_prefix(key, self.key) then
+ local data_valid = true
+ local conf_str = config:get(key, 0)
+ local conf, err = json.decode(conf_str)
+ if not conf then
+ data_valid = false
+ log.error("decode the conf of [", key, "] failed, err: ", err,
+ ", conf_str: ", conf_str)
+ end
+
+ if not self.single_item and type(conf) ~= "table" then
+ data_valid = false
+ log.error("invalid conf of [", key, "], conf: ", conf,
+ ", it should be an object")
+ end
+
+ if data_valid and self.item_schema then
+ local ok, err = check_schema(self.item_schema, conf)
+ if not ok then
+ data_valid = false
+ log.error("failed to check the conf of [", key, "] err:",
err)
+ end
+ end
+
+ if data_valid and self.checker then
+ local ok, err = self.checker(conf)
+ if not ok then
+ data_valid = false
+ log.error("failed to check the conf of [", key, "] err:",
err)
+ end
+ end
+
+ if data_valid then
+ if not conf.id then
+ conf.id = sub_str(key, #self.key + 2, #key + 1)
+ log.warn("the id of [", key, "] is nil, use the id: ",
conf.id)
+ end
+
+ local conf_item = {value = conf, modifiedIndex =
latest_version,
+ key = key}
+ insert_tab(self.values, conf_item)
+ self.values_hash[conf.id] = #self.values
+ conf_item.clean_handlers = {}
+
+ if self.filter then
+ self.filter(conf_item)
+ end
+ end
+ end
+ end
+
+ self.conf_version = latest_version
+ return true
end
+local function _automatic_fetch(premature, self)
+ if premature then
+ return
+ end
+
+ local i = 0
+ while not exiting() and self.running and i <= 32 do
+ i = i + 1
+ local ok, ok2, err = pcall(sync_data, self)
+ if not ok then
+ err = ok2
+ log.error("failed to fetch data from xds: ",
+ err, ", ", tostring(self))
+ ngx_sleep(3)
+ break
+ elseif not ok2 and err then
+ -- todo: handler other error
+ if err ~= "wait for more time" and err ~= "no keys" and
self.last_err ~= err then
+ log.error("failed to fetch data from xds, ", err, ", ",
tostring(self))
+ end
+
+ if err ~= self.last_err then
+ self.last_err = err
+ self.last_err_time = ngx_time()
+ else
+ if ngx_time() - self.last_err_time >= 30 then
+ self.last_err = nil
+ end
+ end
+ ngx_sleep(0.5)
+ elseif not ok2 then
+ ngx_sleep(0.05)
+ else
+ ngx_sleep(0.1)
+ end
+ end
+
+ if not exiting() and self.running then
+ ngx_timer_at(0, _automatic_fetch, self)
+ end
+end
+
+
+local function fetch_version(premature)
+ if premature then
+ return
+ end
+
+ local version = conf_ver:get("version")
+
+ if not version then
+ return
+ end
+
+ if version ~= latest_version then
+ latest_version = version
+ end
+end
+
+
+function _M.new(key, opts)
+ local automatic = opts and opts.automatic
+ local item_schema = opts and opts.item_schema
+ local filter_fun = opts and opts.filter
+ local single_item = opts and opts.single_item
+ local checker = opts and opts.checker
+
+
+ local obj = setmetatable({
+ automatic = automatic,
+ item_schema = item_schema,
+ checker = checker,
+ sync_times = 0,
+ running = true,
+ conf_version = 0,
+ values = nil,
+ routes_hash = nil,
+ prev_index = nil,
+ last_err = nil,
+ last_err_time = nil,
+ key = key,
+ single_item = single_item,
+ filter = filter_fun,
+ }, mt)
+
+ if automatic then
+ if not key then
+ return nil, "missing `key` argument"
+ end
+
+ -- blocking until xds completes initial configuration
+ while true do
+ C.usleep(0.1)
+ fetch_version()
+ if latest_version then
+ break
+ end
+ end
+
+ local ok, ok2, err = pcall(sync_data, obj)
+ if not ok then
+ err = ok2
+ end
+
+ if err then
+ log.error("failed to fetch data from xds ",
+ err, ", ", key)
+ end
+
+ ngx_timer_at(0, _automatic_fetch, obj)
+ end
+
+ if key then
+ created_obj[key] = obj
+ end
+
+ return obj
+end
+
+
+function _M.get(self, key)
+ if not self.values_hash then
+ return
+ end
+
+ local arr_idx = self.values_hash[tostring(key)]
+ if not arr_idx then
+ return nil
+ end
+
+ return self.values[arr_idx]
+end
+
+
+function _M.fetch_created_obj(key)
+ return created_obj[key]
+end
+
function _M.init_worker()
if process.type() == "privileged agent" then
load_libxds(xds_lib_name)
end
+ ngx_timer_every(1, fetch_version)
+
return true
end
diff --git a/apisix/init.lua b/apisix/init.lua
index e60f8b245..b61d43615 100644
--- a/apisix/init.lua
+++ b/apisix/init.lua
@@ -116,12 +116,6 @@ function _M.http_init_worker()
require("apisix.debug").init_worker()
- plugin.init_worker()
- router.http_init_worker()
- require("apisix.http.service").init_worker()
- plugin_config.init_worker()
- require("apisix.consumer").init_worker()
-
if core.config.init_worker then
local ok, err = core.config.init_worker()
if not ok then
@@ -130,6 +124,12 @@ function _M.http_init_worker()
end
end
+ plugin.init_worker()
+ router.http_init_worker()
+ require("apisix.http.service").init_worker()
+ plugin_config.init_worker()
+ require("apisix.consumer").init_worker()
+
apisix_upstream.init_worker()
require("apisix.plugins.ext-plugin.init").init_worker()
diff --git a/t/APISIX.pm b/t/APISIX.pm
index fb1839b78..1bd72edf3 100644
--- a/t/APISIX.pm
+++ b/t/APISIX.pm
@@ -509,7 +509,8 @@ _EOC_
lua_shared_dict ext-plugin 1m;
lua_shared_dict kubernetes 1m;
lua_shared_dict tars 1m;
- lua_shared_dict xds-route-config 1m;
+ lua_shared_dict xds-config 1m;
+ lua_shared_dict xds-config-version 1m;
proxy_ssl_name \$upstream_host;
proxy_ssl_server_name on;
diff --git a/t/xds-library/config_xds.t b/t/xds-library/config_xds.t
index dabdbc141..8df413aa3 100644
--- a/t/xds-library/config_xds.t
+++ b/t/xds-library/config_xds.t
@@ -95,11 +95,27 @@ qr/can not load xDS library/
-- wait for xds library sync data
ngx.sleep(1.5)
local core = require("apisix.core")
- local value =
ngx.shared["xds-route-config"]:get("/apisix/routes/1")
+ local value = ngx.shared["xds-config"]:get("/routes/1")
local route_conf, err = core.json.decode(value)
local json_encode = require("toolkit.json").encode
- ngx.say(json_encode(route_conf))
+ ngx.say(json_encode(route_conf.uri))
}
}
--- response_body
-{"create_time":1646972532,"id":"1","priority":0,"status":1,"update_time":1647250524,"upstream":{"hash_on":"vars","nodes":[{"host":"127.0.0.1","port":80,"priority":0,"weight":1}],"pass_host":"pass","scheme":"http","type":"roundrobin"},"uri":"/hello"}
+"/hello"
+
+
+
+=== TEST 3: read conf version
+--- config
+ location /t {
+ content_by_lua_block {
+ -- wait for xds library sync data
+ ngx.sleep(1.5)
+ local core = require("apisix.core")
+ local version = ngx.shared["xds-config-version"]:get("version")
+ ngx.say(version)
+ }
+ }
+--- response_body eval
+qr/^\d{13}$/
diff --git a/t/xds-library/config_xds_2.t b/t/xds-library/config_xds_2.t
new file mode 100644
index 000000000..85f9e0de3
--- /dev/null
+++ b/t/xds-library/config_xds_2.t
@@ -0,0 +1,239 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX 'no_plan';
+
+use Cwd qw(cwd);
+my $apisix_home = $ENV{APISIX_HOME} || cwd();
+
+repeat_each(1);
+no_long_string();
+no_root_location();
+log_level("info");
+
+add_block_preprocessor(sub {
+ my ($block) = @_;
+
+ if (!$block->request) {
+ $block->set_value("request", "GET /t");
+ }
+
+ if ((!defined $block->error_log) && (!defined $block->no_error_log)) {
+ $block->set_value("no_error_log", "[error]\n[alert]");
+ }
+
+ my $lua_deps_path = $block->lua_deps_path // <<_EOC_;
+ lua_package_path
"$apisix_home/?.lua;$apisix_home/?/init.lua;$apisix_home/deps/share/lua/5.1/?/init.lua;$apisix_home/deps/share/lua/5.1/?.lua;$apisix_home/apisix/?.lua;$apisix_home/t/?.lua;;";
+ lua_package_cpath
"$apisix_home/?.so;$apisix_home/t/xds-library/?.so;$apisix_home/deps/lib/lua/5.1/?.so;$apisix_home/deps/lib64/lua/5.1/?.so;;";
+_EOC_
+
+ $block->set_value("lua_deps_path", $lua_deps_path);
+
+ if (!$block->yaml_config) {
+ my $yaml_config = <<_EOC_;
+apisix:
+ node_listen: 1984
+ config_center: xds
+ enable_admin: false
+_EOC_
+
+ $block->set_value("yaml_config", $yaml_config);
+ }
+});
+
+run_tests;
+
+__DATA__
+
+=== TEST 1: proxy request using data written by xds(id = 1)
+--- config
+ location /t {
+ content_by_lua_block {
+ local http = require "resty.http"
+ local httpc = http.new()
+ local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello"
+ local res, err = httpc:request_uri(uri, { method = "GET"})
+
+ if not res then
+ ngx.say(err)
+ return
+ end
+ ngx.print(res.body)
+ }
+ }
+--- response_body
+hello world
+
+
+
+=== TEST 2: proxy request using data written by xds(id = 2, upstream_id = 1)
+--- config
+ location /t {
+ content_by_lua_block {
+ local http = require "resty.http"
+ local httpc = http.new()
+ local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello1"
+ local res, err = httpc:request_uri(uri, { method = "GET"})
+
+ if not res then
+ ngx.say(err)
+ return
+ end
+ ngx.print(res.body)
+ }
+ }
+--- response_body
+hello1 world
+
+
+
+=== TEST 3: proxy request using data written by xds(id = 3, upstream_id = 2)
+--- config
+ location /t {
+ content_by_lua_block {
+ ngx.sleep(1.5)
+ local core = require("apisix.core")
+ local value = ngx.shared["xds-config"]:flush_all()
+ ngx.sleep(1.5)
+ local http = require "resty.http"
+ local httpc = http.new()
+ local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello"
+ local res, err = httpc:request_uri(uri, { method = "GET"})
+
+ if not res then
+ ngx.say(err)
+ return
+ end
+ ngx.print(res.body)
+ }
+ }
+--- response_body
+hello world
+
+
+
+=== TEST 4: flush all keys in xds config
+--- config
+ location /t {
+ content_by_lua_block {
+ local core = require("apisix.core")
+ ngx.shared["xds-config"]:flush_all()
+ ngx.update_time()
+ ngx.shared["xds-config-version"]:set("version", ngx.now())
+ ngx.sleep(1.5)
+
+ local http = require "resty.http"
+ local httpc = http.new()
+ local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello"
+ local res, err = httpc:request_uri(uri, { method = "GET"})
+
+ if not res then
+ ngx.say(err)
+ return
+ end
+ ngx.status = res.status
+ ngx.print(res.body)
+ }
+ }
+--- error_code: 404
+--- response_body
+{"error_msg":"404 Route Not Found"}
+
+
+
+=== TEST 5: bad format json
+--- config
+ location /t {
+ content_by_lua_block {
+ local data = [[{
+ upstream = {
+ type = "roundrobin"
+ nodes = {
+ ["127.0.0.1:1980"] = 1,
+ }
+ },
+ uri = "/bad_json"
+ }]]
+ ngx.shared["xds-config"]:set("/routes/3", data)
+ ngx.update_time()
+ ngx.shared["xds-config-version"]:set("version", ngx.now())
+ ngx.sleep(1.5)
+
+ local http = require "resty.http"
+ local httpc = http.new()
+ local uri = "http://127.0.0.1:" .. ngx.var.server_port ..
"/bad_json"
+ local res, err = httpc:request_uri(uri, { method = "GET"})
+
+ if not res then
+ ngx.say(err)
+ return
+ end
+ ngx.status = res.status
+ }
+ }
+--- wait: 2
+--- error_code: 404
+--- error_log
+decode the conf of [/routes/3] failed, err: Expected object key string but
found invalid token
+
+
+
+=== TEST 6: schema check fail
+--- config
+ location /t {
+ content_by_lua_block {
+ local core = require("apisix.core")
+ local data = {
+ upstream = {
+ type = "roundrobin",
+ nodes = {
+ ["127.0.0.1:65536"] = 1,
+ }
+ }
+ }
+ local data_str = core.json.encode(data)
+ ngx.log(ngx.WARN, "data_str : ", require("inspect")(data_str))
+
+ ngx.shared["xds-config"]:set("/routes/3", data_str)
+ ngx.update_time()
+ ngx.shared["xds-config-version"]:set("version", ngx.now())
+ ngx.sleep(1.5)
+ }
+ }
+--- no_error_log
+[alert]
+-- wait: 2
+--- error_log
+failed to check the conf of [/routes/3] err:allOf 1 failed: value should match
only one schema, but matches none
+
+
+
+=== TEST 7: not table
+--- config
+ location /t {
+ content_by_lua_block {
+ local data = "/not_table"
+ ngx.shared["xds-config"]:set("/routes/3", data)
+ ngx.update_time()
+ ngx.shared["xds-config-version"]:set("version", ngx.now())
+ ngx.sleep(1.5)
+ }
+ }
+--- no_error_log
+[alert]
+-- wait: 2
+--- error_log
+invalid conf of [/routes/3], conf: nil, it should be an object
diff --git a/t/xds-library/main.go b/t/xds-library/main.go
index 1463ecf91..b682d1358 100644
--- a/t/xds-library/main.go
+++ b/t/xds-library/main.go
@@ -31,7 +31,10 @@ extern void ngx_http_lua_ffi_shdict_store(void *zone, int op,
import "C"
import (
+ "context"
"fmt"
+ "math/rand"
+ "strconv"
"time"
"unsafe"
)
@@ -39,11 +42,14 @@ import (
func main() {
}
-
//export initial
-func initial(zone unsafe.Pointer) {
- time.Sleep(time.Second)
- value := fmt.Sprintf(`{
+func initial(config_zone unsafe.Pointer, version_zone unsafe.Pointer) {
+ write_config(config_zone, version_zone)
+}
+
+func write_config(config_zone unsafe.Pointer, version_zone unsafe.Pointer) {
+ route_key := "/routes/1"
+ route_value := fmt.Sprintf(`{
"status": 1,
"update_time": 1647250524,
"create_time": 1646972532,
@@ -53,7 +59,7 @@ func initial(zone unsafe.Pointer) {
"upstream": {
"nodes": [
{
- "port": 80,
+ "port": 1980,
"priority": 0,
"host": "127.0.0.1",
"weight": 1
@@ -66,10 +72,50 @@ func initial(zone unsafe.Pointer) {
}
}`)
- write_route(zone, "/apisix/routes/1", value)
+ upstream_key := "/upstreams/1"
+ upstream_value := fmt.Sprintf(`{
+"id": "1",
+"nodes": {
+ "127.0.0.1:1980": 1
+},
+"type": "roundrobin"
+}`)
+
+ r_u_key := "/routes/2"
+ r_u_value := fmt.Sprintf(`{
+"status": 1,
+"update_time": 1647250524,
+"create_time": 1646972532,
+"uri": "/hello1",
+"priority": 0,
+"id": "1",
+"upstream_id": "1"
+}`)
+
+ write_shdict(route_key, route_value, config_zone)
+ write_shdict(upstream_key, upstream_value, config_zone)
+ write_shdict(r_u_key, r_u_value, config_zone)
+ update_conf_version(version_zone)
+
+}
+
+func update_conf_version(zone unsafe.Pointer) {
+ ctx := context.Background()
+ go func() {
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-time.After(time.Second *
time.Duration(rand.Intn(10))):
+ key := "version"
+ version :=
strconv.FormatInt(time.Now().UnixNano()/1e6, 10)
+ write_shdict(key, version, zone)
+ }
+ }
+ }()
}
-func write_route(zone unsafe.Pointer, key, value string) {
+func write_shdict(key string, value string, zone unsafe.Pointer) {
var keyCStr = C.CString(key)
defer C.free(unsafe.Pointer(keyCStr))
var keyLen = C.size_t(len(key))