This is an automated email from the ASF dual-hosted git repository.

spacewander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a0e2903f feat(xRPC): support dynamic upstream (#6901)
4a0e2903f is described below

commit 4a0e2903f3e1c9abcca5fd5db67804683c339777
Author: 罗泽轩 <[email protected]>
AuthorDate: Fri Apr 22 15:49:40 2022 +0800

    feat(xRPC): support dynamic upstream (#6901)
    
    Signed-off-by: spacewander <[email protected]>
---
 apisix/schema_def.lua                              |   1 +
 apisix/stream/router/ip_port.lua                   |   6 +
 apisix/stream/xrpc/protocols/redis/init.lua        |  13 +-
 apisix/stream/xrpc/runner.lua                      |  70 ++++--
 apisix/stream/xrpc/sdk.lua                         |  68 ++++++
 .../apisix/stream/xrpc/protocols/pingpong/init.lua |  56 ++++-
 .../stream/xrpc/protocols/pingpong/schema.lua      |   3 +
 t/xrpc/pingpong.t                                  | 253 ++++++++++++++++++++-
 8 files changed, 434 insertions(+), 36 deletions(-)

diff --git a/apisix/schema_def.lua b/apisix/schema_def.lua
index ec78a7256..3f72111d8 100644
--- a/apisix/schema_def.lua
+++ b/apisix/schema_def.lua
@@ -802,6 +802,7 @@ local xrpc_protocol_schema = {
         name = {
             type = "string",
         },
+        superior_id = id_schema,
         conf = {
             description = "protocol-specific configuration",
             type = "object",
diff --git a/apisix/stream/router/ip_port.lua b/apisix/stream/router/ip_port.lua
index f762d348d..977bcb2d3 100644
--- a/apisix/stream/router/ip_port.lua
+++ b/apisix/stream/router/ip_port.lua
@@ -77,6 +77,12 @@ do
             end
 
             local route = item.value
+            if route.protocol and route.protocol.superior_id then
+                -- subordinate route won't be matched in the entry
+                -- TODO: check the subordinate relationship in the Admin API
+                goto CONTINUE
+            end
+
             if item.value.remote_addr then
                 item.value.remote_addr_matcher = 
core_ip.create_ip_matcher({item.value.remote_addr})
             end
diff --git a/apisix/stream/xrpc/protocols/redis/init.lua 
b/apisix/stream/xrpc/protocols/redis/init.lua
index 113489135..cf60f53c1 100644
--- a/apisix/stream/xrpc/protocols/redis/init.lua
+++ b/apisix/stream/xrpc/protocols/redis/init.lua
@@ -73,15 +73,13 @@ local function read_len(sk)
 end
 
 
-local function read_req(sk)
+local function read_req(sk, ctx)
     local narg, err = read_len(sk)
     if not narg then
         return nil, err
     end
 
-    local ctx = {
-        cmd_line = core.table.new(narg, 0)
-    }
+    ctx.cmd_line = core.table.new(narg, 0)
 
     for i = 1, narg do
         local n, err = read_len(sk)
@@ -116,7 +114,7 @@ local function read_req(sk)
     end
 
     ctx.cmd = ctx.cmd_line[1]
-    return ctx
+    return true
 end
 
 
@@ -195,8 +193,9 @@ end
 
 
 function _M.from_downstream(session, downstream)
-    local ctx, err = read_req(downstream)
-    if not ctx then
+    local ctx = sdk.get_req_ctx(session, 0)
+    local ok, err = read_req(downstream, ctx)
+    if not ok then
         if err ~= "timeout" and err ~= "closed" then
             core.log.error("failed to read request: ", err)
         end
diff --git a/apisix/stream/xrpc/runner.lua b/apisix/stream/xrpc/runner.lua
index 2898f4852..8de3176ff 100644
--- a/apisix/stream/xrpc/runner.lua
+++ b/apisix/stream/xrpc/runner.lua
@@ -28,7 +28,9 @@ local _M = {}
 
 local function open_session(conn_ctx)
     conn_ctx.xrpc_session = {
-        upstream_conf = conn_ctx.matched_upstream,
+        _upstream_conf = conn_ctx.matched_upstream,
+        -- fields start with '_' should not be accessed by the protocol 
implementation
+        _route = conn_ctx.matched_route.value,
         _ctxs = {},
     }
     return conn_ctx.xrpc_session
@@ -44,8 +46,18 @@ local function close_session(session, protocol)
         protocol.disconnect_upstream(session, up, upstream_ctx.broken)
     end
 
+    local upstream_ctxs = session._upstream_ctxs
+    if upstream_ctxs then
+        for _, upstream_ctx in pairs(upstream_ctxs) do
+            upstream_ctx.closed = true
+
+            local up = upstream_ctx.upstream
+            protocol.disconnect_upstream(session, up, upstream_ctx.broken)
+        end
+    end
+
     for id in pairs(session._ctxs) do
-        core.log.info("RPC is not finished, id: ", id)
+        core.log.notice("RPC is not finished, id: ", id)
     end
 end
 
@@ -67,8 +79,24 @@ end
 
 
 local function open_upstream(protocol, session, ctx)
-    if session._upstream_ctx then
-        return OK, session._upstream_ctx
+    local key = session._upstream_key
+    session._upstream_key = nil
+
+    if key then
+        if not session._upstream_ctxs then
+            session._upstream_ctxs = {}
+        end
+
+        local up_ctx = session._upstream_ctxs[key]
+        if up_ctx then
+            return OK, up_ctx
+        end
+    else
+        if session._upstream_ctx then
+            return OK, session._upstream_ctx
+        end
+
+        session.upstream_conf = session._upstream_conf
     end
 
     local state, upstream = protocol.connect_upstream(session, session)
@@ -76,12 +104,18 @@ local function open_upstream(protocol, session, ctx)
         return state, nil
     end
 
-    session._upstream_ctx = {
+    local up_ctx = {
         upstream = upstream,
         broken = false,
         closed = false,
     }
-    return OK, session._upstream_ctx
+    if key then
+        session._upstream_ctxs[key] = up_ctx
+    else
+        session._upstream_ctx = up_ctx
+    end
+
+    return OK, up_ctx
 end
 
 
@@ -135,18 +169,28 @@ function _M.run(protocol, conn_ctx)
         -- need to do some auth/routing jobs before reaching upstream
         local status, up_ctx = open_upstream(protocol, session, ctx)
         if status ~= OK then
+            if ctx ~= nil then
+                finish_req(protocol, session, ctx)
+            end
+
             break
         end
 
         status = protocol.to_upstream(session, ctx, downstream, 
up_ctx.upstream)
-        if status == DECLINED then
-            up_ctx.broken = true
-            break
-        end
+        if status ~= OK then
+            if ctx ~= nil then
+                finish_req(protocol, session, ctx)
+            end
 
-        if status == DONE then
-            -- for Unary request we can directly reply here
-            goto continue
+            if status == DECLINED then
+                up_ctx.broken = true
+                break
+            end
+
+            if status == DONE then
+                -- for Unary request we can directly reply here
+                goto continue
+            end
         end
 
         if not up_ctx.coroutine then
diff --git a/apisix/stream/xrpc/sdk.lua b/apisix/stream/xrpc/sdk.lua
index f480d10e2..f77a4c4e0 100644
--- a/apisix/stream/xrpc/sdk.lua
+++ b/apisix/stream/xrpc/sdk.lua
@@ -19,9 +19,13 @@
 --
 -- @module xrpc.sdk
 local core = require("apisix.core")
+local config_util = require("apisix.core.config_util")
+local router = require("apisix.stream.router.ip_port")
 local xrpc_socket = require("resty.apisix.stream.xrpc.socket")
 local ngx_now = ngx.now
+local tab_insert = table.insert
 local error = error
+local tostring = tostring
 
 
 local _M = {}
@@ -92,6 +96,7 @@ function _M.get_req_ctx(session, id)
     end
 
     local ctx = core.tablepool.fetch("xrpc_ctxs", 4, 4)
+    -- fields start with '_' should not be accessed by the protocol 
implementation
     ctx._id = id
     session._ctxs[id] = ctx
 
@@ -100,4 +105,67 @@ function _M.get_req_ctx(session, id)
 end
 
 
+---
+-- Returns the new router if the stream routes are changed
+--
+-- @function xrpc.sdk.get_router
+-- @tparam table xrpc session
+-- @tparam string the current router version, should come from the last call
+-- @treturn boolean whether there is a change
+-- @treturn table the new router under the specific protocol
+-- @treturn string the new router version
+function _M.get_router(session, version)
+    local protocol_name = session._route.protocol.name
+    local id = session._route.id
+
+    local items, conf_version = router.routes()
+    if version == conf_version then
+        return false
+    end
+
+    local proto_router = {}
+    for _, item in config_util.iterate_values(items) do
+        if item.value == nil then
+            goto CONTINUE
+        end
+
+        local route = item.value
+        if route.protocol.name ~= protocol_name then
+            goto CONTINUE
+        end
+
+        if tostring(route.protocol.superior_id) ~= id then
+            goto CONTINUE
+        end
+
+        tab_insert(proto_router, route)
+
+        ::CONTINUE::
+    end
+
+    return true, proto_router, conf_version
+end
+
+
+---
+-- Set the session's current upstream according to the route's configuration
+--
+-- @function xrpc.sdk.set_upstream
+-- @tparam table xrpc session
+-- @tparam table the route configuration
+function _M.set_upstream(session, conf)
+    local up
+    if conf.upstream then
+        up = conf.upstream
+        -- TODO: support upstream_id
+    end
+
+    local key = tostring(conf)
+    core.log.info("set upstream to: ", key)
+
+    session._upstream_key = key
+    session.upstream_conf = up
+end
+
+
 return _M
diff --git a/t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua 
b/t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua
index f78211c14..a45e9b8c7 100644
--- a/t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua
+++ b/t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua
@@ -21,6 +21,7 @@ local bit = require("bit")
 local lshift = bit.lshift
 local ffi = require("ffi")
 local ffi_str = ffi.string
+local ipairs = ipairs
 local math_random = math.random
 local OK = ngx.OK
 local DECLINED = ngx.DECLINED
@@ -28,16 +29,19 @@ local DONE = ngx.DONE
 local str_byte = string.byte
 
 
+local _M = {}
+local router_version
+local router
 -- pingpong protocol is designed to use in the test of xRPC.
 -- It contains two part: a fixed-length header & a body.
 -- Header format:
 -- "pp" (magic number) + 1 bytes req type + 2 bytes stream id + 1 reserved 
bytes
--- + 4 bytes body length
-local _M = {}
+-- + 4 bytes body length + optional 4 bytes service name
 local HDR_LEN = 10
 local TYPE_HEARTBEAT = 1
 local TYPE_UNARY = 2
 local TYPE_STREAM = 3
+local TYPE_UNARY_DYN_UP = 4
 
 
 function _M.init_worker()
@@ -48,7 +52,7 @@ end
 function _M.init_downstream(session)
     -- create the downstream
     local sk = xrpc_socket.downstream.socket()
-    sk:settimeout(10) -- the short timeout is just for test
+    sk:settimeout(1000) -- the short timeout is just for test
     return sk
 end
 
@@ -104,15 +108,54 @@ function _M.from_downstream(session, downstream)
     local body_len = to_int32(p, 6)
     core.log.info("read body len: ", body_len)
 
+    if typ == TYPE_UNARY_DYN_UP then
+        local p = read_data(downstream, 4, false)
+        if p == nil then
+            return DECLINED
+        end
+
+        local len = 4
+        for i = 0, 3 do
+            if p[i] == 0 then
+                len = i
+                break
+            end
+        end
+        local service = ffi_str(p, len)
+        core.log.info("get service [", service, "]")
+        ctx.service = service
+
+        local changed, raw_router, version = sdk.get_router(session, 
router_version)
+        if changed then
+            router_version = version
+            router = {}
+
+            for _, r in ipairs(raw_router) do
+                local conf = r.protocol.conf
+                if conf and conf.service then
+                    router[conf.service] = r
+                end
+            end
+        end
+
+        local conf = router[ctx.service]
+        if conf then
+            sdk.set_upstream(session, conf)
+        end
+    end
+
     local p = read_data(downstream, body_len, true)
     if p == nil then
         return DECLINED
     end
 
-    ctx.is_unary = typ == TYPE_UNARY
+    ctx.is_unary = typ == TYPE_UNARY or typ == TYPE_UNARY_DYN_UP
     ctx.is_stream = typ == TYPE_STREAM
     ctx.id = stream_id
     ctx.len = HDR_LEN + body_len
+    if typ == TYPE_UNARY_DYN_UP then
+        ctx.len = ctx.len + 4
+    end
     return OK, ctx
 end
 
@@ -131,7 +174,10 @@ function _M.connect_upstream(session, ctx)
     end
     local node = nodes[math_random(#nodes)]
     local sk = xrpc_socket.upstream.socket()
-    sk:settimeout(10) -- the short timeout is just for test
+    sk:settimeout(1000) -- the short timeout is just for test
+
+    core.log.info("connect to ", node.host, ":", node.port)
+
     local ok, err = sk:connect(node.host, node.port)
     if not ok then
         core.log.error("failed to connect: ", err)
diff --git a/t/xrpc/apisix/stream/xrpc/protocols/pingpong/schema.lua 
b/t/xrpc/apisix/stream/xrpc/protocols/pingpong/schema.lua
index 0e2f7256a..28bdaef2e 100644
--- a/t/xrpc/apisix/stream/xrpc/protocols/pingpong/schema.lua
+++ b/t/xrpc/apisix/stream/xrpc/protocols/pingpong/schema.lua
@@ -20,6 +20,9 @@ local core = require("apisix.core")
 local schema = {
     type = "object",
     properties = {
+        service = {
+            type = "string"
+        },
         faults = {
             type = "array",
             minItems = 1,
diff --git a/t/xrpc/pingpong.t b/t/xrpc/pingpong.t
index 78cfcdb78..2a296a83d 100644
--- a/t/xrpc/pingpong.t
+++ b/t/xrpc/pingpong.t
@@ -33,6 +33,7 @@ add_block_preprocessor(sub {
 xrpc:
   protocols:
     - name: pingpong
+    - name: redis
 _EOC_
         $block->set_value("extra_yaml_config", $extra_yaml_config);
     }
@@ -83,7 +84,7 @@ _EOC_
     $block->set_value("stream_upstream_code", $stream_upstream_code);
 
     if ((!defined $block->error_log) && (!defined $block->no_error_log)) {
-        $block->set_value("no_error_log", "[error]");
+        $block->set_value("no_error_log", "[error]\nRPC is not finished");
     }
 
     $block;
@@ -98,7 +99,6 @@ __DATA__
     location /t {
         content_by_lua_block {
             local t = require("lib.test_admin").test
-            local etcd = require("apisix.core.etcd")
             local code, body = t('/apisix/admin/stream_routes/1',
                 ngx.HTTP_PUT,
                 {
@@ -174,7 +174,6 @@ pp\x01\x00\x00\x00\x00\x00\x00\x00"
     location /t {
         content_by_lua_block {
             local t = require("lib.test_admin").test
-            local etcd = require("apisix.core.etcd")
             local code, body = t('/apisix/admin/stream_routes/1',
                 ngx.HTTP_PUT,
                 {
@@ -218,7 +217,6 @@ failed to connect: connection refused
     location /t {
         content_by_lua_block {
             local t = require("lib.test_admin").test
-            local etcd = require("apisix.core.etcd")
             local code, body = t('/apisix/admin/stream_routes/1',
                 ngx.HTTP_PUT,
                 {
@@ -254,7 +252,7 @@ passed
 --- stream_conf_enable
 --- stream_upstream_code
     local sock = ngx.req.socket(true)
-    sock:settimeout(10)
+    sock:settimeout(1100)
     while true do
         local data = sock:receiveany(4096)
         if not data then
@@ -264,6 +262,7 @@ passed
     end
 --- error_log
 failed to read: timeout
+--- wait: 1.1
 
 
 
@@ -290,9 +289,6 @@ failed to read: timeout
 --- response_body eval
 "pp\x03\x00\x02\x00\x00\x00\x00\x04ABCD" .
 "pp\x03\x00\x01\x00\x00\x00\x00\x03ABC"
---- no_error_log
-RPC is not finished
-[error]
 
 
 
@@ -348,9 +344,6 @@ RPC is not finished
     assert(sock:send(data1))
 --- response_body eval
 "pp\x03\x00\x01\x00\x00\x00\x00\x03ABC"
---- no_error_log
-RPC is not finished
-[error]
 
 
 
@@ -374,3 +367,241 @@ RPC is not finished
 "pp\x03\x00\x03\x00\x00\x00\x00\x03ABC" .
 "pp\x03\x00\x02\x00\x00\x00\x00\x02AB" .
 "pp\x03\x00\x01\x00\x00\x00\x00\x01A"
+
+
+
+=== TEST 14: superior & subordinate
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/stream_routes/1',
+                ngx.HTTP_PUT,
+                {
+                    protocol = {
+                        name = "pingpong"
+                    },
+                    upstream = {
+                        nodes = {
+                            ["127.0.0.3:1995"] = 1
+                        },
+                        type = "roundrobin"
+                    }
+                }
+            )
+            if code >= 300 then
+                ngx.status = code
+                ngx.say(body)
+                return
+            end
+
+            local code, body = t('/apisix/admin/stream_routes/2',
+                ngx.HTTP_PUT,
+                {
+                    protocol = {
+                        superior_id = 1,
+                        conf = {
+                            service = "a"
+                        },
+                        name = "pingpong"
+                    },
+                    upstream = {
+                        nodes = {
+                            ["127.0.0.1:1995"] = 1
+                        },
+                        type = "roundrobin"
+                    }
+                }
+            )
+            if code >= 300 then
+                ngx.status = code
+                ngx.say(body)
+                return
+            end
+
+            local code, body = t('/apisix/admin/stream_routes/3',
+                ngx.HTTP_PUT,
+                {
+                    protocol = {
+                        superior_id = 1,
+                        conf = {
+                            service = "b"
+                        },
+                        name = "pingpong"
+                    },
+                    upstream = {
+                        nodes = {
+                            ["127.0.0.2:1995"] = 1
+                        },
+                        type = "roundrobin"
+                    }
+                }
+            )
+            if code >= 300 then
+                ngx.status = code
+                ngx.say(body)
+                return
+            end
+
+            -- routes below should not be used to matched
+            local code, body = t('/apisix/admin/stream_routes/4',
+                ngx.HTTP_PUT,
+                {
+                    protocol = {
+                        superior_id = 10000,
+                        conf = {
+                            service = "b"
+                        },
+                        name = "pingpong"
+                    },
+                    upstream = {
+                        nodes = {
+                            ["127.0.0.2:1979"] = 1
+                        },
+                        type = "roundrobin"
+                    }
+                }
+            )
+            if code >= 300 then
+                ngx.status = code
+                ngx.say(body)
+                return
+            end
+
+            local code, body = t('/apisix/admin/stream_routes/5',
+                ngx.HTTP_PUT,
+                {
+                    protocol = {
+                        name = "redis"
+                    },
+                    upstream = {
+                        nodes = {
+                            ["127.0.0.1:6379"] = 1
+                        },
+                        type = "roundrobin"
+                    }
+                }
+            )
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- request
+GET /t
+--- response_body
+passed
+
+
+
+=== TEST 15: hit
+--- request eval
+"POST /t
+" .
+"pp\x04\x00\x00\x00\x00\x00\x00\x03a\x00\x00\x00ABC" .
+"pp\x04\x00\x00\x00\x00\x00\x00\x04b\x00\x00\x00ABCD" .
+"pp\x04\x00\x00\x00\x00\x00\x00\x03a\x00\x00\x00ABC"
+--- response_body eval
+"pp\x04\x00\x00\x00\x00\x00\x00\x03a\x00\x00\x00ABC" .
+"pp\x04\x00\x00\x00\x00\x00\x00\x04b\x00\x00\x00ABCD" .
+"pp\x04\x00\x00\x00\x00\x00\x00\x03a\x00\x00\x00ABC"
+--- grep_error_log eval
+qr/connect to \S+ while prereading client data/
+--- grep_error_log_out
+connect to 127.0.0.1:1995 while prereading client data
+connect to 127.0.0.2:1995 while prereading client data
+--- stream_conf_enable
+
+
+
+=== TEST 16: hit (fallback to superior if not found)
+--- request eval
+"POST /t
+" .
+"pp\x04\x00\x00\x00\x00\x00\x00\x03abcdABC" .
+"pp\x04\x00\x00\x00\x00\x00\x00\x04a\x00\x00\x00ABCD" .
+"pp\x04\x00\x00\x00\x00\x00\x00\x03abcdABC"
+--- response_body eval
+"pp\x04\x00\x00\x00\x00\x00\x00\x03abcdABC" .
+"pp\x04\x00\x00\x00\x00\x00\x00\x04a\x00\x00\x00ABCD" .
+"pp\x04\x00\x00\x00\x00\x00\x00\x03abcdABC"
+--- grep_error_log eval
+qr/connect to \S+ while prereading client data/
+--- grep_error_log_out
+connect to 127.0.0.3:1995 while prereading client data
+connect to 127.0.0.1:1995 while prereading client data
+--- stream_conf_enable
+
+
+
+=== TEST 17: cache router by version
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+
+            local sock = ngx.socket.tcp()
+            sock:settimeout(1000)
+            local ok, err = sock:connect("127.0.0.1", 1985)
+            if not ok then
+                ngx.log(ngx.ERR, "failed to connect: ", err)
+                return ngx.exit(503)
+            end
+
+            
assert(sock:send("pp\x04\x00\x00\x00\x00\x00\x00\x03a\x00\x00\x00ABC"))
+
+            ngx.sleep(0.1)
+
+            local code, body = t('/apisix/admin/stream_routes/2',
+                ngx.HTTP_PUT,
+                {
+                    protocol = {
+                        superior_id = 1,
+                        conf = {
+                            service = "c"
+                        },
+                        name = "pingpong"
+                    },
+                    upstream = {
+                        nodes = {
+                            ["127.0.0.4:1995"] = 1
+                        },
+                        type = "roundrobin"
+                    }
+                }
+            )
+            if code >= 300 then
+                ngx.status = code
+                ngx.say(body)
+                return
+            end
+
+            ngx.sleep(0.1)
+
+            local s = "pp\x04\x00\x00\x00\x00\x00\x00\x04a\x00\x00\x00ABCD"
+            assert(sock:send(s .. 
"pp\x04\x00\x00\x00\x00\x00\x00\x03c\x00\x00\x00ABC"))
+
+            while true do
+                local data, err = sock:receiveany(4096)
+                if not data then
+                    sock:close()
+                    break
+                end
+                ngx.print(data)
+            end
+        }
+    }
+--- request
+GET /t
+--- response_body eval
+"pp\x04\x00\x00\x00\x00\x00\x00\x03a\x00\x00\x00ABC" .
+"pp\x04\x00\x00\x00\x00\x00\x00\x04a\x00\x00\x00ABCD" .
+"pp\x04\x00\x00\x00\x00\x00\x00\x03c\x00\x00\x00ABC"
+--- grep_error_log eval
+qr/connect to \S+ while prereading client data/
+--- grep_error_log_out
+connect to 127.0.0.1:1995 while prereading client data
+connect to 127.0.0.3:1995 while prereading client data
+connect to 127.0.0.4:1995 while prereading client data
+--- stream_conf_enable

Reply via email to