This is an automated email from the ASF dual-hosted git repository.

monkeydluffy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new 3578fbe88 feat: add dubbo protocols support for xrpc  (#9660)
3578fbe88 is described below

commit 3578fbe8886c6944f782e1b025657c409c429c48
Author: wxbty <[email protected]>
AuthorDate: Wed Sep 13 22:06:27 2023 +0800

    feat: add dubbo protocols support for xrpc  (#9660)
---
 .github/workflows/build.yml                   |   2 +-
 Makefile                                      |   3 +
 apisix/stream/xrpc/protocols/dubbo/init.lua   | 231 ++++++++++++++++++++++++++
 apisix/stream/xrpc/protocols/dubbo/schema.lua |  32 ++++
 t/xrpc/dubbo.t                                | 168 +++++++++++++++++++
 5 files changed, 435 insertions(+), 1 deletion(-)

diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index 998bd913c..95e72fe57 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -136,7 +136,7 @@ jobs:
           [[ ${{ steps.test_env.outputs.type }} != first ]] && sudo 
./ci/init-${{ steps.test_env.outputs.type }}-test-service.sh after
           echo "Linux launch services, done."
       - name: Start Dubbo Backend
-        if: matrix.os_name == 'linux_openresty' && steps.test_env.outputs.type 
== 'plugin'
+        if: matrix.os_name == 'linux_openresty' && 
(steps.test_env.outputs.type == 'plugin' || steps.test_env.outputs.type == 
'last')
         run: |
           sudo apt install -y maven
           cd t/lib/dubbo-backend
diff --git a/Makefile b/Makefile
index 7588fd083..f3cc375e0 100644
--- a/Makefile
+++ b/Makefile
@@ -375,6 +375,9 @@ install: runtime
        $(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/stream/xrpc/protocols/redis
        $(ENV_INSTALL) apisix/stream/xrpc/protocols/redis/*.lua 
$(ENV_INST_LUADIR)/apisix/stream/xrpc/protocols/redis/
 
+       $(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/stream/xrpc/protocols/dubbo
+       $(ENV_INSTALL) apisix/stream/xrpc/protocols/dubbo/*.lua 
$(ENV_INST_LUADIR)/apisix/stream/xrpc/protocols/dubbo/
+
        $(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/utils
        $(ENV_INSTALL) apisix/utils/*.lua $(ENV_INST_LUADIR)/apisix/utils/
 
diff --git a/apisix/stream/xrpc/protocols/dubbo/init.lua 
b/apisix/stream/xrpc/protocols/dubbo/init.lua
new file mode 100644
index 000000000..19160d6c5
--- /dev/null
+++ b/apisix/stream/xrpc/protocols/dubbo/init.lua
@@ -0,0 +1,231 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+local sdk = require("apisix.stream.xrpc.sdk")
+local xrpc_socket = require("resty.apisix.stream.xrpc.socket")
+local math_random = math.random
+local ngx = ngx
+local OK = ngx.OK
+local str_format = string.format
+local DECLINED = ngx.DECLINED
+local DONE = ngx.DONE
+local bit = require("bit")
+local ffi = require("ffi")
+local ffi_str = ffi.string
+
+
+-- dubbo protocol spec: 
https://cn.dubbo.apache.org/zh-cn/overview/reference/protocols/tcp/
+local header_len = 16
+local _M = {}
+
+
+function _M.init_downstream(session)
+    session.req_id_seq = 0
+    session.resp_id_seq = 0
+    session.cmd_labels = { session.route.id, "" }
+    return xrpc_socket.downstream.socket()
+end
+
+
+local function parse_dubbo_header(header)
+    for i = 1, header_len do
+        local currentByte = header:byte(i)
+        if not currentByte then
+            return nil
+        end
+    end
+
+    local magic_number = str_format("%04x", header:byte(1) * 256 + 
header:byte(2))
+    local message_flag = header:byte(3)
+    local status = header:byte(4)
+    local request_id = 0
+    for i = 5, 12 do
+        request_id = request_id * 256 + header:byte(i)
+    end
+
+    local byte13Val = header:byte(13) * 256 * 256 * 256
+    local byte14Val = header:byte(14) * 256 * 256
+    local data_length = byte13Val + byte14Val + header:byte(15) * 256 + 
header:byte(16)
+
+    local is_request = bit.band(bit.rshift(message_flag, 7), 0x01) == 1 and 1 
or 0
+    local is_two_way = bit.band(bit.rshift(message_flag, 6), 0x01) == 1 and 1 
or 0
+    local is_event = bit.band(bit.rshift(message_flag, 5), 0x01) == 1 and 1 or 0
+
+    return {
+        magic_number = magic_number,
+        message_flag = message_flag,
+        is_request = is_request,
+        is_two_way = is_two_way,
+        is_event = is_event,
+        status = status,
+        request_id = request_id,
+        data_length = data_length
+    }
+end
+
+
+local function read_data(sk, is_req)
+    local header_data, err = sk:read(header_len)
+    if not header_data then
+        return nil, err, false
+    end
+
+    local header_str = ffi_str(header_data, header_len)
+    local header_info = parse_dubbo_header(header_str)
+    if not header_info then
+        return nil, "header insufficient", false
+    end
+
+    local is_valid_magic_number = header_info.magic_number == "dabb"
+    if not is_valid_magic_number then
+        return nil, str_format("unknown magic number: \"%s\"", 
header_info.magic_number), false
+    end
+
+    local body_data, err = sk:read(header_info.data_length)
+    if not body_data then
+        core.log.error("failed to read dubbo request body")
+        return nil, err, false
+    end
+
+    local ctx = ngx.ctx
+    ctx.dubbo_serialization_id = bit.band(header_info.message_flag, 0x1F)
+
+    if is_req then
+        ctx.dubbo_req_body_data = body_data
+    else
+        ctx.dubbo_rsp_body_data = body_data
+    end
+
+    return true, nil, false
+end
+
+
+local function read_req(sk)
+    return read_data(sk, true)
+end
+
+
+local function read_reply(sk)
+    return read_data(sk, false)
+end
+
+
+local function handle_reply(session, sk)
+    local ok, err = read_reply(sk)
+    if not ok then
+        return nil, err
+    end
+
+    local ctx = sdk.get_req_ctx(session, 10)
+
+    return ctx
+end
+
+
+function _M.from_downstream(session, downstream)
+    local read_pipeline = false
+    session.req_id_seq = session.req_id_seq + 1
+    local ctx = sdk.get_req_ctx(session, session.req_id_seq)
+    session._downstream_ctx = ctx
+    while true do
+        local ok, err, pipelined = read_req(downstream)
+        if not ok then
+            if err ~= "timeout" and err ~= "closed" then
+                core.log.error("failed to read request: ", err)
+            end
+
+            if read_pipeline and err == "timeout" then
+                break
+            end
+
+            return DECLINED
+        end
+
+        if not pipelined then
+            break
+        end
+
+        if not read_pipeline then
+            read_pipeline = true
+            -- set minimal read timeout to read pipelined data
+            downstream:settimeouts(0, 0, 1)
+        end
+    end
+
+    if read_pipeline then
+        -- set timeout back
+        downstream:settimeouts(0, 0, 0)
+    end
+
+    return OK, ctx
+end
+
+
+function _M.connect_upstream(session, ctx)
+    local conf = session.upstream_conf
+    local nodes = conf.nodes
+    if #nodes == 0 then
+        core.log.error("failed to connect: no nodes")
+        return DECLINED
+    end
+
+    local node = nodes[math_random(#nodes)]
+    local sk = sdk.connect_upstream(node, conf)
+    if not sk then
+        return DECLINED
+    end
+
+    core.log.debug("dubbo_connect_upstream end")
+
+    return OK, sk
+end
+
+function _M.disconnect_upstream(session, upstream)
+    sdk.disconnect_upstream(upstream, session.upstream_conf)
+end
+
+
+function _M.to_upstream(session, ctx, downstream, upstream)
+    local ok, _ = upstream:move(downstream)
+    if not ok then
+        return DECLINED
+    end
+
+    return OK
+end
+
+
+function _M.from_upstream(session, downstream, upstream)
+    local ctx,err = handle_reply(session, upstream)
+    if err then
+        return DECLINED
+    end
+
+    local ok, _ = downstream:move(upstream)
+    if not ok then
+        return DECLINED
+    end
+
+    return DONE, ctx
+end
+
+
+function _M.log(_, _)
+end
+
+
+return _M
diff --git a/apisix/stream/xrpc/protocols/dubbo/schema.lua 
b/apisix/stream/xrpc/protocols/dubbo/schema.lua
new file mode 100644
index 000000000..3a9d73325
--- /dev/null
+++ b/apisix/stream/xrpc/protocols/dubbo/schema.lua
@@ -0,0 +1,32 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+
+
+local schema = {
+    type = "object",
+}
+
+local _M = {}
+
+
+function _M.check_schema(conf)
+    return core.schema.check(schema, conf)
+end
+
+
+return _M
diff --git a/t/xrpc/dubbo.t b/t/xrpc/dubbo.t
new file mode 100644
index 000000000..290eadb3c
--- /dev/null
+++ b/t/xrpc/dubbo.t
@@ -0,0 +1,168 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX;
+
+my $nginx_binary = $ENV{'TEST_NGINX_BINARY'} || 'nginx';
+my $version = eval { `$nginx_binary -V 2>&1` };
+
+if ($version !~ m/\/apisix-nginx-module/) {
+    plan(skip_all => "apisix-nginx-module not installed");
+} else {
+    plan('no_plan');
+}
+add_block_preprocessor(sub {
+    my ($block) = @_;
+
+    if (!$block->extra_yaml_config) {
+        my $extra_yaml_config = <<_EOC_;
+xrpc:
+  protocols:
+    - name: dubbo
+_EOC_
+        $block->set_value("extra_yaml_config", $extra_yaml_config);
+    }
+
+    my $config = $block->config // <<_EOC_;
+    location /t {
+        content_by_lua_block {
+            ngx.req.read_body()
+            local sock = ngx.socket.tcp()
+            sock:settimeout(1000)
+            local ok, err = sock:connect("127.0.0.1", 1985)
+            if not ok then
+                ngx.log(ngx.ERR, "failed to connect: ", err)
+                return ngx.exit(503)
+            end
+
+            local bytes, err = sock:send(ngx.req.get_body_data())
+            if not bytes then
+                ngx.log(ngx.ERR, "send stream request error: ", err)
+                return ngx.exit(503)
+            end
+            while true do
+                local data, err = sock:receiveany(4096)
+                if not data then
+                    sock:close()
+                    break
+                end
+                ngx.print(data)
+            end
+        }
+    }
+_EOC_
+
+    $block->set_value("config", $config);
+
+    if ((!defined $block->error_log) && (!defined $block->no_error_log)) {
+        $block->set_value("no_error_log", "[error]\nRPC is not finished");
+    }
+
+    $block;
+});
+
+worker_connections(1024);
+run_tests;
+
+__DATA__
+
+=== TEST 1: init
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/stream_routes/1',
+                ngx.HTTP_PUT,
+                {
+                    protocol = {
+                        name = "dubbo"
+                    },
+                    upstream = {
+                        nodes = {
+                            ["127.0.0.1:20880"] = 1
+                        },
+                        type = "roundrobin"
+                    }
+                }
+            )
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- request
+GET /t
+--- response_body
+passed
+
+
+
+=== TEST 2: use dubbo_backend_provider server. 
request=org.apache.dubbo.backend.DemoService,service_version:1.0.1#hello,response=dubbo
 success & 200
+--- request eval
+"GET /t
+\xda\xbb\xc2\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xef\x05\x32\x2e\x30\x2e\x32\x30\x24\x6f\x72\x67\x2e\x61\x70\x61\x63\x68\x65\x2e\x64\x75\x62\x62\x6f\x2e\x62\x61\x63\x6b\x65\x6e\x64\x2e\x44\x65\x6d\x6f\x53\x65\x72\x76\x69\x63\x65\x05\x31\x2e\x30\x2e\x30\x05\x68\x65\x6c\x6c\x6f\x0f\x4c\x6a\x61\x76\x61\x2f\x75\x74\x69\x6c\x2f\x4d\x61\x70\x3b\x48\x04\x6e\x61\x6d\x65\x08\x7a\x68\x61\x6e\x67\x73\x61\x6e\x5a\x48\x04\x70\x61\x74\x68\x30\x24\x6f\x72\x67\x2e\x61\x70\x61\x63\x68\x65\x2e
 [...]
+--- response_body eval
+"\xda\xbb\x02\x14\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x30\x94\x48\x04\x62\x6f\x64\x79\x0e\x64\x75\x62\x62\x6f\x20\x73\x75\x63\x63\x65\x73\x73\x0a\x06\x73\x74\x61\x74\x75\x73\x03\x32\x30\x30\x5a\x48\x05\x64\x75\x62\x62\x6f\x05\x32\x2e\x30\x2e\x32\x5a"
+--- stream_conf_enable
+--- log_level: debug
+--- no_error_log
+
+
+
+=== TEST 3: heart beat. request=\xe2|11..,response=\x22|00...
+--- request eval
+"GET /t
+\xda\xbb\xe2\x00\x00\x00\x00\x00\x00\x00\x00\x34\x00\x00\x00\x01\x4e"
+--- response_body eval
+"\xda\xbb\x22\x14\x00\x00\x00\x00\x00\x00\x00\x34\x00\x00\x00\x01\x4e"
+--- stream_conf_enable
+--- log_level: debug
+--- no_error_log
+
+
+
+=== TEST 4: no response. Different from test2 \x82=10000010, the second bit=0 
of the third byte means no need to return
+--- request eval
+"GET /t
+\xda\xbb\x82\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xef\x05\x32\x2e\x30\x2e\x32\x30\x24\x6f\x72\x67\x2e\x61\x70\x61\x63\x68\x65\x2e\x64\x75\x62\x62\x6f\x2e\x62\x61\x63\x6b\x65\x6e\x64\x2e\x44\x65\x6d\x6f\x53\x65\x72\x76\x69\x63\x65\x05\x31\x2e\x30\x2e\x30\x05\x68\x65\x6c\x6c\x6f\x0f\x4c\x6a\x61\x76\x61\x2f\x75\x74\x69\x6c\x2f\x4d\x61\x70\x3b\x48\x04\x6e\x61\x6d\x65\x08\x7a\x68\x61\x6e\x67\x73\x61\x6e\x5a\x48\x04\x70\x61\x74\x68\x30\x24\x6f\x72\x67\x2e\x61\x70\x61\x63\x68\x65\x2e
 [...]
+--- response_body eval
+""
+--- stream_conf_enable
+--- log_level: debug
+--- no_error_log
+
+
+
+=== TEST 5: failed response. 
request=org.apache.dubbo.backend.DemoService,service_version:1.0.1#fail,response=503
+--- request eval
+"GET /t
+\xda\xbb\xc2\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xee\x05\x32\x2e\x30\x2e\x32\x30\x24\x6f\x72\x67\x2e\x61\x70\x61\x63\x68\x65\x2e\x64\x75\x62\x62\x6f\x2e\x62\x61\x63\x6b\x65\x6e\x64\x2e\x44\x65\x6d\x6f\x53\x65\x72\x76\x69\x63\x65\x05\x31\x2e\x30\x2e\x30\x04\x66\x61\x69\x6c\x0f\x4c\x6a\x61\x76\x61\x2f\x75\x74\x69\x6c\x2f\x4d\x61\x70\x3b\x48\x04\x6e\x61\x6d\x65\x08\x7a\x68\x61\x6e\x67\x73\x61\x6e\x5a\x48\x04\x70\x61\x74\x68\x30\x24\x6f\x72\x67\x2e\x61\x70\x61\x63\x68\x65\x2e\x64
 [...]
+--- response_body eval
+"\xda\xbb\x02\x14\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x2d\x94\x48\x04\x62\x6f\x64\x79\x0b\x64\x75\x62\x62\x6f\x20\x66\x61\x69\x6c\x0a\x06\x73\x74\x61\x74\x75\x73\x03\x35\x30\x33\x5a\x48\x05\x64\x75\x62\x62\x6f\x05\x32\x2e\x30\x2e\x32\x5a"
+--- stream_conf_enable
+--- log_level: debug
+--- no_error_log
+
+
+
+=== TEST 6: invalid magic(dabc<>dabb) for heart beat.
+--- request eval
+"GET /t
+\xda\xbc\xe2\x00\x00\x00\x00\x00\x00\x00\x00\x34\x00\x00\x00\x01\x4e"
+--- error_log
+unknown magic number
+--- stream_conf_enable

Reply via email to