This is an automated email from the ASF dual-hosted git repository.

spacewander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c54cf5e6 feat(xrpc): register variable rpc_time (#7040)
3c54cf5e6 is described below

commit 3c54cf5e6514bebc73bdec3226ed816a2c78c894
Author: tzssangglass <tzssanggl...@gmail.com>
AuthorDate: Mon May 16 16:55:44 2022 +0800

    feat(xrpc): register variable rpc_time (#7040)
---
 apisix/core/ctx.lua                                |   6 +-
 apisix/stream/xrpc/runner.lua                      |   8 +-
 docs/en/latest/apisix-variable.md                  |   1 +
 docs/en/latest/xrpc.md                             |  41 +++++
 .../apisix/stream/xrpc/protocols/pingpong/init.lua |   4 +
 t/xrpc/pingpong2.t                                 |  20 +--
 t/xrpc/pingpong3.t                                 | 193 +++++++++++++++++++++
 7 files changed, 259 insertions(+), 14 deletions(-)

diff --git a/apisix/core/ctx.lua b/apisix/core/ctx.lua
index df6e430ce..5bf3daa57 100644
--- a/apisix/core/ctx.lua
+++ b/apisix/core/ctx.lua
@@ -278,11 +278,12 @@ do
             else
                 local getter = apisix_var_names[key]
                 if getter then
+                    local ctx = t._ctx
                     if getter == true then
-                        val = ngx.ctx.api_ctx and ngx.ctx.api_ctx[key]
+                        val = ctx and ctx[key]
                     else
                         -- the getter is registered by ctx.register_var
-                        val = getter(ngx.ctx.api_ctx)
+                        val = getter(ctx)
                     end
 
                 else
@@ -341,6 +342,7 @@ function _M.set_vars_meta(ctx)
     end
 
     var._request = get_request()
+    var._ctx = ctx
     setmetatable(var, mt)
     ctx.var = var
 end
diff --git a/apisix/stream/xrpc/runner.lua b/apisix/stream/xrpc/runner.lua
index 51dbd271e..5f1b97d1a 100644
--- a/apisix/stream/xrpc/runner.lua
+++ b/apisix/stream/xrpc/runner.lua
@@ -27,6 +27,11 @@ local pcall = pcall
 local ipairs = ipairs
 local tostring = tostring
 
+
+core.ctx.register_var("rpc_time", function(ctx)
+    return ctx._rpc_end_time - ctx._rpc_start_time
+end)
+
 local logger_expr_cache = core.lrucache.new({
     ttl = 300, count = 1024
 })
@@ -72,7 +77,7 @@ local function filter_logger(ctx, logger)
         core.log.error("failed to validate the 'filter' expression: ", err)
         return false
     end
-    return filter_expr:eval(ctx)
+    return filter_expr:eval(ctx.var)
 end
 
 
@@ -93,7 +98,6 @@ end
 
 local function finialize_req(protocol, session, ctx)
     ctx._rpc_end_time = ngx_now()
-
     local loggers = session.route.protocol.logger
     if loggers and #loggers > 0 then
         for _, logger in ipairs(loggers) do
diff --git a/docs/en/latest/apisix-variable.md 
b/docs/en/latest/apisix-variable.md
index d6254b820..c9281731b 100644
--- a/docs/en/latest/apisix-variable.md
+++ b/docs/en/latest/apisix-variable.md
@@ -39,5 +39,6 @@ List in alphabetical order:
 | route_name       | core    | name of `route`        |   |
 | service_id       | core    | id of `service`        |   |
 | service_name     | core    | name of `service`      |   |
+| rpc_time         | xRPC    | time spent at the rpc request level |   |
 
 You can also [register your own 
variable](./plugin-develop.md#register-custom-variable).
diff --git a/docs/en/latest/xrpc.md b/docs/en/latest/xrpc.md
index 1fb2a5159..9ad486e83 100644
--- a/docs/en/latest/xrpc.md
+++ b/docs/en/latest/xrpc.md
@@ -131,6 +131,47 @@ One specifies the `superior_id`, whose corresponding value 
is the ID of another
 
 For example, for the Dubbo RPC protocol, the subordinate route is matched 
based on the service_name and other parameters configured in the route and the 
actual service_name brought in the request. If the match is successful, the 
configuration above the subordinate route is used, otherwise the configuration 
of the superior route is still used. In the above example, if the match for 
route 2 is successful, it will be forwarded to upstream 2; otherwise, it will 
still be forwarded to upstream 1.
 
+### Log Reporting
+
+xRPC supports logging-related functions. You can use this feature to filter 
requests that require attention, such as high latency, excessive transfer 
content, etc.
+
+Each logger item configuration parameter will contain
+
+- name: the Logger plugin name,
+- filter: the prerequisites for the execution of the logger plugin(e.g., 
request processing time exceeding a given value),
+- conf: the configuration of the logger plugin itself.
+
+ The following configuration is an example:
+
+```json
+{
+    ...
+    "protocol": {
+        "name": "redis",
+        "logger": {
+            {
+                "name": "syslog",
+                "filter": [
+                    ["rpc_time", ">=", 0.01]
+                ],
+                "conf": {
+                    "host": "127.0.0.1",
+                    "port": 8125,
+                }
+            }
+        }
+    }
+}
+```
+
+This configuration means that when the `rpc_time` is greater than 0.01 
seconds, xPRC reports the request log to the log server via the `syslog` 
plugin. `conf` is the configuration of the logging server required by the 
`syslog` plugin.
+
+Unlike standard TCP proxies, which only execute a logger when the connection 
is closed, xRPC's executed logger at the end of each 'request'.
+
+The protocol itself defines the granularity of the specific request, and the 
xRPC extension code implements the request's granularity.
+
+For example, in the Redis protocol, the execution of a command is considered a 
request.
+
 ## How to write your own protocol
 
 Assuming that your protocol is named `my_proto`, you need to create a 
directory that can be introduced by `require 
"apisix.stream.xrpc.protocols.my_proto"`.
diff --git a/t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua 
b/t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua
index 013725832..3ea0c7eaf 100644
--- a/t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua
+++ b/t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua
@@ -29,6 +29,10 @@ local DONE = ngx.DONE
 local str_byte = string.byte
 
 
+core.ctx.register_var("rpc_len", function(ctx)
+    return ctx.len
+end)
+
 local _M = {}
 local router_version
 local router
diff --git a/t/xrpc/pingpong2.t b/t/xrpc/pingpong2.t
index 93c57bd27..cdcf367c2 100644
--- a/t/xrpc/pingpong2.t
+++ b/t/xrpc/pingpong2.t
@@ -220,7 +220,7 @@ failed to validate the 'filter' expression: rule too short
                             {
                                 name = "syslog",
                                 filter = {
-                                    {"len", ">", 10}
+                                    {"rpc_len", ">", 10}
                                 },
                                 conf = {}
                             }
@@ -272,7 +272,7 @@ log filter: syslog filter result: true
                             {
                                 name = "syslog",
                                 filter = {
-                                    {"len", "<", 10}
+                                    {"rpc_len", "<", 10}
                                 },
                                 conf = {}
                             }
@@ -324,8 +324,8 @@ log filter: syslog filter result: false
                             {
                                 name = "syslog",
                                 filter = {
-                                    {"len", ">", 12},
-                                    {"len", "<", 14}
+                                    {"rpc_len", ">", 12},
+                                    {"rpc_len", "<", 14}
                                 },
                                 conf = {}
                             }
@@ -377,8 +377,8 @@ log filter: syslog filter result: true
                             {
                                 name = "syslog",
                                 filter = {
-                                    {"len", "<", 10},
-                                    {"len", ">", 12}
+                                    {"rpc_len", "<", 10},
+                                    {"rpc_len", ">", 12}
                                 },
                                 conf = {}
                             }
@@ -516,7 +516,7 @@ qr/message received:.*\"client_ip\\"\:\\"127.0.0.1\\"/
                             {
                                 name = "syslog",
                                 filter = {
-                                    {"len", ">", 10}
+                                    {"rpc_len", ">", 10}
                                 },
                                 conf = {
                                     host = "127.0.0.1",
@@ -576,7 +576,7 @@ qr/message received:.*\"client_ip\\"\:\\"127.0.0.1\\"/
                             {
                                 name = "syslog",
                                 filter = {
-                                    {"len", ">", 10}
+                                    {"rpc_len", ">", 10}
                                 },
                                 conf = {
                                     host = "127.0.0.1",
@@ -650,7 +650,7 @@ unlock with key xrpc-pingpong-logger#table
                             {
                                 name = "syslog",
                                 filter = {
-                                    {"len", ">", 10}
+                                    {"rpc_len", ">", 10}
                                 },
                                 conf = {
                                     host = "127.0.0.1",
@@ -698,7 +698,7 @@ unlock with key xrpc-pingpong-logger#table
                             {
                                 name = "syslog",
                                 filter = {
-                                    {"len", ">", 10}
+                                    {"rpc_len", ">", 10}
                                 },
                                 conf = {
                                     host = "127.0.0.1",
diff --git a/t/xrpc/pingpong3.t b/t/xrpc/pingpong3.t
new file mode 100644
index 000000000..c6d98810d
--- /dev/null
+++ b/t/xrpc/pingpong3.t
@@ -0,0 +1,193 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX;
+
+my $nginx_binary = $ENV{'TEST_NGINX_BINARY'} || 'nginx';
+my $version = eval { `$nginx_binary -V 2>&1` };
+
+if ($version !~ m/\/apisix-nginx-module/) {
+    plan(skip_all => "apisix-nginx-module not installed");
+} else {
+    plan('no_plan');
+}
+
+add_block_preprocessor(sub {
+    my ($block) = @_;
+
+    if (!$block->extra_yaml_config) {
+        my $extra_yaml_config = <<_EOC_;
+xrpc:
+  protocols:
+    - name: pingpong
+_EOC_
+        $block->set_value("extra_yaml_config", $extra_yaml_config);
+    }
+
+    my $config = $block->config // <<_EOC_;
+    location /t {
+        content_by_lua_block {
+            ngx.req.read_body()
+            local sock = ngx.socket.tcp()
+            sock:settimeout(1000)
+            local ok, err = sock:connect("127.0.0.1", 1985)
+            if not ok then
+                ngx.log(ngx.ERR, "failed to connect: ", err)
+                return ngx.exit(503)
+            end
+
+            local bytes, err = sock:send(ngx.req.get_body_data())
+            if not bytes then
+                ngx.log(ngx.ERR, "send stream request error: ", err)
+                return ngx.exit(503)
+            end
+            while true do
+                local data, err = sock:receiveany(4096)
+                if not data then
+                    sock:close()
+                    break
+                end
+                ngx.print(data)
+            end
+        }
+    }
+_EOC_
+
+    $block->set_value("config", $config);
+
+    my $stream_upstream_code = $block->stream_upstream_code // <<_EOC_;
+            local sock = ngx.req.socket(true)
+            sock:settimeout(10)
+            while true do
+                local data = sock:receiveany(4096)
+                if not data then
+                    return
+                end
+                sock:send(data)
+            end
+_EOC_
+
+    $block->set_value("stream_upstream_code", $stream_upstream_code);
+
+    if ((!defined $block->error_log) && (!defined $block->no_error_log)) {
+        $block->set_value("no_error_log", "[error]\nRPC is not finished");
+    }
+
+    if (!defined $block->extra_stream_config) {
+        my $stream_config = <<_EOC_;
+    server {
+        listen 8125 udp;
+        content_by_lua_block {
+            require("lib.mock_layer4").dogstatsd()
+        }
+    }
+_EOC_
+        $block->set_value("extra_stream_config", $stream_config);
+    }
+
+    $block;
+});
+
+run_tests;
+
+__DATA__
+
+=== TEST 1: set custom log format
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/plugin_metadata/syslog',
+                ngx.HTTP_PUT,
+                [[{
+                    "log_format": {
+                        "rpc_time": "$rpc_time"
+                    }
+                }]]
+                )
+
+            if code >= 300 then
+                ngx.status = code
+                ngx.say(body)
+                return
+            end
+
+            ngx.say(body)
+        }
+    }
+--- request
+GET /t
+--- response_body
+passed
+
+
+
+=== TEST 2: use vae rpc_time
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/stream_routes/1',
+                ngx.HTTP_PUT,
+                {
+                    protocol = {
+                        name = "pingpong",
+                        logger = {
+                            {
+                                name = "syslog",
+                                filter = {
+                                    {"rpc_time", ">=", 0}
+                                },
+                                conf = {
+                                    host = "127.0.0.1",
+                                    port = 8125,
+                                    sock_type = "udp",
+                                    batch_max_size = 1,
+                                    flush_limit = 1
+                                }
+                            }
+                        }
+                    },
+                    upstream = {
+                        nodes = {
+                            ["127.0.0.1:1995"] = 1
+                        },
+                        type = "roundrobin"
+                    }
+                }
+            )
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- request
+GET /t
+--- response_body
+passed
+
+
+
+=== TEST 3: verify the data received by the log server
+--- request eval
+"POST /t
+" .
+"pp\x02\x00\x00\x00\x00\x00\x00\x03ABC"
+--- stream_conf_enable
+--- wait: 0.5
+--- error_log eval
+qr/message received:.*\"rpc_time\\"\:(0.\d+|0)\}\"/

Reply via email to