This is an automated email from the ASF dual-hosted git repository.
spacewander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git
The following commit(s) were added to refs/heads/master by this push:
new 4a0e2903f feat(xRPC): support dynamic upstream (#6901)
4a0e2903f is described below
commit 4a0e2903f3e1c9abcca5fd5db67804683c339777
Author: 罗泽轩 <[email protected]>
AuthorDate: Fri Apr 22 15:49:40 2022 +0800
feat(xRPC): support dynamic upstream (#6901)
Signed-off-by: spacewander <[email protected]>
---
apisix/schema_def.lua | 1 +
apisix/stream/router/ip_port.lua | 6 +
apisix/stream/xrpc/protocols/redis/init.lua | 13 +-
apisix/stream/xrpc/runner.lua | 70 ++++--
apisix/stream/xrpc/sdk.lua | 68 ++++++
.../apisix/stream/xrpc/protocols/pingpong/init.lua | 56 ++++-
.../stream/xrpc/protocols/pingpong/schema.lua | 3 +
t/xrpc/pingpong.t | 253 ++++++++++++++++++++-
8 files changed, 434 insertions(+), 36 deletions(-)
diff --git a/apisix/schema_def.lua b/apisix/schema_def.lua
index ec78a7256..3f72111d8 100644
--- a/apisix/schema_def.lua
+++ b/apisix/schema_def.lua
@@ -802,6 +802,7 @@ local xrpc_protocol_schema = {
name = {
type = "string",
},
+ superior_id = id_schema,
conf = {
description = "protocol-specific configuration",
type = "object",
diff --git a/apisix/stream/router/ip_port.lua b/apisix/stream/router/ip_port.lua
index f762d348d..977bcb2d3 100644
--- a/apisix/stream/router/ip_port.lua
+++ b/apisix/stream/router/ip_port.lua
@@ -77,6 +77,12 @@ do
end
local route = item.value
+ if route.protocol and route.protocol.superior_id then
+ -- subordinate route won't be matched in the entry
+ -- TODO: check the subordinate relationship in the Admin API
+ goto CONTINUE
+ end
+
if item.value.remote_addr then
item.value.remote_addr_matcher =
core_ip.create_ip_matcher({item.value.remote_addr})
end
diff --git a/apisix/stream/xrpc/protocols/redis/init.lua
b/apisix/stream/xrpc/protocols/redis/init.lua
index 113489135..cf60f53c1 100644
--- a/apisix/stream/xrpc/protocols/redis/init.lua
+++ b/apisix/stream/xrpc/protocols/redis/init.lua
@@ -73,15 +73,13 @@ local function read_len(sk)
end
-local function read_req(sk)
+local function read_req(sk, ctx)
local narg, err = read_len(sk)
if not narg then
return nil, err
end
- local ctx = {
- cmd_line = core.table.new(narg, 0)
- }
+ ctx.cmd_line = core.table.new(narg, 0)
for i = 1, narg do
local n, err = read_len(sk)
@@ -116,7 +114,7 @@ local function read_req(sk)
end
ctx.cmd = ctx.cmd_line[1]
- return ctx
+ return true
end
@@ -195,8 +193,9 @@ end
function _M.from_downstream(session, downstream)
- local ctx, err = read_req(downstream)
- if not ctx then
+ local ctx = sdk.get_req_ctx(session, 0)
+ local ok, err = read_req(downstream, ctx)
+ if not ok then
if err ~= "timeout" and err ~= "closed" then
core.log.error("failed to read request: ", err)
end
diff --git a/apisix/stream/xrpc/runner.lua b/apisix/stream/xrpc/runner.lua
index 2898f4852..8de3176ff 100644
--- a/apisix/stream/xrpc/runner.lua
+++ b/apisix/stream/xrpc/runner.lua
@@ -28,7 +28,9 @@ local _M = {}
local function open_session(conn_ctx)
conn_ctx.xrpc_session = {
- upstream_conf = conn_ctx.matched_upstream,
+ _upstream_conf = conn_ctx.matched_upstream,
+ -- fields start with '_' should not be accessed by the protocol
implementation
+ _route = conn_ctx.matched_route.value,
_ctxs = {},
}
return conn_ctx.xrpc_session
@@ -44,8 +46,18 @@ local function close_session(session, protocol)
protocol.disconnect_upstream(session, up, upstream_ctx.broken)
end
+ local upstream_ctxs = session._upstream_ctxs
+ if upstream_ctxs then
+ for _, upstream_ctx in pairs(upstream_ctxs) do
+ upstream_ctx.closed = true
+
+ local up = upstream_ctx.upstream
+ protocol.disconnect_upstream(session, up, upstream_ctx.broken)
+ end
+ end
+
for id in pairs(session._ctxs) do
- core.log.info("RPC is not finished, id: ", id)
+ core.log.notice("RPC is not finished, id: ", id)
end
end
@@ -67,8 +79,24 @@ end
local function open_upstream(protocol, session, ctx)
- if session._upstream_ctx then
- return OK, session._upstream_ctx
+ local key = session._upstream_key
+ session._upstream_key = nil
+
+ if key then
+ if not session._upstream_ctxs then
+ session._upstream_ctxs = {}
+ end
+
+ local up_ctx = session._upstream_ctxs[key]
+ if up_ctx then
+ return OK, up_ctx
+ end
+ else
+ if session._upstream_ctx then
+ return OK, session._upstream_ctx
+ end
+
+ session.upstream_conf = session._upstream_conf
end
local state, upstream = protocol.connect_upstream(session, session)
@@ -76,12 +104,18 @@ local function open_upstream(protocol, session, ctx)
return state, nil
end
- session._upstream_ctx = {
+ local up_ctx = {
upstream = upstream,
broken = false,
closed = false,
}
- return OK, session._upstream_ctx
+ if key then
+ session._upstream_ctxs[key] = up_ctx
+ else
+ session._upstream_ctx = up_ctx
+ end
+
+ return OK, up_ctx
end
@@ -135,18 +169,28 @@ function _M.run(protocol, conn_ctx)
-- need to do some auth/routing jobs before reaching upstream
local status, up_ctx = open_upstream(protocol, session, ctx)
if status ~= OK then
+ if ctx ~= nil then
+ finish_req(protocol, session, ctx)
+ end
+
break
end
status = protocol.to_upstream(session, ctx, downstream,
up_ctx.upstream)
- if status == DECLINED then
- up_ctx.broken = true
- break
- end
+ if status ~= OK then
+ if ctx ~= nil then
+ finish_req(protocol, session, ctx)
+ end
- if status == DONE then
- -- for Unary request we can directly reply here
- goto continue
+ if status == DECLINED then
+ up_ctx.broken = true
+ break
+ end
+
+ if status == DONE then
+ -- for Unary request we can directly reply here
+ goto continue
+ end
end
if not up_ctx.coroutine then
diff --git a/apisix/stream/xrpc/sdk.lua b/apisix/stream/xrpc/sdk.lua
index f480d10e2..f77a4c4e0 100644
--- a/apisix/stream/xrpc/sdk.lua
+++ b/apisix/stream/xrpc/sdk.lua
@@ -19,9 +19,13 @@
--
-- @module xrpc.sdk
local core = require("apisix.core")
+local config_util = require("apisix.core.config_util")
+local router = require("apisix.stream.router.ip_port")
local xrpc_socket = require("resty.apisix.stream.xrpc.socket")
local ngx_now = ngx.now
+local tab_insert = table.insert
local error = error
+local tostring = tostring
local _M = {}
@@ -92,6 +96,7 @@ function _M.get_req_ctx(session, id)
end
local ctx = core.tablepool.fetch("xrpc_ctxs", 4, 4)
+ -- fields start with '_' should not be accessed by the protocol
implementation
ctx._id = id
session._ctxs[id] = ctx
@@ -100,4 +105,67 @@ function _M.get_req_ctx(session, id)
end
+---
+-- Returns the new router if the stream routes are changed
+--
+-- @function xrpc.sdk.get_router
+-- @tparam table xrpc session
+-- @tparam string the current router version, should come from the last call
+-- @treturn boolean whether there is a change
+-- @treturn table the new router under the specific protocol
+-- @treturn string the new router version
+function _M.get_router(session, version)
+ local protocol_name = session._route.protocol.name
+ local id = session._route.id
+
+ local items, conf_version = router.routes()
+ if version == conf_version then
+ return false
+ end
+
+ local proto_router = {}
+ for _, item in config_util.iterate_values(items) do
+ if item.value == nil then
+ goto CONTINUE
+ end
+
+ local route = item.value
+ if route.protocol.name ~= protocol_name then
+ goto CONTINUE
+ end
+
+ if tostring(route.protocol.superior_id) ~= id then
+ goto CONTINUE
+ end
+
+ tab_insert(proto_router, route)
+
+ ::CONTINUE::
+ end
+
+ return true, proto_router, conf_version
+end
+
+
+---
+-- Set the session's current upstream according to the route's configuration
+--
+-- @function xrpc.sdk.set_upstream
+-- @tparam table xrpc session
+-- @tparam table the route configuration
+function _M.set_upstream(session, conf)
+ local up
+ if conf.upstream then
+ up = conf.upstream
+ -- TODO: support upstream_id
+ end
+
+ local key = tostring(conf)
+ core.log.info("set upstream to: ", key)
+
+ session._upstream_key = key
+ session.upstream_conf = up
+end
+
+
return _M
diff --git a/t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua
b/t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua
index f78211c14..a45e9b8c7 100644
--- a/t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua
+++ b/t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua
@@ -21,6 +21,7 @@ local bit = require("bit")
local lshift = bit.lshift
local ffi = require("ffi")
local ffi_str = ffi.string
+local ipairs = ipairs
local math_random = math.random
local OK = ngx.OK
local DECLINED = ngx.DECLINED
@@ -28,16 +29,19 @@ local DONE = ngx.DONE
local str_byte = string.byte
+local _M = {}
+local router_version
+local router
-- pingpong protocol is designed to use in the test of xRPC.
-- It contains two part: a fixed-length header & a body.
-- Header format:
-- "pp" (magic number) + 1 bytes req type + 2 bytes stream id + 1 reserved
bytes
--- + 4 bytes body length
-local _M = {}
+-- + 4 bytes body length + optional 4 bytes service name
local HDR_LEN = 10
local TYPE_HEARTBEAT = 1
local TYPE_UNARY = 2
local TYPE_STREAM = 3
+local TYPE_UNARY_DYN_UP = 4
function _M.init_worker()
@@ -48,7 +52,7 @@ end
function _M.init_downstream(session)
-- create the downstream
local sk = xrpc_socket.downstream.socket()
- sk:settimeout(10) -- the short timeout is just for test
+ sk:settimeout(1000) -- the short timeout is just for test
return sk
end
@@ -104,15 +108,54 @@ function _M.from_downstream(session, downstream)
local body_len = to_int32(p, 6)
core.log.info("read body len: ", body_len)
+ if typ == TYPE_UNARY_DYN_UP then
+ local p = read_data(downstream, 4, false)
+ if p == nil then
+ return DECLINED
+ end
+
+ local len = 4
+ for i = 0, 3 do
+ if p[i] == 0 then
+ len = i
+ break
+ end
+ end
+ local service = ffi_str(p, len)
+ core.log.info("get service [", service, "]")
+ ctx.service = service
+
+ local changed, raw_router, version = sdk.get_router(session,
router_version)
+ if changed then
+ router_version = version
+ router = {}
+
+ for _, r in ipairs(raw_router) do
+ local conf = r.protocol.conf
+ if conf and conf.service then
+ router[conf.service] = r
+ end
+ end
+ end
+
+ local conf = router[ctx.service]
+ if conf then
+ sdk.set_upstream(session, conf)
+ end
+ end
+
local p = read_data(downstream, body_len, true)
if p == nil then
return DECLINED
end
- ctx.is_unary = typ == TYPE_UNARY
+ ctx.is_unary = typ == TYPE_UNARY or typ == TYPE_UNARY_DYN_UP
ctx.is_stream = typ == TYPE_STREAM
ctx.id = stream_id
ctx.len = HDR_LEN + body_len
+ if typ == TYPE_UNARY_DYN_UP then
+ ctx.len = ctx.len + 4
+ end
return OK, ctx
end
@@ -131,7 +174,10 @@ function _M.connect_upstream(session, ctx)
end
local node = nodes[math_random(#nodes)]
local sk = xrpc_socket.upstream.socket()
- sk:settimeout(10) -- the short timeout is just for test
+ sk:settimeout(1000) -- the short timeout is just for test
+
+ core.log.info("connect to ", node.host, ":", node.port)
+
local ok, err = sk:connect(node.host, node.port)
if not ok then
core.log.error("failed to connect: ", err)
diff --git a/t/xrpc/apisix/stream/xrpc/protocols/pingpong/schema.lua
b/t/xrpc/apisix/stream/xrpc/protocols/pingpong/schema.lua
index 0e2f7256a..28bdaef2e 100644
--- a/t/xrpc/apisix/stream/xrpc/protocols/pingpong/schema.lua
+++ b/t/xrpc/apisix/stream/xrpc/protocols/pingpong/schema.lua
@@ -20,6 +20,9 @@ local core = require("apisix.core")
local schema = {
type = "object",
properties = {
+ service = {
+ type = "string"
+ },
faults = {
type = "array",
minItems = 1,
diff --git a/t/xrpc/pingpong.t b/t/xrpc/pingpong.t
index 78cfcdb78..2a296a83d 100644
--- a/t/xrpc/pingpong.t
+++ b/t/xrpc/pingpong.t
@@ -33,6 +33,7 @@ add_block_preprocessor(sub {
xrpc:
protocols:
- name: pingpong
+ - name: redis
_EOC_
$block->set_value("extra_yaml_config", $extra_yaml_config);
}
@@ -83,7 +84,7 @@ _EOC_
$block->set_value("stream_upstream_code", $stream_upstream_code);
if ((!defined $block->error_log) && (!defined $block->no_error_log)) {
- $block->set_value("no_error_log", "[error]");
+ $block->set_value("no_error_log", "[error]\nRPC is not finished");
}
$block;
@@ -98,7 +99,6 @@ __DATA__
location /t {
content_by_lua_block {
local t = require("lib.test_admin").test
- local etcd = require("apisix.core.etcd")
local code, body = t('/apisix/admin/stream_routes/1',
ngx.HTTP_PUT,
{
@@ -174,7 +174,6 @@ pp\x01\x00\x00\x00\x00\x00\x00\x00"
location /t {
content_by_lua_block {
local t = require("lib.test_admin").test
- local etcd = require("apisix.core.etcd")
local code, body = t('/apisix/admin/stream_routes/1',
ngx.HTTP_PUT,
{
@@ -218,7 +217,6 @@ failed to connect: connection refused
location /t {
content_by_lua_block {
local t = require("lib.test_admin").test
- local etcd = require("apisix.core.etcd")
local code, body = t('/apisix/admin/stream_routes/1',
ngx.HTTP_PUT,
{
@@ -254,7 +252,7 @@ passed
--- stream_conf_enable
--- stream_upstream_code
local sock = ngx.req.socket(true)
- sock:settimeout(10)
+ sock:settimeout(1100)
while true do
local data = sock:receiveany(4096)
if not data then
@@ -264,6 +262,7 @@ passed
end
--- error_log
failed to read: timeout
+--- wait: 1.1
@@ -290,9 +289,6 @@ failed to read: timeout
--- response_body eval
"pp\x03\x00\x02\x00\x00\x00\x00\x04ABCD" .
"pp\x03\x00\x01\x00\x00\x00\x00\x03ABC"
---- no_error_log
-RPC is not finished
-[error]
@@ -348,9 +344,6 @@ RPC is not finished
assert(sock:send(data1))
--- response_body eval
"pp\x03\x00\x01\x00\x00\x00\x00\x03ABC"
---- no_error_log
-RPC is not finished
-[error]
@@ -374,3 +367,241 @@ RPC is not finished
"pp\x03\x00\x03\x00\x00\x00\x00\x03ABC" .
"pp\x03\x00\x02\x00\x00\x00\x00\x02AB" .
"pp\x03\x00\x01\x00\x00\x00\x00\x01A"
+
+
+
+=== TEST 14: superior & subordinate
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/stream_routes/1',
+ ngx.HTTP_PUT,
+ {
+ protocol = {
+ name = "pingpong"
+ },
+ upstream = {
+ nodes = {
+ ["127.0.0.3:1995"] = 1
+ },
+ type = "roundrobin"
+ }
+ }
+ )
+ if code >= 300 then
+ ngx.status = code
+ ngx.say(body)
+ return
+ end
+
+ local code, body = t('/apisix/admin/stream_routes/2',
+ ngx.HTTP_PUT,
+ {
+ protocol = {
+ superior_id = 1,
+ conf = {
+ service = "a"
+ },
+ name = "pingpong"
+ },
+ upstream = {
+ nodes = {
+ ["127.0.0.1:1995"] = 1
+ },
+ type = "roundrobin"
+ }
+ }
+ )
+ if code >= 300 then
+ ngx.status = code
+ ngx.say(body)
+ return
+ end
+
+ local code, body = t('/apisix/admin/stream_routes/3',
+ ngx.HTTP_PUT,
+ {
+ protocol = {
+ superior_id = 1,
+ conf = {
+ service = "b"
+ },
+ name = "pingpong"
+ },
+ upstream = {
+ nodes = {
+ ["127.0.0.2:1995"] = 1
+ },
+ type = "roundrobin"
+ }
+ }
+ )
+ if code >= 300 then
+ ngx.status = code
+ ngx.say(body)
+ return
+ end
+
+ -- routes below should not be used to matched
+ local code, body = t('/apisix/admin/stream_routes/4',
+ ngx.HTTP_PUT,
+ {
+ protocol = {
+ superior_id = 10000,
+ conf = {
+ service = "b"
+ },
+ name = "pingpong"
+ },
+ upstream = {
+ nodes = {
+ ["127.0.0.2:1979"] = 1
+ },
+ type = "roundrobin"
+ }
+ }
+ )
+ if code >= 300 then
+ ngx.status = code
+ ngx.say(body)
+ return
+ end
+
+ local code, body = t('/apisix/admin/stream_routes/5',
+ ngx.HTTP_PUT,
+ {
+ protocol = {
+ name = "redis"
+ },
+ upstream = {
+ nodes = {
+ ["127.0.0.1:6379"] = 1
+ },
+ type = "roundrobin"
+ }
+ }
+ )
+ if code >= 300 then
+ ngx.status = code
+ end
+ ngx.say(body)
+ }
+ }
+--- request
+GET /t
+--- response_body
+passed
+
+
+
+=== TEST 15: hit
+--- request eval
+"POST /t
+" .
+"pp\x04\x00\x00\x00\x00\x00\x00\x03a\x00\x00\x00ABC" .
+"pp\x04\x00\x00\x00\x00\x00\x00\x04b\x00\x00\x00ABCD" .
+"pp\x04\x00\x00\x00\x00\x00\x00\x03a\x00\x00\x00ABC"
+--- response_body eval
+"pp\x04\x00\x00\x00\x00\x00\x00\x03a\x00\x00\x00ABC" .
+"pp\x04\x00\x00\x00\x00\x00\x00\x04b\x00\x00\x00ABCD" .
+"pp\x04\x00\x00\x00\x00\x00\x00\x03a\x00\x00\x00ABC"
+--- grep_error_log eval
+qr/connect to \S+ while prereading client data/
+--- grep_error_log_out
+connect to 127.0.0.1:1995 while prereading client data
+connect to 127.0.0.2:1995 while prereading client data
+--- stream_conf_enable
+
+
+
+=== TEST 16: hit (fallback to superior if not found)
+--- request eval
+"POST /t
+" .
+"pp\x04\x00\x00\x00\x00\x00\x00\x03abcdABC" .
+"pp\x04\x00\x00\x00\x00\x00\x00\x04a\x00\x00\x00ABCD" .
+"pp\x04\x00\x00\x00\x00\x00\x00\x03abcdABC"
+--- response_body eval
+"pp\x04\x00\x00\x00\x00\x00\x00\x03abcdABC" .
+"pp\x04\x00\x00\x00\x00\x00\x00\x04a\x00\x00\x00ABCD" .
+"pp\x04\x00\x00\x00\x00\x00\x00\x03abcdABC"
+--- grep_error_log eval
+qr/connect to \S+ while prereading client data/
+--- grep_error_log_out
+connect to 127.0.0.3:1995 while prereading client data
+connect to 127.0.0.1:1995 while prereading client data
+--- stream_conf_enable
+
+
+
+=== TEST 17: cache router by version
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+
+ local sock = ngx.socket.tcp()
+ sock:settimeout(1000)
+ local ok, err = sock:connect("127.0.0.1", 1985)
+ if not ok then
+ ngx.log(ngx.ERR, "failed to connect: ", err)
+ return ngx.exit(503)
+ end
+
+
assert(sock:send("pp\x04\x00\x00\x00\x00\x00\x00\x03a\x00\x00\x00ABC"))
+
+ ngx.sleep(0.1)
+
+ local code, body = t('/apisix/admin/stream_routes/2',
+ ngx.HTTP_PUT,
+ {
+ protocol = {
+ superior_id = 1,
+ conf = {
+ service = "c"
+ },
+ name = "pingpong"
+ },
+ upstream = {
+ nodes = {
+ ["127.0.0.4:1995"] = 1
+ },
+ type = "roundrobin"
+ }
+ }
+ )
+ if code >= 300 then
+ ngx.status = code
+ ngx.say(body)
+ return
+ end
+
+ ngx.sleep(0.1)
+
+ local s = "pp\x04\x00\x00\x00\x00\x00\x00\x04a\x00\x00\x00ABCD"
+ assert(sock:send(s ..
"pp\x04\x00\x00\x00\x00\x00\x00\x03c\x00\x00\x00ABC"))
+
+ while true do
+ local data, err = sock:receiveany(4096)
+ if not data then
+ sock:close()
+ break
+ end
+ ngx.print(data)
+ end
+ }
+ }
+--- request
+GET /t
+--- response_body eval
+"pp\x04\x00\x00\x00\x00\x00\x00\x03a\x00\x00\x00ABC" .
+"pp\x04\x00\x00\x00\x00\x00\x00\x04a\x00\x00\x00ABCD" .
+"pp\x04\x00\x00\x00\x00\x00\x00\x03c\x00\x00\x00ABC"
+--- grep_error_log eval
+qr/connect to \S+ while prereading client data/
+--- grep_error_log_out
+connect to 127.0.0.1:1995 while prereading client data
+connect to 127.0.0.3:1995 while prereading client data
+connect to 127.0.0.4:1995 while prereading client data
+--- stream_conf_enable