This is an automated email from the ASF dual-hosted git repository.

zhangjintao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new 808d511  feat(stream): add limit-conn (#4515)
808d511 is described below

commit 808d511dd480618f1118d4b0a58cade1fe792680
Author: 罗泽轩 <[email protected]>
AuthorDate: Tue Jul 13 13:17:52 2021 +0800

    feat(stream): add limit-conn (#4515)
    
    * feat(stream): add limit-conn
    
    Signed-off-by: spacewander <[email protected]>
    
    * break down long bracket to avoid missing the closing long bracket "]]"
    
    https: //github.com/apache/apisix/pull/4515/checks?check_run_id=2962435093
    Signed-off-by: spacewander <[email protected]>
---
 Makefile                                           |   3 +
 apisix/cli/ngx_tpl.lua                             |   1 +
 apisix/plugins/limit-conn.lua                      |  86 +------
 .../{limit-conn.lua => limit-conn/init.lua}        |  47 +---
 apisix/stream/plugins/limit-conn.lua               |  59 +++++
 conf/config-default.yaml                           |   2 +
 docs/en/latest/plugins/limit-conn.md               |   2 +
 docs/zh/latest/plugins/limit-conn.md               |   2 +
 t/APISIX.pm                                        |  25 +-
 t/lib/server.lua                                   |  14 +-
 t/plugin/limit-conn2.t                             |   2 +-
 t/plugin/openid-configuration.json                 |  75 ++++++
 t/plugin/request-validation.t                      | 150 ++++++------
 t/plugin/traffic-split.t                           | 191 ++++++++-------
 t/plugin/traffic-split2.t                          | 268 ++++++++++-----------
 t/stream-plugin/limit-conn.t                       | 191 +++++++++++++++
 16 files changed, 670 insertions(+), 448 deletions(-)

diff --git a/Makefile b/Makefile
index c0ae7dd..f82cf33 100644
--- a/Makefile
+++ b/Makefile
@@ -215,6 +215,9 @@ install: default
        $(INSTALL) -d $(INST_LUADIR)/apisix/plugins/grpc-transcode
        $(INSTALL) apisix/plugins/grpc-transcode/*.lua 
$(INST_LUADIR)/apisix/plugins/grpc-transcode/
 
+       $(INSTALL) -d $(INST_LUADIR)/apisix/plugins/limit-conn
+       $(INSTALL) apisix/plugins/limit-conn/*.lua 
$(INST_LUADIR)/apisix/plugins/limit-conn/
+
        $(INSTALL) -d $(INST_LUADIR)/apisix/plugins/limit-count
        $(INSTALL) apisix/plugins/limit-count/*.lua 
$(INST_LUADIR)/apisix/plugins/limit-count/
 
diff --git a/apisix/cli/ngx_tpl.lua b/apisix/cli/ngx_tpl.lua
index fc8b685..e68e834 100644
--- a/apisix/cli/ngx_tpl.lua
+++ b/apisix/cli/ngx_tpl.lua
@@ -67,6 +67,7 @@ stream {
     lua_socket_log_errors off;
 
     lua_shared_dict lrucache-lock-stream {* 
stream.lua_shared_dict["lrucache-lock-stream"] *};
+    lua_shared_dict plugin-limit-conn-stream {* 
stream.lua_shared_dict["plugin-limit-conn-stream"] *};
 
     resolver {% for _, dns_addr in ipairs(dns_resolver or {}) do %} 
{*dns_addr*} {% end %} {% if dns_resolver_valid then %} 
valid={*dns_resolver_valid*}{% end %};
     resolver_timeout {*resolver_timeout*};
diff --git a/apisix/plugins/limit-conn.lua b/apisix/plugins/limit-conn.lua
index 564a1b5..2f174c9 100644
--- a/apisix/plugins/limit-conn.lua
+++ b/apisix/plugins/limit-conn.lua
@@ -14,16 +14,11 @@
 -- See the License for the specific language governing permissions and
 -- limitations under the License.
 --
-local limit_conn_new = require("resty.limit.conn").new
 local core = require("apisix.core")
-local sleep = core.sleep
-local plugin_name = "limit-conn"
-
+local limit_conn = require("apisix.plugins.limit-conn.init")
 
-local lrucache = core.lrucache.new({
-    type = "plugin",
-})
 
+local plugin_name = "limit-conn"
 local schema = {
     type = "object",
     properties = {
@@ -41,7 +36,6 @@ local schema = {
     required = {"conn", "burst", "default_conn_delay", "key"}
 }
 
-
 local _M = {
     version = 0.1,
     priority = 1003,
@@ -49,87 +43,19 @@ local _M = {
     schema = schema,
 }
 
-function _M.check_schema(conf)
-    local ok, err = core.schema.check(schema, conf)
-    if not ok then
-        return false, err
-    end
-
-    return true
-end
 
-local function create_limit_obj(conf)
-    core.log.info("create new limit-conn plugin instance")
-    return limit_conn_new("plugin-limit-conn", conf.conn, conf.burst,
-                          conf.default_conn_delay)
+function _M.check_schema(conf)
+    return core.schema.check(schema, conf)
 end
 
 
 function _M.access(conf, ctx)
-    core.log.info("ver: ", ctx.conf_version)
-    local lim, err = lrucache(conf, nil, create_limit_obj, conf)
-    if not lim then
-        core.log.error("failed to instantiate a resty.limit.conn object: ", 
err)
-        return 500
-    end
-
-    local key = (ctx.var[conf.key] or "") .. ctx.conf_type .. ctx.conf_version
-    core.log.info("limit key: ", key)
-
-    local delay, err = lim:incoming(key, true)
-    if not delay then
-        if err == "rejected" then
-            return conf.rejected_code
-        end
-
-        core.log.error("failed to limit req: ", err)
-        return 500
-    end
-
-    if lim:is_committed() then
-        if not ctx.limit_conn then
-            ctx.limit_conn = core.tablepool.fetch("plugin#limit-conn", 0, 6)
-        end
-
-        core.table.insert_tail(ctx.limit_conn, lim, key, delay)
-    end
-
-    if delay >= 0.001 then
-        sleep(delay)
-    end
+    return limit_conn.increase(conf, ctx)
 end
 
 
 function _M.log(conf, ctx)
-    local limit_conn = ctx.limit_conn
-    if not limit_conn then
-        return
-    end
-
-    for i = 1, #limit_conn, 3 do
-        local lim = limit_conn[i]
-        local key = limit_conn[i + 1]
-        local delay = limit_conn[i + 2]
-
-        local latency
-        if ctx.proxy_passed then
-            latency = ctx.var.upstream_response_time
-        else
-            latency = ctx.var.request_time - delay
-        end
-
-        core.log.debug("request latency is ", latency) -- for test
-
-        local conn, err = lim:leaving(key, latency)
-        if not conn then
-            core.log.error("failed to record the connection leaving request: ",
-                           err)
-            break
-        end
-    end
-
-    core.tablepool.release("plugin#limit-conn", limit_conn)
-    return
+    return limit_conn.decrease(conf, ctx)
 end
 
 
diff --git a/apisix/plugins/limit-conn.lua b/apisix/plugins/limit-conn/init.lua
similarity index 72%
copy from apisix/plugins/limit-conn.lua
copy to apisix/plugins/limit-conn/init.lua
index 564a1b5..d2935c2 100644
--- a/apisix/plugins/limit-conn.lua
+++ b/apisix/plugins/limit-conn/init.lua
@@ -17,55 +17,26 @@
 local limit_conn_new = require("resty.limit.conn").new
 local core = require("apisix.core")
 local sleep = core.sleep
-local plugin_name = "limit-conn"
+local shdict_name = "plugin-limit-conn"
+if ngx.config.subsystem == "stream" then
+    shdict_name = shdict_name .. "-stream"
+end
 
 
 local lrucache = core.lrucache.new({
     type = "plugin",
 })
+local _M = {}
 
-local schema = {
-    type = "object",
-    properties = {
-        conn = {type = "integer", exclusiveMinimum = 0},
-        burst = {type = "integer",  minimum = 0},
-        default_conn_delay = {type = "number", exclusiveMinimum = 0},
-        key = {type = "string",
-            enum = {"remote_addr", "server_addr", "http_x_real_ip",
-                    "http_x_forwarded_for", "consumer_name"},
-        },
-        rejected_code = {
-            type = "integer", minimum = 200, maximum = 599, default = 503
-        },
-    },
-    required = {"conn", "burst", "default_conn_delay", "key"}
-}
-
-
-local _M = {
-    version = 0.1,
-    priority = 1003,
-    name = plugin_name,
-    schema = schema,
-}
-
-function _M.check_schema(conf)
-    local ok, err = core.schema.check(schema, conf)
-    if not ok then
-        return false, err
-    end
-
-    return true
-end
 
 local function create_limit_obj(conf)
     core.log.info("create new limit-conn plugin instance")
-    return limit_conn_new("plugin-limit-conn", conf.conn, conf.burst,
+    return limit_conn_new(shdict_name, conf.conn, conf.burst,
                           conf.default_conn_delay)
 end
 
 
-function _M.access(conf, ctx)
+function _M.increase(conf, ctx)
     core.log.info("ver: ", ctx.conf_version)
     local lim, err = lrucache(conf, nil, create_limit_obj, conf)
     if not lim then
@@ -79,7 +50,7 @@ function _M.access(conf, ctx)
     local delay, err = lim:incoming(key, true)
     if not delay then
         if err == "rejected" then
-            return conf.rejected_code
+            return conf.rejected_code or 503
         end
 
         core.log.error("failed to limit req: ", err)
@@ -100,7 +71,7 @@ function _M.access(conf, ctx)
 end
 
 
-function _M.log(conf, ctx)
+function _M.decrease(conf, ctx)
     local limit_conn = ctx.limit_conn
     if not limit_conn then
         return
diff --git a/apisix/stream/plugins/limit-conn.lua 
b/apisix/stream/plugins/limit-conn.lua
new file mode 100644
index 0000000..6f949c3
--- /dev/null
+++ b/apisix/stream/plugins/limit-conn.lua
@@ -0,0 +1,59 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+local limit_conn = require("apisix.plugins.limit-conn.init")
+
+
+local plugin_name = "limit-conn"
+local schema = {
+    type = "object",
+    properties = {
+        conn = {type = "integer", exclusiveMinimum = 0},
+        burst = {type = "integer",  minimum = 0},
+        default_conn_delay = {type = "number", exclusiveMinimum = 0},
+        key = {
+            type = "string",
+            enum = {"remote_addr", "server_addr"}
+        },
+    },
+    required = {"conn", "burst", "default_conn_delay", "key"}
+}
+
+local _M = {
+    version = 0.1,
+    priority = 1003,
+    name = plugin_name,
+    schema = schema,
+}
+
+
+function _M.check_schema(conf)
+    return core.schema.check(schema, conf)
+end
+
+
+function _M.preread(conf, ctx)
+    return limit_conn.increase(conf, ctx)
+end
+
+
+function _M.log(conf, ctx)
+    return limit_conn.decrease(conf, ctx)
+end
+
+
+return _M
diff --git a/conf/config-default.yaml b/conf/config-default.yaml
index 0e26b34..ebd28c7 100644
--- a/conf/config-default.yaml
+++ b/conf/config-default.yaml
@@ -148,6 +148,7 @@ nginx_config:                     # config for render the 
template to generate n
   stream:
     lua_shared_dict:
       lrucache-lock-stream: 10m
+      plugin-limit-conn-stream: 10m
 
   # As user can add arbitrary configurations in the snippet,
   # it is user's responsibility to check the configurations
@@ -319,6 +320,7 @@ plugins:                          # plugin list (sorted by 
priority)
   - ext-plugin-post-req            # priority: -3000
 
 stream_plugins: # sorted by priority
+  - limit-conn                     # priority: 1003
   - mqtt-proxy                     # priority: 1000
   # <- recommend to use priority (0, 100) for your custom plugins
 
diff --git a/docs/en/latest/plugins/limit-conn.md 
b/docs/en/latest/plugins/limit-conn.md
index ed04e05..b402a26 100644
--- a/docs/en/latest/plugins/limit-conn.md
+++ b/docs/en/latest/plugins/limit-conn.md
@@ -45,6 +45,8 @@ Limiting request concurrency plugin.
 
 **Key can be customized by the user, only need to modify a line of code of the 
plug-in to complete. It is a security consideration that is not open in the 
plugin.**
 
+When used in the stream proxy, only `remote_addr` and `server_addr` can be 
used as key. And `rejected_code` is meaningless.
+
 ## How To Enable
 
 Here's an example, enable the limit-conn plugin on the specified route:
diff --git a/docs/zh/latest/plugins/limit-conn.md 
b/docs/zh/latest/plugins/limit-conn.md
index e307beb..ef02c93 100644
--- a/docs/zh/latest/plugins/limit-conn.md
+++ b/docs/zh/latest/plugins/limit-conn.md
@@ -35,6 +35,8 @@ title: limit-conn
 
 **注:key 是可以被用户自定义的,只需要修改插件的一行代码即可完成。并没有在插件中放开是处于安全的考虑。**
 
+在 stream 代理中使用该插件时,只有 `remote_addr` 和 `server_addr` 可以被用作 key。另外设置 
`rejected_code` 毫无意义。
+
 #### 如何启用
 
 下面是一个示例,在指定的 route 上开启了 limit-conn 插件:
diff --git a/t/APISIX.pm b/t/APISIX.pm
index 418ead0..82ca1b4 100644
--- a/t/APISIX.pm
+++ b/t/APISIX.pm
@@ -295,11 +295,18 @@ _EOC_
     my $stream_enable = $block->stream_enable;
     my $stream_conf_enable = $block->stream_conf_enable;
     my $extra_stream_config = $block->extra_stream_config // '';
+    my $stream_upstream_code = $block->stream_upstream_code // <<_EOC_;
+            local sock = ngx.req.socket()
+            local data = sock:receive("1")
+            ngx.say("hello world")
+_EOC_
+
     my $stream_config = $block->stream_config // <<_EOC_;
     $lua_deps_path
     lua_socket_log_errors off;
 
     lua_shared_dict lrucache-lock-stream 10m;
+    lua_shared_dict plugin-limit-conn-stream 10m;
 
     upstream apisix_backend {
         server 127.0.0.1:1900;
@@ -339,9 +346,7 @@ _EOC_
         listen 1995;
 
         content_by_lua_block {
-            local sock = ngx.req.socket()
-            local data = sock:receive("1")
-            ngx.say("hello world")
+            $stream_upstream_code
         }
     }
 _EOC_
@@ -536,20 +541,6 @@ _EOC_
                 apisix.http_log_phase()
             }
         }
-
-        location = /v3/auth/authenticate {
-            content_by_lua_block {
-                ngx.log(ngx.WARN, "etcd auth failed!")
-            }
-        }
-
-        location  = /.well-known/openid-configuration {
-            content_by_lua_block {
-                ngx.say([[
-{"issuer":"https://samples.auth0.com/","authorization_endpoint":"https://samples.auth0.com/authorize","token_endpoint":"https://samples.auth0.com/oauth/token","device_authorization_endpoint":"https://samples.auth0.com/oauth/device/code","userinfo_endpoint":"https://samples.auth0.com/userinfo","mfa_challenge_endpoint":"https://samples.auth0.com/mfa/challenge","jwks_uri":"https://samples.auth0.com/.well-known/jwks.json","registration_endpoint":"https://samples.auth0.com/oidc/register","rev
 [...]
-                ]])
-            }
-        }
     }
 
     $a6_ngx_directives
diff --git a/t/lib/server.lua b/t/lib/server.lua
index e762104..09f01ed 100644
--- a/t/lib/server.lua
+++ b/t/lib/server.lua
@@ -370,7 +370,7 @@ end
 
 function _M.go()
     local action = string.sub(ngx.var.uri, 2)
-    action = string.gsub(action, "[/\\.]", "_")
+    action = string.gsub(action, "[/\\.-]", "_")
     if not action or not _M[action] then
         return ngx.exit(404)
     end
@@ -418,4 +418,16 @@ function _M.server_error()
 end
 
 
+function _M.v3_auth_authenticate()
+    ngx.log(ngx.WARN, "etcd auth failed!")
+end
+
+
+function _M._well_known_openid_configuration()
+    local t = require("lib.test_admin")
+    local openid_data = t.read_file("t/plugin/openid-configuration.json")
+    ngx.say(openid_data)
+end
+
+
 return _M
diff --git a/t/plugin/limit-conn2.t b/t/plugin/limit-conn2.t
index 914abca..565a5c7 100644
--- a/t/plugin/limit-conn2.t
+++ b/t/plugin/limit-conn2.t
@@ -16,7 +16,7 @@
 #
 BEGIN {
     if ($ENV{TEST_NGINX_CHECK_LEAK}) {
-        $SkipReason = "unavailable for the hup tests";
+        $SkipReason = "unavailable for the check leak tests";
 
     } else {
         $ENV{TEST_NGINX_USE_HUP} = 1;
diff --git a/t/plugin/openid-configuration.json 
b/t/plugin/openid-configuration.json
new file mode 100644
index 0000000..0788a9b
--- /dev/null
+++ b/t/plugin/openid-configuration.json
@@ -0,0 +1,75 @@
+{
+    "issuer": "https://samples.auth0.com/";,
+    "authorization_endpoint": "https://samples.auth0.com/authorize";,
+    "token_endpoint": "https://samples.auth0.com/oauth/token";,
+    "device_authorization_endpoint": 
"https://samples.auth0.com/oauth/device/code";,
+    "userinfo_endpoint": "https://samples.auth0.com/userinfo";,
+    "mfa_challenge_endpoint": "https://samples.auth0.com/mfa/challenge";,
+    "jwks_uri": "https://samples.auth0.com/.well-known/jwks.json";,
+    "registration_endpoint": "https://samples.auth0.com/oidc/register";,
+    "revocation_endpoint": "https://samples.auth0.com/oauth/revoke";,
+    "scopes_supported": [
+        "openid",
+        "profile",
+        "offline_access",
+        "name",
+        "given_name",
+        "family_name",
+        "nickname",
+        "email",
+        "email_verified",
+        "picture",
+        "created_at",
+        "identities",
+        "phone",
+        "address"
+    ],
+    "response_types_supported": [
+        "code",
+        "token",
+        "id_token",
+        "code token",
+        "code id_token",
+        "token id_token",
+        "code token id_token"
+    ],
+    "code_challenge_methods_supported": [
+        "S256",
+        "plain"
+    ],
+    "response_modes_supported": [
+        "query",
+        "fragment",
+        "form_post"
+    ],
+    "subject_types_supported": [
+        "public"
+    ],
+    "id_token_signing_alg_values_supported": [
+        "HS256",
+        "RS256"
+    ],
+    "token_endpoint_auth_methods_supported": [
+        "client_secret_basic",
+        "client_secret_post"
+    ],
+    "claims_supported": [
+        "aud",
+        "auth_time",
+        "created_at",
+        "email",
+        "email_verified",
+        "exp",
+        "family_name",
+        "given_name",
+        "iat",
+        "identities",
+        "iss",
+        "name",
+        "nickname",
+        "phone_number",
+        "picture",
+        "sub"
+    ],
+    "request_uri_parameter_supported": false
+}
diff --git a/t/plugin/request-validation.t b/t/plugin/request-validation.t
index cc075ca..f870768 100644
--- a/t/plugin/request-validation.t
+++ b/t/plugin/request-validation.t
@@ -73,44 +73,50 @@ done
 --- config
     location /t {
         content_by_lua_block {
+            local json = require("toolkit.json")
             local t = require("lib.test_admin").test
+            local data = {
+                plugins = {
+                    ["request-validation"] = {
+                    body_schema = {
+                        type = "object",
+                        required = { "required_payload" },
+                        properties = {
+                        required_payload = {
+                            type = "string"
+                        },
+                        boolean_payload = {
+                            type = "boolean"
+                        },
+                        timeouts = {
+                            type = "integer",
+                            minimum = 1,
+                            maximum = 254,
+                            default = 3
+                        },
+                        req_headers = {
+                            type = "array",
+                            minItems = 1,
+                            items = {
+                                type = "string"
+                            }
+                        }
+                        }
+                    }
+                    }
+                },
+                upstream = {
+                    nodes = {
+                        ["127.0.0.1:1982"] = 1
+                    },
+                    type = "roundrobin"
+                },
+                uri = "/opentracing"
+            }
             local code, body = t('/apisix/admin/routes/1',
                  ngx.HTTP_PUT,
-                 [[{
-                        "plugins": {
-                            "request-validation": {
-                                "body_schema": {
-                                    "type": "object",
-                                    "required": ["required_payload"],
-                                    "properties": {
-                                        "required_payload": {"type": "string"},
-                                        "boolean_payload": {"type": "boolean"},
-                                        "timeouts": {
-                                           "type": "integer",
-                                            "minimum": 1,
-                                            "maximum": 254,
-                                            "default": 3
-                                        },
-                                        "req_headers": {
-                                            "type": "array",
-                                            "minItems": 1,
-                                            "items": {
-                                                "type": "string"
-                                            }
-                                        }
-                                    }
-                                }
-                            }
-                        },]] .. [[
-                        "upstream": {
-                            "nodes": {
-                                "127.0.0.1:1982": 1
-                            },
-                            "type": "roundrobin"
-                        },
-                        "uri": "/opentracing"
-                }]]
-                )
+                 json.encode(data)
+            )
 
             if code >= 300 then
                 ngx.status = code
@@ -195,43 +201,49 @@ hello1 world
 --- config
     location /t {
         content_by_lua_block {
+            local json = require("toolkit.json")
             local t = require("lib.test_admin").test
+            local data = {
+                plugins = {
+                    ["request-validation"] = {
+                    header_schema = {
+                        type = "object",
+                        required = { "required_payload" },
+                        properties = {
+                        required_payload = {
+                            type = "string"
+                        },
+                        boolean_payload = {
+                            type = "boolean"
+                        },
+                        timeouts = {
+                            type = "integer",
+                            minimum = 1,
+                            maximum = 254,
+                            default = 3
+                        },
+                        req_headers = {
+                            type = "array",
+                            minItems = 1,
+                            items = {
+                            type = "string"
+                            }
+                        }
+                        }
+                    }
+                    }
+                },
+                upstream = {
+                    nodes = {
+                    ["127.0.0.1:1982"] = 1
+                    },
+                    type = "roundrobin"
+                },
+                uri = "/opentracing"
+            }
             local code, body = t('/apisix/admin/routes/1',
                  ngx.HTTP_PUT,
-                 [[{
-                        "plugins": {
-                            "request-validation": {
-                                "header_schema": {
-                                    "type": "object",
-                                    "required": ["required_payload"],
-                                    "properties": {
-                                        "required_payload": {"type": "string"},
-                                        "boolean_payload": {"type": "boolean"},
-                                        "timeouts": {
-                                           "type": "integer",
-                                            "minimum": 1,
-                                            "maximum": 254,
-                                            "default": 3
-                                        },
-                                        "req_headers": {
-                                            "type": "array",
-                                            "minItems": 1,
-                                            "items": {
-                                                "type": "string"
-                                            }
-                                        }
-                                    }
-                                }
-                            }
-                        },]] .. [[
-                        "upstream": {
-                            "nodes": {
-                                "127.0.0.1:1982": 1
-                            },
-                            "type": "roundrobin"
-                        },
-                        "uri": "/opentracing"
-                }]]
+                 json.encode(data)
                 )
 
             if code >= 300 then
diff --git a/t/plugin/traffic-split.t b/t/plugin/traffic-split.t
index b1a932c..cff6d6d 100644
--- a/t/plugin/traffic-split.t
+++ b/t/plugin/traffic-split.t
@@ -304,40 +304,46 @@ GET /t
 --- config
     location /t {
         content_by_lua_block {
+            local json = require("toolkit.json")
             local t = require("lib.test_admin").test
-            local code, body = t('/apisix/admin/routes/1',
-                ngx.HTTP_PUT,
-                [=[{
-                    "uri": "/server_port",
-                    "plugins": {
-                        "traffic-split": {
-                            "rules": [
-                                {
-                                    "match": [
-                                        {
-                                            "vars": [["arg_name", "==", 
"jack"],["arg_age", "!","<", "16"]]
-                                        }
-                                    ],
-                                    "weighted_upstreams": [
-                                        {
-                                           "upstream": {"name": "upstream_A", 
"type": "roundrobin", "nodes": {"127.0.0.1:1981":2}, "timeout": {"connect": 15, 
"send": 15, "read": 15}},
-                                            "weight": 2
-                                        },
-                                        {
-                                            "weight": 1
-                                        }
-                                    ]
-                                }
-                            ]
-                        }
-                    },
-                    "upstream": {
-                            "type": "roundrobin",
-                            "nodes": {
-                                "127.0.0.1:1980": 1
+            local data = {
+                uri = "/server_port",
+                plugins = {
+                    ["traffic-split"] = {
+                        rules = { {
+                        match = { {
+                            vars = { { "arg_name", "==", "jack" }, { 
"arg_age", "!", "<", "16" } }
+                        } },
+                        weighted_upstreams = { {
+                            upstream = {
+                            name = "upstream_A",
+                            type = "roundrobin",
+                            nodes = {
+                                ["127.0.0.1:1981"] = 2
+                            },
+                            timeout = {
+                                connect = 15,
+                                send = 15,
+                                read = 15
                             }
+                            },
+                            weight = 2
+                        }, {
+                            weight = 1
+                        } }
+                        } }
                     }
-                }]=]
+                },
+                upstream = {
+                    type = "roundrobin",
+                    nodes = {
+                        ["127.0.0.1:1980"] = 1
+                    }
+                }
+            }
+            local code, body = t('/apisix/admin/routes/1',
+                ngx.HTTP_PUT,
+                json.encode(data)
             )
             if code >= 300 then
                 ngx.status = code
@@ -608,41 +614,38 @@ GET /t
 --- config
     location /t {
         content_by_lua_block {
+            local json = require("toolkit.json")
             local t = require("lib.test_admin").test
+            local data = {
+              uri = "/server_port",
+              plugins = {
+                ["traffic-split"] = {
+                  rules = { {
+                    weighted_upstreams = { {
+                      upstream = {
+                        name = "upstream_A",
+                        type = "roundrobin",
+                        nodes = {
+                          ["foo.com:80"] = 0
+                        }
+                      },
+                      weight = 2
+                    }, {
+                      weight = 1
+                    } }
+                  } }
+                }
+              },
+              upstream = {
+                type = "roundrobin",
+                nodes = {
+                  ["127.0.0.1:1980"] = 1
+                }
+              }
+            }
             local code, body = t('/apisix/admin/routes/1',
                 ngx.HTTP_PUT,
-                [[{
-                    "uri": "/server_port",
-                    "plugins": {
-                        "traffic-split": {
-                            "rules": [
-                                {
-                                    "weighted_upstreams": [
-                                        {
-                                            "upstream": {
-                                                "name": "upstream_A",
-                                                "type": "roundrobin",
-                                                "nodes": {
-                                                    "foo.com:80": 0
-                                                }
-                                            },
-                                            "weight": 2
-                                        },
-                                        {
-                                            "weight": 1
-                                        }
-                                    ]
-                                }
-                            ]
-                        }
-                    },
-                    "upstream": {
-                            "type": "roundrobin",
-                            "nodes": {
-                                "127.0.0.1:1980": 1
-                            }
-                    }
-                }]]
+                json.encode(data)
             )
             if code >= 300 then
                 ngx.status = code
@@ -750,42 +753,38 @@ GET /t
 --- config
     location /t {
         content_by_lua_block {
+            local json = require("toolkit.json")
             local t = require("lib.test_admin").test
+            local data = {
+              uri = "/server_port",
+              plugins = {
+                ["traffic-split"] = {
+                  rules = { {
+                    match = { {
+                      vars = { { "http_release", "==", "blue" } }
+                    } },
+                    weighted_upstreams = { {
+                      upstream = {
+                        name = "upstream_A",
+                        type = "roundrobin",
+                        nodes = {
+                          ["127.0.0.1:1981"] = 1
+                        }
+                      }
+                    } }
+                  } }
+                }
+              },
+              upstream = {
+                type = "roundrobin",
+                nodes = {
+                  ["127.0.0.1:1980"] = 1
+                }
+              }
+            }
             local code, body = t('/apisix/admin/routes/1',
                 ngx.HTTP_PUT,
-                [=[{
-                    "uri": "/server_port",
-                    "plugins": {
-                        "traffic-split": {
-                            "rules": [
-                                {
-                                    "match": [
-                                        {
-                                            "vars": 
[["http_release","==","blue"]]
-                                        }
-                                    ],
-                                    "weighted_upstreams": [
-                                        {
-                                            "upstream": {
-                                                "name": "upstream_A",
-                                                "type": "roundrobin",
-                                                "nodes": {
-                                                    "127.0.0.1:1981":1
-                                                }
-                                            }
-                                        }
-                                    ]
-                                }
-                            ]
-                        }
-                    },
-                    "upstream": {
-                            "type": "roundrobin",
-                            "nodes": {
-                                "127.0.0.1:1980": 1
-                            }
-                    }
-                }]=]
+                json.encode(data)
             )
             if code >= 300 then
                 ngx.status = code
diff --git a/t/plugin/traffic-split2.t b/t/plugin/traffic-split2.t
index eb0f926..58c98b5 100644
--- a/t/plugin/traffic-split2.t
+++ b/t/plugin/traffic-split2.t
@@ -179,43 +179,39 @@ x-real-ip: 127.0.0.1
 --- config
     location /t {
         content_by_lua_block {
+            local json = require("toolkit.json")
             local t = require("lib.test_admin").test
+            local data = {
+              uri = "/uri",
+              plugins = {
+                ["traffic-split"] = {
+                  rules = { {
+                    match = { {
+                      vars = { { "arg_name", "==", "jack" } }
+                    } },
+                    weighted_upstreams = { {
+                      upstream = {
+                        type = "roundrobin",
+                        pass_host = "rewrite",
+                        upstream_host = "test.com",
+                        nodes = {
+                          ["127.0.0.1:1981"] = 1
+                        }
+                      }
+                    } }
+                  } }
+                }
+              },
+              upstream = {
+                type = "roundrobin",
+                nodes = {
+                  ["127.0.0.1:1980"] = 1
+                }
+              }
+            }
             local code, body = t('/apisix/admin/routes/1',
                 ngx.HTTP_PATCH,
-                [=[{
-                    "uri": "/uri",
-                    "plugins": {
-                        "traffic-split": {
-                            "rules": [
-                                {
-                                    "match": [
-                                        {
-                                            "vars": [["arg_name", "==", 
"jack"]]
-                                        }
-                                    ],
-                                    "weighted_upstreams": [
-                                        {
-                                            "upstream": {
-                                                "type": "roundrobin",
-                                                "pass_host": "rewrite",
-                                                "upstream_host": "test.com",
-                                                "nodes": {
-                                                    "127.0.0.1:1981":1
-                                                }
-                                            }
-                                        }
-                                    ]
-                                }
-                            ]
-                        }
-                    },
-                    "upstream": {
-                            "type": "roundrobin",
-                            "nodes": {
-                                "127.0.0.1:1980": 1
-                            }
-                    }
-                }]=]
+                json.encode(data)
             )
             if code >= 300 then
                 ngx.status = code
@@ -415,62 +411,53 @@ chash_key: "hello"
 --- config
     location /t {
         content_by_lua_block {
+            local json = require("toolkit.json")
             local t = require("lib.test_admin").test
-            local code, body = t('/apisix/admin/routes/1',
-                ngx.HTTP_PATCH,
-                [=[{
-                    "uri": "/server_port",
-                    "plugins": {
-                        "traffic-split": {
-                            "rules": [
-                                {
-                                    "match": [
-                                        {
-                                            "vars": [["arg_id","==","1"]]
-                                        }
-                                    ],
-                                    "weighted_upstreams": [
-                                        {
-                                            "upstream": {
-                                                "name": "upstream_A",
-                                                "type": "roundrobin",
-                                                "nodes": {
-                                                    "127.0.0.1:1981":1
-                                                }
-                                            },
-                                            "weight": 1
-                                        }
-                                    ]
-                                },
-                                {
-                                    "match": [
-                                        {
-                                            "vars": [["arg_id","==","2"]]
-                                        }
-                                    ],
-                                    "weighted_upstreams": [
-                                        {
-                                            "upstream": {
-                                                "name": "upstream_B",
-                                                "type": "roundrobin",
-                                                "nodes": {
-                                                    "127.0.0.1:1982":1
-                                                }
-                                            },
-                                            "weight": 1
-                                        }
-                                    ]
+            local data = {
+                uri = "/server_port",
+                plugins = {
+                    ["traffic-split"] = {
+                    rules = { {
+                        match = { {
+                            vars = { { "arg_id", "==", "1" } }
+                        } },
+                        weighted_upstreams = { {
+                            upstream = {
+                                name = "upstream_A",
+                                type = "roundrobin",
+                                nodes = {
+                                    ["127.0.0.1:1981"] = 1
                                 }
-                            ]
-                        }
-                    },
-                    "upstream": {
-                            "type": "roundrobin",
-                            "nodes": {
-                                "127.0.0.1:1980": 1
-                            }
+                            },
+                            weight = 1
+                        } }
+                    }, {
+                        match = { {
+                            vars = { { "arg_id", "==", "2" } }
+                        } },
+                        weighted_upstreams = { {
+                            upstream = {
+                                name = "upstream_B",
+                                type = "roundrobin",
+                                nodes = {
+                                    ["127.0.0.1:1982"] = 1
+                                }
+                            },
+                            weight = 1
+                        } }
+                    } }
                     }
-                }]=]
+                },
+                upstream = {
+                    type = "roundrobin",
+                    nodes = {
+                        ["127.0.0.1:1980"] = 1
+                    }
+                }
+            }
+            local code, body = t('/apisix/admin/routes/1',
+                ngx.HTTP_PATCH,
+                json.encode(data)
             )
             if code >= 300 then
                 ngx.status = code
@@ -514,68 +501,57 @@ qr/1980, 1981, 1982, 1980, 1981, 1982, 1980, 1981, 1982/
 --- config
     location /t {
         content_by_lua_block {
+            local json = require("toolkit.json")
             local t = require("lib.test_admin").test
-            local code, body = t('/apisix/admin/routes/1',
-                ngx.HTTP_PATCH,
-                [=[{
-                    "uri": "/server_port",
-                    "plugins": {
-                        "traffic-split": {
-                            "rules": [
-                                {
-                                    "match": [
-                                        {
-                                            "vars": [["arg_id","==","1"]]
-                                        }
-                                    ],
-                                    "weighted_upstreams": [
-                                        {
-                                            "upstream": {
-                                                "name": "upstream_A",
-                                                "type": "roundrobin",
-                                                "nodes": {
-                                                    "127.0.0.1:1981":1
-                                                }
-                                            },
-                                            "weight": 1
-                                        },
-                                        {
-                                            "weight": 1
-                                        }
-                                    ]
-                                },
-                                {
-                                    "match": [
-                                        {
-                                            "vars": [["arg_id","==","2"]]
-                                        }
-                                    ],
-                                    "weighted_upstreams": [
-                                        {
-                                            "upstream": {
-                                                "name": "upstream_B",
-                                                "type": "roundrobin",
-                                                "nodes": {
-                                                    "127.0.0.1:1982":1
-                                                }
-                                            },
-                                            "weight": 1
-                                        },
-                                        {
-                                            "weight": 1
-                                        }
-                                    ]
+            local data = {
+                uri = "/server_port",
+                plugins = {
+                    ["traffic-split"] = {
+                        rules = { {
+                            match = { {
+                            vars = { { "arg_id", "==", "1" } }
+                            } },
+                            weighted_upstreams = { {
+                            upstream = {
+                                name = "upstream_A",
+                                type = "roundrobin",
+                                nodes = {
+                                    ["127.0.0.1:1981"] = 1
                                 }
-                            ]
-                        }
-                    },
-                    "upstream": {
-                            "type": "roundrobin",
-                            "nodes": {
-                                "127.0.0.1:1980": 1
-                            }
+                            },
+                            weight = 1
+                            }, {
+                            weight = 1
+                            } }
+                        }, {
+                            match = { {
+                            vars = { { "arg_id", "==", "2" } }
+                            } },
+                            weighted_upstreams = { {
+                            upstream = {
+                                name = "upstream_B",
+                                type = "roundrobin",
+                                nodes = {
+                                    ["127.0.0.1:1982"] = 1
+                                }
+                            },
+                            weight = 1
+                            }, {
+                            weight = 1
+                            } }
+                        } }
                     }
-                }]=]
+                },
+                upstream = {
+                    type = "roundrobin",
+                    nodes = {
+                        ["127.0.0.1:1980"] = 1
+                    }
+                }
+            }
+            local code, body = t('/apisix/admin/routes/1',
+                ngx.HTTP_PATCH,
+                json.encode(data)
             )
             if code >= 300 then
                 ngx.status = code
diff --git a/t/stream-plugin/limit-conn.t b/t/stream-plugin/limit-conn.t
new file mode 100644
index 0000000..95e0b82
--- /dev/null
+++ b/t/stream-plugin/limit-conn.t
@@ -0,0 +1,191 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX 'no_plan';
+
+repeat_each(1);
+no_long_string();
+no_shuffle();
+no_root_location();
+
+
+add_block_preprocessor(sub {
+    my ($block) = @_;
+
+    if (!$block->error_log && !$block->no_error_log) {
+        $block->set_value("no_error_log", "[error]\n[alert]");
+    }
+
+    my $config = $block->config // <<_EOC_;
+    location /hit {
+        content_by_lua_block {
+            local sock = ngx.socket.tcp()
+            local ok, err = sock:connect("127.0.0.1", 1985)
+            if not ok then
+                ngx.log(ngx.ERR, "failed to connect: ", err)
+                return ngx.exit(503)
+            end
+
+            local bytes, err = sock:send("mmm")
+            if not bytes then
+                ngx.log(ngx.ERR, "send stream request error: ", err)
+                return ngx.exit(503)
+            end
+            local data, err = sock:receive("*a")
+            if not data then
+                sock:close()
+                return ngx.exit(503)
+            end
+            ngx.print(data)
+        }
+    }
+
+    location /test_concurrency {
+        content_by_lua_block {
+            local reqs = {}
+            for i = 1, 5 do
+                reqs[i] = { "/hit" }
+            end
+            local resps = { ngx.location.capture_multi(reqs) }
+            for i, resp in ipairs(resps) do
+                ngx.say(resp.status)
+            end
+        }
+    }
+_EOC_
+
+    $block->set_value("config", $config);
+
+    my $stream_upstream_code = $block->stream_upstream_code // <<_EOC_;
+            local sock = ngx.req.socket()
+            local data = sock:receive("1")
+            ngx.sleep(0.2)
+            ngx.say("hello world")
+_EOC_
+    $block->set_value("stream_upstream_code", $stream_upstream_code);
+});
+
+run_tests;
+
+__DATA__
+
+=== TEST 1: set route
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/upstreams/1',
+                ngx.HTTP_PUT,
+                [[{
+                    "nodes": {
+                        "127.0.0.1:1995": 1
+                    },
+                    "type": "roundrobin"
+                }]]
+                )
+
+            if code >= 300 then
+                ngx.status = code
+                ngx.say(body)
+                return
+            end
+
+            local code, body = t('/apisix/admin/stream_routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                    "plugins": {
+                        "limit-conn": {
+                            "conn": 100,
+                            "burst": 50,
+                            "default_conn_delay": 0.1,
+                            "key": "remote_addr"
+                        }
+                    },
+                    "upstream_id": "1"
+                }]]
+                )
+
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- request
+GET /t
+--- response_body
+passed
+
+
+
+=== TEST 2: not exceeding the burst
+--- request
+GET /test_concurrency
+--- response_body
+200
+200
+200
+200
+200
+--- stream_enable
+
+
+
+=== TEST 3: update route
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/stream_routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                    "plugins": {
+                        "limit-conn": {
+                            "conn": 2,
+                            "burst": 1,
+                            "default_conn_delay": 0.1,
+                            "key": "remote_addr"
+                        }
+                    },
+                    "upstream_id": "1"
+                }]]
+                )
+
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- request
+GET /t
+--- response_body
+passed
+
+
+
+=== TEST 4: exceeding the burst
+--- request
+GET /test_concurrency
+--- response_body
+200
+200
+200
+503
+503
+--- error_log
+Connection reset by peer
+--- stream_enable

Reply via email to