This is an automated email from the ASF dual-hosted git repository.
monkeydluffy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git
The following commit(s) were added to refs/heads/master by this push:
new 3578fbe88 feat: add dubbo protocols support for xrpc (#9660)
3578fbe88 is described below
commit 3578fbe8886c6944f782e1b025657c409c429c48
Author: wxbty <[email protected]>
AuthorDate: Wed Sep 13 22:06:27 2023 +0800
feat: add dubbo protocols support for xrpc (#9660)
---
.github/workflows/build.yml | 2 +-
Makefile | 3 +
apisix/stream/xrpc/protocols/dubbo/init.lua | 231 ++++++++++++++++++++++++++
apisix/stream/xrpc/protocols/dubbo/schema.lua | 32 ++++
t/xrpc/dubbo.t | 168 +++++++++++++++++++
5 files changed, 435 insertions(+), 1 deletion(-)
diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index 998bd913c..95e72fe57 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -136,7 +136,7 @@ jobs:
[[ ${{ steps.test_env.outputs.type }} != first ]] && sudo
./ci/init-${{ steps.test_env.outputs.type }}-test-service.sh after
echo "Linux launch services, done."
- name: Start Dubbo Backend
- if: matrix.os_name == 'linux_openresty' && steps.test_env.outputs.type
== 'plugin'
+ if: matrix.os_name == 'linux_openresty' &&
(steps.test_env.outputs.type == 'plugin' || steps.test_env.outputs.type ==
'last')
run: |
sudo apt install -y maven
cd t/lib/dubbo-backend
diff --git a/Makefile b/Makefile
index 7588fd083..f3cc375e0 100644
--- a/Makefile
+++ b/Makefile
@@ -375,6 +375,9 @@ install: runtime
$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/stream/xrpc/protocols/redis
$(ENV_INSTALL) apisix/stream/xrpc/protocols/redis/*.lua
$(ENV_INST_LUADIR)/apisix/stream/xrpc/protocols/redis/
+ $(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/stream/xrpc/protocols/dubbo
+ $(ENV_INSTALL) apisix/stream/xrpc/protocols/dubbo/*.lua
$(ENV_INST_LUADIR)/apisix/stream/xrpc/protocols/dubbo/
+
$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/utils
$(ENV_INSTALL) apisix/utils/*.lua $(ENV_INST_LUADIR)/apisix/utils/
diff --git a/apisix/stream/xrpc/protocols/dubbo/init.lua
b/apisix/stream/xrpc/protocols/dubbo/init.lua
new file mode 100644
index 000000000..19160d6c5
--- /dev/null
+++ b/apisix/stream/xrpc/protocols/dubbo/init.lua
@@ -0,0 +1,231 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+local sdk = require("apisix.stream.xrpc.sdk")
+local xrpc_socket = require("resty.apisix.stream.xrpc.socket")
+local math_random = math.random
+local ngx = ngx
+local OK = ngx.OK
+local str_format = string.format
+local DECLINED = ngx.DECLINED
+local DONE = ngx.DONE
+local bit = require("bit")
+local ffi = require("ffi")
+local ffi_str = ffi.string
+
+
+-- dubbo protocol spec:
https://cn.dubbo.apache.org/zh-cn/overview/reference/protocols/tcp/
+local header_len = 16
+local _M = {}
+
+
+function _M.init_downstream(session)
+ session.req_id_seq = 0
+ session.resp_id_seq = 0
+ session.cmd_labels = { session.route.id, "" }
+ return xrpc_socket.downstream.socket()
+end
+
+
+local function parse_dubbo_header(header)
+ for i = 1, header_len do
+ local currentByte = header:byte(i)
+ if not currentByte then
+ return nil
+ end
+ end
+
+ local magic_number = str_format("%04x", header:byte(1) * 256 +
header:byte(2))
+ local message_flag = header:byte(3)
+ local status = header:byte(4)
+ local request_id = 0
+ for i = 5, 12 do
+ request_id = request_id * 256 + header:byte(i)
+ end
+
+ local byte13Val = header:byte(13) * 256 * 256 * 256
+ local byte14Val = header:byte(14) * 256 * 256
+ local data_length = byte13Val + byte14Val + header:byte(15) * 256 +
header:byte(16)
+
+ local is_request = bit.band(bit.rshift(message_flag, 7), 0x01) == 1 and 1
or 0
+ local is_two_way = bit.band(bit.rshift(message_flag, 6), 0x01) == 1 and 1
or 0
+ local is_event = bit.band(bit.rshift(message_flag, 5), 0x01) == 1 and 1 or 0
+
+ return {
+ magic_number = magic_number,
+ message_flag = message_flag,
+ is_request = is_request,
+ is_two_way = is_two_way,
+ is_event = is_event,
+ status = status,
+ request_id = request_id,
+ data_length = data_length
+ }
+end
+
+
+local function read_data(sk, is_req)
+ local header_data, err = sk:read(header_len)
+ if not header_data then
+ return nil, err, false
+ end
+
+ local header_str = ffi_str(header_data, header_len)
+ local header_info = parse_dubbo_header(header_str)
+ if not header_info then
+ return nil, "header insufficient", false
+ end
+
+ local is_valid_magic_number = header_info.magic_number == "dabb"
+ if not is_valid_magic_number then
+ return nil, str_format("unknown magic number: \"%s\"",
header_info.magic_number), false
+ end
+
+ local body_data, err = sk:read(header_info.data_length)
+ if not body_data then
+ core.log.error("failed to read dubbo request body")
+ return nil, err, false
+ end
+
+ local ctx = ngx.ctx
+ ctx.dubbo_serialization_id = bit.band(header_info.message_flag, 0x1F)
+
+ if is_req then
+ ctx.dubbo_req_body_data = body_data
+ else
+ ctx.dubbo_rsp_body_data = body_data
+ end
+
+ return true, nil, false
+end
+
+
+local function read_req(sk)
+ return read_data(sk, true)
+end
+
+
+local function read_reply(sk)
+ return read_data(sk, false)
+end
+
+
+local function handle_reply(session, sk)
+ local ok, err = read_reply(sk)
+ if not ok then
+ return nil, err
+ end
+
+ local ctx = sdk.get_req_ctx(session, 10)
+
+ return ctx
+end
+
+
+function _M.from_downstream(session, downstream)
+ local read_pipeline = false
+ session.req_id_seq = session.req_id_seq + 1
+ local ctx = sdk.get_req_ctx(session, session.req_id_seq)
+ session._downstream_ctx = ctx
+ while true do
+ local ok, err, pipelined = read_req(downstream)
+ if not ok then
+ if err ~= "timeout" and err ~= "closed" then
+ core.log.error("failed to read request: ", err)
+ end
+
+ if read_pipeline and err == "timeout" then
+ break
+ end
+
+ return DECLINED
+ end
+
+ if not pipelined then
+ break
+ end
+
+ if not read_pipeline then
+ read_pipeline = true
+ -- set minimal read timeout to read pipelined data
+ downstream:settimeouts(0, 0, 1)
+ end
+ end
+
+ if read_pipeline then
+ -- set timeout back
+ downstream:settimeouts(0, 0, 0)
+ end
+
+ return OK, ctx
+end
+
+
+function _M.connect_upstream(session, ctx)
+ local conf = session.upstream_conf
+ local nodes = conf.nodes
+ if #nodes == 0 then
+ core.log.error("failed to connect: no nodes")
+ return DECLINED
+ end
+
+ local node = nodes[math_random(#nodes)]
+ local sk = sdk.connect_upstream(node, conf)
+ if not sk then
+ return DECLINED
+ end
+
+ core.log.debug("dubbo_connect_upstream end")
+
+ return OK, sk
+end
+
+function _M.disconnect_upstream(session, upstream)
+ sdk.disconnect_upstream(upstream, session.upstream_conf)
+end
+
+
+function _M.to_upstream(session, ctx, downstream, upstream)
+ local ok, _ = upstream:move(downstream)
+ if not ok then
+ return DECLINED
+ end
+
+ return OK
+end
+
+
+function _M.from_upstream(session, downstream, upstream)
+ local ctx,err = handle_reply(session, upstream)
+ if err then
+ return DECLINED
+ end
+
+ local ok, _ = downstream:move(upstream)
+ if not ok then
+ return DECLINED
+ end
+
+ return DONE, ctx
+end
+
+
+function _M.log(_, _)
+end
+
+
+return _M
diff --git a/apisix/stream/xrpc/protocols/dubbo/schema.lua
b/apisix/stream/xrpc/protocols/dubbo/schema.lua
new file mode 100644
index 000000000..3a9d73325
--- /dev/null
+++ b/apisix/stream/xrpc/protocols/dubbo/schema.lua
@@ -0,0 +1,32 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+
+
+local schema = {
+ type = "object",
+}
+
+local _M = {}
+
+
+function _M.check_schema(conf)
+ return core.schema.check(schema, conf)
+end
+
+
+return _M
diff --git a/t/xrpc/dubbo.t b/t/xrpc/dubbo.t
new file mode 100644
index 000000000..290eadb3c
--- /dev/null
+++ b/t/xrpc/dubbo.t
@@ -0,0 +1,168 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX;
+
+my $nginx_binary = $ENV{'TEST_NGINX_BINARY'} || 'nginx';
+my $version = eval { `$nginx_binary -V 2>&1` };
+
+if ($version !~ m/\/apisix-nginx-module/) {
+ plan(skip_all => "apisix-nginx-module not installed");
+} else {
+ plan('no_plan');
+}
+add_block_preprocessor(sub {
+ my ($block) = @_;
+
+ if (!$block->extra_yaml_config) {
+ my $extra_yaml_config = <<_EOC_;
+xrpc:
+ protocols:
+ - name: dubbo
+_EOC_
+ $block->set_value("extra_yaml_config", $extra_yaml_config);
+ }
+
+ my $config = $block->config // <<_EOC_;
+ location /t {
+ content_by_lua_block {
+ ngx.req.read_body()
+ local sock = ngx.socket.tcp()
+ sock:settimeout(1000)
+ local ok, err = sock:connect("127.0.0.1", 1985)
+ if not ok then
+ ngx.log(ngx.ERR, "failed to connect: ", err)
+ return ngx.exit(503)
+ end
+
+ local bytes, err = sock:send(ngx.req.get_body_data())
+ if not bytes then
+ ngx.log(ngx.ERR, "send stream request error: ", err)
+ return ngx.exit(503)
+ end
+ while true do
+ local data, err = sock:receiveany(4096)
+ if not data then
+ sock:close()
+ break
+ end
+ ngx.print(data)
+ end
+ }
+ }
+_EOC_
+
+ $block->set_value("config", $config);
+
+ if ((!defined $block->error_log) && (!defined $block->no_error_log)) {
+ $block->set_value("no_error_log", "[error]\nRPC is not finished");
+ }
+
+ $block;
+});
+
+worker_connections(1024);
+run_tests;
+
+__DATA__
+
+=== TEST 1: init
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/stream_routes/1',
+ ngx.HTTP_PUT,
+ {
+ protocol = {
+ name = "dubbo"
+ },
+ upstream = {
+ nodes = {
+ ["127.0.0.1:20880"] = 1
+ },
+ type = "roundrobin"
+ }
+ }
+ )
+ if code >= 300 then
+ ngx.status = code
+ end
+ ngx.say(body)
+ }
+ }
+--- request
+GET /t
+--- response_body
+passed
+
+
+
+=== TEST 2: use dubbo_backend_provider server.
request=org.apache.dubbo.backend.DemoService,service_version:1.0.1#hello,response=dubbo
success & 200
+--- request eval
+"GET /t
+\xda\xbb\xc2\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xef\x05\x32\x2e\x30\x2e\x32\x30\x24\x6f\x72\x67\x2e\x61\x70\x61\x63\x68\x65\x2e\x64\x75\x62\x62\x6f\x2e\x62\x61\x63\x6b\x65\x6e\x64\x2e\x44\x65\x6d\x6f\x53\x65\x72\x76\x69\x63\x65\x05\x31\x2e\x30\x2e\x30\x05\x68\x65\x6c\x6c\x6f\x0f\x4c\x6a\x61\x76\x61\x2f\x75\x74\x69\x6c\x2f\x4d\x61\x70\x3b\x48\x04\x6e\x61\x6d\x65\x08\x7a\x68\x61\x6e\x67\x73\x61\x6e\x5a\x48\x04\x70\x61\x74\x68\x30\x24\x6f\x72\x67\x2e\x61\x70\x61\x63\x68\x65\x2e
[...]
+--- response_body eval
+"\xda\xbb\x02\x14\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x30\x94\x48\x04\x62\x6f\x64\x79\x0e\x64\x75\x62\x62\x6f\x20\x73\x75\x63\x63\x65\x73\x73\x0a\x06\x73\x74\x61\x74\x75\x73\x03\x32\x30\x30\x5a\x48\x05\x64\x75\x62\x62\x6f\x05\x32\x2e\x30\x2e\x32\x5a"
+--- stream_conf_enable
+--- log_level: debug
+--- no_error_log
+
+
+
+=== TEST 3: heart beat. request=\xe2|11..,response=\x22|00...
+--- request eval
+"GET /t
+\xda\xbb\xe2\x00\x00\x00\x00\x00\x00\x00\x00\x34\x00\x00\x00\x01\x4e"
+--- response_body eval
+"\xda\xbb\x22\x14\x00\x00\x00\x00\x00\x00\x00\x34\x00\x00\x00\x01\x4e"
+--- stream_conf_enable
+--- log_level: debug
+--- no_error_log
+
+
+
+=== TEST 4: no response. Different from test2 \x82=10000010, the second bit=0
of the third byte means no need to return
+--- request eval
+"GET /t
+\xda\xbb\x82\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xef\x05\x32\x2e\x30\x2e\x32\x30\x24\x6f\x72\x67\x2e\x61\x70\x61\x63\x68\x65\x2e\x64\x75\x62\x62\x6f\x2e\x62\x61\x63\x6b\x65\x6e\x64\x2e\x44\x65\x6d\x6f\x53\x65\x72\x76\x69\x63\x65\x05\x31\x2e\x30\x2e\x30\x05\x68\x65\x6c\x6c\x6f\x0f\x4c\x6a\x61\x76\x61\x2f\x75\x74\x69\x6c\x2f\x4d\x61\x70\x3b\x48\x04\x6e\x61\x6d\x65\x08\x7a\x68\x61\x6e\x67\x73\x61\x6e\x5a\x48\x04\x70\x61\x74\x68\x30\x24\x6f\x72\x67\x2e\x61\x70\x61\x63\x68\x65\x2e
[...]
+--- response_body eval
+""
+--- stream_conf_enable
+--- log_level: debug
+--- no_error_log
+
+
+
+=== TEST 5: failed response.
request=org.apache.dubbo.backend.DemoService,service_version:1.0.1#fail,response=503
+--- request eval
+"GET /t
+\xda\xbb\xc2\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xee\x05\x32\x2e\x30\x2e\x32\x30\x24\x6f\x72\x67\x2e\x61\x70\x61\x63\x68\x65\x2e\x64\x75\x62\x62\x6f\x2e\x62\x61\x63\x6b\x65\x6e\x64\x2e\x44\x65\x6d\x6f\x53\x65\x72\x76\x69\x63\x65\x05\x31\x2e\x30\x2e\x30\x04\x66\x61\x69\x6c\x0f\x4c\x6a\x61\x76\x61\x2f\x75\x74\x69\x6c\x2f\x4d\x61\x70\x3b\x48\x04\x6e\x61\x6d\x65\x08\x7a\x68\x61\x6e\x67\x73\x61\x6e\x5a\x48\x04\x70\x61\x74\x68\x30\x24\x6f\x72\x67\x2e\x61\x70\x61\x63\x68\x65\x2e\x64
[...]
+--- response_body eval
+"\xda\xbb\x02\x14\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x2d\x94\x48\x04\x62\x6f\x64\x79\x0b\x64\x75\x62\x62\x6f\x20\x66\x61\x69\x6c\x0a\x06\x73\x74\x61\x74\x75\x73\x03\x35\x30\x33\x5a\x48\x05\x64\x75\x62\x62\x6f\x05\x32\x2e\x30\x2e\x32\x5a"
+--- stream_conf_enable
+--- log_level: debug
+--- no_error_log
+
+
+
+=== TEST 6: invalid magic(dabc<>dabb) for heart beat.
+--- request eval
+"GET /t
+\xda\xbc\xe2\x00\x00\x00\x00\x00\x00\x00\x00\x34\x00\x00\x00\x01\x4e"
+--- error_log
+unknown magic number
+--- stream_conf_enable