This is an automated email from the ASF dual-hosted git repository.
spacewander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git
The following commit(s) were added to refs/heads/master by this push:
new 641481dde feat: stream subsystem support tars service discovery (#8826)
641481dde is described below
commit 641481dde6c1039209abf430784ee5774d503439
Author: Ashing Zheng <[email protected]>
AuthorDate: Fri Feb 17 10:05:02 2023 +0800
feat: stream subsystem support tars service discovery (#8826)
---
apisix/cli/ngx_tpl.lua | 4 +
apisix/discovery/tars/init.lua | 20 +++-
conf/config-default.yaml | 1 +
t/APISIX.pm | 5 +
t/tars/discovery/stream/tars.t | 216 +++++++++++++++++++++++++++++++++++++++++
5 files changed, 244 insertions(+), 2 deletions(-)
diff --git a/apisix/cli/ngx_tpl.lua b/apisix/cli/ngx_tpl.lua
index 7083d4201..bc6af4bf2 100644
--- a/apisix/cli/ngx_tpl.lua
+++ b/apisix/cli/ngx_tpl.lua
@@ -141,6 +141,10 @@ stream {
lua_shared_dict etcd-cluster-health-check-stream {*
stream.lua_shared_dict["etcd-cluster-health-check-stream"] *};
lua_shared_dict worker-events-stream {*
stream.lua_shared_dict["worker-events-stream"] *};
+ {% if enabled_discoveries["tars"] then %}
+ lua_shared_dict tars-stream {* stream.lua_shared_dict["tars-stream"] *};
+ {% end %}
+
{% if enabled_stream_plugins["limit-conn"] then %}
lua_shared_dict plugin-limit-conn-stream {*
stream.lua_shared_dict["plugin-limit-conn-stream"] *};
{% end %}
diff --git a/apisix/discovery/tars/init.lua b/apisix/discovery/tars/init.lua
index f621791dd..14f658dce 100644
--- a/apisix/discovery/tars/init.lua
+++ b/apisix/discovery/tars/init.lua
@@ -22,7 +22,8 @@ local tonumber = tonumber
local local_conf = require("apisix.core.config_local").local_conf()
local core = require("apisix.core")
local mysql = require("resty.mysql")
-local process = require("ngx.process")
+local is_http = ngx.config.subsystem == "http"
+local support_process, process = pcall(require, "ngx.process")
local endpoint_dict
@@ -331,9 +332,24 @@ function _M.nodes(servant)
return get_endpoint(servant)
end
+local function get_endpoint_dict()
+ local shm = "tars"
+
+ if not is_http then
+ shm = shm .. "-stream"
+ end
+
+ return ngx.shared[shm]
+end
function _M.init_worker()
- endpoint_dict = ngx.shared.tars
+ if not support_process then
+ core.log.error("tars discovery not support in subsystem: ",
ngx.config.subsystem,
+ ", please check if your openresty version >= 1.19.9.1
or not")
+ return
+ end
+
+ endpoint_dict = get_endpoint_dict()
if not endpoint_dict then
error("failed to get lua_shared_dict: tars, please check your APISIX
version")
end
diff --git a/conf/config-default.yaml b/conf/config-default.yaml
index db5cb2211..4d8d0c208 100755
--- a/conf/config-default.yaml
+++ b/conf/config-default.yaml
@@ -165,6 +165,7 @@ nginx_config: # config for render the
template to generate n
lrucache-lock-stream: 10m
plugin-limit-conn-stream: 10m
worker-events-stream: 10m
+ tars-stream: 1m
# As user can add arbitrary configurations in the snippet,
# it is user's responsibility to check the configurations
diff --git a/t/APISIX.pm b/t/APISIX.pm
index 8e9e72d11..e0ad70427 100644
--- a/t/APISIX.pm
+++ b/t/APISIX.pm
@@ -396,6 +396,7 @@ _EOC_
lua_shared_dict kubernetes-stream 1m;
lua_shared_dict kubernetes-first-stream 1m;
lua_shared_dict kubernetes-second-stream 1m;
+ lua_shared_dict tars-stream 1m;
upstream apisix_backend {
server 127.0.0.1:1900;
@@ -405,6 +406,8 @@ _EOC_
}
_EOC_
+ my $stream_extra_init_by_lua_start =
$block->stream_extra_init_by_lua_start // "";
+
my $stream_init_by_lua_block = $block->stream_init_by_lua_block // <<_EOC_;
if os.getenv("APISIX_ENABLE_LUACOV") == "1" then
require("luacov.runner")("t/apisix.luacov")
@@ -413,6 +416,8 @@ _EOC_
require "resty.core"
+ $stream_extra_init_by_lua_start
+
apisix = require("apisix")
local args = {
dns_resolver = $dns_addrs_tbl_str,
diff --git a/t/tars/discovery/stream/tars.t b/t/tars/discovery/stream/tars.t
new file mode 100644
index 000000000..b7c55a0f4
--- /dev/null
+++ b/t/tars/discovery/stream/tars.t
@@ -0,0 +1,216 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX;
+
+my $nginx_binary = $ENV{'TEST_NGINX_BINARY'} || 'nginx';
+my $version = eval { `$nginx_binary -V 2>&1` };
+
+if ($version =~ m/\/1.19.3/) {
+ plan(skip_all => "require OpenResty version >= 1.19.9.1");
+} else {
+ plan('no_plan');
+}
+
+repeat_each(1);
+log_level('warn');
+no_root_location();
+no_shuffle();
+workers(4);
+
+add_block_preprocessor(sub {
+ my ($block) = @_;
+
+ my $yaml_config = <<_EOC_;
+apisix:
+ node_listen: 1984
+ enable_admin: false
+deployment:
+ role: data_plane
+ role_data_plane:
+ config_provider: yaml
+discovery:
+ tars:
+ db_conf:
+ host: 127.0.0.1
+ port: 3306
+ database: db_tars
+ user: root
+ password: tars2022
+ full_fetch_interval: 3
+ incremental_fetch_interval: 1
+_EOC_
+
+ $block->set_value("yaml_config", $yaml_config);
+
+ my $apisix_yaml = $block->apisix_yaml // <<_EOC_;
+routes: []
+#END
+_EOC_
+
+ $block->set_value("apisix_yaml", $apisix_yaml);
+
+ my $extra_init_by_lua_start = <<_EOC_;
+ -- reduce incremental_fetch_interval,full_fetch_interval
+ local schema = require("apisix.discovery.tars.schema")
+ schema.properties.incremental_fetch_interval.minimum=1
+ schema.properties.incremental_fetch_interval.default=1
+ schema.properties.full_fetch_interval.minimum = 3
+ schema.properties.full_fetch_interval.default = 3
+_EOC_
+
+ $block->set_value("extra_init_by_lua_start", $extra_init_by_lua_start);
+ $block->set_value("stream_extra_init_by_lua_start",
$extra_init_by_lua_start);
+
+ my $config = $block->config // <<_EOC_;
+
+ location /sql {
+ content_by_lua_block {
+ local mysql = require("resty.mysql")
+ local core = require("apisix.core")
+ local ipairs = ipairs
+
+ ngx.req.read_body()
+ local sql = ngx.req.get_body_data()
+ core.log.info("get sql ", sql)
+
+ local db_conf= {
+ host="127.0.0.1",
+ port=3306,
+ database="db_tars",
+ user="root",
+ password="tars2022",
+ }
+
+ local db_cli, err = mysql:new()
+ if not db_cli then
+ core.log.error("failed to instantiate mysql: ", err)
+ return
+ end
+ db_cli:set_timeout(3000)
+
+ local ok, err, errcode, sqlstate = db_cli:connect(db_conf)
+ if not ok then
+ core.log.error("failed to connect mysql: ", err, ", ",
errcode, ", ", sqlstate)
+ return
+ end
+
+ local res, err, errcode, sqlstate = db_cli:query(sql)
+ if not res then
+ ngx.say("bad result: ", err, ": ", errcode, ": ", sqlstate,
".")
+ return
+ end
+ ngx.say("DONE")
+ }
+ }
+_EOC_
+
+ $block->set_value("config", $config);
+
+ my $stream_config = $block->stream_config // <<_EOC_;
+ server {
+ listen 8125;
+ content_by_lua_block {
+ local core = require("apisix.core")
+ local d = require("apisix.discovery.tars")
+
+ ngx.sleep(2)
+
+ local sock = ngx.req.socket()
+ local request_body = sock:receive()
+
+ core.log.info("get body ", request_body)
+
+ local response_body = "{"
+ local queries = core.json.decode(request_body)
+ for _,query in ipairs(queries) do
+ local nodes = d.nodes(query)
+ if nodes==nil or #nodes==0 then
+ response_body=response_body.." "..0
+ else
+ response_body=response_body.." "..#nodes
+ end
+ end
+ ngx.say(response_body.." }")
+ }
+ }
+
+_EOC_
+
+ $block->set_value("extra_stream_config", $stream_config);
+
+});
+
+run_tests();
+
+__DATA__
+
+=== TEST 1: create initial server and servant
+--- timeout: 3
+--- request eval
+[
+"POST /sql
+truncate table t_server_conf",
+
+"POST /sql
+truncate table t_adapter_conf",
+
+"POST /sql
+insert into t_server_conf(application, server_name, node_name,
registry_timestamp,
+ template_name, setting_state, present_state,
server_type)
+values ('A', 'AServer', '172.16.1.1', now(), 'taf-cpp', 'active', 'active',
'tars_cpp'),
+ ('B', 'BServer', '172.16.2.1', now(), 'taf-cpp', 'active', 'active',
'tars_cpp'),
+ ('C', 'CServer', '172.16.3.1', now(), 'taf-cpp', 'active', 'active',
'tars_cpp')",
+
+"POST /sql
+insert into t_adapter_conf(application, server_name, node_name, adapter_name,
endpoint, servant)
+values ('A', 'AServer', '172.16.1.1', 'A.AServer.FirstObjAdapter',
+ 'tcp -h 172.16.1.1 -p 10001 -e 0 -t 6000', 'A.AServer.FirstObj'),
+ ('B', 'BServer', '172.16.2.1', 'B.BServer.FirstObjAdapter',
+ 'tcp -p 10001 -h 172.16.2.1 -e 0 -t 6000', 'B.BServer.FirstObj'),
+ ('C', 'CServer', '172.16.3.1', 'C.CServer.FirstObjAdapter',
+ 'tcp -e 0 -h 172.16.3.1 -t 6000 -p 10001 ', 'C.CServer.FirstObj')",
+
+]
+--- response_body eval
+[
+ "DONE\n",
+ "DONE\n",
+ "DONE\n",
+ "DONE\n",
+]
+
+
+
+=== TEST 2: get count after create servant
+--- apisix_yaml
+stream_routes:
+ -
+ id: 1
+ server_port: 1985
+ upstream_id: 1
+
+upstreams:
+ - nodes:
+ "127.0.0.1:8125": 1
+ type: roundrobin
+ id: 1
+
+#END
+--- stream_request
+["A.AServer.FirstObj","B.BServer.FirstObj", "C.CServer.FirstObj"]
+--- stream_response eval
+qr{ 1 1 1 }