This is an automated email from the ASF dual-hosted git repository.

spacewander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new 641481dde feat: stream subsystem support tars service discovery (#8826)
641481dde is described below

commit 641481dde6c1039209abf430784ee5774d503439
Author: Ashing Zheng <[email protected]>
AuthorDate: Fri Feb 17 10:05:02 2023 +0800

    feat: stream subsystem support tars service discovery (#8826)
---
 apisix/cli/ngx_tpl.lua         |   4 +
 apisix/discovery/tars/init.lua |  20 +++-
 conf/config-default.yaml       |   1 +
 t/APISIX.pm                    |   5 +
 t/tars/discovery/stream/tars.t | 216 +++++++++++++++++++++++++++++++++++++++++
 5 files changed, 244 insertions(+), 2 deletions(-)

diff --git a/apisix/cli/ngx_tpl.lua b/apisix/cli/ngx_tpl.lua
index 7083d4201..bc6af4bf2 100644
--- a/apisix/cli/ngx_tpl.lua
+++ b/apisix/cli/ngx_tpl.lua
@@ -141,6 +141,10 @@ stream {
     lua_shared_dict etcd-cluster-health-check-stream {* 
stream.lua_shared_dict["etcd-cluster-health-check-stream"] *};
     lua_shared_dict worker-events-stream {* 
stream.lua_shared_dict["worker-events-stream"] *};
 
+    {% if enabled_discoveries["tars"] then %}
+    lua_shared_dict tars-stream {* stream.lua_shared_dict["tars-stream"] *};
+    {% end %}
+
     {% if enabled_stream_plugins["limit-conn"] then %}
     lua_shared_dict plugin-limit-conn-stream {* 
stream.lua_shared_dict["plugin-limit-conn-stream"] *};
     {% end %}
diff --git a/apisix/discovery/tars/init.lua b/apisix/discovery/tars/init.lua
index f621791dd..14f658dce 100644
--- a/apisix/discovery/tars/init.lua
+++ b/apisix/discovery/tars/init.lua
@@ -22,7 +22,8 @@ local tonumber = tonumber
 local local_conf = require("apisix.core.config_local").local_conf()
 local core = require("apisix.core")
 local mysql = require("resty.mysql")
-local process = require("ngx.process")
+local is_http = ngx.config.subsystem == "http"
+local support_process, process = pcall(require, "ngx.process")
 
 local endpoint_dict
 
@@ -331,9 +332,24 @@ function _M.nodes(servant)
     return get_endpoint(servant)
 end
 
+local function get_endpoint_dict()
+    local shm = "tars"
+
+    if not is_http then
+        shm = shm .. "-stream"
+    end
+
+    return ngx.shared[shm]
+end
 
 function _M.init_worker()
-    endpoint_dict = ngx.shared.tars
+    if not support_process then
+        core.log.error("tars discovery not support in subsystem: ", 
ngx.config.subsystem,
+                       ", please check if your openresty version >= 1.19.9.1 
or not")
+        return
+    end
+
+    endpoint_dict = get_endpoint_dict()
     if not endpoint_dict then
         error("failed to get lua_shared_dict: tars, please check your APISIX 
version")
     end
diff --git a/conf/config-default.yaml b/conf/config-default.yaml
index db5cb2211..4d8d0c208 100755
--- a/conf/config-default.yaml
+++ b/conf/config-default.yaml
@@ -165,6 +165,7 @@ nginx_config:                     # config for render the 
template to generate n
       lrucache-lock-stream: 10m
       plugin-limit-conn-stream: 10m
       worker-events-stream: 10m
+      tars-stream: 1m
 
   # As user can add arbitrary configurations in the snippet,
   # it is user's responsibility to check the configurations
diff --git a/t/APISIX.pm b/t/APISIX.pm
index 8e9e72d11..e0ad70427 100644
--- a/t/APISIX.pm
+++ b/t/APISIX.pm
@@ -396,6 +396,7 @@ _EOC_
     lua_shared_dict kubernetes-stream 1m;
     lua_shared_dict kubernetes-first-stream 1m;
     lua_shared_dict kubernetes-second-stream 1m;
+    lua_shared_dict tars-stream 1m;
 
     upstream apisix_backend {
         server 127.0.0.1:1900;
@@ -405,6 +406,8 @@ _EOC_
     }
 _EOC_
 
+    my $stream_extra_init_by_lua_start = 
$block->stream_extra_init_by_lua_start // "";
+
     my $stream_init_by_lua_block = $block->stream_init_by_lua_block // <<_EOC_;
         if os.getenv("APISIX_ENABLE_LUACOV") == "1" then
             require("luacov.runner")("t/apisix.luacov")
@@ -413,6 +416,8 @@ _EOC_
 
         require "resty.core"
 
+        $stream_extra_init_by_lua_start
+
         apisix = require("apisix")
         local args = {
             dns_resolver = $dns_addrs_tbl_str,
diff --git a/t/tars/discovery/stream/tars.t b/t/tars/discovery/stream/tars.t
new file mode 100644
index 000000000..b7c55a0f4
--- /dev/null
+++ b/t/tars/discovery/stream/tars.t
@@ -0,0 +1,216 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX;
+
+my $nginx_binary = $ENV{'TEST_NGINX_BINARY'} || 'nginx';
+my $version = eval { `$nginx_binary -V 2>&1` };
+
+if ($version =~ m/\/1.19.3/) {
+    plan(skip_all => "require OpenResty version >= 1.19.9.1");
+} else {
+    plan('no_plan');
+}
+
+repeat_each(1);
+log_level('warn');
+no_root_location();
+no_shuffle();
+workers(4);
+
+add_block_preprocessor(sub {
+    my ($block) = @_;
+
+    my $yaml_config = <<_EOC_;
+apisix:
+  node_listen: 1984
+  enable_admin: false
+deployment:
+    role: data_plane
+    role_data_plane:
+        config_provider: yaml
+discovery:
+  tars:
+    db_conf:
+      host: 127.0.0.1
+      port: 3306
+      database: db_tars
+      user: root
+      password: tars2022
+    full_fetch_interval: 3
+    incremental_fetch_interval: 1
+_EOC_
+
+    $block->set_value("yaml_config", $yaml_config);
+
+    my $apisix_yaml = $block->apisix_yaml // <<_EOC_;
+routes: []
+#END
+_EOC_
+
+    $block->set_value("apisix_yaml", $apisix_yaml);
+
+    my $extra_init_by_lua_start = <<_EOC_;
+        -- reduce incremental_fetch_interval,full_fetch_interval
+        local schema = require("apisix.discovery.tars.schema")
+        schema.properties.incremental_fetch_interval.minimum=1
+        schema.properties.incremental_fetch_interval.default=1
+        schema.properties.full_fetch_interval.minimum = 3
+        schema.properties.full_fetch_interval.default = 3
+_EOC_
+
+    $block->set_value("extra_init_by_lua_start", $extra_init_by_lua_start);
+    $block->set_value("stream_extra_init_by_lua_start", 
$extra_init_by_lua_start);
+
+    my $config = $block->config // <<_EOC_;
+
+        location /sql {
+            content_by_lua_block {
+                local mysql = require("resty.mysql")
+                local core = require("apisix.core")
+                local ipairs = ipairs
+
+                ngx.req.read_body()
+                local sql = ngx.req.get_body_data()
+                core.log.info("get sql ", sql)
+
+                local db_conf= {
+                  host="127.0.0.1",
+                  port=3306,
+                  database="db_tars",
+                  user="root",
+                  password="tars2022",
+                }
+
+                local db_cli, err = mysql:new()
+                if not db_cli then
+                  core.log.error("failed to instantiate mysql: ", err)
+                  return
+                end
+                db_cli:set_timeout(3000)
+
+                local ok, err, errcode, sqlstate = db_cli:connect(db_conf)
+                if not ok then
+                  core.log.error("failed to connect mysql: ", err, ", ", 
errcode, ", ", sqlstate)
+                  return
+                end
+
+                local res, err, errcode, sqlstate = db_cli:query(sql)
+                if not res then
+                   ngx.say("bad result: ", err, ": ", errcode, ": ", sqlstate, 
".")
+                   return
+                end
+                ngx.say("DONE")
+            }
+        }
+_EOC_
+
+    $block->set_value("config", $config);
+
+    my $stream_config = $block->stream_config // <<_EOC_;
+        server {
+            listen 8125;
+            content_by_lua_block {
+                local core = require("apisix.core")
+                local d = require("apisix.discovery.tars")
+
+                ngx.sleep(2)
+
+                local sock = ngx.req.socket()
+                local request_body = sock:receive()
+
+                core.log.info("get body ", request_body)
+
+                local response_body = "{"
+                local queries = core.json.decode(request_body)
+                for _,query in ipairs(queries) do
+                  local nodes = d.nodes(query)
+                  if nodes==nil or #nodes==0 then
+                      response_body=response_body.." "..0
+                  else
+                      response_body=response_body.." "..#nodes
+                  end
+                end
+                ngx.say(response_body.." }")
+            }
+        }
+
+_EOC_
+
+    $block->set_value("extra_stream_config", $stream_config);
+
+});
+
+run_tests();
+
+__DATA__
+
+=== TEST 1: create initial server and servant
+--- timeout: 3
+--- request eval
+[
+"POST /sql
+truncate table t_server_conf",
+
+"POST /sql
+truncate table t_adapter_conf",
+
+"POST /sql
+insert into t_server_conf(application, server_name, node_name, 
registry_timestamp,
+                          template_name, setting_state, present_state, 
server_type)
+values ('A', 'AServer', '172.16.1.1', now(), 'taf-cpp', 'active', 'active', 
'tars_cpp'),
+       ('B', 'BServer', '172.16.2.1', now(), 'taf-cpp', 'active', 'active', 
'tars_cpp'),
+       ('C', 'CServer', '172.16.3.1', now(), 'taf-cpp', 'active', 'active', 
'tars_cpp')",
+
+"POST /sql
+insert into t_adapter_conf(application, server_name, node_name, adapter_name, 
endpoint, servant)
+values ('A', 'AServer', '172.16.1.1', 'A.AServer.FirstObjAdapter',
+        'tcp -h 172.16.1.1 -p 10001 -e 0 -t 6000', 'A.AServer.FirstObj'),
+       ('B', 'BServer', '172.16.2.1', 'B.BServer.FirstObjAdapter',
+        'tcp -p 10001 -h 172.16.2.1 -e 0 -t 6000', 'B.BServer.FirstObj'),
+       ('C', 'CServer', '172.16.3.1', 'C.CServer.FirstObjAdapter',
+        'tcp -e 0 -h 172.16.3.1 -t 6000 -p 10001 ', 'C.CServer.FirstObj')",
+
+]
+--- response_body eval
+[
+    "DONE\n",
+    "DONE\n",
+    "DONE\n",
+    "DONE\n",
+]
+
+
+
+=== TEST 2: get count after create servant
+--- apisix_yaml
+stream_routes:
+  -
+    id: 1
+    server_port: 1985
+    upstream_id: 1
+
+upstreams:
+  - nodes:
+      "127.0.0.1:8125": 1
+    type: roundrobin
+    id: 1
+
+#END
+--- stream_request
+["A.AServer.FirstObj","B.BServer.FirstObj", "C.CServer.FirstObj"]
+--- stream_response eval
+qr{ 1 1 1 }

Reply via email to